diff --git a/crates/gitlawb-node/src/api/events.rs b/crates/gitlawb-node/src/api/events.rs index 45db8a0..c865720 100644 --- a/crates/gitlawb-node/src/api/events.rs +++ b/crates/gitlawb-node/src/api/events.rs @@ -2,24 +2,120 @@ use std::collections::HashMap; -use axum::extract::{Path, Query, State}; +use axum::extract::{Extension, Path, Query, State}; use axum::Json; +use crate::auth::AuthenticatedDid; use crate::error::Result; use crate::state::AppState; +/// Hard ceiling on rows any ref-update feed returns in one request. Shared by the +/// shared collector's clamp and the per-handler request caps so they can't drift. +const MAX_VISIBLE_REF_UPDATES: i64 = 200; + +/// Collect up to `limit` ref-update rows visible to `caller`, newest first, +/// paging past rows the feed gate drops. Filtering after a plain SQL `LIMIT` +/// under-serves an anonymous caller whenever the newest rows name private local +/// repos (#114): the older, visible rows are never fetched, so a small limit can +/// return zero. Over-fetch in bounded pages until `limit` visible rows are +/// collected or the scan window is spent. Fail-closed: any DB error propagates +/// rather than serving ungated rows, and the scan cap only ever returns fewer +/// rows. Rows matching no local repo pass through (remote/gossip-only). Shared by +/// the REST global feed (#114) and the GraphQL `ref_updates` resolver (#112) so +/// the one gate cannot drift between the two surfaces. +pub(crate) async fn collect_visible_ref_updates( + db: &crate::db::Db, + repo: Option<&str>, + limit: i64, + caller: Option<&str>, +) -> Result> { + // 128 rows per DB round-trip. The page size is a parameter on the inner fn + // only so tests can force multi-page offset paging over a small dataset. + collect_visible_ref_updates_inner(db, repo, limit, caller, 128).await +} + +async fn collect_visible_ref_updates_inner( + db: &crate::db::Db, + repo: Option<&str>, + limit: i64, + caller: Option<&str>, + page: i64, +) -> Result> { + // Clamp the effective limit inside the shared collector so both callers are + // bounded: REST already caps at MAX_VISIBLE_REF_UPDATES, but the GraphQL + // resolver passes its caller-provided limit straight through, which would + // otherwise let a large request return unbounded rows and scan unbounded DB + // rows. + let bounded_limit = limit.clamp(0, MAX_VISIBLE_REF_UPDATES); + let want = bounded_limit as usize; + let mut visible = Vec::with_capacity(want); + if want == 0 { + return Ok(visible); + } + + // Gate inputs loaded once; DB errors abort (fail closed, never serve). + let deduped = db.list_all_repos_deduped().await?; + // Quarantined mirrors are excluded from the deduped set, and quarantine must + // be withheld from every surface INCLUDING the owner: it's a status decided + // at admission, checked separately from the mirror's (untrustworthy) + // visibility fields. A folded is_public=false cannot enforce that here — + // visibility_check short-circuits to Allow for the owner before is_public is + // read, so an owner-matched row would leak. Instead, drop any row that names a + // quarantined repo in the loop below, before the visibility gate runs, so the + // drop bypasses that owner short-circuit entirely. + let quarantined = db.list_quarantined_repos().await?; + let ids: Vec = deduped.iter().map(|r| r.id.clone()).collect(); + let rules = db.list_visibility_rules_for_repos(&ids).await?; + + // Never scan fewer rows than the caller asked for (no regression vs the old + // single LIMIT), but cap the walk so a feed of newest-private rows can't + // force an unbounded scan. The cap only fails safe (may return fewer). + let max_scan = bounded_limit.max(2_048); + let mut offset: i64 = 0; + while offset < max_scan { + let rows = db.list_ref_updates_page(repo, page, offset).await?; + let fetched = rows.len() as i64; + for u in rows { + // Quarantine denies unconditionally, before the visibility gate, so + // even a caller matching the mirror's owner_did cannot read the row. + if quarantined + .iter() + .any(|q| crate::visibility::ref_update_row_names_repo(q, &u.repo)) + { + continue; + } + if crate::visibility::ref_update_row_visible(&deduped, &rules, caller, &u.repo) { + visible.push(u); + if visible.len() == want { + return Ok(visible); + } + } + } + offset += fetched; + if fetched < page { + break; // page under-filled → table exhausted + } + } + Ok(visible) +} + /// GET /api/v1/events/ref-updates?limit=50 pub async fn list_ref_updates( State(state): State, Query(params): Query>, + auth: Option>, ) -> Result> { let limit = params .get("limit") .and_then(|v| v.parse::().ok()) .unwrap_or(50) - .min(200); + .clamp(0, MAX_VISIBLE_REF_UPDATES); + + // Fail-closed visibility gate (#114), applied before the limit via paging so + // an anon caller still gets the latest visible events, not a short page. + let caller = auth.as_ref().map(|e| e.0 .0.as_str()); + let updates = collect_visible_ref_updates(&state.db, None, limit, caller).await?; - let updates = state.db.list_ref_updates(limit).await?; let events: Vec = updates .iter() .map(|u| { @@ -50,87 +146,102 @@ pub async fn list_repo_events( State(state): State, Path((owner, repo_name)): Path<(String, String)>, Query(params): Query>, + auth: Option>, ) -> Result> { + // The lower bound of this clamp is load-bearing, not just an upper cap: the + // local ref-cert half below is bounded only by `all_events.truncate(limit as + // usize)`, which bypasses the shared collector. A negative limit would wrap to + // usize::MAX and leave that truncate a no-op. Do not relax to `.min` here (the + // global feed can, since its limit is re-clamped inside the collector). let limit = params .get("limit") .and_then(|v| v.parse::().ok()) .unwrap_or(50) - .min(200); + .clamp(0, MAX_VISIBLE_REF_UPDATES); - // Look up the repo record once so we can use the full owner DID - let repo_record = state.db.get_repo(&owner, &repo_name).await.ok().flatten(); + // Gate this handler in two layers (#112/#114). First, a repo-root read gate on + // THIS repo: authorize_repo_read returns RepoNotFound (→ 404) when the repo is + // quarantined, visibility-denied, or not hosted here, so the local ref + // certificates (keyed by the unique repo record id) are served only to a caller + // who may read this repo. A repo this node does not host returns 404: it holds no + // visibility record for it, so it fails closed (remote gossip is read via the + // global /api/v1/events/ref-updates feed). Second, the gossip half below is + // filtered per row: received_ref_updates rows are keyed by a lossy, non-unique + // wire slug, so the repo-root gate alone would leak a colliding private repo's + // rows — the shared collector's row gate closes that. + let caller = auth.as_ref().map(|e| e.0 .0.as_str()); + let (record, _rules) = + crate::api::authorize_repo_read(&state, &owner, &repo_name, caller, "/").await?; // Build the repo identifier using the FULL DID key part (not the 8-char URL truncation). // Gossip events are stored as "{full_key_part}/{repo_name}" (e.g. "z6MksXZDfullkeyhere/myrepo"), // but the URL only carries the first 8 chars of the key. Without the full slug the // WHERE repo = '...' query never matches and the events tab appears empty. - let repo_id_str = if let Some(ref record) = repo_record { - format!( - "{}/{}", - record - .owner_did - .split(':') - .next_back() - .unwrap_or(&record.owner_did), - repo_name - ) - } else { - format!("{owner}/{repo_name}") - }; + let repo_id_str = format!( + "{}/{}", + record + .owner_did + .split(':') + .next_back() + .unwrap_or(&record.owner_did), + repo_name + ); - // Fetch local ref certificates for this repo (if the repo exists on this node) - let cert_events: Vec = if let Some(ref record) = repo_record { - state - .db - .list_ref_certificates(&record.id) - .await - .unwrap_or_default() - .iter() - .map(|c| { - serde_json::json!({ - "type": "local_cert", - "id": c.id, - "repo": repo_id_str, - "ref_name": c.ref_name, - "old_sha": c.old_sha, - "new_sha": c.new_sha, - "pusher_did": c.pusher_did, - "node_did": c.node_did, - "timestamp": c.issued_at, - "source": "local", - }) - }) - .collect() - } else { - vec![] - }; - - // Fetch gossipsub received ref updates for this repo (uses full slug built above) - let gossip_events: Vec = state + // Fetch this repo's local ref certificates (keyed by the unique record id, so no + // slug-collision concern). DB errors propagate as 500 rather than being swallowed + // into an empty 200, matching the gossip half below. + let cert_events: Vec = state .db - .list_repo_ref_updates(&repo_id_str, limit) - .await - .unwrap_or_default() + .list_ref_certificates(&record.id) + .await? .iter() - .map(|u| { + .map(|c| { serde_json::json!({ - "type": "gossipsub", - "id": u.id, - "repo": u.repo, - "ref_name": u.ref_name, - "old_sha": u.old_sha, - "new_sha": u.new_sha, - "pusher_did": u.pusher_did, - "node_did": u.node_did, - "timestamp": u.timestamp, - "cert_id": u.cert_id, - "received_at": u.received_at, - "from_peer": u.from_peer, - "source": "gossipsub", + "type": "local_cert", + "id": c.id, + "repo": repo_id_str, + "ref_name": c.ref_name, + "old_sha": c.old_sha, + "new_sha": c.new_sha, + "pusher_did": c.pusher_did, + "node_did": c.node_did, + "timestamp": c.issued_at, + "source": "local", }) }) .collect(); + // Fetch gossipsub received ref updates for this repo (uses full slug built above), + // filtered per row by the SAME shared gate the cross-repo feeds use. The slug is + // the non-unique wire form {last-segment}/{name}: two owners (e.g. + // did:web:a:alice and did:web:b:alice) collide on `alice/name`, so a plain + // `WHERE repo = slug` query would serve a colliding PRIVATE repo's rows to anyone + // allowed to read this one. collect_visible_ref_updates drops any row whose slug + // matches a local repo the caller cannot read (fail-closed), and propagates DB + // errors instead of swallowing them. + let gossip_events: Vec = + collect_visible_ref_updates(&state.db, Some(&repo_id_str), limit, caller) + .await? + .iter() + .map(|u| { + serde_json::json!({ + "type": "gossipsub", + "id": u.id, + "repo": u.repo, + "ref_name": u.ref_name, + "old_sha": u.old_sha, + "new_sha": u.new_sha, + "pusher_did": u.pusher_did, + "node_did": u.node_did, + "timestamp": u.timestamp, + "cert_id": u.cert_id, + "received_at": u.received_at, + "from_peer": u.from_peer, + "source": "gossipsub", + }) + }) + .collect(); + // Merge both lists let mut all_events: Vec = cert_events; all_events.extend(gossip_events); @@ -150,3 +261,1099 @@ pub async fn list_repo_events( serde_json::json!({ "events": all_events, "count": count }), )) } + +#[cfg(test)] +mod ref_updates_feed_tests { + use crate::db::{ReceivedRefUpdate, RefCertificate, RepoRecord}; + use crate::test_support::{signed_request_as, test_state}; + use axum::body::Body; + use axum::http::{Method, Request, StatusCode}; + use axum::Router; + use chrono::Utc; + use sqlx::PgPool; + use tower::ServiceExt; + + const OWNER: &str = "did:key:z6MkOwner"; + + fn repo(id: &str, owner_did: &str, name: &str, is_public: bool) -> RepoRecord { + let now = Utc::now(); + RepoRecord { + id: id.into(), + name: name.into(), + owner_did: owner_did.into(), + description: None, + is_public, + default_branch: "main".into(), + created_at: now, + updated_at: now, + disk_path: format!("/tmp/{id}"), + forked_from: None, + machine_id: None, + } + } + + fn ref_row(id: &str, slug: &str) -> ReceivedRefUpdate { + ReceivedRefUpdate { + id: id.into(), + node_did: "did:key:z6MkNode".into(), + pusher_did: "did:key:z6MkPusher".into(), + repo: slug.into(), + ref_name: "refs/heads/main".into(), + old_sha: "0".repeat(40), + new_sha: "a".repeat(40), + timestamp: Utc::now().to_rfc3339(), + cert_id: None, + received_at: Utc::now().to_rfc3339(), + from_peer: "peer1".into(), + } + } + + fn router(state: crate::state::AppState) -> Router { + Router::new() + .route( + "/api/v1/events/ref-updates", + axum::routing::get(super::list_ref_updates), + ) + .with_state(state) + } + + fn anon_get() -> Request { + Request::builder() + .method(Method::GET) + .uri("/api/v1/events/ref-updates") + .body(Body::empty()) + .expect("request builder") + } + + async fn body_json(resp: axum::response::Response) -> serde_json::Value { + let bytes = axum::body::to_bytes(resp.into_body(), usize::MAX) + .await + .expect("body bytes"); + serde_json::from_slice(&bytes).expect("json body") + } + + /// Repo slugs present in the `events` array of the feed response. + fn slugs(v: &serde_json::Value) -> Vec { + v["events"] + .as_array() + .expect("events array") + .iter() + .filter_map(|e| e["repo"].as_str().map(str::to_string)) + .collect() + } + + fn count(v: &serde_json::Value) -> u64 { + v["count"].as_u64().expect("count number") + } + + // --- repo-scoped events endpoint (list_repo_events) gate tests --- + // The handler serves one repo's ref certificates + received gossip ref-updates. + // authorize_repo_read gates the whole handler on repo-root read visibility: + // allow → serve both datasets; deny / quarantine / not-hosted → opaque 404. + + fn repo_events_router(state: crate::state::AppState) -> Router { + Router::new() + .route( + "/api/v1/repos/{owner}/{repo}/events", + axum::routing::get(super::list_repo_events), + ) + .with_state(state) + } + + fn ref_cert(id: &str, repo_id: &str) -> RefCertificate { + RefCertificate { + id: id.into(), + repo_id: repo_id.into(), + ref_name: "refs/heads/main".into(), + old_sha: "0".repeat(40), + new_sha: "b".repeat(40), + pusher_did: "did:key:z6MkPusher".into(), + node_did: "did:key:z6MkNode".into(), + signature: "sig".into(), + issued_at: Utc::now().to_rfc3339(), + } + } + + fn anon_repo_events(owner: &str, name: &str) -> Request { + Request::builder() + .method(Method::GET) + .uri(format!("/api/v1/repos/{owner}/{name}/events")) + .body(Body::empty()) + .expect("request builder") + } + + // Scenario 1 — load-bearing RED→GREEN: anon must not get a private local + // repo's row, and `count` must reflect the filtered set. + #[sqlx::test] + async fn feed_private_repo_dropped_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert!( + slugs(&body).is_empty(), + "anon must not see a private local repo's ref update, got {:?}", + slugs(&body) + ); + assert_eq!(count(&body), 0, "count must reflect the filtered set"); + } + + // Scenario 2 — owner still sees their own private repo's row. + #[sqlx::test] + async fn feed_private_repo_kept_for_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + + let resp = router(state) + .oneshot(signed_request_as( + OWNER, + Method::GET, + "/api/v1/events/ref-updates", + Body::empty(), + )) + .await + .unwrap(); + let body = body_json(resp).await; + assert_eq!(slugs(&body), vec!["z6MkOwner/widget".to_string()]); + assert_eq!(count(&body), 1); + } + + // Scenario 3 — mixed feed: anon sees only the public row; count == 1. + #[sqlx::test] + async fn feed_mixed_anon_gets_only_public(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .create_repo(&repo("priv", OWNER, "secret", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u_pub", "z6MkOwner/openrepo")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u_priv", "z6MkOwner/secret")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + let body = body_json(resp).await; + assert_eq!(slugs(&body), vec!["z6MkOwner/openrepo".to_string()]); + assert_eq!(count(&body), 1); + } + + // Scenario 4 — alias fail-closed: private repo's row stored full-DID form. + #[sqlx::test] + async fn feed_full_did_slug_dropped_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", "did:key:zABC", "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "did:key:zABC/widget")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + let body = body_json(resp).await; + assert!(slugs(&body).is_empty(), "full-DID alias must be dropped"); + assert_eq!(count(&body), 0); + } + + // Scenario 5 — truncated-key fail-closed: 8-char-prefix owner form. + #[sqlx::test] + async fn feed_truncated_key_slug_dropped_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", "did:key:zABCDEFGH", "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "zABCDEF/widget")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + let body = body_json(resp).await; + assert!( + slugs(&body).is_empty(), + "truncated-key alias must be dropped" + ); + assert_eq!(count(&body), 0); + } + + // Scenario 6 — remote slug (no local match) is returned to anon. + #[sqlx::test] + async fn feed_remote_slug_kept_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "zZZZOTHER/gadget")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + let body = body_json(resp).await; + assert_eq!(slugs(&body), vec!["zZZZOTHER/gadget".to_string()]); + assert_eq!(count(&body), 1); + } + + // Scenario 7 (#114 P2) — a small limit must page past the newest rows when + // they are private, so the older public rows are still returned instead of a + // short/empty page. Before the gate moved ahead of the limit this returned 0. + // RED→GREEN. + #[sqlx::test] + async fn feed_small_limit_pages_past_newest_private(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .create_repo(&repo("priv", OWNER, "secret", false)) + .await + .unwrap(); + // 3 older PUBLIC rows … + for i in 0..3 { + let mut r = ref_row(&format!("pub{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:0{i}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + // … then 5 NEWER PRIVATE rows (the newest in the feed). + for i in 0..5 { + let mut r = ref_row(&format!("priv{i}"), "z6MkOwner/secret"); + r.timestamp = format!("2026-07-01T10:00:1{i}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + + let req = Request::builder() + .method(Method::GET) + .uri("/api/v1/events/ref-updates?limit=3") + .body(Body::empty()) + .expect("request builder"); + let resp = router(state).oneshot(req).await.unwrap(); + let body = body_json(resp).await; + // The 3-row limit is filled from the older public rows, not left short. + assert_eq!( + count(&body), + 3, + "limit must be filled from older public rows" + ); + assert!( + slugs(&body).iter().all(|s| s == "z6MkOwner/openrepo"), + "returned rows must all be the public repo's, got {:?}", + slugs(&body) + ); + } + + // A negative limit on the GLOBAL feed must return zero, not the whole visible + // set. Unlike the repo feed, this handler has no local `truncate`; its guard is + // the shared collector's `clamp(0, MAX)` (want==0 short-circuits before any + // scan), so the handler-level clamp here is a consistency measure, not the + // load-bearing one. Seeded with 5 visible public rows so an unbounded return + // would be 5; asserting 0 proves the clamp chain holds. + #[sqlx::test] + async fn feed_negative_limit_returns_empty(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + for i in 0..5 { + let mut r = ref_row(&format!("pub{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:0{i}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + + let req = Request::builder() + .method(Method::GET) + .uri("/api/v1/events/ref-updates?limit=-1") + .body(Body::empty()) + .expect("request builder"); + let resp = router(state).oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + count(&body_json(resp).await), + 0, + "negative limit must clamp to 0, not return the full visible set" + ); + } + + // Scenario 8 (#114 P2) — multi-page paging: a page smaller than the dataset + // must still collect the requested visible rows from older pages, advancing + // the offset without skipping or duplicating. page=2 over 5 newest-private + + // 3 older-public rows spans four fetches (offset 0→2→4→6). Guards the offset + // paging that the single-page feed tests above can't reach. + #[sqlx::test] + async fn collect_visible_pages_across_page_boundary(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .create_repo(&repo("priv", OWNER, "secret", false)) + .await + .unwrap(); + for i in 0..3 { + let mut r = ref_row(&format!("pub{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:0{i}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + for i in 0..5 { + let mut r = ref_row(&format!("priv{i}"), "z6MkOwner/secret"); + r.timestamp = format!("2026-07-01T10:00:1{i}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + + let got = super::collect_visible_ref_updates_inner(&state.db, None, 3, None, 2) + .await + .unwrap(); + // All 3 older public rows, collected across four pages … + let got_slugs: Vec<&str> = got.iter().map(|u| u.repo.as_str()).collect(); + assert_eq!(got_slugs, vec!["z6MkOwner/openrepo"; 3]); + // … each exactly once (no duplicate rows across page boundaries). + let unique: std::collections::HashSet<&str> = got.iter().map(|u| u.id.as_str()).collect(); + assert_eq!(unique.len(), 3, "no row returned twice across pages"); + } + + // Scenario 9 — an oversized limit (the GraphQL resolver passes its + // caller-provided limit uncapped) must be clamped inside the shared collector + // so it can't return unbounded rows or scan unbounded DB rows. + #[sqlx::test] + async fn collect_visible_clamps_oversized_limit(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + // 201 public rows — one more than the 200 cap. + for i in 0..201 { + let mut r = ref_row(&format!("pub{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:00.{i:04}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + + let got = super::collect_visible_ref_updates_inner(&state.db, None, 100_000, None, 128) + .await + .unwrap(); + assert_eq!(got.len(), 200, "oversized limit must clamp to 200"); + } + + // Scenario 10 — a quarantined mirror is withheld from every listing surface. + // Its rows are excluded from list_all_repos_deduped, so without folding them + // into the match universe the gate would misclassify the row as remote and + // serve it to anon. + #[sqlx::test] + async fn feed_quarantined_mirror_withheld_from_anon(pool: PgPool) { + let state = test_state(pool).await; + // Quarantined mirror: admitted but unvalidated, withheld from listings. + state + .db + .upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + + let resp = router(state).oneshot(anon_get()).await.unwrap(); + let body = body_json(resp).await; + assert!( + slugs(&body).is_empty(), + "quarantined mirror's ref-update must be withheld from anon, got {:?}", + slugs(&body) + ); + } + + // Scenario 10b — a quarantined mirror must be withheld even from a caller who + // matches its owner_did, not just from anon. is_public=false cannot enforce + // this: visibility_check short-circuits to Allow for the owner BEFORE is_public + // is read, so quarantine has to deny before that check runs. The anon test + // above never exercises that owner short-circuit; this one does (RED before + // the collector's explicit quarantine drop). upsert_mirror_repo stores the + // owner as the bare short key, so the matching caller is the bare form. + #[sqlx::test] + async fn feed_quarantined_mirror_withheld_from_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + + let got = + super::collect_visible_ref_updates_inner(&state.db, None, 50, Some("z6MkQuar"), 128) + .await + .unwrap(); + let got_slugs: Vec<&str> = got.iter().map(|u| u.repo.as_str()).collect(); + assert!( + got_slugs.is_empty(), + "quarantined mirror must be withheld from its own owner, got {got_slugs:?}" + ); + } + + // Scenario 10c — a quarantined repo whose owner_did is a full did:key must be + // withheld from that full-DID owner, the exact identity require_signature + // injects on the live path. This is the reachable shape once an operator + // quarantines a canonical repo via set_repo_quarantine. RED before the drop: + // the owner short-circuit keeps the row for the full-DID caller. + #[sqlx::test] + async fn feed_quarantined_full_did_repo_withheld_from_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("q1", "did:key:z6MkQuar", "secret", false)) + .await + .unwrap(); + let touched = state.db.set_repo_quarantine("q1", true).await.unwrap(); + assert_eq!(touched, 1, "quarantine flag must be set on the repo"); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + + let got = super::collect_visible_ref_updates_inner( + &state.db, + None, + 50, + Some("did:key:z6MkQuar"), + 128, + ) + .await + .unwrap(); + let got_slugs: Vec<&str> = got.iter().map(|u| u.repo.as_str()).collect(); + assert!( + got_slugs.is_empty(), + "quarantined full-DID repo must be withheld from its owner, got {got_slugs:?}" + ); + } + + // Must-not: the quarantine drop withholds ONLY the rows it names, never an + // unrelated visible row. A servable public repo alongside two quarantined + // mirrors — the public row is served, both quarantined rows withheld. This is + // the drop's `.any() == false → serve` branch over a NON-EMPTY (multi-element) + // quarantined set, which the single-repo tests above never reach. + #[sqlx::test] + async fn feed_quarantine_drop_does_not_suppress_unrelated_rows(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + state + .db + .upsert_mirror_repo("z6MkOther", "hidden", "/tmp/o", None, true) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("pub1", "z6MkOwner/openrepo")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("q1", "z6MkQuar/secret")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("q2", "z6MkOther/hidden")) + .await + .unwrap(); + + let got = super::collect_visible_ref_updates_inner(&state.db, None, 50, None, 128) + .await + .unwrap(); + let got_slugs: Vec<&str> = got.iter().map(|u| u.repo.as_str()).collect(); + assert_eq!( + got_slugs, + vec!["z6MkOwner/openrepo"], + "quarantine must withhold only its own rows, still serving unrelated visible ones" + ); + } + + // The live REST handler (not just the collector) must withhold a quarantined + // repo from an authenticated owner. Drives list_ref_updates through the router + // with the owner's full DID as caller — the identity require_signature injects. + #[sqlx::test] + async fn feed_quarantined_repo_withheld_from_owner_via_router(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("q1", "did:key:z6MkQuar", "secret", false)) + .await + .unwrap(); + state.db.set_repo_quarantine("q1", true).await.unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + + let req = signed_request_as( + "did:key:z6MkQuar", + Method::GET, + "/api/v1/events/ref-updates", + Body::empty(), + ); + let resp = router(state).oneshot(req).await.unwrap(); + let body = body_json(resp).await; + assert!( + slugs(&body).is_empty(), + "quarantined repo must be withheld from its owner via the REST handler, got {:?}", + slugs(&body) + ); + } + + // RED→GREEN: anon must not read a private repo's ref metadata; a denied read is + // an opaque 404, not a 200 carrying the cert/gossip rows. + #[sqlx::test] + async fn repo_events_private_repo_404_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_certificate(&ref_cert("c1", "r1")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkOwner", "widget")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NOT_FOUND, + "anon read of a private repo's events must be an opaque 404" + ); + } + + // Owner reads their own private repo → 200 with BOTH datasets (cert + gossip), + // guarding against a one-dataset half-fix. + #[sqlx::test] + async fn repo_events_private_repo_served_to_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_certificate(&ref_cert("c1", "r1")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(signed_request_as( + OWNER, + Method::GET, + "/api/v1/repos/z6MkOwner/widget/events", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!( + count(&body), + 2, + "owner sees both the cert and the gossip row" + ); + let sources: Vec<&str> = body["events"] + .as_array() + .expect("events array") + .iter() + .filter_map(|e| e["source"].as_str()) + .collect(); + assert!( + sources.contains(&"local"), + "cert row must be present, got {sources:?}" + ); + assert!( + sources.contains(&"gossipsub"), + "gossip row must be present, got {sources:?}" + ); + } + + // Anon reads a PUBLIC repo → 200 with data (positive control: the gate must not + // over-withhold). + #[sqlx::test] + async fn repo_events_public_repo_served_to_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .insert_ref_certificate(&ref_cert("c1", "pub")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/openrepo")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkOwner", "openrepo")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(count(&body_json(resp).await), 2); + } + + // Anon reads a quarantined mirror → 404 (withheld without disclosing existence + // via authorize_repo_read's quarantine short-circuit). + #[sqlx::test] + async fn repo_events_quarantined_mirror_404_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkQuar", "secret")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + // Authenticated non-owner with no visibility grant → 404 (visibility_check deny + // path, distinct from the anonymous case). + #[sqlx::test] + async fn repo_events_private_repo_404_for_non_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(signed_request_as( + "did:key:z6MkStranger", + Method::GET, + "/api/v1/repos/z6MkOwner/widget/events", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + // RED→GREEN characterization of the deliberate behavior change: a repo NOT + // hosted here (no repos row) but with a received gossip row under a matching + // last-segment slug was served a populated 200 pre-gate; the gate closes it to + // 404 (this node holds no visibility record for a not-hosted repo, so it fails + // closed). Every other scenario seeds a local row and is blind to this path. + #[sqlx::test] + async fn repo_events_not_local_with_gossip_404_for_anon(pool: PgPool) { + let state = test_state(pool).await; + // No create_repo → get_repo returns None. A did:web-style short last segment + // ("alice") makes the stored gossip slug equal the URL owner, so pre-gate the + // not-local fallback slug matched and served the row. + state + .db + .insert_ref_update(&ref_row("u1", "alice/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("alice", "widget")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::NOT_FOUND, + "a repo this node does not host must 404, not serve its gossip" + ); + } + + // A private LOCAL did:web repo denies anon → 404. Complements the not-local test: + // this proves anon cannot read a private did:web repo; the not-local test is what + // exercises the truncated-owner resolution path. + #[sqlx::test] + async fn repo_events_did_web_private_local_404_for_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("r1", "did:web:example.com:alice", "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "alice/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("alice", "widget")) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::NOT_FOUND); + } + + // RED→GREEN: the repo-root gate authorizes a record, but gossip rows are keyed by + // the lossy, non-unique wire slug {last-segment}/{name}. Two did:web owners + // (good.com:alice, evil.com:alice) collide on `alice/widget`, so without per-row + // filtering an anon read of the PUBLIC repo is served the colliding PRIVATE repo's + // gossip. The gossip half must drop it (fail-closed on the shared slug). + #[sqlx::test] + async fn repo_events_gossip_slug_collision_withheld_from_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", "did:web:good.com:alice", "widget", true)) + .await + .unwrap(); + state + .db + .create_repo(&repo("priv", "did:web:evil.com:alice", "widget", false)) + .await + .unwrap(); + // One gossip row under the shared last-segment slug (how both repos' rows key). + state + .db + .insert_ref_update(&ref_row("collide", "alice/widget")) + .await + .unwrap(); + + // Full owner_did in the URL so get_repo resolves the PUBLIC repo deterministically + // (exact owner_did match; the private repo only LIKE-matches a short owner). + let resp = repo_events_router(state) + .oneshot(anon_repo_events("did:web:good.com:alice", "widget")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "public repo is anon-readable (cert half)" + ); + let body = body_json(resp).await; + let gossip_served = body["events"] + .as_array() + .expect("events array") + .iter() + .any(|e| e["source"].as_str() == Some("gossipsub")); + assert!( + !gossip_served, + "collision leak: anon read of public alice/widget served gossip keyed to a private sibling: {}", + body["events"] + ); + } + + // Authenticated non-owner reads a PUBLIC repo → 200 with data. Exercises + // visibility_check's is_public Allow branch with a Some(caller), which the + // anon-public and non-owner-private tests do not cover together. + #[sqlx::test] + async fn repo_events_public_repo_served_to_authenticated_non_owner(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkOwner/openrepo")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(signed_request_as( + "did:key:z6MkStranger", + Method::GET, + "/api/v1/repos/z6MkOwner/openrepo/events", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!(count(&body_json(resp).await), 1); + } + + // did:web OWNER reads their own private repo → 200 with both datasets. Confirms the + // last-segment slug ("alice/widget") serves gossip to an authorized non-did:key + // owner (the happy-path complement to the did:web deny test); exercises the gossip + // KEEP branch of the shared collector for a did:web caller. + #[sqlx::test] + async fn repo_events_did_web_owner_reads_own_gossip(pool: PgPool) { + let state = test_state(pool).await; + let owner = "did:web:example.com:alice"; + state + .db + .create_repo(&repo("r1", owner, "widget", false)) + .await + .unwrap(); + state + .db + .insert_ref_certificate(&ref_cert("c1", "r1")) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "alice/widget")) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(signed_request_as( + owner, + Method::GET, + "/api/v1/repos/did:web:example.com:alice/widget/events", + Body::empty(), + )) + .await + .unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + let body = body_json(resp).await; + assert_eq!(count(&body), 2, "did:web owner sees cert + gossip"); + let sources: Vec<&str> = body["events"] + .as_array() + .expect("events array") + .iter() + .filter_map(|e| e["source"].as_str()) + .collect(); + assert!( + sources.contains(&"gossipsub"), + "did:web owner's own gossip must be served, got {sources:?}" + ); + } + + // An oversized limit is clamped at this handler (parity with the global feed). + #[sqlx::test] + async fn repo_events_oversized_limit_clamped(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + for i in 0..201 { + let mut r = ref_row(&format!("g{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:00.{i:04}+00:00"); + state.db.insert_ref_update(&r).await.unwrap(); + } + + let req = Request::builder() + .method(Method::GET) + .uri("/api/v1/repos/z6MkOwner/openrepo/events?limit=100000") + .body(Body::empty()) + .expect("request builder"); + let resp = repo_events_router(state).oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + count(&body_json(resp).await), + 200, + "limit must clamp to MAX_VISIBLE_REF_UPDATES" + ); + } + + // A negative limit must floor to 0 at this handler, not wrap to usize::MAX and + // leave the local ref-cert list untruncated. The bug lives in the LOCAL half's + // `truncate(limit as usize)` (the gossip half is already clamped in the shared + // collector), so the repo is seeded with local certs and no gossip rows to keep + // the assertion load-bearing. + #[sqlx::test] + async fn repo_events_negative_limit_clamped(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + for i in 0..3 { + let mut c = ref_cert(&format!("c{i}"), "pub"); + c.ref_name = format!("refs/heads/b{i}"); + state.db.insert_ref_certificate(&c).await.unwrap(); + } + + let req = Request::builder() + .method(Method::GET) + .uri("/api/v1/repos/z6MkOwner/openrepo/events?limit=-1") + .body(Body::empty()) + .expect("request builder"); + let resp = repo_events_router(state).oneshot(req).await.unwrap(); + assert_eq!(resp.status(), StatusCode::OK); + assert_eq!( + count(&body_json(resp).await), + 0, + "negative limit must clamp to 0, not leave the local set untruncated" + ); + } + + // A mirror released from quarantine becomes readable → 200 (complements the + // quarantined→404 test; guards against the gate staying closed after release). + #[sqlx::test] + async fn repo_events_released_mirror_served_to_anon(pool: PgPool) { + let state = test_state(pool).await; + state + .db + .upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + state + .db + .insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + // upsert_mirror_repo builds the id as "{owner_short}/{name}". + state + .db + .set_repo_quarantine("z6MkQuar/secret", false) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkQuar", "secret")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::OK, + "a released mirror must be readable again" + ); + } + + // A DB error in the gate fails closed as 500, not swallowed into an empty 200 (the + // regression the old get_repo().ok().flatten() allowed). Inject by dropping a + // column get_repo selects so its query errors. + #[sqlx::test] + async fn repo_events_db_error_fails_closed_500(pool: PgPool) { + let state = test_state(pool.clone()).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", true)) + .await + .unwrap(); + sqlx::query("ALTER TABLE repos DROP COLUMN is_public") + .execute(&pool) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkOwner", "widget")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::INTERNAL_SERVER_ERROR, + "a DB error must fail closed (500), never serve an empty 200" + ); + } + + // Symmetric to the gate DB-error test: a DB error in the CERT fetch (after the gate + // passes) must also fail closed as 500, not an empty 200. Drop a column + // list_ref_certificates selects so its query errors. (sqlx::test gives each test its + // own isolated database, so the schema change cannot bleed into other tests.) + #[sqlx::test] + async fn repo_events_cert_db_error_fails_closed_500(pool: PgPool) { + let state = test_state(pool.clone()).await; + state + .db + .create_repo(&repo("r1", OWNER, "widget", true)) + .await + .unwrap(); + sqlx::query("ALTER TABLE ref_certificates DROP COLUMN signature") + .execute(&pool) + .await + .unwrap(); + + let resp = repo_events_router(state) + .oneshot(anon_repo_events("z6MkOwner", "widget")) + .await + .unwrap(); + assert_eq!( + resp.status(), + StatusCode::INTERNAL_SERVER_ERROR, + "a DB error in the cert fetch must fail closed (500), never an empty 200" + ); + } +} diff --git a/crates/gitlawb-node/src/api/mod.rs b/crates/gitlawb-node/src/api/mod.rs index ea4591f..6c6088f 100644 --- a/crates/gitlawb-node/src/api/mod.rs +++ b/crates/gitlawb-node/src/api/mod.rs @@ -407,7 +407,6 @@ mod authz_guard { ("list_labels", "#120"), ("list_repo_bounties", "#120"), ("get_star_status", "#120"), - ("list_repo_events", "#94 (PR #113)"), ("list_webhooks", "#94 (PR #113)"), ("list_replicas", "PR #113"), ("list_protected_branches", "PR #113"), diff --git a/crates/gitlawb-node/src/api/repos.rs b/crates/gitlawb-node/src/api/repos.rs index b74f3f6..4de9def 100644 --- a/crates/gitlawb-node/src/api/repos.rs +++ b/crates/gitlawb-node/src/api/repos.rs @@ -1213,17 +1213,26 @@ pub async fn git_receive_pack( } // Broadcast ref update to GraphQL subscription listeners — one per ref. + // Gated on `announce`: /graphql/ws is unauthenticated (mounted after + // the optional_signature layer), and the subscription resolver has no + // caller to gate against, so only publicly-readable ref updates may + // reach anonymous subscribers. Mirrors the gossip (above) and Arweave + // (below) sends, which are already `announce`-gated. Without this a + // private-repo push would leak live ref metadata over the socket — + // the subscription analog of #112/#114. let now_ts = chrono::Utc::now().to_rfc3339(); - for (ref_name, old_sha, new_sha) in &ref_updates_clone { - let _ = ref_update_tx.send(crate::state::RefUpdateBroadcast { - repo: repo_slug.clone(), - ref_name: ref_name.clone(), - old_sha: old_sha.clone(), - new_sha: new_sha.clone(), - pusher_did: pusher_did_clone.clone(), - node_did: node_did_str.clone(), - timestamp: now_ts.clone(), - }); + if announce { + for (ref_name, old_sha, new_sha) in &ref_updates_clone { + let _ = ref_update_tx.send(crate::state::RefUpdateBroadcast { + repo: repo_slug.clone(), + ref_name: ref_name.clone(), + old_sha: old_sha.clone(), + new_sha: new_sha.clone(), + pusher_did: pusher_did_clone.clone(), + node_did: node_did_str.clone(), + timestamp: now_ts.clone(), + }); + } } // Arweave permanent anchoring — fire for each ref update. @@ -1779,6 +1788,32 @@ mod tests { } } + /// `announce` is the single boolean that gates every network-facing emission + /// of a push: gossip, Arweave anchoring, and the GraphQL subscription + /// broadcast (the last one added in this change). It must be false for a repo + /// the anonymous public cannot read, or the unauthenticated `/graphql/ws` + /// subscription leaks live private-repo ref metadata. Pin both directions of + /// the decision the broadcast now sits behind. No disk access: a non-announce + /// repo returns early, and a public repo with no path-scoped rule skips the + /// withheld walk. + #[tokio::test] + async fn replication_announce_false_for_private_true_for_public() { + let dummy = std::path::PathBuf::from("/nonexistent"); + + // Private: no rules at all. + let (announce, _) = replication_withheld_set(None, OWNER_DID, false, dummy.clone()).await; + assert!(!announce, "private repo (no rules) must not announce"); + + // Private: empty rule set, is_public=false → still not listable at root. + let (announce, _) = + replication_withheld_set(Some(vec![]), OWNER_DID, false, dummy.clone()).await; + assert!(!announce, "private repo (empty rules) must not announce"); + + // Public: empty rule set, is_public=true → listable at root, announces. + let (announce, _) = replication_withheld_set(Some(vec![]), OWNER_DID, true, dummy).await; + assert!(announce, "public repo must announce"); + } + /// A rejection must be a 403 Forbidden (authenticated but not authorized), /// not a 400 — some git/CI clients retry 400s. fn assert_forbidden(rejection: Option) { diff --git a/crates/gitlawb-node/src/db/mod.rs b/crates/gitlawb-node/src/db/mod.rs index 31ff72f..e4b8e5a 100644 --- a/crates/gitlawb-node/src/db/mod.rs +++ b/crates/gitlawb-node/src/db/mod.rs @@ -1069,6 +1069,22 @@ impl Db { Ok(rows.into_iter().map(row_to_repo).collect()) } + /// Repos currently quarantined (admitted as mirrors but withheld from every + /// listing surface). `list_all_repos_deduped` excludes these (its `DEDUP_CTE` + /// filters `quarantined = FALSE`), so a gate that resolves a slug against the + /// deduped set must also match against these and fail closed, or a quarantined + /// repo's row is misclassified as remote/gossip-only and served. + pub async fn list_quarantined_repos(&self) -> Result> { + let rows = sqlx::query( + "SELECT id, name, owner_did, description, is_public, default_branch, + created_at, updated_at, disk_path, forked_from, machine_id + FROM repos WHERE quarantined = TRUE", + ) + .fetch_all(&self.pool) + .await?; + Ok(rows.into_iter().map(row_to_repo).collect()) + } + /// Count of distinct logical repos (mirror + canonical collapsed). Uses the /// same did:key-aware owner-key grouping as `DEDUP_CTE` (the CASE must stay /// byte-identical); the marker/tiebreak only decide which row would survive, @@ -2109,58 +2125,38 @@ impl Db { Ok(()) } - pub async fn list_ref_updates(&self, limit: i64) -> Result> { - let rows = sqlx::query( - "SELECT id, node_did, pusher_did, repo, ref_name, old_sha, new_sha, timestamp, - cert_id, received_at, from_peer - FROM received_ref_updates ORDER BY timestamp DESC LIMIT $1", - ) - .bind(limit) - .fetch_all(&self.pool) - .await?; - Ok(rows.into_iter().map(row_to_ref_update).collect()) - } - - pub async fn list_repo_ref_updates( - &self, - repo: &str, - limit: i64, - ) -> Result> { - let rows = sqlx::query( - "SELECT id, node_did, pusher_did, repo, ref_name, old_sha, new_sha, timestamp, - cert_id, received_at, from_peer - FROM received_ref_updates WHERE repo = $1 ORDER BY timestamp DESC LIMIT $2", - ) - .bind(repo) - .bind(limit) - .fetch_all(&self.pool) - .await?; - Ok(rows.into_iter().map(row_to_ref_update).collect()) - } - - /// Filtered ref updates — optionally scoped to a specific repo. - pub async fn list_ref_updates_filtered( + /// One page of ref updates (newest first), optionally scoped to one repo. + /// The `(timestamp DESC, id DESC)` order gives a stable tiebreak so offset + /// paging does not skip or duplicate rows when timestamps collide. Used by + /// the visibility-gated feed collector, which pages past dropped private rows + /// so a small limit still returns the latest visible events (#114). + pub async fn list_ref_updates_page( &self, repo: Option<&str>, limit: i64, + offset: i64, ) -> Result> { let rows = if let Some(r) = repo { sqlx::query( "SELECT id, node_did, pusher_did, repo, ref_name, old_sha, new_sha, timestamp, cert_id, received_at, from_peer - FROM received_ref_updates WHERE repo=$1 ORDER BY timestamp DESC LIMIT $2", + FROM received_ref_updates WHERE repo=$1 + ORDER BY timestamp DESC, id DESC LIMIT $2 OFFSET $3", ) .bind(r) .bind(limit) + .bind(offset) .fetch_all(&self.pool) .await? } else { sqlx::query( "SELECT id, node_did, pusher_did, repo, ref_name, old_sha, new_sha, timestamp, cert_id, received_at, from_peer - FROM received_ref_updates ORDER BY timestamp DESC LIMIT $1", + FROM received_ref_updates + ORDER BY timestamp DESC, id DESC LIMIT $1 OFFSET $2", ) .bind(limit) + .bind(offset) .fetch_all(&self.pool) .await? }; @@ -3346,6 +3342,42 @@ mod dedup_db_tests { ); } + /// A PRIVATE canonical repo and a PUBLIC mirror row for the same + /// (owner, name) collapse to a single survivor whose `is_public` is the + /// canonical `false`, not the mirror's `true`. `upsert_mirror_repo` always + /// writes `is_public=true`, so without this the deduped set could carry a + /// public flag for a locally-private repo and the ref-updates feed gate + /// would over-serve. Pins the DEDUP_CTE tiebreak so a future regression + /// that flips the survivor can't leak silently. + #[sqlx::test] + async fn deduped_private_canonical_beats_public_mirror(pool: PgPool) { + let db = db(pool).await; + // Private canonical row (rec() forces is_public=true, so build inline). + let mut canonical = rec( + "uuid-private-canonical", + "did:key:z6Mkwbud", + "nipmod", + "private canonical", + "2026-01-15T00:00:00Z", + "2026-01-15T00:00:00Z", + ); + canonical.is_public = false; + db.create_repo(&canonical).await.unwrap(); + // Public mirror row for the same (owner, name): id = "z6Mkwbud/nipmod", + // is_public = true. + db.upsert_mirror_repo("z6Mkwbud", "nipmod", "/srv/mirror", None, false) + .await + .unwrap(); + + let out = db.list_all_repos_deduped().await.unwrap(); + assert_eq!(out.len(), 1, "the pair collapses to one logical repo"); + assert_eq!(out[0].owner_did, "did:key:z6Mkwbud", "canonical row wins"); + assert!( + !out[0].is_public, + "survivor keeps the canonical private is_public=false, not the mirror's true" + ); + } + /// upsert_mirror_repo's own rows dedupe against a canonical twin (proves the /// real mirror writer's row shape is classified correctly). #[sqlx::test] diff --git a/crates/gitlawb-node/src/graphql/query.rs b/crates/gitlawb-node/src/graphql/query.rs index 2caeefb..55148e0 100644 --- a/crates/gitlawb-node/src/graphql/query.rs +++ b/crates/gitlawb-node/src/graphql/query.rs @@ -49,13 +49,30 @@ impl QueryRoot { &self, ctx: &Context<'_>, repo: Option, - #[graphql(default = 20)] limit: i64, + #[graphql( + default = 20, + desc = "Max 200; larger requests return the newest 200 rows (no continuation cursor)." + )] + limit: i64, ) -> Result> { let db = ctx.data_unchecked::>(); - let updates = db - .list_ref_updates_filtered(repo.as_deref(), limit) - .await - .map_err(|e| async_graphql::Error::new(e.to_string()))?; + + // Gate each row on the same "/" visibility decision the repos resolver + // uses, so anonymous callers get no row for a local repo they can't read + // (#112). The shared collector applies the fail-closed gate *before* the + // limit (paging past dropped private rows) so a small limit still returns + // the latest visible events, and keeps this surface byte-identical to the + // REST feed (#114). The row slug is peer-supplied, so the pure filter + // treats it as untrusted input; remote (no local match) rows pass. + let caller = ctx + .data::() + .ok() + .map(|d| d.0.as_str()); + let updates = + crate::api::events::collect_visible_ref_updates(db, repo.as_deref(), limit, caller) + .await + .map_err(|e| async_graphql::Error::new(e.to_string()))?; + Ok(updates .into_iter() .map(|u| RefUpdateType { @@ -94,3 +111,288 @@ impl QueryRoot { Ok(t.map(AgentTaskType::from)) } } + +#[cfg(test)] +mod tests { + use crate::db::{Db, ReceivedRefUpdate, RepoRecord}; + use chrono::Utc; + use sqlx::PgPool; + use std::sync::Arc; + + const OWNER: &str = "did:key:z6MkOwner"; + + async fn db(pool: PgPool) -> Arc { + let db = Db::for_testing(pool); + db.run_migrations().await.unwrap(); + Arc::new(db) + } + + fn schema(db: Arc) -> super::super::GitlawbSchema { + let (ref_tx, _) = tokio::sync::broadcast::channel(16); + let (task_tx, _) = tokio::sync::broadcast::channel(16); + super::super::build_schema(db, ref_tx, task_tx) + } + + fn repo(id: &str, owner_did: &str, name: &str, is_public: bool) -> RepoRecord { + RepoRecord { + id: id.into(), + name: name.into(), + owner_did: owner_did.into(), + description: None, + is_public, + default_branch: "main".into(), + created_at: Utc::now(), + updated_at: Utc::now(), + disk_path: format!("/srv/{id}"), + forked_from: None, + machine_id: None, + } + } + + fn ref_row(id: &str, slug: &str) -> ReceivedRefUpdate { + ReceivedRefUpdate { + id: id.into(), + node_did: "did:key:z6MkNode".into(), + pusher_did: "did:key:z6MkPusher".into(), + repo: slug.into(), + ref_name: "refs/heads/main".into(), + old_sha: "0".repeat(40), + new_sha: "a".repeat(40), + timestamp: Utc::now().to_rfc3339(), + cert_id: None, + received_at: Utc::now().to_rfc3339(), + from_peer: "peer1".into(), + } + } + + /// Count `refUpdates` rows in a GraphQL response. + fn count(resp: &async_graphql::Response) -> usize { + assert!(resp.errors.is_empty(), "graphql errors: {:?}", resp.errors); + let async_graphql::Value::Object(obj) = &resp.data else { + panic!("data not an object: {:?}", resp.data); + }; + let async_graphql::Value::List(rows) = obj.get("refUpdates").expect("refUpdates key") + else { + panic!("refUpdates not a list"); + }; + rows.len() + } + + async fn anon(schema: &super::super::GitlawbSchema, query: &str) -> async_graphql::Response { + schema.execute(async_graphql::Request::new(query)).await + } + + async fn authed( + schema: &super::super::GitlawbSchema, + query: &str, + did: &str, + ) -> async_graphql::Response { + schema + .execute( + async_graphql::Request::new(query) + .data(crate::auth::AuthenticatedDid(did.to_string())), + ) + .await + } + + // Scenario 1 — anon must not get a private local repo's row on the + // repo:Some branch. This is the load-bearing RED→GREEN case. + #[sqlx::test] + async fn ref_updates_private_repo_dropped_for_anon(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + let schema = schema(db); + // The GraphQL `repo` arg is the raw slug DB filter, so it must equal the + // stored slug to select the row at all — this is the exact leak path. + let q = r#"{ refUpdates(repo: "z6MkOwner/widget") { refName newSha pusherDid } }"#; + assert_eq!(count(&anon(&schema, q).await), 0); + } + + // Scenario 2 — owner still sees their own private repo's row. + #[sqlx::test] + async fn ref_updates_private_repo_kept_for_owner(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u1", "z6MkOwner/widget")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates(repo: "z6MkOwner/widget") { refName } }"#; + assert_eq!(count(&authed(&schema, q, OWNER).await), 1); + } + + // Scenario 3 — unfiltered (repo:None): anon gets only the public row. + #[sqlx::test] + async fn ref_updates_unfiltered_anon_gets_only_public(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + db.create_repo(&repo("priv", OWNER, "secret", false)) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u_pub", "z6MkOwner/openrepo")) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u_priv", "z6MkOwner/secret")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates { repo refName } }"#; + let resp = anon(&schema, q).await; + assert_eq!(count(&resp), 1); + // The one row returned must be the public repo's. + let async_graphql::Value::Object(obj) = &resp.data else { + unreachable!() + }; + let async_graphql::Value::List(rows) = obj.get("refUpdates").unwrap() else { + unreachable!() + }; + let async_graphql::Value::Object(row) = &rows[0] else { + unreachable!() + }; + assert_eq!( + row.get("repo").unwrap(), + &async_graphql::Value::from("z6MkOwner/openrepo") + ); + } + + // Scenario 4 — alias fail-closed: private repo's row stored full-DID form. + #[sqlx::test] + async fn ref_updates_full_did_slug_dropped_for_anon(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("r1", "did:key:zABC", "widget", false)) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u1", "did:key:zABC/widget")) + .await + .unwrap(); + let schema = schema(db); + // repo:None so the slug is not the DB filter key (which is verbatim); + // the gate must still drop it. + let q = r#"{ refUpdates { repo } }"#; + assert_eq!(count(&anon(&schema, q).await), 0); + } + + // Scenario 5 — truncated-key fail-closed: 8-char-prefix owner form. + #[sqlx::test] + async fn ref_updates_truncated_key_slug_dropped_for_anon(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("r1", "did:key:zABCDEFGH", "widget", false)) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u1", "zABCDEF/widget")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates { repo } }"#; + assert_eq!(count(&anon(&schema, q).await), 0); + } + + // Scenario 6 — remote slug (no local match) is returned to anon. + #[sqlx::test] + async fn ref_updates_remote_slug_kept_for_anon(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("r1", OWNER, "widget", false)) + .await + .unwrap(); + // Row whose slug matches no local repo (different owner + name). + db.insert_ref_update(&ref_row("u1", "zZZZOTHER/gadget")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates { repo } }"#; + assert_eq!(count(&anon(&schema, q).await), 1); + } + + // Scenario 7 (#114 P2) — a small limit must page past the newest rows when + // they are private, so the older public rows are still returned. Before the + // gate moved ahead of the limit this returned 0 (the newest `limit` rows were + // all private and got filtered after the SQL LIMIT). RED→GREEN. + #[sqlx::test] + async fn ref_updates_small_limit_pages_past_newest_private(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("pub", OWNER, "openrepo", true)) + .await + .unwrap(); + db.create_repo(&repo("priv", OWNER, "secret", false)) + .await + .unwrap(); + // 3 older PUBLIC rows … + for i in 0..3 { + let mut r = ref_row(&format!("pub{i}"), "z6MkOwner/openrepo"); + r.timestamp = format!("2026-07-01T10:00:0{i}+00:00"); + db.insert_ref_update(&r).await.unwrap(); + } + // … then 5 NEWER PRIVATE rows (the newest in the feed). + for i in 0..5 { + let mut r = ref_row(&format!("priv{i}"), "z6MkOwner/secret"); + r.timestamp = format!("2026-07-01T10:00:1{i}+00:00"); + db.insert_ref_update(&r).await.unwrap(); + } + let schema = schema(db); + // limit 3 < the 5 newest (all private): anon must still get 3 public rows. + let q = r#"{ refUpdates(limit: 3) { repo } }"#; + let resp = anon(&schema, q).await; + assert_eq!(count(&resp), 3, "paging must reach the older public rows"); + let async_graphql::Value::Object(obj) = &resp.data else { + unreachable!() + }; + let async_graphql::Value::List(rows) = obj.get("refUpdates").unwrap() else { + unreachable!() + }; + for row in rows { + let async_graphql::Value::Object(r) = row else { + unreachable!() + }; + assert_eq!( + r.get("repo").unwrap(), + &async_graphql::Value::from("z6MkOwner/openrepo"), + "every returned row must be the public repo's" + ); + } + } + + // Scenario 8 — a quarantined mirror is withheld on the GraphQL surface too. + // Guards that the resolver keeps delegating to the shared collector (where the + // quarantine fold lives); a REST-only test would miss a resolver that stopped. + #[sqlx::test] + async fn ref_updates_quarantined_mirror_dropped_for_anon(pool: PgPool) { + let db = db(pool).await; + db.upsert_mirror_repo("z6MkQuar", "secret", "/tmp/q", None, true) + .await + .unwrap(); + db.insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates { repo } }"#; + assert_eq!(count(&anon(&schema, q).await), 0); + } + + // Scenario 8b — the GraphQL surface also withholds a quarantined repo from an + // authenticated OWNER, not just anon. Without the collector's quarantine drop + // the owner short-circuit in visibility_check keeps the row on this surface + // too, so the REST owner test alone would not guard the resolver. + #[sqlx::test] + async fn ref_updates_quarantined_repo_dropped_for_owner(pool: PgPool) { + let db = db(pool).await; + db.create_repo(&repo("q1", "did:key:z6MkQuar", "secret", false)) + .await + .unwrap(); + db.set_repo_quarantine("q1", true).await.unwrap(); + db.insert_ref_update(&ref_row("u1", "z6MkQuar/secret")) + .await + .unwrap(); + let schema = schema(db); + let q = r#"{ refUpdates { repo } }"#; + assert_eq!(count(&authed(&schema, q, "did:key:z6MkQuar").await), 0); + } +} diff --git a/crates/gitlawb-node/src/graphql/subscription.rs b/crates/gitlawb-node/src/graphql/subscription.rs index 8fd0b30..6c639bc 100644 --- a/crates/gitlawb-node/src/graphql/subscription.rs +++ b/crates/gitlawb-node/src/graphql/subscription.rs @@ -11,6 +11,15 @@ pub struct SubscriptionRoot; #[Subscription] impl SubscriptionRoot { + /// Live ref-update stream. `/graphql/ws` is mounted outside the + /// `optional_signature` layer, so this resolver has NO caller identity and + /// cannot gate per-subscriber — it relays whatever enters the broadcast + /// channel to any anonymous client. Its visibility safety therefore rests + /// entirely on the WRITE side: the push handler only sends a + /// `RefUpdateBroadcast` for repos the anonymous public may read (inside its + /// `if announce` block, `api/repos.rs`). This is a single-point invariant — + /// any new sender to `ref_update_tx` MUST be `announce`-gated, or private-repo + /// ref metadata leaks here to unauthenticated subscribers (#112/#114 class). async fn ref_updates( &self, ctx: &Context<'_>, diff --git a/crates/gitlawb-node/src/visibility.rs b/crates/gitlawb-node/src/visibility.rs index 335d261..b1b7336 100644 --- a/crates/gitlawb-node/src/visibility.rs +++ b/crates/gitlawb-node/src/visibility.rs @@ -4,7 +4,8 @@ //! based on the repo's visibility rules with a fallback to the legacy //! `is_public` flag. It performs no I/O so it is exhaustively unit tested. -use crate::db::VisibilityRule; +use crate::db::{RepoRecord, VisibilityRule}; +use std::collections::HashMap; use unicode_normalization::UnicodeNormalization; #[derive(Debug, PartialEq, Eq)] @@ -127,6 +128,103 @@ pub fn listable_at_root( visibility_check(rules, is_public, owner_did, caller, "/") == Decision::Allow } +/// Whether a single `received_ref_updates` row (identified by its peer-supplied +/// `row_repo` slug) should be shown to `caller` (None = anonymous) on the +/// cross-repo ref-updates feeds (#112 GraphQL, #114 REST). +/// +/// Pure and I/O-free: both call sites load the deduped local repo set and its +/// visibility rules once per request and pass them in, so the gate logic lives +/// here and visibility.rs keeps its "no I/O" property. +/// +/// The slug is written verbatim from the inbound gossip/notify message, so it is +/// untrusted input, not a canonical key. The decision is fail-closed by +/// construction: the only KEEP paths are (a) a slug with no `/` (cannot name a +/// local `owner/name` pair, so remote by definition), (b) all matched local +/// records are readable, and (c) no local record matches (remote/gossip-only). +/// Any local match the caller cannot read at root DROPs the row. There is no +/// catch-all keep on unexpected state. +/// +/// Slug/record owner keys are matched prefix-tolerantly (one is a prefix of the +/// other), covering the exact short-key, the full `did:key:` form, and the +/// URL-truncated 8-char form. Prefix over-match can only over-drop a genuinely +/// remote row (fail-safe), never over-serve a private local one. +/// +/// The live call sites are the #112 (GraphQL) and #114 (REST) feed handlers, +/// added in the following units; exercised by the unit tests below meanwhile. +pub fn ref_update_row_visible( + deduped: &[RepoRecord], + rules_by_repo: &HashMap>, + caller: Option<&str>, + row_repo: &str, +) -> bool { + // A slug with no '/' cannot name a local owner/name pair — remote by + // definition (same branch as "matches nothing local") → KEEP. + if row_repo.rsplit_once('/').is_none() { + return true; + } + + // Match each local record with the shared slug predicate (one matcher, so + // this gate and the collector's quarantine drop cannot disagree about which + // rows a repo owns), then fail closed on any matched record the caller + // cannot read at root. + for record in deduped { + if !ref_update_row_names_repo(record, row_repo) { + continue; + } + let rules = rules_by_repo + .get(&record.id) + .map(Vec::as_slice) + .unwrap_or(&[]); + if !listable_at_root(rules, record.is_public, &record.owner_did, caller) { + return false; + } + } + + // Reached only if every matched local record is readable, or nothing local + // matched (remote/gossip-only). Both are the KEEP paths; there is no + // default-keep on unexpected state — an unreadable match already returned. + true +} + +/// Whether `row_repo`'s peer-supplied slug names the local `record`. The single +/// match predicate shared by the feed gate ([`ref_update_row_visible`]) and the +/// quarantine hard-drop in the ref-update collector, so the two cannot diverge +/// about which rows a repo owns — a second, drifting matcher is exactly the #134 +/// slug-collision class. +/// +/// A record matches when the slug's `name` half is equal and one owner key is a +/// prefix of the other (prefix-tolerant: exact short key, full `did:key:` form, +/// and the URL-truncated 8-char form). The slug owner is reduced to its last +/// ':'-delimited segment — the SAME lossy form the emitter broadcasts +/// (`git_receive_pack` in api/repos.rs builds the wire slug as +/// `{last-segment-of-owner_did}/{name}`), so a repo's own rows always arrive +/// keyed by that trailing segment. +/// +/// This intentionally diverges from `did_matches` / `DEDUP_CTE`, which strip +/// only bare `did:key:` and keep other DID methods whole: those compare trusted, +/// canonical DIDs, while this slug is untrusted and already method-stripped by +/// the emitter. Applying the keep-whole rule here would fail open — a private +/// `did:web:host:user` repo's short slug `user/name` would not prefix-match the +/// whole record and would leak. The price is a fail-SAFE over-match when a +/// remote owner shares both a trailing segment and a repo name with a local +/// private repo (negligible for full did:key ids; only did:web / truncated forms +/// collide): it can hide a genuinely remote row, never serve a private one. +pub fn ref_update_row_names_repo(record: &RepoRecord, row_repo: &str) -> bool { + let Some((owner_part, name)) = row_repo.rsplit_once('/') else { + return false; + }; + if record.name != name { + return false; + } + let row_key = owner_part.split(':').next_back().unwrap_or(owner_part); + let record_key = record + .owner_did + .split(':') + .next_back() + .unwrap_or(&record.owner_did); + record_key.starts_with(row_key) || row_key.starts_with(record_key) +} + /// The subtree path globs that `caller` (None = anonymous) may NOT read, given /// the repo's rules. Whole-repo ("/") rules are excluded: a denied whole-repo /// read is handled by the 404 gate before a clone ever starts. Each remaining @@ -403,6 +501,258 @@ mod tests { )); } + // ── ref_update_row_visible (feed gate) ────────────────────────────────── + + fn rec(id: &str, owner_did: &str, name: &str, is_public: bool) -> RepoRecord { + RepoRecord { + id: id.into(), + name: name.into(), + owner_did: owner_did.into(), + description: None, + is_public, + default_branch: "main".into(), + created_at: Utc::now(), + updated_at: Utc::now(), + disk_path: format!("/srv/{id}"), + forked_from: None, + machine_id: None, + } + } + + fn rules_for(entries: &[(&str, &[VisibilityRule])]) -> HashMap> { + entries + .iter() + .map(|(id, rs)| (id.to_string(), rs.to_vec())) + .collect() + } + + #[test] + fn feed_public_local_repo_kept_for_anon() { + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", true)]; + let rules = HashMap::new(); + assert!(ref_update_row_visible( + &deduped, + &rules, + None, + "z6MkOwner/widget" + )); + } + + #[test] + fn feed_private_local_repo_dropped_for_anon_kept_for_owner() { + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", false)]; + let rules = HashMap::new(); + // Anonymous → drop. + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "z6MkOwner/widget" + )); + // Owner (full DID) → keep. + assert!(ref_update_row_visible( + &deduped, + &rules, + Some("did:key:z6MkOwner"), + "z6MkOwner/widget" + )); + } + + #[test] + fn feed_root_rule_reader_reincluded() { + // Private repo (is_public=false) with a root rule granting a named + // reader. Delegates to listable_at_root: anon and non-reader denied, + // named reader allowed. + let deduped = [rec("r1", OWNER, "widget", false)]; + let root = [rule("/", VisibilityMode::A, &["did:key:z6MkFriend"])]; + let rules = rules_for(&[("r1", &root)]); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "z6MkOwner/widget" + )); + assert!(!ref_update_row_visible( + &deduped, + &rules, + Some("did:key:z6MkStranger"), + "z6MkOwner/widget" + )); + assert!(ref_update_row_visible( + &deduped, + &rules, + Some("did:key:z6MkFriend"), + "z6MkOwner/widget" + )); + } + + #[test] + fn feed_alias_full_did_slug_dropped_for_anon() { + // Owner stored full-DID; slug also carries the full-DID form. Still + // matches (row_key normalizes to the short key) → drop. Round 1's + // string-match would have leaked this. + let deduped = [rec("r1", "did:key:zABC", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "did:key:zABC/widget" + )); + } + + #[test] + fn feed_truncated_key_slug_dropped_for_anon() { + // Slug carries an 8-char URL-truncated prefix of the owner key; still + // matches via prefix tolerance → drop. Round 2's get_repo path leaked. + let deduped = [rec("r1", "did:key:zABCDEFGH", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "zABCDEF/widget" + )); + } + + #[test] + fn feed_mirror_coexistence_private_canonical_dropped_for_anon() { + // Pure-level mirror-coexistence: the deduped set contains only the + // private canonical record for (owner,name). A matching slug drops for + // anon. (DB-level dedup survivor property is pinned separately.) + let deduped = [rec("uuid-canonical", "did:key:z6Mkwbud", "nipmod", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "z6Mkwbud/nipmod" + )); + } + + #[test] + fn feed_empty_owner_slug_matches_and_drops() { + // Slug "/name": empty owner_part → row_key "" → starts_with("") matches + // every same-named record. Fail-safe pin for a private repo. + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible(&deduped, &rules, None, "/widget")); + } + + #[test] + fn feed_one_char_owner_slug_matches_and_drops() { + // 1-char owner prefix that the private repo's key starts with → match. + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible(&deduped, &rules, None, "z/widget")); + } + + #[test] + fn feed_remote_slug_no_match_kept() { + // Different owner key, no prefix relation → no local match → keep. + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", false)]; + let rules = HashMap::new(); + assert!(ref_update_row_visible( + &deduped, + &rules, + None, + "zZZZOTHER/widget" + )); + } + + #[test] + fn feed_private_didweb_short_slug_dropped_for_anon() { + // The canonical, load-bearing case for the last-segment normalization: + // the emitter broadcasts a did:web repo's rows under the SHORT slug + // `{last-segment}/{name}` (publish_ref_update, api/repos.rs). A private + // such repo must drop that slug for anon. A did:key-aware (keep-whole) + // rule would fail to match `alice` against `did:web:host:alice` and leak. + let deduped = [rec("r1", "did:web:host:alice", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "alice/widget" + )); + } + + #[test] + fn feed_multi_segment_did_slug_dropped_for_anon() { + // A private repo owned by a multi-segment DID must also fail closed when a + // peer *crafts* the row under the full-DID slug form. This form is + // non-canonical (the emitter only ever broadcasts the short slug above), + // but the gate still drops it: both sides reduce to the last ':' segment + // ("user"), so they match. Fail-safe against a hand-forged slug. + let deduped = [rec("r1", "did:web:host:user", "widget", false)]; + let rules = HashMap::new(); + assert!(!ref_update_row_visible( + &deduped, + &rules, + None, + "did:web:host:user/widget" + )); + } + + #[test] + fn feed_multi_segment_did_slug_kept_for_public() { + // Symmetric keep-side: a PUBLIC multi-segment-DID repo's row must still be + // returned to anon after the last-':'-segment normalization — guards + // against a regression that over-drops legitimate did:web rows. + let deduped = [rec("r1", "did:web:host:user", "widget", true)]; + let rules = HashMap::new(); + assert!(ref_update_row_visible( + &deduped, + &rules, + None, + "did:web:host:user/widget" + )); + } + + #[test] + fn feed_malformed_slug_no_slash_kept_no_panic() { + let deduped = [rec("r1", "did:key:z6MkOwner", "widget", false)]; + let rules = HashMap::new(); + assert!(ref_update_row_visible(&deduped, &rules, None, "noslug")); + } + + #[test] + fn feed_empty_deduped_set_keeps_any_slug() { + let deduped: [RepoRecord; 0] = []; + let rules = HashMap::new(); + assert!(ref_update_row_visible( + &deduped, + &rules, + None, + "z6MkOwner/widget" + )); + assert!(ref_update_row_visible(&deduped, &rules, None, "anything")); + } + + // The shared slug matcher underpinning both the feed gate and the collector's + // quarantine drop: it must recognize a repo's own row across every owner-DID + // form (bare short key, full did:key, URL-truncated prefix) and must not + // over-match a different name or an unrelated owner. The quarantine drop calls + // this directly, so its match contract is load-bearing for withholding a + // quarantined mirror from a caller who matches the mirror's owner_did. + #[test] + fn names_repo_matches_owner_forms_and_rejects_mismatches() { + // Bare short owner key (the form upsert_mirror_repo stores). + let bare = rec("m", "z6MkQuar", "secret", false); + assert!(ref_update_row_names_repo(&bare, "z6MkQuar/secret")); + // Full did:key owner; the short-segment slug still matches via last-segment. + let full = rec("c", "did:key:z6MkQuar", "secret", false); + assert!(ref_update_row_names_repo(&full, "z6MkQuar/secret")); + assert!(ref_update_row_names_repo(&full, "did:key:z6MkQuar/secret")); + // URL-truncated 8-char prefix slug → prefix-tolerant match. + let long = rec("l", "did:key:z6MkQuarLONGKEY", "secret", false); + assert!(ref_update_row_names_repo(&long, "z6MkQuar/secret")); + // Different name, different owner, and no-slash → no match. + assert!(!ref_update_row_names_repo(&bare, "z6MkQuar/other")); + assert!(!ref_update_row_names_repo(&bare, "z6MkOther/secret")); + assert!(!ref_update_row_names_repo(&bare, "noslash")); + } + // #101: a deny rule must withhold a path that denotes the same characters in // a different Unicode normalization form. Without NFC normalization at the // matcher seam, an NFC-authored rule byte-compares unequal to an NFD-stored