diff --git a/CHANGELOG.md b/CHANGELOG.md index fea2a5c..634a326 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,24 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Two more ecosystem integration cookbooks under `docs/integrations/`, each with + a runnable, offline companion wired into `make ci`: + - **ChainWeaver compiled flows as capabilities** (#95): a `ChainWeaverDriver` + wraps a compiled flow behind the `Driver` protocol so the flow runs through + the normal policy/audit pipeline and produces a kernel-visible `ActionTrace`. + A flow-step failure is translated into a `DriverError` that preserves the + flow id and failing step. ChainWeaver stays an optional dependency. New + [`docs/integrations/chainweaver.md`](docs/integrations/chainweaver.md) and + [`examples/chainweaver_flow.py`](examples/chainweaver_flow.py). + - **Policy guardrails for statistical evaluation artifacts** (#96): a generic, + producer-agnostic `assess_artifact()` layer lets an agent summarize an + evaluation artifact while gating deployment/rollout recommendations on its + support diagnostics (multi-signal: `support_health`, `decision_stable`, + `warnings`, `recommendation.intent`). Denied actions are downgraded to a + manual-review recommendation whose reason is recorded in `ActionTrace.args`. + No statistical estimation is added and no producer dependency is taken. New + [`docs/integrations/evaluation_artifacts.md`](docs/integrations/evaluation_artifacts.md) + and [`examples/evaluation_artifact_policy.py`](examples/evaluation_artifact_policy.py). - `ActionTrace.result_summary` (#93): successful invocations now record a redaction-safe summary of the firewalled `Frame` (`fact_count`, `row_count`, `warning_count`, `has_handle` — counts/flags only, never raw driver data), so diff --git a/Makefile b/Makefile index b4886a2..b014013 100644 --- a/Makefile +++ b/Makefile @@ -23,5 +23,7 @@ example: python examples/readme_quickstart.py python examples/contextweaver_policy_flow.py python examples/repository_safety_check.py + python examples/chainweaver_flow.py + python examples/evaluation_artifact_policy.py ci: fmt-check lint type test example diff --git a/README.md b/README.md index b8d2d09..5f983b7 100644 --- a/README.md +++ b/README.md @@ -189,6 +189,8 @@ See [docs/agent-context/invariants.md](docs/agent-context/invariants.md) for the - [Integrations (MCP, HTTPDriver)](docs/integrations.md) - [contextweaver: policy before action](docs/integrations/contextweaver.md) - [Repository safety checks as a capability](docs/integrations/repository_safety_check.md) + - [ChainWeaver compiled flows as capabilities](docs/integrations/chainweaver.md) + - [Policy guardrails for evaluation artifacts](docs/integrations/evaluation_artifacts.md) - [Designing capabilities](docs/capabilities.md) - [Context Firewall](docs/context_firewall.md) diff --git a/docs/integrations.md b/docs/integrations.md index dfbc8cd..a80f5af 100644 --- a/docs/integrations.md +++ b/docs/integrations.md @@ -377,3 +377,14 @@ projects and external checkers. Each has a runnable, offline companion under — gate a high-impact action behind a deterministic check that shells out to a local command (e.g. VibeGuard), with the result recorded in the audit trace. Companion: [`examples/repository_safety_check.py`](../examples/repository_safety_check.py). +- [ChainWeaver compiled flows as policy-controlled capabilities](integrations/chainweaver.md) + — wrap a ChainWeaver compiled flow behind the `Driver` protocol so it runs + through the normal policy/audit pipeline; step failures surface as a + `DriverError` that preserves the flow id and failing step. ChainWeaver stays + an optional dependency. + Companion: [`examples/chainweaver_flow.py`](../examples/chainweaver_flow.py). +- [Policy guardrails for statistical evaluation artifacts](integrations/evaluation_artifacts.md) + — let an agent summarize an evaluation artifact while gating deployment/rollout + recommendations on its support diagnostics; the downgrade reason is recorded in + the audit trace. Producer-agnostic; no statistical estimation in agent-kernel. + Companion: [`examples/evaluation_artifact_policy.py`](../examples/evaluation_artifact_policy.py). diff --git a/docs/integrations/chainweaver.md b/docs/integrations/chainweaver.md new file mode 100644 index 0000000..8400ff3 --- /dev/null +++ b/docs/integrations/chainweaver.md @@ -0,0 +1,104 @@ +# ChainWeaver compiled flows as policy-controlled capabilities + +[ChainWeaver](https://github.com/dgenio/ChainWeaver) is the Weaver-ecosystem +orchestration layer: it *compiles* a multi-step flow that an agent can run. +agent-kernel owns authorization, execution, and audit. Wrapping a compiled flow +as a capability means the flow runs through the normal pipeline +(policy → token → invoke → firewall → trace) instead of as an out-of-band side +channel — so a flow invocation is policy-checked and auditable like any other +capability (weaver-spec **I-02**). + +This page describes the pattern. The runnable companion is +[`examples/chainweaver_flow.py`](../../examples/chainweaver_flow.py), which is +deterministic, offline, and depends on no ChainWeaver package. + +> ChainWeaver is **not** a required dependency. The adapter only relies on a +> compiled flow exposing a `run(inputs)` method and a `flow_id` attribute. The +> example ships tiny `CompiledFlow` / `FlowExecutionError` stand-ins so it runs +> in CI; in production you pass a real compiled flow to `ChainWeaverDriver`. + +## The pattern + +``` +agent invokes flows.summarize_release + │ + ▼ +ChainWeaverDriver.execute() → compiled_flow.run(inputs) + │ │ + │ ├─ all steps ok → RawResult → Frame + ActionTrace + │ └─ step raises → FlowExecutionError + ▼ │ + DriverError (flow id + failing step preserved) ◄───────────┘ +``` + +| Component | Role | +|---|---| +| `CompiledFlow` | A ChainWeaver compiled flow: ordered, named steps run over a shared context. | +| `ChainWeaverDriver` | Implements the `Driver` protocol; maps a capability operation to a flow and runs it. | +| `flows.summarize_release` | A `READ` capability whose implementation is the compiled flow. | + +## The adapter + +`ChainWeaverDriver` implements the `Driver` protocol and runs the flow bound to +the capability's operation: + +```python +class ChainWeaverDriver: + def __init__(self, flows: dict[str, CompiledFlow], *, driver_id: str = "chainweaver") -> None: + self._flows = dict(flows) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + operation = str(ctx.args.get("operation", ctx.capability_id)) + flow = self._flows.get(operation) + if flow is None: + raise DriverError(f"... no flow for operation='{operation}'.") + inputs = {k: v for k, v in ctx.args.items() if k != "operation"} + try: + output = flow.run(inputs) + except FlowExecutionError as exc: + raise DriverError( + f"ChainWeaver flow '{exc.flow_id}' failed at step '{exc.step}': {exc.cause}" + ) from exc + return RawResult(capability_id=ctx.capability_id, data=output, + metadata={"flow_id": flow.flow_id, "steps": [s.name for s in flow.steps]}) +``` + +## Errors preserve ChainWeaver context + +A real ChainWeaver flow raises its own exception when a step fails. The adapter +**translates** that native error into a kernel `DriverError`, carrying the flow +id and the failing step into the message rather than leaking a raw traceback. +`Kernel.invoke()` then wraps it as +`All drivers failed for capability '...'. Last error: ChainWeaver flow '' +failed at step '': `, so the orchestration context survives for the +caller — and a failed run still records an `ActionTrace` (with `error` set), so +I-02 holds even on failure. + +## Audit trail + +A successful invocation is recorded like any capability: + +```python +action_id = await run_flow(kernel, principal, {"release": "v1.4.0", "changes": [...]}) +trace = kernel.explain(action_id) +# trace.driver_id == "chainweaver" +# trace.result_summary == {"fact_count": ..., "row_count": ..., "warning_count": ..., "has_handle": ...} +``` + +## Non-goals + +- agent-kernel does not compile flows — that is ChainWeaver's job. +- ChainWeaver is never a required dependency. +- Wrapping a flow does not bypass policy: the flow capability is granted and + invoked through the same pipeline as every other capability. + +## Related + +- `examples/chainweaver_flow.py` — runnable, offline. +- [ChainWeaver](https://github.com/dgenio/ChainWeaver) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/docs/integrations/evaluation_artifacts.md b/docs/integrations/evaluation_artifacts.md new file mode 100644 index 0000000..0fc2b0e --- /dev/null +++ b/docs/integrations/evaluation_artifacts.md @@ -0,0 +1,105 @@ +# Policy guardrails for statistical evaluation artifacts + +Agents increasingly consume *evaluation artifacts* — structured reports such as +an offline policy-evaluation result (e.g. from +[`skdr-eval`](https://github.com/dgenio/skdr-eval)). These artifacts are easy to +misuse: an agent that reads a favorable headline estimate and recommends +deployment, while ignoring the support diagnostics, uncertainty, and warnings, +turns a *caveated* result into an *unconditional* action. + +agent-kernel already separates **reading** from **acting** through safety +classes. This page adds a small, generic policy layer on top: an agent may +always *summarize* an artifact (with caveats), but recommending **deployment** +or **automatic rollout** is gated on the artifact's diagnostics. + +The runnable companion is +[`examples/evaluation_artifact_policy.py`](../../examples/evaluation_artifact_policy.py), +which is deterministic, offline, and uses fixture artifacts. + +> agent-kernel does **not** implement offline policy evaluation or any +> statistical estimation, and takes no dependency on a specific producer. The +> policy reads documented fields off a plain dict artifact, so it works for any +> producer — not just `skdr-eval`. + +## Summarizing evidence vs. acting on evidence + +This is the distinction the guardrail enforces: + +| Action | Capability | Safety class | Gated? | +|---|---|---|---| +| Summarize the artifact and its caveats | `eval.summarize_artifact` | `READ` | No — always allowed. | +| Recommend deployment / rollout | `eval.recommend_deployment` | `WRITE` | Yes — only when diagnostics are healthy. | +| Recommend manual review / better logs | `eval.recommend_manual_review` | `WRITE` | The downgrade target when deployment is denied. | + +Summarizing a high-risk result is fine — it informs the human. Recommending +deployment *as if the result were reliable* is what the policy blocks. + +## The generic assessment + +`assess_artifact(artifact)` is producer-agnostic. It inspects documented fields +and returns stable decision codes: + +```python +decision = assess_artifact(artifact) +# decision.allowed_actions → e.g. ("allow_summary", "allow_manual_review_recommendation", ...) +# decision.denied_actions → e.g. ("deny_deployment_recommendation", "deny_automatic_rollout") +# decision.reasons → e.g. ("support_health=high_risk", "decision is not stable") +# decision.allows_deployment → bool gate the host branches on +``` + +Fields inspected (all optional; missing fields default to the safest reading): + +| Field | Meaning | +|---|---| +| `support_health` | `"ok"` / `"caution"` / `"high_risk"`. | +| `decision_stable` | Whether the comparison is robust to reasonable perturbation. | +| `warnings` | Producer warnings (e.g. low ESS, poor overlap). | +| `recommendation.intent` | The artifact's own steer (`"deploy"`, `"hold"`, …). | +| `uncertainty` / `limitations` | Surfaced as caveats in the summary. | + +Deployment is permitted **only** when several signals agree: `support_health` +is `ok`, the decision is stable, there are no warnings, and the artifact does +not itself recommend holding. This is deliberately *not* a single-metric gate — +a good point estimate with poor support is still blocked. + +| `support_health` | Deployment | Outcome | +|---|---|---| +| `ok` (stable, no warnings) | allowed | `allow_summary` + deployment recommendation | +| `caution` | denied | downgraded to manual-review recommendation | +| `high_risk` | denied | downgraded + `require_human_review` | + +## Audit trail records why + +When deployment is denied, the host does not grant `eval.recommend_deployment`; +instead it invokes `eval.recommend_manual_review` with the reasons in the call +args. Because the kernel only redacts args for `memory.`-prefixed capability ids, +these `eval.*` capabilities keep their args in `ActionTrace.args`, so the audit +trace records *why* the action was downgraded: + +```python +capability_id, action_id = await act_on_artifact(kernel, principal, artifact) +trace = kernel.explain(action_id) +# capability_id == "eval.recommend_manual_review" +# trace.args["reason"] == "support_health=high_risk; decision is not stable; 2 warning(s): ..." +# trace.args["downgraded_from"] == "recommend_deployment" +``` + +## Non-goals + +- No OPE / statistical estimation in agent-kernel. +- No hard dependency on `skdr-eval` or any producer. +- No decision based on a single numeric metric. + +## Aligning with `weaver-spec` + +If `weaver-spec` publishes a formal `EvaluationArtifact` contract, the field +names read by `assess_artifact` should be aligned to it; the decision codes here +(`allow_summary`, `allow_manual_review_recommendation`, `require_human_review`, +`deny_deployment_recommendation`, `deny_automatic_rollout`) are intended to be a +stable, producer-neutral vocabulary in the meantime. + +## Related + +- `examples/evaluation_artifact_policy.py` — runnable, offline. +- [skdr-eval](https://github.com/dgenio/skdr-eval) +- [weaver-spec](https://github.com/dgenio/weaver-spec) diff --git a/examples/chainweaver_flow.py b/examples/chainweaver_flow.py new file mode 100644 index 0000000..00040ab --- /dev/null +++ b/examples/chainweaver_flow.py @@ -0,0 +1,264 @@ +"""chainweaver_flow.py — wrap a ChainWeaver compiled flow as a capability. + +The written walkthrough lives in ``docs/integrations/chainweaver.md``. This +script is the runnable companion. It shows the Weaver-ecosystem pattern for +running a *ChainWeaver* compiled flow (an orchestration of ordered steps) +through the normal agent-kernel pipeline so the flow becomes a policy-checked, +audited capability rather than an out-of-band side channel: + + 1. A compiled flow is wrapped behind a ``ChainWeaverDriver``. The driver + implements the ``Driver`` protocol and runs the flow when the capability + is invoked. + 2. Running the wrapped flow through ``Kernel.invoke()`` produces a + kernel-visible execution record (an ``ActionTrace``) exactly like any + other capability — so I-02 (every execution is audited) holds. + 3. When a flow step fails, the driver translates ChainWeaver's native + ``FlowExecutionError`` into a kernel ``DriverError`` that *preserves* the + flow id and the failing step, so the audit trail and the caller keep the + orchestration context. + +``chainweaver`` is **not** a dependency of this example. ``CompiledFlow`` and +``FlowExecutionError`` are tiny, deterministic stand-ins for the compiled-flow +object and native exception a real ChainWeaver build would hand you, so the +demo runs offline and in CI. In production you point ``ChainWeaverDriver`` at a +real compiled flow; the driver only relies on a ``run(inputs)`` method and a +``flow_id`` attribute, which keeps ChainWeaver an optional dependency. + +Run with: ``python examples/chainweaver_flow.py`` +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import Any + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.errors import DriverError +from agent_kernel.models import CapabilityRequest, ImplementationRef, RawResult + +_SECRET = "example-secret-do-not-use-in-prod" + + +# ── ChainWeaver stand-ins (a real build supplies these) ───────────────────── + + +class FlowExecutionError(Exception): + """Stand-in for ChainWeaver's native flow-execution exception. + + Carries the orchestration context a real ChainWeaver error would: which + flow failed and at which step. The :class:`ChainWeaverDriver` translates + this into a kernel :class:`~agent_kernel.errors.DriverError` so the context + survives into the audit trail instead of leaking a raw stack trace. + """ + + def __init__(self, flow_id: str, step: str, cause: str) -> None: + super().__init__(f"flow '{flow_id}' failed at step '{step}': {cause}") + self.flow_id = flow_id + self.step = step + self.cause = cause + + +@dataclass(slots=True) +class FlowStep: + """One named step in a compiled flow.""" + + name: str + run: Callable[[dict[str, Any]], dict[str, Any]] + """Transforms the running context dict and returns the keys it produced.""" + + +@dataclass(slots=True) +class CompiledFlow: + """Deterministic stand-in for a ChainWeaver compiled flow. + + Executes its :attr:`steps` in order, threading a shared context dict. A step + that raises is reported as a :class:`FlowExecutionError` naming the flow and + the failing step — the shape a real ChainWeaver error carries. + """ + + flow_id: str + steps: list[FlowStep] = field(default_factory=list) + + def run(self, inputs: dict[str, Any]) -> dict[str, Any]: + """Run every step in order and return the accumulated context.""" + context: dict[str, Any] = dict(inputs) + for step in self.steps: + try: + context.update(step.run(context)) + except Exception as exc: # translate to ChainWeaver's error shape + raise FlowExecutionError(self.flow_id, step.name, str(exc)) from exc + return context + + +# ── The adapter: a compiled flow behind the Driver protocol ───────────────── + + +class ChainWeaverDriver: + """Driver that runs ChainWeaver compiled flows as capabilities. + + Operations map to compiled flows. ``execute`` runs the flow named by + ``ctx.args['operation']`` (falling back to ``ctx.capability_id``) with the + remaining args as inputs. A :class:`FlowExecutionError` is re-raised as a + :class:`~agent_kernel.errors.DriverError` that keeps the flow id and failing + step, so the orchestration context is preserved for the caller and the + audit trail. + """ + + def __init__(self, flows: dict[str, CompiledFlow], *, driver_id: str = "chainweaver") -> None: + self._flows = dict(flows) + self._driver_id = driver_id + + @property + def driver_id(self) -> str: + """Unique identifier for this driver.""" + return self._driver_id + + async def execute(self, ctx: ExecutionContext) -> RawResult: + """Run the compiled flow bound to this capability's operation.""" + operation = str(ctx.args.get("operation", ctx.capability_id)) + flow = self._flows.get(operation) + if flow is None: + raise DriverError( + f"ChainWeaverDriver '{self._driver_id}' has no flow for operation='{operation}'." + ) + inputs = {k: v for k, v in ctx.args.items() if k != "operation"} + try: + output = flow.run(inputs) + except FlowExecutionError as exc: + raise DriverError( + f"ChainWeaver flow '{exc.flow_id}' failed at step '{exc.step}': {exc.cause}" + ) from exc + return RawResult( + capability_id=ctx.capability_id, + data=output, + metadata={"flow_id": flow.flow_id, "steps": [s.name for s in flow.steps]}, + ) + + +# ── A small, deterministic release-notes flow ─────────────────────────────── + + +def _collect_changes(context: dict[str, Any]) -> dict[str, Any]: + """Step 1: normalise the raw change list, failing loudly when absent.""" + changes = context.get("changes") + if not changes: + raise ValueError("no 'changes' provided to summarize") + return {"changes": [str(c).strip() for c in changes]} + + +def _classify(context: dict[str, Any]) -> dict[str, Any]: + """Step 2: split changes into highlights (feat/fix) and the rest.""" + highlights = [c for c in context["changes"] if c.startswith(("feat", "fix"))] + return {"highlights": highlights} + + +def _render_summary(context: dict[str, Any]) -> dict[str, Any]: + """Step 3: render the final, audit-friendly summary payload.""" + return { + "release": context.get("release", "unreleased"), + "change_count": len(context["changes"]), + "highlights": context["highlights"], + } + + +def build_release_flow() -> CompiledFlow: + """A three-step compiled flow that summarizes release notes.""" + return CompiledFlow( + flow_id="release_notes_summary", + steps=[ + FlowStep("collect_changes", _collect_changes), + FlowStep("classify", _classify), + FlowStep("render_summary", _render_summary), + ], + ) + + +def build_kernel() -> Kernel: + """Wire a kernel with one capability backed by the compiled flow.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="flows.summarize_release", + name="Summarize Release", + description="Run the ChainWeaver release-notes summary flow", + safety_class=SafetyClass.READ, + tags=["flows", "release", "summary", "chainweaver"], + impl=ImplementationRef(driver_id="chainweaver", operation="summarize_release"), + ) + ) + router = StaticRouter(routes={"flows.summarize_release": ["chainweaver"]}) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(ChainWeaverDriver(flows={"summarize_release": build_release_flow()})) + return kernel + + +async def run_flow(kernel: Kernel, principal: Principal, args: dict[str, Any]) -> str: + """Invoke the flow-backed capability and return its audit ``action_id``.""" + request = CapabilityRequest( + capability_id="flows.summarize_release", + goal="summarize the release notes for the next tag", + ) + token = kernel.get_token(request, principal, justification="") + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "summarize_release", **args}, + ) + return frame.action_id + + +async def main() -> None: + kernel = build_kernel() + # A release agent: may read (run flows), not an admin. + agent = Principal(principal_id="release-bot", roles=["reader"]) + + print("=== ChainWeaver compiled flow as a policy-controlled capability ===") + + print("\n=== Success: the wrapped flow runs and is audited ===") + action_id = await run_flow( + kernel, + agent, + { + "release": "v1.4.0", + "changes": ["feat: add export", "fix: token drift", "chore: bump deps"], + }, + ) + trace = kernel.explain(action_id) + print(f" audited: action_id={trace.action_id} driver={trace.driver_id}") + print(f" result_summary={trace.result_summary}") + assert trace.driver_id == "chainweaver", "the flow must run through the ChainWeaver driver" + assert trace.error is None, "the success path must not record an error" + + print("\n=== Failure: a step error is preserved as kernel context ===") + try: + await run_flow(kernel, agent, {"release": "v1.4.1", "changes": []}) + except DriverError as exc: + # The kernel wraps the driver error, but the ChainWeaver context — the + # flow id and the failing step — is preserved inside the message. + message = str(exc) + print(f" DriverError: {message}") + assert "release_notes_summary" in message, "flow id must survive into the kernel error" + assert "collect_changes" in message, "failing step must survive into the kernel error" + else: # pragma: no cover - defensive + raise SystemExit("Expected DriverError when the flow's first step fails") + + print("\n✓ chainweaver_flow.py complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/evaluation_artifact_policy.py b/examples/evaluation_artifact_policy.py new file mode 100644 index 0000000..bfc1ecb --- /dev/null +++ b/examples/evaluation_artifact_policy.py @@ -0,0 +1,345 @@ +"""evaluation_artifact_policy.py — gate agent actions on evaluation artifacts. + +The written walkthrough lives in ``docs/integrations/evaluation_artifacts.md``. +This script is the runnable companion. It shows the Weaver-ecosystem pattern for +letting an agent *summarize* a statistical/model-evaluation artifact (such as an +offline policy-evaluation report) while *denying* high-impact actions — +deployment or automatic rollout — when the artifact's support diagnostics say +the headline estimate is not trustworthy. + +The key distinction the policy enforces: + + * **Summarizing evidence** is always allowed (with caveats surfaced). + * **Acting on evidence** (recommending deployment / automatic rollout) is + gated: it is downgraded to a *manual-review* recommendation whenever the + support diagnostics are weak, the decision is unstable, or warnings exist. + +``assess_artifact`` is a generic, producer-agnostic policy layer: it inspects +documented fields (``support_health``, ``warnings``, ``uncertainty``, +``decision_stable``, ``recommendation.intent``, ``limitations``) on a plain +dict artifact, so it works for any producer — not just ``skdr-eval``. agent-kernel +does **not** implement offline policy evaluation or any statistical estimation, +and takes no dependency on a specific producer; the artifacts here are fixtures. + +Run with: ``python examples/evaluation_artifact_policy.py`` +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any + +from agent_kernel import ( + Capability, + CapabilityRegistry, + HMACTokenProvider, + InMemoryDriver, + Kernel, + Principal, + SafetyClass, + StaticRouter, +) +from agent_kernel.drivers.base import ExecutionContext +from agent_kernel.models import CapabilityRequest, ImplementationRef + +_SECRET = "example-secret-do-not-use-in-prod" + +# Stable decision codes (see docs/integrations/evaluation_artifacts.md). Kept as +# string constants so callers branch on a stable vocabulary, not on prose. +ALLOW_SUMMARY = "allow_summary" +ALLOW_MANUAL_REVIEW_RECOMMENDATION = "allow_manual_review_recommendation" +REQUIRE_HUMAN_REVIEW = "require_human_review" +DENY_DEPLOYMENT_RECOMMENDATION = "deny_deployment_recommendation" +DENY_AUTOMATIC_ROLLOUT = "deny_automatic_rollout" + +# Support-health states an artifact may report, safest first when defaulting. +_HEALTH_OK = "ok" +_HEALTH_CAUTION = "caution" +_HEALTH_HIGH_RISK = "high_risk" +_KNOWN_HEALTH = frozenset({_HEALTH_OK, _HEALTH_CAUTION, _HEALTH_HIGH_RISK}) + + +@dataclass(slots=True) +class ArtifactDecision: + """The outcome of assessing an evaluation artifact for a deployment intent. + + ``allowed_actions`` / ``denied_actions`` carry the stable decision codes; the + convenience flag :attr:`allows_deployment` is the top-level gate the host + uses to decide whether to grant the high-impact capability. ``reasons`` + explains *why* — these strings are recorded in the audit trace when an + action is downgraded. + """ + + support_health: str + allowed_actions: tuple[str, ...] + denied_actions: tuple[str, ...] + reasons: tuple[str, ...] = field(default_factory=tuple) + + @property + def allows_deployment(self) -> bool: + """True only when recommending deployment is not a denied action.""" + return DENY_DEPLOYMENT_RECOMMENDATION not in self.denied_actions + + +def assess_artifact(artifact: dict[str, Any]) -> ArtifactDecision: + """Decide which actions an agent may take given an evaluation *artifact*. + + Generic and producer-agnostic: reads documented fields off a plain dict. + Deployment is permitted only when *several* signals agree — support health + is ``ok``, the decision is stable, no warnings are present, and the + artifact's own recommendation does not say to hold. Relying on more than a + single numeric metric is deliberate (a good point estimate with poor support + must still be blocked). + + Args: + artifact: A mapping with optional keys ``support_health`` (``"ok"`` / + ``"caution"`` / ``"high_risk"``), ``warnings`` (list), + ``decision_stable`` (bool), ``uncertainty`` (mapping/str), + ``recommendation`` (mapping with ``intent``), and ``limitations`` + (list). Missing fields default to the safest interpretation. + + Returns: + An :class:`ArtifactDecision`. Summarizing is always allowed; deployment + is allowed only when every gating signal is satisfied. + """ + # Default to — and normalise any unknown value to — the safest state, so a + # missing or garbage support_health can never read as deployable. + raw_health = str(artifact.get("support_health", _HEALTH_HIGH_RISK)) + support_health = raw_health if raw_health in _KNOWN_HEALTH else _HEALTH_HIGH_RISK + warnings = list(artifact.get("warnings") or []) + decision_stable = bool(artifact.get("decision_stable", False)) + recommendation = artifact.get("recommendation") or {} + intent = ( + str(recommendation.get("intent", "")).lower() if isinstance(recommendation, dict) else "" + ) + + reasons: list[str] = [] + if support_health != _HEALTH_OK: + reasons.append(f"support_health={support_health}") + if not decision_stable: + reasons.append("decision is not stable") + if warnings: + reasons.append(f"{len(warnings)} warning(s): {', '.join(str(w) for w in warnings)}") + if intent in {"hold", "do_not_deploy", "manual_review"}: + reasons.append(f"artifact recommends '{intent}'") + + # Summarizing evidence is always allowed. + allowed: list[str] = [ALLOW_SUMMARY] + denied: list[str] = [] + + if reasons: + # Acting on the evidence is downgraded to a manual-review recommendation. + allowed.append(ALLOW_MANUAL_REVIEW_RECOMMENDATION) + denied.extend([DENY_DEPLOYMENT_RECOMMENDATION, DENY_AUTOMATIC_ROLLOUT]) + if support_health == _HEALTH_HIGH_RISK: + allowed.append(REQUIRE_HUMAN_REVIEW) + + return ArtifactDecision( + support_health=support_health, + allowed_actions=tuple(allowed), + denied_actions=tuple(denied), + reasons=tuple(reasons), + ) + + +def build_kernel() -> Kernel: + """Wire a kernel with a summarize READ and two gated WRITE recommendations.""" + registry = CapabilityRegistry() + registry.register( + Capability( + capability_id="eval.summarize_artifact", + name="Summarize Evaluation Artifact", + description="Summarize an evaluation artifact and surface its caveats", + safety_class=SafetyClass.READ, + tags=["eval", "artifact", "summarize", "report"], + impl=ImplementationRef(driver_id="memory", operation="summarize_artifact"), + ) + ) + registry.register( + Capability( + capability_id="eval.recommend_deployment", + name="Recommend Deployment", + description="Recommend deploying/rolling out the evaluated candidate", + safety_class=SafetyClass.WRITE, + tags=["eval", "deploy", "rollout", "recommendation"], + impl=ImplementationRef(driver_id="memory", operation="recommend_deployment"), + ) + ) + registry.register( + Capability( + capability_id="eval.recommend_manual_review", + name="Recommend Manual Review", + description="Recommend human review / improving logs instead of deploying", + safety_class=SafetyClass.WRITE, + tags=["eval", "manual", "review", "recommendation"], + impl=ImplementationRef(driver_id="memory", operation="recommend_manual_review"), + ) + ) + + driver = InMemoryDriver() + + def summarize_artifact(ctx: ExecutionContext) -> dict[str, Any]: + artifact = ctx.args.get("artifact", {}) + decision = assess_artifact(artifact) + return { + "artifact_type": artifact.get("artifact_type", "evaluation"), + "support_health": decision.support_health, + "caveats": list(artifact.get("limitations") or []) + list(decision.reasons), + } + + def recommend_deployment(ctx: ExecutionContext) -> dict[str, Any]: + return {"recommendation": "deploy", "candidate": ctx.args.get("candidate", "candidate")} + + def recommend_manual_review(ctx: ExecutionContext) -> dict[str, Any]: + return {"recommendation": "manual_review", "reason": ctx.args.get("reason", "")} + + driver.register_handler("summarize_artifact", summarize_artifact) + driver.register_handler("recommend_deployment", recommend_deployment) + driver.register_handler("recommend_manual_review", recommend_manual_review) + + router = StaticRouter( + routes={ + "eval.summarize_artifact": ["memory"], + "eval.recommend_deployment": ["memory"], + "eval.recommend_manual_review": ["memory"], + } + ) + kernel = Kernel( + registry=registry, + token_provider=HMACTokenProvider(secret=_SECRET), + router=router, + ) + kernel.register_driver(driver) + return kernel + + +async def summarize(kernel: Kernel, principal: Principal, artifact: dict[str, Any]) -> str: + """Summarize the artifact (always allowed) and return the audit ``action_id``.""" + request = CapabilityRequest( + capability_id="eval.summarize_artifact", + goal="summarize the evaluation artifact and its caveats", + ) + token = kernel.get_token(request, principal, justification="") + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "summarize_artifact", "artifact": artifact}, + ) + return frame.action_id + + +async def act_on_artifact( + kernel: Kernel, principal: Principal, artifact: dict[str, Any] +) -> tuple[str, str]: + """Gate the deployment recommendation on the artifact assessment. + + Returns ``(capability_id, action_id)`` for whichever action was taken: the + deployment recommendation when the artifact is trustworthy, otherwise the + downgraded manual-review recommendation whose audit args record *why*. + """ + decision = assess_artifact(artifact) + if decision.allows_deployment: + request = CapabilityRequest( + capability_id="eval.recommend_deployment", + goal="recommend deploying the evaluated candidate", + ) + token = kernel.get_token( + request, + principal, + justification="evaluation support is healthy and the decision is stable", + ) + frame = await kernel.invoke( + token, + principal=principal, + args={"operation": "recommend_deployment", "candidate": artifact.get("candidate", "")}, + ) + return ("eval.recommend_deployment", frame.action_id) + + # Downgrade: record the reasons in the audit args so the trace explains why + # deployment was denied. The kernel only redacts args for "memory."-prefixed + # capability ids, so these eval.* caps keep their args in ActionTrace.args + # (the driver_id="memory" InMemoryDriver above is unrelated to that redaction). + request = CapabilityRequest( + capability_id="eval.recommend_manual_review", + goal="recommend manual review because the evaluation support is weak", + ) + token = kernel.get_token( + request, + principal, + justification="downgraded from deployment: weak support diagnostics", + ) + frame = await kernel.invoke( + token, + principal=principal, + args={ + "operation": "recommend_manual_review", + "reason": "; ".join(decision.reasons), + "downgraded_from": "recommend_deployment", + }, + ) + return ("eval.recommend_manual_review", frame.action_id) + + +async def handle( + kernel: Kernel, principal: Principal, label: str, artifact: dict[str, Any] +) -> None: + """Summarize then (conditionally) act on one artifact, printing the audit.""" + print(f"\n=== Artifact: {label} (support_health={artifact.get('support_health')}) ===") + summary_action = await summarize(kernel, principal, artifact) + print(f" summarize: allowed — action_id={summary_action} (always permitted)") + + capability_id, action_id = await act_on_artifact(kernel, principal, artifact) + trace = kernel.explain(action_id) + if capability_id == "eval.recommend_deployment": + print(f" action: deployment recommended — action_id={trace.action_id}") + else: + print(f" action: DOWNGRADED to manual review — action_id={trace.action_id}") + print(f" audited reason: {trace.args.get('reason')}") + assert trace.args.get("reason"), "the downgrade reason must be recorded in the audit trace" + + +async def main() -> None: + kernel = build_kernel() + # An analyst agent: may read and write recommendations, not an admin. + analyst = Principal(principal_id="eval-analyst", roles=["reader", "writer"]) + + print("=== Policy guardrails for statistical evaluation artifacts ===") + + healthy = { + "artifact_type": "offline_policy_evaluation", + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "uncertainty": {"ci_width": "narrow"}, + "recommendation": {"intent": "deploy"}, + "limitations": [], + "candidate": "policy-v2", + } + cautious = { + "artifact_type": "offline_policy_evaluation", + "support_health": "caution", + "decision_stable": True, + "warnings": ["moderate overlap"], + "recommendation": {"intent": "deploy"}, + "candidate": "policy-v2", + } + risky = { + "artifact_type": "offline_policy_evaluation", + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low effective sample size", "poor overlap"], + "recommendation": {"intent": "hold"}, + "limitations": ["estimate extrapolates beyond logged support"], + "candidate": "policy-v2", + } + + await handle(kernel, analyst, "healthy", healthy) + await handle(kernel, analyst, "caution", cautious) + await handle(kernel, analyst, "high_risk", risky) + + print("\n✓ evaluation_artifact_policy.py complete.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tests/test_chainweaver_flow.py b/tests/test_chainweaver_flow.py new file mode 100644 index 0000000..b8a0338 --- /dev/null +++ b/tests/test_chainweaver_flow.py @@ -0,0 +1,74 @@ +"""Tests for the ChainWeaver integration example (issue #95). + +Verifies the two acceptance-critical behaviors of +``examples/chainweaver_flow.py``: running a wrapped compiled flow produces a +kernel-visible execution record (an ``ActionTrace``), and a failing flow step +surfaces as a ``DriverError`` that preserves the ChainWeaver context (flow id +and failing step). +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType + +import pytest + +from agent_kernel import Principal +from agent_kernel.errors import DriverError + +_EXAMPLES = Path(__file__).resolve().parent.parent / "examples" + + +def _load_example(name: str) -> ModuleType: + """Import an example module by file path (examples are not a package).""" + spec = importlib.util.spec_from_file_location(name, _EXAMPLES / f"{name}.py") + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module # let dataclass field resolution find the module + spec.loader.exec_module(module) + return module + + +cw = _load_example("chainweaver_flow") + + +def test_compiled_flow_runs_steps_in_order() -> None: + """The compiled-flow stand-in threads context through every step.""" + flow = cw.build_release_flow() + result = flow.run({"release": "v1.0.0", "changes": ["feat: a", "chore: b"]}) + assert result["change_count"] == 2 + assert result["highlights"] == ["feat: a"] + + +def test_compiled_flow_raises_flow_error_with_context() -> None: + """A failing step is reported with the flow id and failing step name.""" + flow = cw.build_release_flow() + with pytest.raises(cw.FlowExecutionError) as excinfo: + flow.run({"release": "v1.0.0", "changes": []}) + assert excinfo.value.flow_id == "release_notes_summary" + assert excinfo.value.step == "collect_changes" + + +async def test_wrapped_flow_produces_audit_trace() -> None: + """Invoking the flow-backed capability records a kernel-visible trace.""" + kernel = cw.build_kernel() + principal = Principal(principal_id="release-bot", roles=["reader"]) + action_id = await cw.run_flow( + kernel, principal, {"release": "v2.0.0", "changes": ["feat: x", "fix: y"]} + ) + trace = kernel.explain(action_id) + assert trace.capability_id == "flows.summarize_release" + assert trace.driver_id == "chainweaver" + assert trace.error is None + assert trace.result_summary is not None + + +async def test_flow_failure_preserves_chainweaver_context() -> None: + """A flow step failure surfaces as DriverError keeping flow id + step.""" + kernel = cw.build_kernel() + principal = Principal(principal_id="release-bot", roles=["reader"]) + with pytest.raises(DriverError, match="release_notes_summary.*collect_changes"): + await cw.run_flow(kernel, principal, {"release": "v2.0.1", "changes": []}) diff --git a/tests/test_evaluation_artifact_policy.py b/tests/test_evaluation_artifact_policy.py new file mode 100644 index 0000000..90c7a44 --- /dev/null +++ b/tests/test_evaluation_artifact_policy.py @@ -0,0 +1,144 @@ +"""Tests for the evaluation-artifact policy example (issue #96). + +Covers the three required support-health states (``ok`` / ``caution`` / +``high_risk``), confirms the gate relies on more than a single signal, and +verifies that a downgraded action records *why* in the audit trace. +""" + +from __future__ import annotations + +import importlib.util +import sys +from pathlib import Path +from types import ModuleType + +import pytest + +from agent_kernel import Principal + +_EXAMPLES = Path(__file__).resolve().parent.parent / "examples" + + +def _load_example(name: str) -> ModuleType: + """Import an example module by file path (examples are not a package).""" + spec = importlib.util.spec_from_file_location(name, _EXAMPLES / f"{name}.py") + assert spec is not None and spec.loader is not None + module = importlib.util.module_from_spec(spec) + sys.modules[name] = module # let dataclass field resolution find the module + spec.loader.exec_module(module) + return module + + +eap = _load_example("evaluation_artifact_policy") + + +def test_ok_artifact_allows_deployment() -> None: + """A healthy, stable artifact with no warnings permits deployment.""" + artifact = { + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is True + assert decision.denied_actions == () + assert eap.ALLOW_SUMMARY in decision.allowed_actions + + +def test_caution_artifact_downgrades_without_human_review() -> None: + """A caution artifact denies deployment but does not force human review.""" + artifact = { + "support_health": "caution", + "decision_stable": True, + "warnings": ["moderate overlap"], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert eap.DENY_DEPLOYMENT_RECOMMENDATION in decision.denied_actions + assert eap.DENY_AUTOMATIC_ROLLOUT in decision.denied_actions + assert eap.ALLOW_MANUAL_REVIEW_RECOMMENDATION in decision.allowed_actions + assert eap.REQUIRE_HUMAN_REVIEW not in decision.allowed_actions + + +def test_high_risk_artifact_requires_human_review() -> None: + """A high_risk artifact denies deployment and requires human review.""" + artifact = { + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low effective sample size", "poor overlap"], + "recommendation": {"intent": "hold"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert eap.DENY_DEPLOYMENT_RECOMMENDATION in decision.denied_actions + assert eap.REQUIRE_HUMAN_REVIEW in decision.allowed_actions + assert decision.reasons # at least one reason recorded + + +def test_gate_is_not_single_metric() -> None: + """Good support health is insufficient when the decision is unstable.""" + artifact = { + "support_health": "ok", + "decision_stable": False, + "warnings": [], + "recommendation": {"intent": "deploy"}, + } + decision = eap.assess_artifact(artifact) + assert decision.allows_deployment is False + assert "decision is not stable" in "; ".join(decision.reasons) + + +def test_missing_fields_default_to_safest() -> None: + """An artifact missing diagnostics is treated as high_risk, not deployable.""" + decision = eap.assess_artifact({}) + assert decision.support_health == "high_risk" + assert decision.allows_deployment is False + + +def test_unknown_support_health_normalised_to_safest() -> None: + """An unrecognised support_health value cannot read as deployable.""" + decision = eap.assess_artifact( + {"support_health": "looks_fine", "decision_stable": True, "warnings": []} + ) + assert decision.support_health == "high_risk" + assert decision.allows_deployment is False + + +@pytest.mark.parametrize( + ("artifact", "expected_capability"), + [ + ( + { + "support_health": "ok", + "decision_stable": True, + "warnings": [], + "recommendation": {"intent": "deploy"}, + }, + "eval.recommend_deployment", + ), + ( + { + "support_health": "high_risk", + "decision_stable": False, + "warnings": ["low ESS"], + "recommendation": {"intent": "hold"}, + }, + "eval.recommend_manual_review", + ), + ], +) +async def test_act_on_artifact_audits_decision( + artifact: dict[str, object], expected_capability: str +) -> None: + """The chosen action is audited; a downgrade records its reason.""" + kernel = eap.build_kernel() + analyst = Principal(principal_id="eval-analyst", roles=["reader", "writer"]) + capability_id, action_id = await eap.act_on_artifact(kernel, analyst, artifact) + assert capability_id == expected_capability + trace = kernel.explain(action_id) + assert trace.capability_id == expected_capability + if expected_capability == "eval.recommend_manual_review": + assert trace.args.get("reason"), "downgrade must record the reason in the audit trace" + assert trace.args.get("downgraded_from") == "recommend_deployment"