Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions crates/buzz-acp/src/acp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,41 @@ pub fn resolve_model_switch_method(
None
}

/// Whether `desired_model` appears in pre-extracted catalog halves.
///
/// Mirrors [`resolve_model_switch_method`]'s match, but operates on the
/// already-extracted `configOptions` (model category) and `models` state that
/// [`AgentModelCapabilities`](crate::pool::AgentModelCapabilities) caches — the
/// idle-path pre-cancel guard has those halves, not the full `session/new` JSON.
pub fn model_in_catalog(
config_options: &[serde_json::Value],
available_models: Option<&serde_json::Value>,
desired_model: &str,
) -> bool {
let in_config_options = config_options.iter().any(|config_opt| {
config_opt
.get("options")
.and_then(|v| v.as_array())
.is_some_and(|options| {
options
.iter()
.any(|opt| opt.get("value").and_then(|v| v.as_str()) == Some(desired_model))
})
});
if in_config_options {
return true;
}

available_models
.and_then(|models| models.get("availableModels"))
.and_then(|v| v.as_array())
.is_some_and(|available| {
available
.iter()
.any(|model| model.get("modelId").and_then(|v| v.as_str()) == Some(desired_model))
})
}

// ─── Drop: kill child process ─────────────────────────────────────────────────

impl Drop for AcpClient {
Expand Down Expand Up @@ -1783,6 +1818,58 @@ mod tests {
);
}

// ── model_in_catalog tests ────────────────────────────────────────────

#[test]
fn model_in_catalog_true_when_in_config_options() {
let config_options = vec![serde_json::json!({
"configId": "model",
"category": "model",
"options": [
{ "value": "claude-sonnet-4-20250514" },
{ "value": "claude-opus-4-20250514" }
]
})];
assert!(super::model_in_catalog(
&config_options,
None,
"claude-opus-4-20250514"
));
}

#[test]
fn model_in_catalog_true_when_in_available_models() {
let available = serde_json::json!({
"currentModelId": "gpt-5",
"availableModels": [
{ "modelId": "gpt-5" },
{ "modelId": "o3-pro" }
]
});
assert!(super::model_in_catalog(&[], Some(&available), "o3-pro"));
}

#[test]
fn model_in_catalog_false_when_absent_from_both_halves() {
let config_options = vec![serde_json::json!({
"configId": "model",
"options": [{ "value": "claude-sonnet-4-20250514" }]
})];
let available = serde_json::json!({
"availableModels": [{ "modelId": "gpt-5" }]
});
assert!(!super::model_in_catalog(
&config_options,
Some(&available),
"nonexistent-model"
));
}

#[test]
fn model_in_catalog_false_when_both_halves_empty() {
assert!(!super::model_in_catalog(&[], None, "anything"));
}

// ── Error variant display ─────────────────────────────────────────────

#[test]
Expand Down
106 changes: 100 additions & 6 deletions crates/buzz-acp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use filter::SubscriptionRule;
use futures_util::FutureExt;
use nostr::{PublicKey, ToBech32};
use pool::{
AgentPool, ControlSignal, OwnedAgent, PromptContext, PromptOutcome, PromptResult, PromptSource,
SessionState,
AgentPool, ControlSignal, IdleSwitchResult, OwnedAgent, PromptContext, PromptOutcome,
PromptResult, PromptSource, SessionState,
};
use queue::{EventQueue, QueuedEvent, ThreadTags};
use relay::{HarnessRelay, RelayEventPublisher};
Expand Down Expand Up @@ -718,11 +718,25 @@ fn handle_relay_observer_control_event(
};

let command_type = payload.get("type").and_then(|value| value.as_str());
if command_type != Some("cancel_turn") {
tracing::debug!(payload = %payload, "ignoring unknown observer control frame");
return;
match command_type {
Some("cancel_turn") => {
handle_cancel_turn_control(&payload, pool, observer);
}
Some("switch_model") => {
handle_switch_model_control(&payload, pool, observer);
}
_ => {
tracing::debug!(payload = %payload, "ignoring unknown observer control frame");
}
}
}

/// Handle a `cancel_turn` control frame: signal the in-flight task to cancel.
fn handle_cancel_turn_control(
payload: &serde_json::Value,
pool: &mut AgentPool,
observer: Option<&observer::ObserverHandle>,
) {
let Some(channel_id) = payload
.get("channelId")
.and_then(|value| value.as_str())
Expand Down Expand Up @@ -751,6 +765,83 @@ fn handle_relay_observer_control_event(
}
}

/// Handle a `switch_model` control frame (Phase 3a, Option ii).
///
/// Busy path: deliver `SwitchModel` over the in-flight task's oneshot — the
/// task cancels the turn, sets `desired_model`, and requeues the batch so it
/// re-runs on a fresh session under the new model. A catalog miss surfaces
/// post-cancel via `create_session_and_apply_model` (the turn restarts on the
/// unchanged model + an `unsupported_model` result).
///
/// Idle path: validate against the cached catalog *before* invalidating
/// (pre-cancel guard), then set `desired_model` + invalidate. The override
/// takes visible effect on the agent's next turn.
fn handle_switch_model_control(
payload: &serde_json::Value,
pool: &mut AgentPool,
observer: Option<&observer::ObserverHandle>,
) {
let Some(channel_id) = payload
.get("channelId")
.and_then(|value| value.as_str())
.and_then(|value| value.parse::<Uuid>().ok())
else {
tracing::warn!("observer switch_model control frame missing valid channelId");
return;
};
let Some(model_id) = payload.get("modelId").and_then(|value| value.as_str()) else {
tracing::warn!("observer switch_model control frame missing modelId");
return;
};

// A turn is in flight for this channel iff a task_map entry exists. The
// agent is moved out of the pool during a turn, so the control oneshot is
// the only reachable lever; an idle channel has no such entry.
let turn_in_flight = pool
.task_map()
.values()
.any(|m| m.channel_id == Some(channel_id));

let status = if turn_in_flight {
// Busy path: deliver over the oneshot. `false` means the oneshot was
// already consumed this turn (a prior cancel/interrupt) — the turn is
// already ending, so the switch cannot land on it.
if signal_in_flight_task(
pool,
channel_id,
ControlSignal::SwitchModel(model_id.to_string()),
) {
"sent"
} else {
"turn_ending"
}
} else {
// Idle path: validate against the cached catalog before invalidating.
match pool.switch_idle_agent_model(channel_id, model_id) {
IdleSwitchResult::Switched => "switched",
IdleSwitchResult::UnsupportedModel => "unsupported_model",
IdleSwitchResult::NoIdleAgent => "no_active_turn",
}
};

if let Some(observer) = observer {
observer.emit(
"control_result",
None,
&observer::ObserverContext {
channel_id: Some(channel_id.to_string()),
session_id: None,
turn_id: None,
},
serde_json::json!({
"type": "switch_model",
"status": status,
"modelId": model_id,
}),
);
}
}

/// Maximum crashes in a 60-second window before a slot's circuit opens.
const CIRCUIT_BREAKER_THRESHOLD: usize = 3;
/// Window for circuit-breaker crash counting.
Expand Down Expand Up @@ -1046,6 +1137,7 @@ async fn tokio_main() -> Result<()> {
state: SessionState::default(),
model_capabilities: None,
desired_model: config.model.clone(),
model_overridden: false,
protocol_version,
}));
}
Expand Down Expand Up @@ -1482,6 +1574,7 @@ async fn tokio_main() -> Result<()> {
state: SessionState::default(),
model_capabilities: None,
desired_model: config.model.clone(),
model_overridden: false,
protocol_version,
};
pool.return_agent(agent);
Expand Down Expand Up @@ -2161,8 +2254,8 @@ fn signal_in_flight_task(

if let Some(meta) = entry {
if let Some(tx) = meta.control_tx.take() {
let _ = tx.send(mode);
tracing::info!(channel = %channel_id, ?mode, "control signal sent to in-flight task");
let _ = tx.send(mode);
return true;
}
}
Expand Down Expand Up @@ -3487,6 +3580,7 @@ mod error_outcome_emission_tests {
state: Default::default(),
model_capabilities: None,
desired_model: None,
model_overridden: false,
// Error branches under test never read this; 1 is the legacy
// non-systemPrompt path, the simplest valid value.
protocol_version: 1,
Expand Down
Loading
Loading