From f3fd8eca9170ce507297d3be42aef87c13326fc4 Mon Sep 17 00:00:00 2001 From: Claude Date: Sun, 14 Jun 2026 14:20:38 +0000 Subject: [PATCH] fix(firewall): close redaction-leak egress paths and add canary suite MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Route every Frame/trace egress through the firewall redactor so secrets and PII cannot escape the I-01 boundary via a non-summary path: - #149: redact() fails closed at max_depth — scrub leaf strings, elide nested containers instead of returning them verbatim. - #150: HandleStore.expand() redacts projected rows (inline secrets in grant-permitted fields are scrubbed); expansion Frames carry warnings. - #151: Firewall.apply_stream() holds back a per-field overlap window via a new StreamRedactor so a secret split across chunks is reassembled before emission. - #172: ActionTrace.args (all capabilities) and driver error text pass through the redactor before persistence; memory.* payload stripping unchanged. - #206: new tests/test_firewall_canary.py asserts planted canaries never appear in any egress (summary/table/raw Frames, expansion, streaming, trace args/errors, adapter payloads). Docs (security.md, context_firewall.md) and CHANGELOG updated, including the honest cross-chunk whitespace-pattern limit. https://claude.ai/code/session_01Gq2ooVRbX8rxi5d3dvREN6 --- CHANGELOG.md | 31 +++ docs/context_firewall.md | 21 ++ docs/security.md | 13 +- src/weaver_kernel/firewall/redaction.py | 129 ++++++++++-- src/weaver_kernel/firewall/transform.py | 66 ++++++- src/weaver_kernel/handles.py | 14 +- src/weaver_kernel/kernel/_invoke.py | 54 +++-- tests/test_firewall_boundary.py | 9 +- tests/test_firewall_canary.py | 250 ++++++++++++++++++++++++ 9 files changed, 547 insertions(+), 40 deletions(-) create mode 100644 tests/test_firewall_canary.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a7548f..19e419f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,37 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Fixed +- **Firewall redaction now fails closed at the depth boundary (#149).** + `redact()` previously returned any subtree nested at/below `max_depth` + verbatim, so PII/secrets nested beyond the cap reached the LLM unscanned. The + boundary now scrubs leaf strings and *elides* nested containers + (`[REDACTED: nested data beyond depth limit]`) instead of returning them raw. +- **Handle expansion is now redacted (#150).** `HandleStore.expand()` routes its + projected rows through the same `redact()` the firewall applies on first + invocation, so a secret inline in a grant-permitted field (e.g. a Bearer token + in a `note` value) is scrubbed on the expand path. Expansion Frames now carry + redaction `warnings`. +- **Cross-chunk redaction safety for streaming Frames (#151).** + `Firewall.apply_stream()` keeps a per-field `StreamRedactor` that holds back a + trailing overlap window, so a secret whose characters are split across two + streamed chunks is reassembled and redacted before either half is emitted. + Documented limit: patterns containing internal whitespace split exactly at the + held boundary may still evade detection (see `docs/security.md`). +- **Trace argument and error redaction extended beyond `memory.*` (#172).** + `ActionTrace.args` for **every** capability — and driver `error` text — now + pass through the firewall redactor before persistence, so the trace store no + longer becomes a sensitive-data sink when arguments carry secrets/PII or when a + `DriverError` embeds a raw response body. Memory-payload stripping for + `memory.*` capabilities is unchanged. + +### Added +- **Secret-canary regression suite (#206).** `tests/test_firewall_canary.py` + plants distinctive canary secrets and asserts they never appear in any kernel + egress — summary/table/raw Frames, handle expansions, streamed chunks, trace + args/errors, and adapter-rendered payloads — turning the I-01 boundary into an + executable invariant and a regression net for the fixes above. + ## [0.11.0] - 2026-06-13 ### Added diff --git a/docs/context_firewall.md b/docs/context_firewall.md index a7faa5f..af65471 100644 --- a/docs/context_firewall.md +++ b/docs/context_firewall.md @@ -60,6 +60,27 @@ When a capability has `SensitivityTag.PII` or `SensitivityTag.PCI`: Principals with the `pii_reader` role bypass `allowed_fields` enforcement. +Redaction is applied on **every** path that returns data to the LLM, not just +the first `transform()`: + +- **Depth boundary (fail-closed).** The `max_depth` cap bounds recursion cost. + At the boundary, scalar strings are still pattern-scrubbed, but a nested + container is *elided* (`[REDACTED: nested data beyond depth limit]`) rather + than returned verbatim — a deeply nested subtree never reaches the LLM + unscanned. +- **Handle expansion.** `HandleStore.expand()` runs its projected rows through + the same `redact()` as the first invocation, so a secret inline in a + permitted field (e.g. a token in a `note` value) is scrubbed on expand too. +- **Streaming.** `Firewall.apply_stream()` keeps a per-field `StreamRedactor` + that holds back a trailing overlap window, so a secret split across two + chunks is reassembled and redacted before either half is emitted. Patterns + containing internal whitespace (phone/SSN/spaced card numbers) split exactly + at the held boundary may still evade detection — see `docs/security.md`. + +Invocation **arguments** recorded on `ActionTrace.args`, and driver **error** +text, are run through the same redactor before persistence, so the trace store +never becomes a sensitive-data sink (see `docs/security.md`). + ## Summarization Summaries are produced deterministically: diff --git a/docs/security.md b/docs/security.md index 2fcead8..31b1ccf 100644 --- a/docs/security.md +++ b/docs/security.md @@ -9,12 +9,14 @@ | Token forgery / tampering | HMAC-SHA256 signature; any bit flip → `TokenInvalid` | | Token replay after expiry | Expiry checked on every `verify()` call | | Context injection via raw tool output | Firewall always transforms `RawResult → Frame`; raw data never reaches LLM by default | -| PII / PCI leakage | Redaction + `allowed_fields` enforcement in the firewall | +| PII / PCI leakage | Redaction + `allowed_fields` enforcement in the firewall, applied on every egress path (summary/table/raw, handle expansion, streaming) | +| PII / secret leak below the depth budget | Redaction fails *closed* at `max_depth`: leaf strings are scrubbed; nested containers are elided rather than returned verbatim (#149) | +| Inline secret leak via handle expansion | `HandleStore.expand()` runs projected rows through the firewall redactor, so a secret in a permitted field is scrubbed (#150) | +| Cross-chunk secret split in streaming | `Firewall.apply_stream()` holds back a per-field overlap window so a secret spanning two chunks is reassembled before redaction (#151) | | Privilege escalation via WRITE/DESTRUCTIVE | Policy engine enforces role requirements | | Audit evasion | Every `invoke()` creates an immutable `ActionTrace` | | Handle scope escape (expand exceeds grant) | Handles persist grant constraints; `HandleStore.expand` rechecks `max_rows`, `allowed_fields`, `scope`, and principal binding (#76) | -| Memory exfiltration via tool output | `SensitivityTag.MEMORY` capabilities gate sensitive reads and durable writes; `ActionTrace.args` redacts payload-like fields for `memory.*` capabilities (#75) | -| Raw memory payload reaching audit log | Kernel strips `payload`/`content`/`value`/`memory`/`text`/`body` from `ActionTrace.args` for `memory.*` capabilities | +| Sensitive data reaching the audit log via args/errors | `ActionTrace.args` and driver `error` text are run through the firewall redactor for **every** capability; memory payloads (`payload`/`content`/`value`/`memory`/`text`/`body`) are additionally stripped wholesale for `memory.*` capabilities (#75, #172) | | Scanned content / raw result reaching audit log | `ActionTrace.result_summary` is built only from the post-firewall `Frame` (counts and flags, never raw driver data), so the audit trail records an invocation's outcome without re-introducing the data the firewall removed | ## Token scopes @@ -126,6 +128,11 @@ audit.db` exits non-zero on any divergence (see [cli.md](cli.md)). - The `WEAVER_KERNEL_SECRET` must be kept secret. Rotate it if compromised. - The default `InMemoryDriver` has no persistence — suitable for testing only. - PII redaction is heuristic (regex-based). It is not a substitute for proper data governance. +- Streaming redaction (`Firewall.apply_stream`) reassembles patterns split across + chunks by holding back a bounded overlap window. A contiguous secret + (JWT/Bearer/API-key/connection-string body) is never split across a commit + boundary, but a pattern containing internal whitespace (phone, SSN, spaced card + number) split exactly at the held boundary may still evade detection. - Rate limiting is enforced per `(principal_id, capability_id)` pair using a sliding window. Default limits: 60 READ / 10 WRITE / 2 DESTRUCTIVE invocations per 60-second window. Principals with the `"service"` role receive 10× the default limits. Limits are diff --git a/src/weaver_kernel/firewall/redaction.py b/src/weaver_kernel/firewall/redaction.py index 7c91ce7..a21e156 100644 --- a/src/weaver_kernel/firewall/redaction.py +++ b/src/weaver_kernel/firewall/redaction.py @@ -65,12 +65,39 @@ """Matches connection strings containing embedded credentials (``scheme://user:pass@host``).""" _REDACTED = "[REDACTED]" +_DEPTH_ELIDED = "[REDACTED: nested data beyond depth limit]" def _is_sensitive_field_name(name: str) -> bool: return name.lower() in _SENSITIVE_FIELDS +def _redact_string(data: str) -> tuple[str, list[str]]: + """Redact inline sensitive patterns from a single string. + + Pure leaf helper shared by :func:`redact` and :class:`StreamRedactor` so + the pattern set lives in exactly one place. + + Args: + data: The string to scrub. + + Returns: + A tuple of ``(redacted_string, warnings)``. + """ + original = data + data = _EMAIL_RE.sub(_REDACTED, data) + data = _PHONE_RE.sub(_REDACTED, data) + data = _CARD_RE.sub(_REDACTED, data) + data = _SSN_RE.sub(_REDACTED, data) + data = _BEARER_RE.sub(_REDACTED, data) + data = _JWT_RE.sub(_REDACTED, data) + data = _API_KEY_RE.sub(r"\1" + _REDACTED, data) + data = _CONN_STR_RE.sub(r"\1" + _REDACTED + r"\2", data) + if data != original: + return data, ["String value contained sensitive patterns and was redacted."] + return data, [] + + def redact( data: Any, *, @@ -84,6 +111,13 @@ def redact( all others are removed. Sensitive field names are replaced with ``[REDACTED]`` regardless. + The ``max_depth`` cap bounds recursion cost; it must **fail closed**. At + the boundary, scalar strings are still pattern-redacted (a leaf scan is + cheap and cannot recurse), but any nested container is *elided* and + replaced with a marker rather than returned verbatim — a deeply nested + subtree must never reach the LLM unscanned (the I-01 boundary; see + ``docs/agent-context/invariants.md``). + Args: data: The data to redact. allowed_fields: If non-empty, only keep these field names in dicts. @@ -94,10 +128,18 @@ def redact( A tuple of ``(redacted_data, warnings)`` where *warnings* is a list of human-readable strings describing what was redacted. """ - warnings: list[str] = [] - if depth >= max_depth: - return data, warnings + # Fail closed at the depth boundary: scrub leaf strings, elide nested + # containers (they would otherwise flow through unredacted). + if isinstance(data, str): + return _redact_string(data) + if isinstance(data, dict | list): + return _DEPTH_ELIDED, [ + "Nested data beyond the configured max_depth was elided (not scanned)." + ] + return data, [] + + warnings: list[str] = [] if isinstance(data, dict): result: dict[str, Any] = {} @@ -127,17 +169,74 @@ def redact( return redacted_list, warnings if isinstance(data, str): - original = data - data = _EMAIL_RE.sub(_REDACTED, data) - data = _PHONE_RE.sub(_REDACTED, data) - data = _CARD_RE.sub(_REDACTED, data) - data = _SSN_RE.sub(_REDACTED, data) - data = _BEARER_RE.sub(_REDACTED, data) - data = _JWT_RE.sub(_REDACTED, data) - data = _API_KEY_RE.sub(r"\1" + _REDACTED, data) - data = _CONN_STR_RE.sub(r"\1" + _REDACTED + r"\2", data) - if data != original: - warnings.append("String value contained sensitive patterns and was redacted.") - return data, warnings + return _redact_string(data) return data, warnings + + +# Characters that can appear *inside* a contiguous secret token (JWT, Bearer +# value, API key, connection-string body). A commit boundary is never placed +# inside a run of these, so such a token is never split across chunks. +_TOKEN_CHAR_RE = re.compile(r"[A-Za-z0-9._~+/:=@-]") + +# How many trailing characters of a string stream are held back before +# emission so a pattern split across two chunks is reassembled first. +_STREAM_OVERLAP = 256 + + +class StreamRedactor: + """Redacts an incrementally delivered text stream with cross-chunk safety. + + A per-chunk regex pass cannot catch a secret whose characters are split + across two chunks (e.g. ``"...eyJ"`` then ``"abc.def..."``). This buffer + holds back the trailing :data:`_STREAM_OVERLAP` characters of the stream + and only commits text once enough right-context has arrived, so a pattern + straddling a chunk boundary is reassembled before either half is emitted. + + Commit boundaries are placed only at non-token separators, so a contiguous + secret (JWT/Bearer/API-key/connection-string body) is never severed across + a commit. Patterns that contain internal whitespace (phone, SSN, spaced + card numbers) and are split exactly at the held boundary may still evade + detection — a documented limit, mirrored in ``docs/security.md``. + + The redactor is single-stream and stateful: feed chunks in order, then + call :meth:`flush` once at end-of-stream. + """ + + __slots__ = ("_pending", "_overlap", "_max_pending") + + def __init__(self, *, overlap: int = _STREAM_OVERLAP) -> None: + self._pending = "" + self._overlap = overlap + # Bound the buffer: a single unbroken token longer than this is + # force-committed rather than held indefinitely (memory safety). + self._max_pending = overlap * 4 + + def feed(self, text: str) -> tuple[str, list[str]]: + """Accept the next chunk; return ``(redacted_committed_text, warnings)``. + + The returned text is the portion now safe to emit; the trailing + overlap window is retained until a later :meth:`feed` or :meth:`flush`. + """ + if text: + self._pending += text + if len(self._pending) <= self._overlap: + return "", [] + cut = len(self._pending) - self._overlap + if len(self._pending) <= self._max_pending: + # Back the cut off a contiguous token so we never sever one. + while cut > 0 and _TOKEN_CHAR_RE.match(self._pending[cut - 1]): + cut -= 1 + if cut <= 0: + return "", [] + committed = self._pending[:cut] + self._pending = self._pending[cut:] + return _redact_string(committed) + + def flush(self) -> tuple[str, list[str]]: + """Redact and return any buffered remainder at end-of-stream.""" + if not self._pending: + return "", [] + out = _redact_string(self._pending) + self._pending = "" + return out diff --git a/src/weaver_kernel/firewall/transform.py b/src/weaver_kernel/firewall/transform.py index 5be1a86..0d82d9f 100644 --- a/src/weaver_kernel/firewall/transform.py +++ b/src/weaver_kernel/firewall/transform.py @@ -17,7 +17,7 @@ ResponseMode, ) from .budgets import Budgets -from .redaction import redact +from .redaction import StreamRedactor, redact from .summarize import summarize logger = logging.getLogger(__name__) @@ -232,9 +232,15 @@ async def apply_stream( budget caps that apply to a single-shot :meth:`transform` apply to *every* chunk — PII never leaks even when results stream in. + Cross-chunk redaction safety: top-level string fields are routed + through a per-field :class:`StreamRedactor`, which holds back a + trailing overlap window so a secret whose characters span two chunks + is reassembled and redacted before either half is emitted. Non-string + and nested values are redacted per chunk by :meth:`transform`. + Mode escalation across chunks (e.g. dropping from ``table`` to ``summary`` as budget drains) is the caller's responsibility — the - Firewall itself is stateless. ``Kernel.invoke_stream`` orchestrates + Firewall itself does not escalate. ``Kernel.invoke_stream`` orchestrates escalation via :class:`BudgetManager.suggested_mode`. The synthetic key ``"__is_final__"`` on a chunk is stripped before @@ -256,9 +262,13 @@ async def apply_stream( Yields: :class:`Frame` chunks with ``is_final`` set on the last one. """ + redactors: dict[str, StreamRedactor] = {} async for chunk in response_chunks: is_final = bool(chunk.get("__is_final__", False)) - payload = {k: v for k, v in chunk.items() if k != "__is_final__"} + raw_payload = {k: v for k, v in chunk.items() if k != "__is_final__"} + payload, stream_warnings = _apply_stream_redactors( + raw_payload, redactors, is_final=is_final + ) synthetic_raw = RawResult( capability_id=capability_id, data=payload, @@ -272,6 +282,8 @@ async def apply_stream( response_mode=response_mode, constraints=constraints, ) + if stream_warnings: + frame = replace(frame, warnings=[*frame.warnings, *stream_warnings]) if is_final: frame = replace(frame, is_final=True) yield frame @@ -295,6 +307,54 @@ def _make_table(self, data: Any, *, max_rows: int) -> list[dict[str, Any]]: return result +def _apply_stream_redactors( + payload: dict[str, Any], + redactors: dict[str, StreamRedactor], + *, + is_final: bool, +) -> tuple[dict[str, Any], list[str]]: + """Route a chunk's top-level string fields through per-field redactors. + + String values are fed to a :class:`StreamRedactor` (created lazily per + field) so patterns split across chunks are reassembled before emission. + Non-string values are passed through unchanged — :meth:`Firewall.transform` + still redacts them per chunk. On the final chunk every active redactor is + flushed, including fields absent from the final payload (their held tail is + re-attached under the original key) so no buffered text is dropped. + + Args: + payload: The chunk payload (``__is_final__`` already stripped). + redactors: Mutable per-field redactor state carried across chunks. + is_final: Whether this is the last chunk (triggers flush). + + Returns: + ``(redacted_payload, warnings)``. + """ + out: dict[str, Any] = {} + warnings: list[str] = [] + for key, value in payload.items(): + if isinstance(value, str): + redactor = redactors.setdefault(key, StreamRedactor()) + committed, warns = redactor.feed(value) + if is_final: + tail, tail_warns = redactor.flush() + committed += tail + warns = [*warns, *tail_warns] + out[key] = committed + warnings.extend(warns) + else: + out[key] = value + if is_final: + for key, redactor in redactors.items(): + if key in out: + continue + tail, tail_warns = redactor.flush() + if tail: + out[key] = tail + warnings.extend(tail_warns) + return out, warnings + + def _cap_facts(facts: list[str], max_chars: int) -> list[str]: """Return as many facts as fit within *max_chars* total.""" total = 0 diff --git a/src/weaver_kernel/handles.py b/src/weaver_kernel/handles.py index 8dcdfb2..9b0fde1 100644 --- a/src/weaver_kernel/handles.py +++ b/src/weaver_kernel/handles.py @@ -7,6 +7,7 @@ from typing import Any from .errors import HandleConstraintViolation, HandleExpired, HandleNotFound +from .firewall.redaction import redact from .models import Frame, Handle, Provenance, ResponseMode from .policy_reasons import DenialReason @@ -319,11 +320,22 @@ def expand( else: table_preview = [{"value": r} for r in rows] + # ── Redaction ─────────────────────────────────────────────────────────── + # expand() builds its Frame directly from the raw stored dataset, which + # is persisted pre-firewall. Field-level grant constraints + # (allowed_fields / scope) are already enforced above, but a permitted + # field can still carry inline secrets (e.g. a Bearer token in a `note` + # value). Route the projected rows through the same redactor the + # Firewall applies on first invocation so the I-01 boundary holds on the + # expansion path too (see docs/agent-context/invariants.md). + redacted_preview, warnings = redact(table_preview) + return Frame( action_id=action_id, capability_id=handle.capability_id, response_mode=response_mode, - table_preview=table_preview, + table_preview=redacted_preview, + warnings=warnings, handle=handle, provenance=Provenance( capability_id=handle.capability_id, diff --git a/src/weaver_kernel/kernel/_invoke.py b/src/weaver_kernel/kernel/_invoke.py index 3abfcf5..3edee5f 100644 --- a/src/weaver_kernel/kernel/_invoke.py +++ b/src/weaver_kernel/kernel/_invoke.py @@ -19,12 +19,13 @@ import logging import uuid from dataclasses import replace -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, cast from ..drivers.base import Driver, ExecutionContext from ..enums import SensitivityTag from ..errors import DriverError from ..firewall.budget_manager import BudgetManager +from ..firewall.redaction import redact from ..models import ( ActionTrace, Capability, @@ -50,22 +51,41 @@ def _redact_args_for_trace(capability_id: str, args: dict[str, Any]) -> dict[str, Any]: - """Strip raw memory payloads from :class:`ActionTrace.args`. - - Memory capabilities (``capability_id`` starting with ``"memory."``) may - carry durable text the principal is committing to or fetching from - long-term memory. Tracing the raw payload would defeat the I-01 boundary - the :class:`Firewall` enforces for outputs — so we apply an equivalent - input-side redaction at trace-record time. Keys are preserved (so audit - can confirm a payload was provided); sensitive values become - ``"[REDACTED]"``. Non-memory capabilities are returned unchanged. + """Redact sensitive values from :class:`ActionTrace.args` before storage. + + The trace store is the long-lived audit record; if invocation arguments + carry user content, secrets passed as parameters, or PII, storing them raw + makes the store itself a sensitive-data sink — undermining the I-01 + boundary the :class:`Firewall` enforces on *outputs*. Two layers apply: + + 1. **Memory payload stripping.** Memory capabilities (``capability_id`` + starting with ``"memory."``) carry durable free text under known keys + (``payload``, ``content``, …); those values are replaced wholesale with + ``"[REDACTED]"`` (keys preserved so audit can confirm a payload was + provided). + 2. **General pattern/field redaction for every capability.** All args are + then passed through the same :func:`~weaver_kernel.firewall.redaction.redact` + used on driver output, so inline secrets/PII and sensitive field names + are scrubbed regardless of the capability namespace (#172). """ - if not capability_id.startswith(_MEMORY_CAPABILITY_PREFIX): - return args - return { - k: ("[REDACTED]" if k.lower() in _MEMORY_SENSITIVE_ARG_KEYS else v) - for k, v in args.items() - } + if capability_id.startswith(_MEMORY_CAPABILITY_PREFIX): + args = { + k: ("[REDACTED]" if k.lower() in _MEMORY_SENSITIVE_ARG_KEYS else v) + for k, v in args.items() + } + redacted, _ = redact(args) + return cast(dict[str, Any], redacted) + + +def _redact_trace_text(text: str) -> str: + """Scrub inline secrets/PII from free text before it enters a trace. + + ``DriverError`` messages can embed raw response bodies (e.g. up to 200 + characters of an HTTP error body), so error text recorded on an + :class:`ActionTrace` is run through the firewall's string redactor first. + """ + redacted, _ = redact(text) + return cast(str, redacted) def _frame_result_summary(frame: Frame) -> dict[str, Any]: @@ -166,7 +186,7 @@ def record_failure_trace( response_mode=response_mode, driver_id="", sensitivity=sensitivity, - error=error_message, + error=_redact_trace_text(error_message), ) ) diff --git a/tests/test_firewall_boundary.py b/tests/test_firewall_boundary.py index 929e717..d6c05c0 100644 --- a/tests/test_firewall_boundary.py +++ b/tests/test_firewall_boundary.py @@ -237,7 +237,14 @@ def test_action_trace_redacts_memory_payload_arg() -> None: def test_action_trace_keeps_non_memory_args_verbatim() -> None: - """The redaction is scoped to memory.* — other capabilities are untouched.""" + """Benign non-memory args survive the trace redaction pass unchanged. + + Since #172 every capability's args pass through ``redact()``, but values + with no sensitive field name or inline pattern (and non-string scalars) + are returned verbatim — so audit value is preserved for ordinary args. + The complementary case (a secret in a non-memory arg *is* scrubbed) is + pinned by the canary suite in ``test_firewall_canary.py``. + """ registry = CapabilityRegistry() cap = Capability( capability_id="billing.refund", diff --git a/tests/test_firewall_canary.py b/tests/test_firewall_canary.py new file mode 100644 index 0000000..78fbe57 --- /dev/null +++ b/tests/test_firewall_canary.py @@ -0,0 +1,250 @@ +"""Secret-canary regression suite covering every Frame egress path (#206). + +Where ``test_firewall_boundary.py`` pins individual mechanisms, this suite +pins the *global* property the I-01 boundary promises: a distinctive secret +planted in driver output must never appear, verbatim, in **any** kernel +egress a downstream consumer (LLM, audit reader, log sink) can read — +regardless of which path produced it. + +Each canary string exists nowhere except this file, so a failure prints the +exact string that leaked and names the path. The suite is the regression net +for the redaction-leak fixes in #149 (depth fail-open), #150 (handle +expansion), #151 (cross-chunk streaming), and #172 (trace args/errors). A new +egress path or response mode should add a case here. +""" + +from __future__ import annotations + +import asyncio +import json +from collections.abc import AsyncIterator +from typing import Any + +import pytest + +from weaver_kernel import ( + Capability, + CapabilityRegistry, + Firewall, + HandleStore, + HMACTokenProvider, + InMemoryDriver, + Kernel, + Principal, + SafetyClass, + StaticRouter, + TraceStore, +) +from weaver_kernel.adapters._base import frame_to_payload +from weaver_kernel.drivers.base import ExecutionContext +from weaver_kernel.errors import DriverError +from weaver_kernel.firewall.redaction import StreamRedactor, redact +from weaver_kernel.models import CapabilityRequest, Handle + +# ── Canaries — these strings exist nowhere else in the codebase ──────────────── + +_CANARY_BEARER = "Bearer canary-zzz-9999-do-not-leak-token" +_CANARY_JWT = "eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJjYW5hcnkifQ.Qk9HVVNfQ0FOQVJZX1NJRw" +_CANARY_API_KEY_VALUE = "ZZZZ_CANARYKEY_ABCDEF12345" +_CANARY_CONN = "postgresql://canary:zzz-secret-pw@db.invalid/main" +_CANARY_EMAIL = "canary.victim@example.invalid" + +# Every canary that the redaction pattern set is expected to catch. +_ALL_CANARIES = ( + _CANARY_BEARER, + _CANARY_JWT, + _CANARY_API_KEY_VALUE, + _CANARY_CONN, + _CANARY_EMAIL, +) + + +def _assert_no_canary(blob: str, *, path: str, canaries: tuple[str, ...] = _ALL_CANARIES) -> None: + for canary in canaries: + assert canary not in blob, f"{path}: canary {canary!r} escaped the firewall" + + +def _dump(obj: object) -> str: + """Serialize anything to JSON text for a global negative assertion.""" + return json.dumps(obj, default=lambda o: getattr(o, "__dict__", str(o))) + + +# ── #149: depth fail-open ────────────────────────────────────────────────────── + + +def test_canary_below_max_depth_is_not_returned_verbatim() -> None: + """A secret nested below ``max_depth`` must not flow through unredacted.""" + nested = {"l0": {"l1": {"l2": {"l3": {"note": f"auth={_CANARY_BEARER}"}}}}} + redacted, _warnings = redact(nested, max_depth=3) + _assert_no_canary(_dump(redacted), path="redact(depth>max_depth)") + + +def test_canary_string_at_depth_boundary_is_scrubbed() -> None: + """A scalar string sitting *at* the depth boundary is still scrubbed.""" + # The list element is visited at depth == max_depth. + redacted, _warnings = redact({"a": {"b": {"c": [f"key {_CANARY_BEARER}"]}}}, max_depth=3) + _assert_no_canary(_dump(redacted), path="redact(string at boundary)") + + +# ── #150: handle expansion ───────────────────────────────────────────────────── + + +def _store_with_secret_rows() -> tuple[HandleStore, Handle]: + store = HandleStore() + rows = [ + {"id": 1, "name": "Alice", "note": f"call token {_CANARY_BEARER}"}, + {"id": 2, "name": "Bob", "note": f"db {_CANARY_CONN}"}, + ] + handle = store.store("cap.customers", rows, principal_id="agent-1") + return store, handle + + +def test_canary_never_leaks_through_handle_expand() -> None: + """expand() must redact inline secrets in permitted fields (#150).""" + store, handle = _store_with_secret_rows() + frame = store.expand(handle, query={}, principal_id="agent-1") + _assert_no_canary(_dump(frame), path="HandleStore.expand") + # The non-secret data is still present (redaction is surgical). + assert any("Alice" in _dump(row) for row in frame.table_preview) + + +def test_canary_never_leaks_through_field_projected_expand() -> None: + """A permitted field carrying a secret is scrubbed, not just dropped.""" + store, handle = _store_with_secret_rows() + frame = store.expand(handle, query={"fields": ["id", "note"]}, principal_id="agent-1") + _assert_no_canary(_dump(frame), path="HandleStore.expand(fields=note)") + + +# ── #151: cross-chunk streaming ──────────────────────────────────────────────── + + +def _collect_stream_text(parts: list[str]) -> str: + fw = Firewall() + + async def _chunks() -> AsyncIterator[dict[str, Any]]: + for part in parts: + yield {"text": part} + yield {"__is_final__": True} + + async def _run() -> str: + out: list[str] = [] + async for frame in fw.apply_stream( + _chunks(), + action_id="s1", + capability_id="cap.stream", + principal_id="u1", + principal_roles=["reader"], + response_mode="summary", + ): + out.append(_dump(frame)) + return "".join(out) + + return asyncio.run(_run()) + + +@pytest.mark.parametrize( + "parts", + [ + ["prefix Bearer canary-zzz-9999", "-do-not-leak-token suffix"], + ["eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJjYW5hcnkifQ", ".Qk9HVVNfQ0FOQVJZX1NJRw end"], + ["host postgresql://canary:zzz-secret", "-pw@db.invalid/main tail"], + ], +) +def test_canary_split_across_chunks_is_redacted(parts: list[str]) -> None: + """A secret whose characters span two chunks must not leak (#151).""" + combined = _collect_stream_text(parts) + _assert_no_canary(combined, path="Firewall.apply_stream(split)") + + +def test_stream_redactor_preserves_full_text_minus_secret() -> None: + """Holdback must not drop non-secret text — only the secret is replaced.""" + redactor = StreamRedactor(overlap=8) + emitted = "" + for piece in ["hello wor", "ld and ", f"{_CANARY_BEARER}", " bye"]: + text, _warns = redactor.feed(piece) + emitted += text + tail, _warns = redactor.flush() + emitted += tail + assert "hello world and" in emitted + assert "bye" in emitted + _assert_no_canary(emitted, path="StreamRedactor", canaries=(_CANARY_BEARER,)) + + +# ── #172: trace args and error text ──────────────────────────────────────────── + + +def _build_kernel( + handler: Any, *, capability_id: str = "billing.refund" +) -> tuple[Kernel, Principal]: + registry = CapabilityRegistry() + cap = Capability( + capability_id=capability_id, + name="refund", + description="issue refund", + safety_class=SafetyClass.WRITE, + ) + registry.register(cap) + driver = InMemoryDriver(driver_id="billing") + driver.register_handler(capability_id, handler) + router = StaticRouter(routes={capability_id: ["billing"]}) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret="test-secret-do-not-use-in-prod"), + router=router, + handle_store=HandleStore(), + trace_store=TraceStore(), + ) + kernel.register_driver(driver) + return kernel, Principal(principal_id="agent-1", roles=["admin"]) + + +def _grant_and_invoke( + kernel: Kernel, principal: Principal, capability_id: str, args: dict[str, Any] +): + request = CapabilityRequest(capability_id=capability_id, goal="canary test") + grant = kernel.grant_capability(request, principal, justification="canary regression coverage") + return asyncio.run( + kernel.invoke(grant.token, principal=principal, args=args, response_mode="summary") + ) + + +def test_canary_in_non_memory_arg_is_redacted_in_trace() -> None: + """A secret passed as an argument to a non-memory capability is scrubbed (#172).""" + kernel, principal = _build_kernel(lambda _ctx: [{"ok": True}]) + _grant_and_invoke( + kernel, + principal, + "billing.refund", + {"reference": f"authorize {_CANARY_BEARER}", "amount": 42}, + ) + trace = kernel._trace_store.list_all()[-1] + _assert_no_canary(_dump(trace.args), path="ActionTrace.args") + # Non-sensitive scalar args are preserved. + assert trace.args.get("amount") == 42 + + +def test_canary_in_driver_error_is_redacted_in_trace() -> None: + """A secret embedded in a driver error message is scrubbed before tracing (#172).""" + + def _boom(_ctx: ExecutionContext) -> Any: + raise DriverError(f"upstream rejected token {_CANARY_BEARER}") + + kernel, principal = _build_kernel(_boom) + with pytest.raises(DriverError): + _grant_and_invoke(kernel, principal, "billing.refund", {"amount": 1}) + trace = kernel._trace_store.list_all()[-1] + assert trace.error is not None + _assert_no_canary(trace.error, path="ActionTrace.error", canaries=(_CANARY_BEARER,)) + + +# ── Adapter-rendered egress ──────────────────────────────────────────────────── + + +def test_canary_never_leaks_through_adapter_payload() -> None: + """The adapter tool-result payload renders only a redacted Frame.""" + kernel, principal = _build_kernel( + lambda _ctx: [{"id": 1, "secret_note": f"api_key={_CANARY_API_KEY_VALUE}"}] + ) + frame = _grant_and_invoke(kernel, principal, "billing.refund", {"amount": 1}) + payload = frame_to_payload(frame) + _assert_no_canary(_dump(payload), path="frame_to_payload")