From cb059f25bc99c17a8ce490122dd25b8f0d4837c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Wed, 20 May 2026 16:01:11 -0700 Subject: [PATCH 1/2] propagate db errors through vault DbAccess trait --- crates/mqdb-agent/src/agent/handlers.rs | 169 +++++++++++++++++------- crates/mqdb-agent/src/db_helpers.rs | 131 +++++++++++++++--- crates/mqdb-agent/src/vault_backend.rs | 21 +-- crates/mqdb-vault/src/backend.rs | 122 ++++++++++++++--- crates/mqdb-vault/src/ops.rs | 36 +++-- 5 files changed, 368 insertions(+), 111 deletions(-) diff --git a/crates/mqdb-agent/src/agent/handlers.rs b/crates/mqdb-agent/src/agent/handlers.rs index f37f8e2..392aab1 100644 --- a/crates/mqdb-agent/src/agent/handlers.rs +++ b/crates/mqdb-agent/src/agent/handlers.rs @@ -1138,11 +1138,17 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) -> ); } - let Some(identity) = - crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await - else { - return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found"); - }; + let identity = + match crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => { + return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found"); + } + Err(e) => { + warn!("password change: failed to read identity {canonical_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to read identity"); + } + }; let email_verified = identity .get("email_verified") @@ -1155,12 +1161,18 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) -> ); } - let Some(cred) = crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await - else { - return Response::error( - mqdb_core::ErrorCode::NotFound, - "no credentials found (OAuth-only account)", - ); + let cred = match crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => { + return Response::error( + mqdb_core::ErrorCode::NotFound, + "no credentials found (OAuth-only account)", + ); + } + Err(e) => { + warn!("password change: failed to read credentials {canonical_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to read credentials"); + } }; let Some(stored_hash) = cred.get("password_hash").and_then(|v| v.as_str()) else { @@ -1185,13 +1197,17 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) -> } }; - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_credentials", canonical_id, &json!({"password_hash": new_hash}), ) - .await; + .await + { + error!("password change: failed to update credentials for {canonical_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password"); + } Response::ok(json!({"status": "password changed"})) } @@ -1216,11 +1232,17 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu ); } - let Some(identity) = - crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await - else { - return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found"); - }; + let identity = + match crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => { + return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found"); + } + Err(e) => { + warn!("password reset start: failed to read identity {canonical_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to read identity"); + } + }; let email_lower = email.to_lowercase(); let email_matches = if let Some(crypto) = ctx.identity_crypto { @@ -1262,14 +1284,16 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu if status != "pending" && status != "delivered" { continue; } - if let Some(id) = challenge.get("id").and_then(|v| v.as_str()) { - crate::db_helpers::update_entity_db( + if let Some(id) = challenge.get("id").and_then(|v| v.as_str()) + && let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_verification_challenges", id, &json!({"status": "expired"}), ) - .await; + .await + { + warn!("password reset start: failed to expire stale challenge {id}: {e}"); } } } @@ -1369,11 +1393,19 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val ); } - let Some(challenge) = - crate::db_helpers::read_entity_db(ctx.db, "_verification_challenges", challenge_id).await - else { - return Response::error(mqdb_core::ErrorCode::NotFound, "challenge not found"); - }; + let challenge = + match crate::db_helpers::read_entity_db(ctx.db, "_verification_challenges", challenge_id) + .await + { + Ok(Some(v)) => v, + Ok(None) => { + return Response::error(mqdb_core::ErrorCode::NotFound, "challenge not found"); + } + Err(e) => { + warn!("password reset submit: failed to read challenge {challenge_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to read challenge"); + } + }; let purpose = challenge .get("purpose") @@ -1412,13 +1444,16 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val .unwrap_or(0); let now = crate::http::challenge_utils::now_unix(); if expires_at > 0 && now >= expires_at { - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_verification_challenges", challenge_id, &json!({"status": "expired"}), ) - .await; + .await + { + warn!("password reset submit: failed to mark challenge {challenge_id} expired: {e}"); + } return Response::error(mqdb_core::ErrorCode::BadRequest, "challenge has expired"); } @@ -1431,13 +1466,16 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val .and_then(Value::as_u64) .unwrap_or(5); if attempts >= max_attempts { - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_verification_challenges", challenge_id, &json!({"status": "failed"}), ) - .await; + .await + { + warn!("password reset submit: failed to mark challenge {challenge_id} failed: {e}"); + } return Response::error(mqdb_core::ErrorCode::BadRequest, "too many attempts"); } @@ -1454,23 +1492,29 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val } else { status }; - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_verification_challenges", challenge_id, &json!({"attempts": new_attempts, "status": new_status}), ) - .await; + .await + { + warn!("password reset submit: failed to record failed attempt on {challenge_id}: {e}"); + } return Response::error(mqdb_core::ErrorCode::Unauthorized, "invalid code"); } - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_verification_challenges", challenge_id, &json!({"status": "verified", "attempts": new_attempts}), ) - .await; + .await + { + warn!("password reset submit: failed to mark challenge {challenge_id} verified: {e}"); + } let new_hash = match crate::http::credentials::hash_password(new_password) { Ok(h) => h, @@ -1486,35 +1530,60 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val .unwrap_or(""); let existing_creds = - crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await; + match crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await { + Ok(opt) => opt, + Err(e) => { + warn!("password reset submit: failed to read credentials {canonical_id}: {e}"); + return Response::error( + mqdb_core::ErrorCode::Internal, + "failed to read credentials", + ); + } + }; if existing_creds.is_some() { - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_credentials", canonical_id, &json!({"password_hash": new_hash}), ) - .await; - } else { - crate::db_helpers::create_entity_db( - ctx.db, - "_credentials", - &json!({ - "id": canonical_id, - "password_hash": new_hash, - "email_hash": target_hash, - }), - ) - .await; + .await + { + error!("password reset submit: failed to update credentials for {canonical_id}: {e}"); + return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password"); + } + } else if !crate::db_helpers::create_entity_db( + ctx.db, + "_credentials", + &json!({ + "id": canonical_id, + "password_hash": new_hash, + "email_hash": target_hash, + }), + ) + .await + { + error!("password reset submit: failed to create credentials for {canonical_id}"); + return Response::error( + mqdb_core::ErrorCode::Internal, + "failed to create credentials", + ); } - crate::db_helpers::update_entity_db( + if let Err(e) = crate::db_helpers::update_entity_db( ctx.db, "_identities", canonical_id, &json!({"email_verified": true}), ) - .await; + .await + { + error!("password reset submit: failed to mark email_verified for {canonical_id}: {e}"); + return Response::error( + mqdb_core::ErrorCode::Internal, + "failed to mark email verified", + ); + } Response::ok(json!({"status": "password_reset"})) } diff --git a/crates/mqdb-agent/src/db_helpers.rs b/crates/mqdb-agent/src/db_helpers.rs index c5cfa85..913961c 100644 --- a/crates/mqdb-agent/src/db_helpers.rs +++ b/crates/mqdb-agent/src/db_helpers.rs @@ -3,6 +3,7 @@ use crate::database::{CallerContext, Database}; use crate::vault_backend::{DbAccess, VaultFuture}; +use mqdb_core::error::Error; use mqdb_core::types::ScopeConfig; use mqdb_core::{Filter, FilterOp}; use mqtt5::client::MqttClient; @@ -18,13 +19,27 @@ pub async fn create_entity_db(db: &Database, entity: &str, data: &Value) -> bool .is_ok() } -pub async fn read_entity_db(db: &Database, entity: &str, id: &str) -> Option { - db.read(entity.to_string(), id.to_string(), vec![], None) +/// # Errors +/// Returns the underlying storage error. `Ok(None)` is returned when the record does not exist. +pub async fn read_entity_db(db: &Database, entity: &str, id: &str) -> Result, Error> { + match db + .read(entity.to_string(), id.to_string(), vec![], None) .await - .ok() + { + Ok(v) => Ok(Some(v)), + Err(Error::NotFound { .. }) => Ok(None), + Err(e) => Err(e), + } } -pub async fn update_entity_db(db: &Database, entity: &str, id: &str, data: &Value) -> bool { +/// # Errors +/// Returns `Error::NotFound` if the record does not exist, or any other storage/validation error from the underlying update. +pub async fn update_entity_db( + db: &Database, + entity: &str, + id: &str, + data: &Value, +) -> Result { let scope = ScopeConfig::default(); let caller = CallerContext { sender: None, @@ -39,7 +54,6 @@ pub async fn update_entity_db(db: &Database, entity: &str, id: &str, data: &Valu &caller, ) .await - .is_ok() } pub async fn list_entities_db(db: &Database, entity: &str, filter: &str) -> Option> { @@ -124,24 +138,95 @@ pub async fn create_entity_mqtt(client: &MqttClient, entity: &str, data: &Value) .unwrap_or(false) } -pub async fn read_entity_mqtt(client: &MqttClient, entity: &str, id: &str) -> Option { +/// # Errors +/// Returns `Error::Internal` when the MQTT round-trip fails or the broker returns a non-ok status without a `not_found` code; `Ok(None)` when the record does not exist. +pub async fn read_entity_mqtt( + client: &MqttClient, + entity: &str, + id: &str, +) -> Result, Error> { let topic = format!("$DB/{entity}/{id}"); - let payload = mqtt_rr(client, &topic, vec![], std::time::Duration::from_secs(5)).await?; - let response: Value = serde_json::from_slice(&payload).ok()?; - response.get("data").cloned() + let Some(payload) = mqtt_rr(client, &topic, vec![], std::time::Duration::from_secs(5)).await + else { + return Err(Error::Internal(format!( + "mqtt read failed: no response for {entity}/{id}" + ))); + }; + let response: Value = serde_json::from_slice(&payload) + .map_err(|e| Error::Internal(format!("mqtt read response decode failed: {e}")))?; + let status = response + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if status == "ok" { + Ok(response.get("data").cloned()) + } else { + let code = response + .get("error") + .and_then(|e| e.get("code")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + if code == "not_found" { + Ok(None) + } else { + let message = response + .get("error") + .and_then(|e| e.get("message")) + .and_then(|v| v.as_str()) + .unwrap_or("mqtt read returned non-ok status"); + Err(Error::Internal(format!( + "mqtt read failed for {entity}/{id}: {message}" + ))) + } + } } -pub async fn update_entity_mqtt(client: &MqttClient, entity: &str, id: &str, data: &Value) -> bool { +/// # Errors +/// Returns `Error::NotFound` if the broker reports `not_found`, `Error::Internal` for any other MQTT or status failure. +pub async fn update_entity_mqtt( + client: &MqttClient, + entity: &str, + id: &str, + data: &Value, +) -> Result { let topic = format!("$DB/{entity}/{id}/update"); let payload = serde_json::to_vec(data).unwrap_or_default(); let Some(resp) = mqtt_rr(client, &topic, payload, std::time::Duration::from_secs(5)).await else { - return false; + return Err(Error::Internal(format!( + "mqtt update failed: no response for {entity}/{id}" + ))); }; - serde_json::from_slice::(&resp) - .ok() - .and_then(|v| v.get("status").and_then(|s| s.as_str()).map(|s| s == "ok")) - .unwrap_or(false) + let response: Value = serde_json::from_slice(&resp) + .map_err(|e| Error::Internal(format!("mqtt update response decode failed: {e}")))?; + let status = response + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if status == "ok" { + Ok(response.get("data").cloned().unwrap_or(Value::Null)) + } else { + let code = response + .get("error") + .and_then(|e| e.get("code")) + .and_then(|v| v.as_str()) + .unwrap_or(""); + if code == "not_found" { + Err(Error::NotFound { + entity: entity.to_string(), + id: id.to_string(), + }) + } else { + let message = response + .get("error") + .and_then(|e| e.get("message")) + .and_then(|v| v.as_str()) + .unwrap_or("mqtt update returned non-ok status"); + Err(Error::Internal(format!( + "mqtt update failed for {entity}/{id}: {message}" + ))) + } + } } pub async fn list_entities_mqtt( @@ -170,7 +255,11 @@ pub async fn list_entities_mqtt( } impl DbAccess for Database { - fn read_entity<'a>(&'a self, entity: &'a str, id: &'a str) -> VaultFuture<'a, Option> { + fn read_entity<'a>( + &'a self, + entity: &'a str, + id: &'a str, + ) -> VaultFuture<'a, Result, Error>> { Box::pin(read_entity_db(self, entity, id)) } fn update_entity<'a>( @@ -178,7 +267,7 @@ impl DbAccess for Database { entity: &'a str, id: &'a str, data: Value, - ) -> VaultFuture<'a, bool> { + ) -> VaultFuture<'a, Result> { Box::pin(async move { update_entity_db(self, entity, id, &data).await }) } fn list_entities<'a>( @@ -205,7 +294,11 @@ impl MqttDbAccess { } impl DbAccess for MqttDbAccess { - fn read_entity<'a>(&'a self, entity: &'a str, id: &'a str) -> VaultFuture<'a, Option> { + fn read_entity<'a>( + &'a self, + entity: &'a str, + id: &'a str, + ) -> VaultFuture<'a, Result, Error>> { Box::pin(async move { read_entity_mqtt(&self.client, entity, id).await }) } fn update_entity<'a>( @@ -213,7 +306,7 @@ impl DbAccess for MqttDbAccess { entity: &'a str, id: &'a str, data: Value, - ) -> VaultFuture<'a, bool> { + ) -> VaultFuture<'a, Result> { Box::pin(async move { update_entity_mqtt(&self.client, entity, id, &data).await }) } fn list_entities<'a>( diff --git a/crates/mqdb-agent/src/vault_backend.rs b/crates/mqdb-agent/src/vault_backend.rs index 04a447d..db371df 100644 --- a/crates/mqdb-agent/src/vault_backend.rs +++ b/crates/mqdb-agent/src/vault_backend.rs @@ -46,14 +46,14 @@ pub trait DbAccess: Send + Sync { &'a self, entity: &'a str, id: &'a str, - ) -> VaultFuture<'a, Option>; + ) -> VaultFuture<'a, Result, mqdb_core::error::Error>>; fn update_entity<'a>( &'a self, entity: &'a str, id: &'a str, data: serde_json::Value, - ) -> VaultFuture<'a, bool>; + ) -> VaultFuture<'a, Result>; fn list_entities<'a>( &'a self, @@ -148,16 +148,21 @@ impl DbAccess for NoopDbAccess { &'a self, _entity: &'a str, _id: &'a str, - ) -> VaultFuture<'a, Option> { - Box::pin(async { None }) + ) -> VaultFuture<'a, Result, mqdb_core::error::Error>> { + Box::pin(async { Ok(None) }) } fn update_entity<'a>( &'a self, - _entity: &'a str, - _id: &'a str, + entity: &'a str, + id: &'a str, _data: serde_json::Value, - ) -> VaultFuture<'a, bool> { - Box::pin(async { false }) + ) -> VaultFuture<'a, Result> { + Box::pin(async move { + Err(mqdb_core::error::Error::NotFound { + entity: entity.to_string(), + id: id.to_string(), + }) + }) } fn list_entities<'a>( &'a self, diff --git a/crates/mqdb-vault/src/backend.rs b/crates/mqdb-vault/src/backend.rs index 3c3b4f5..e75bab1 100644 --- a/crates/mqdb-vault/src/backend.rs +++ b/crates/mqdb-vault/src/backend.rs @@ -21,7 +21,7 @@ use mqdb_core::transport::{Request, Response, VaultConstraintData}; use mqdb_core::types::{OwnershipConfig, OwnershipDecision}; use serde_json::{Value, json}; use std::sync::Arc; -use tracing::{debug, error}; +use tracing::{debug, error, warn}; pub struct VaultBackendImpl { key_store: Arc, @@ -271,8 +271,15 @@ impl VaultBackend for VaultBackendImpl { } self.check_rate_limit(canonical_id)?; - let Some(identity) = db.read_entity("_identities", canonical_id).await else { - return Err(VaultError::NotFound("identity not found".into())); + let identity = match db.read_entity("_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => return Err(VaultError::NotFound("identity not found".into())), + Err(e) => { + warn!("admin_enable: failed to read identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to read identity: {e}" + ))); + } }; if identity .get("vault_enabled") @@ -300,8 +307,15 @@ impl VaultBackend for VaultBackendImpl { "vault_migration_status": "pending", "vault_migration_mode": "encrypt", }); - db.update_entity("_identities", canonical_id, migration_start) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_start) + .await + { + warn!("admin_enable: failed to mark identity {canonical_id} migration_start: {e}"); + return Err(VaultError::Internal(format!( + "failed to update identity: {e}" + ))); + } let batch = ops::batch_vault_operation( db, @@ -313,8 +327,15 @@ impl VaultBackend for VaultBackendImpl { .await; let migration_done = json!({"vault_migration_status": "complete"}); - db.update_entity("_identities", canonical_id, migration_done) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_done) + .await + { + warn!("admin_enable: failed to mark identity {canonical_id} migration_done: {e}"); + return Err(VaultError::Internal(format!( + "failed to finalize identity: {e}" + ))); + } let mut body = json!({"status": "enabled", "records_encrypted": batch.succeeded}); if batch.failed > 0 || !batch.entities_skipped.is_empty() { @@ -338,8 +359,15 @@ impl VaultBackend for VaultBackendImpl { Box::pin(async move { self.check_rate_limit(canonical_id)?; - let Some(identity) = db.read_entity("_identities", canonical_id).await else { - return Err(VaultError::NotFound("identity not found".into())); + let identity = match db.read_entity("_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => return Err(VaultError::NotFound("identity not found".into())), + Err(e) => { + warn!("admin_unlock: failed to read identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to read identity: {e}" + ))); + } }; if !identity .get("vault_enabled") @@ -421,8 +449,15 @@ impl VaultBackend for VaultBackendImpl { Box::pin(async move { self.check_rate_limit(canonical_id)?; - let Some(identity) = db.read_entity("_identities", canonical_id).await else { - return Err(VaultError::NotFound("identity not found".into())); + let identity = match db.read_entity("_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => return Err(VaultError::NotFound("identity not found".into())), + Err(e) => { + warn!("admin_disable: failed to read identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to read identity: {e}" + ))); + } }; if !identity .get("vault_enabled") @@ -454,8 +489,15 @@ impl VaultBackend for VaultBackendImpl { "vault_migration_status": "pending", "vault_migration_mode": "decrypt", }); - db.update_entity("_identities", canonical_id, migration_start) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_start) + .await + { + warn!("admin_disable: failed to mark identity {canonical_id} migration_start: {e}"); + return Err(VaultError::Internal(format!( + "failed to update identity: {e}" + ))); + } let batch = ops::batch_vault_operation( db, @@ -473,8 +515,15 @@ impl VaultBackend for VaultBackendImpl { "vault_migration_status": "complete", "vault_migration_mode": null, }); - db.update_entity("_identities", canonical_id, identity_update) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, identity_update) + .await + { + warn!("admin_disable: failed to finalize identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to finalize identity: {e}" + ))); + } let mut body = json!({"status": "disabled", "records_decrypted": batch.succeeded}); if batch.failed > 0 || !batch.entities_skipped.is_empty() { @@ -502,8 +551,15 @@ impl VaultBackend for VaultBackendImpl { } self.check_rate_limit(canonical_id)?; - let Some(identity) = db.read_entity("_identities", canonical_id).await else { - return Err(VaultError::NotFound("identity not found".into())); + let identity = match db.read_entity("_identities", canonical_id).await { + Ok(Some(v)) => v, + Ok(None) => return Err(VaultError::NotFound("identity not found".into())), + Err(e) => { + warn!("admin_change: failed to read identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to read identity: {e}" + ))); + } }; if !identity .get("vault_enabled") @@ -549,8 +605,15 @@ impl VaultBackend for VaultBackendImpl { "vault_old_check": check_token, "vault_old_salt": old_salt_b64_encoded, }); - db.update_entity("_identities", canonical_id, migration_start) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_start) + .await + { + warn!("admin_change: failed to mark identity {canonical_id} migration_start: {e}"); + return Err(VaultError::Internal(format!( + "failed to update identity: {e}" + ))); + } let batch = ops::batch_vault_re_encrypt(db, ownership, canonical_id, &old_crypto, &new_crypto) @@ -562,8 +625,15 @@ impl VaultBackend for VaultBackendImpl { "vault_old_check": null, "vault_old_salt": null, }); - db.update_entity("_identities", canonical_id, migration_done) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_done) + .await + { + warn!("admin_change: failed to finalize identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to finalize identity: {e}" + ))); + } let mut body = json!({"status": "changed", "records_re_encrypted": batch.succeeded}); if batch.failed > 0 || !batch.entities_skipped.is_empty() { @@ -583,7 +653,15 @@ impl VaultBackend for VaultBackendImpl { canonical_id: &'a str, ) -> VaultFuture<'a, VaultResult> { Box::pin(async move { - let identity = db.read_entity("_identities", canonical_id).await; + let identity = match db.read_entity("_identities", canonical_id).await { + Ok(opt) => opt, + Err(e) => { + warn!("admin_status: failed to read identity {canonical_id}: {e}"); + return Err(VaultError::Internal(format!( + "failed to read identity: {e}" + ))); + } + }; let vault_enabled = identity .as_ref() .and_then(|i| i.get("vault_enabled")) diff --git a/crates/mqdb-vault/src/ops.rs b/crates/mqdb-vault/src/ops.rs index 1556e78..5c726e7 100644 --- a/crates/mqdb-vault/src/ops.rs +++ b/crates/mqdb-vault/src/ops.rs @@ -69,10 +69,12 @@ pub async fn batch_vault_operation( VaultMode::Encrypt => crypto.encrypt_record(entity, &id, &mut data, &skip), VaultMode::Decrypt => crypto.decrypt_record(entity, &id, &mut data, &skip), } - if db.update_entity(entity, &id, data).await { - result.succeeded += 1; - } else { - result.failed += 1; + match db.update_entity(entity, &id, data).await { + Ok(_) => result.succeeded += 1, + Err(e) => { + warn!("vault migration update failed for {entity}/{id}: {e}"); + result.failed += 1; + } } } } @@ -105,10 +107,12 @@ pub async fn batch_vault_re_encrypt( let skip: Vec<&str> = vec![owner_field.as_str()]; old_crypto.decrypt_record(entity, &id, &mut data, &skip); new_crypto.encrypt_record(entity, &id, &mut data, &skip); - if db.update_entity(entity, &id, data).await { - result.succeeded += 1; - } else { - result.failed += 1; + match db.update_entity(entity, &id, data).await { + Ok(_) => result.succeeded += 1, + Err(e) => { + warn!("vault migration update failed for {entity}/{id}: {e}"); + result.failed += 1; + } } } } @@ -162,8 +166,12 @@ pub async fn resume_pending_migration( "vault_old_check": null, "vault_old_salt": null, }); - db.update_entity("_identities", canonical_id, migration_done) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, migration_done) + .await + { + warn!("vault migration: failed to mark identity {canonical_id} migration_done: {e}"); + } if mode == "decrypt" { let disable_vault = json!({ @@ -171,8 +179,12 @@ pub async fn resume_pending_migration( "vault_salt": null, "vault_check": null, }); - db.update_entity("_identities", canonical_id, disable_vault) - .await; + if let Err(e) = db + .update_entity("_identities", canonical_id, disable_vault) + .await + { + warn!("vault migration: failed to disable vault on identity {canonical_id}: {e}"); + } vault_key_store.remove(canonical_id); } From 31608a24879a3f934f61664a4f3a606d57f1767d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Wed, 20 May 2026 19:07:25 -0700 Subject: [PATCH 2/2] propagate db errors through create and list helpers --- crates/mqdb-agent/src/agent/handlers.rs | 53 +++++++------ crates/mqdb-agent/src/db_helpers.rs | 101 +++++++++++++++++++----- crates/mqdb-agent/src/vault_backend.rs | 18 +++-- crates/mqdb-vault/src/ops.rs | 20 +++-- 4 files changed, 137 insertions(+), 55 deletions(-) diff --git a/crates/mqdb-agent/src/agent/handlers.rs b/crates/mqdb-agent/src/agent/handlers.rs index 392aab1..9739616 100644 --- a/crates/mqdb-agent/src/agent/handlers.rs +++ b/crates/mqdb-agent/src/agent/handlers.rs @@ -1269,33 +1269,38 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu email, ); - if let Some(challenges) = crate::db_helpers::list_entities_db( + match crate::db_helpers::list_entities_db( ctx.db, "_verification_challenges", &format!("target_hash={target_hash}"), ) .await { - for challenge in challenges { - let status = challenge - .get("status") - .and_then(|v| v.as_str()) - .unwrap_or(""); - if status != "pending" && status != "delivered" { - continue; - } - if let Some(id) = challenge.get("id").and_then(|v| v.as_str()) - && let Err(e) = crate::db_helpers::update_entity_db( - ctx.db, - "_verification_challenges", - id, - &json!({"status": "expired"}), - ) - .await - { - warn!("password reset start: failed to expire stale challenge {id}: {e}"); + Ok(challenges) => { + for challenge in challenges { + let status = challenge + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if status != "pending" && status != "delivered" { + continue; + } + if let Some(id) = challenge.get("id").and_then(|v| v.as_str()) + && let Err(e) = crate::db_helpers::update_entity_db( + ctx.db, + "_verification_challenges", + id, + &json!({"status": "expired"}), + ) + .await + { + warn!("password reset start: failed to expire stale challenge {id}: {e}"); + } } } + Err(e) => { + warn!("password reset start: failed to list existing challenges: {e}"); + } } let challenge_id = crate::http::challenge_utils::uuid_v4(); @@ -1321,9 +1326,11 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu "expires_at": expires_at.to_string(), }); - if !crate::db_helpers::create_entity_db(ctx.db, "_verification_challenges", &challenge_data) - .await + if let Err(e) = + crate::db_helpers::create_entity_db(ctx.db, "_verification_challenges", &challenge_data) + .await { + error!("password reset start: failed to create reset challenge for {canonical_id}: {e}"); return Response::error( mqdb_core::ErrorCode::Internal, "failed to create reset challenge", @@ -1552,7 +1559,7 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val error!("password reset submit: failed to update credentials for {canonical_id}: {e}"); return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password"); } - } else if !crate::db_helpers::create_entity_db( + } else if let Err(e) = crate::db_helpers::create_entity_db( ctx.db, "_credentials", &json!({ @@ -1563,7 +1570,7 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val ) .await { - error!("password reset submit: failed to create credentials for {canonical_id}"); + error!("password reset submit: failed to create credentials for {canonical_id}: {e}"); return Response::error( mqdb_core::ErrorCode::Internal, "failed to create credentials", diff --git a/crates/mqdb-agent/src/db_helpers.rs b/crates/mqdb-agent/src/db_helpers.rs index 913961c..3347171 100644 --- a/crates/mqdb-agent/src/db_helpers.rs +++ b/crates/mqdb-agent/src/db_helpers.rs @@ -12,11 +12,12 @@ use ring::rand::{SecureRandom, SystemRandom}; use serde_json::{Value, json}; use std::sync::Arc; -pub async fn create_entity_db(db: &Database, entity: &str, data: &Value) -> bool { +/// # Errors +/// Returns any storage, validation, or constraint error from the underlying create. +pub async fn create_entity_db(db: &Database, entity: &str, data: &Value) -> Result { let scope = ScopeConfig::default(); db.create(entity.to_string(), data.clone(), None, None, None, &scope) .await - .is_ok() } /// # Errors @@ -56,7 +57,13 @@ pub async fn update_entity_db( .await } -pub async fn list_entities_db(db: &Database, entity: &str, filter: &str) -> Option> { +/// # Errors +/// Returns any storage error from the underlying list. An empty result is `Ok(vec![])`, not an error. +pub async fn list_entities_db( + db: &Database, + entity: &str, + filter: &str, +) -> Result, Error> { let filters = if let Some((field, value)) = filter.split_once('=') { vec![Filter::new( field.to_string(), @@ -68,7 +75,6 @@ pub async fn list_entities_db(db: &Database, entity: &str, filter: &str) -> Opti }; db.list(entity.to_string(), filters, vec![], None, vec![], None) .await - .ok() } fn resp_topic() -> String { @@ -125,17 +131,39 @@ async fn mqtt_rr( result.ok()?.ok() } -pub async fn create_entity_mqtt(client: &MqttClient, entity: &str, data: &Value) -> bool { +/// # Errors +/// Returns `Error::Internal` when the MQTT round-trip fails or the broker returns a non-ok status. +pub async fn create_entity_mqtt( + client: &MqttClient, + entity: &str, + data: &Value, +) -> Result { let topic = format!("$DB/{entity}/create"); let payload = serde_json::to_vec(data).unwrap_or_default(); let Some(resp) = mqtt_rr(client, &topic, payload, std::time::Duration::from_secs(5)).await else { - return false; + return Err(Error::Internal(format!( + "mqtt create failed: no response for {entity}" + ))); }; - serde_json::from_slice::(&resp) - .ok() - .and_then(|v| v.get("status").and_then(|s| s.as_str()).map(|s| s == "ok")) - .unwrap_or(false) + let response: Value = serde_json::from_slice(&resp) + .map_err(|e| Error::Internal(format!("mqtt create response decode failed: {e}")))?; + let status = response + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if status == "ok" { + Ok(response.get("data").cloned().unwrap_or(Value::Null)) + } else { + let message = response + .get("error") + .and_then(|e| e.get("message")) + .and_then(|v| v.as_str()) + .unwrap_or("mqtt create returned non-ok status"); + Err(Error::Internal(format!( + "mqtt create failed for {entity}: {message}" + ))) + } } /// # Errors @@ -229,11 +257,13 @@ pub async fn update_entity_mqtt( } } +/// # Errors +/// Returns `Error::Internal` when the MQTT round-trip fails, the response is undecodable, or the broker returns a non-ok status. pub async fn list_entities_mqtt( client: &MqttClient, entity: &str, filter: &str, -) -> Option> { +) -> Result, Error> { let topic = format!("$DB/{entity}/list"); let list_payload = if let Some((field, value)) = filter.split_once('=') { serde_json::to_vec(&json!({ @@ -243,15 +273,40 @@ pub async fn list_entities_mqtt( } else { vec![] }; - let payload = mqtt_rr( + let Some(payload) = mqtt_rr( client, &topic, list_payload, std::time::Duration::from_secs(10), ) - .await?; - let response: Value = serde_json::from_slice(&payload).ok()?; - response.get("data").and_then(|v| v.as_array()).cloned() + .await + else { + return Err(Error::Internal(format!( + "mqtt list failed: no response for {entity}" + ))); + }; + let response: Value = serde_json::from_slice(&payload) + .map_err(|e| Error::Internal(format!("mqtt list response decode failed: {e}")))?; + let status = response + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or(""); + if status == "ok" { + Ok(response + .get("data") + .and_then(|v| v.as_array()) + .cloned() + .unwrap_or_default()) + } else { + let message = response + .get("error") + .and_then(|e| e.get("message")) + .and_then(|v| v.as_str()) + .unwrap_or("mqtt list returned non-ok status"); + Err(Error::Internal(format!( + "mqtt list failed for {entity}: {message}" + ))) + } } impl DbAccess for Database { @@ -274,10 +329,14 @@ impl DbAccess for Database { &'a self, entity: &'a str, filter: &'a str, - ) -> VaultFuture<'a, Option>> { + ) -> VaultFuture<'a, Result, Error>> { Box::pin(list_entities_db(self, entity, filter)) } - fn create_entity<'a>(&'a self, entity: &'a str, data: Value) -> VaultFuture<'a, bool> { + fn create_entity<'a>( + &'a self, + entity: &'a str, + data: Value, + ) -> VaultFuture<'a, Result> { Box::pin(async move { create_entity_db(self, entity, &data).await }) } } @@ -313,10 +372,14 @@ impl DbAccess for MqttDbAccess { &'a self, entity: &'a str, filter: &'a str, - ) -> VaultFuture<'a, Option>> { + ) -> VaultFuture<'a, Result, Error>> { Box::pin(async move { list_entities_mqtt(&self.client, entity, filter).await }) } - fn create_entity<'a>(&'a self, entity: &'a str, data: Value) -> VaultFuture<'a, bool> { + fn create_entity<'a>( + &'a self, + entity: &'a str, + data: Value, + ) -> VaultFuture<'a, Result> { Box::pin(async move { create_entity_mqtt(&self.client, entity, &data).await }) } } diff --git a/crates/mqdb-agent/src/vault_backend.rs b/crates/mqdb-agent/src/vault_backend.rs index db371df..ce33637 100644 --- a/crates/mqdb-agent/src/vault_backend.rs +++ b/crates/mqdb-agent/src/vault_backend.rs @@ -59,13 +59,13 @@ pub trait DbAccess: Send + Sync { &'a self, entity: &'a str, filter: &'a str, - ) -> VaultFuture<'a, Option>>; + ) -> VaultFuture<'a, Result, mqdb_core::error::Error>>; fn create_entity<'a>( &'a self, entity: &'a str, data: serde_json::Value, - ) -> VaultFuture<'a, bool>; + ) -> VaultFuture<'a, Result>; } pub trait VaultBackend: Send + Sync { @@ -168,15 +168,19 @@ impl DbAccess for NoopDbAccess { &'a self, _entity: &'a str, _filter: &'a str, - ) -> VaultFuture<'a, Option>> { - Box::pin(async { None }) + ) -> VaultFuture<'a, Result, mqdb_core::error::Error>> { + Box::pin(async { Ok(Vec::new()) }) } fn create_entity<'a>( &'a self, - _entity: &'a str, + entity: &'a str, _data: serde_json::Value, - ) -> VaultFuture<'a, bool> { - Box::pin(async { false }) + ) -> VaultFuture<'a, Result> { + Box::pin(async move { + Err(mqdb_core::error::Error::Internal(format!( + "NoopDbAccess: create_entity not supported for {entity}" + ))) + }) } } diff --git a/crates/mqdb-vault/src/ops.rs b/crates/mqdb-vault/src/ops.rs index 5c726e7..98e26bf 100644 --- a/crates/mqdb-vault/src/ops.rs +++ b/crates/mqdb-vault/src/ops.rs @@ -55,9 +55,13 @@ pub async fn batch_vault_operation( }; for (entity, owner_field) in &ownership.entity_owner_fields { let filter = format!("{owner_field}={canonical_id}"); - let Some(records) = db.list_entities(entity, &filter).await else { - result.entities_skipped.push(entity.clone()); - continue; + let records = match db.list_entities(entity, &filter).await { + Ok(records) => records, + Err(e) => { + warn!("vault migration: failed to list {entity} for {canonical_id}: {e}"); + result.entities_skipped.push(entity.clone()); + continue; + } }; for record in records { let Some(id) = record.get("id").and_then(|v| v.as_str()).map(String::from) else { @@ -95,9 +99,13 @@ pub async fn batch_vault_re_encrypt( }; for (entity, owner_field) in &ownership.entity_owner_fields { let filter = format!("{owner_field}={canonical_id}"); - let Some(records) = db.list_entities(entity, &filter).await else { - result.entities_skipped.push(entity.clone()); - continue; + let records = match db.list_entities(entity, &filter).await { + Ok(records) => records, + Err(e) => { + warn!("vault re-encrypt: failed to list {entity} for {canonical_id}: {e}"); + result.entities_skipped.push(entity.clone()); + continue; + } }; for record in records { let Some(id) = record.get("id").and_then(|v| v.as_str()).map(String::from) else {