Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 142 additions & 66 deletions crates/mqdb-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1138,11 +1138,17 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) ->
);
}

let Some(identity) =
crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await
else {
return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found");
};
let identity =
match crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await {
Ok(Some(v)) => v,
Ok(None) => {
return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found");
}
Err(e) => {
warn!("password change: failed to read identity {canonical_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to read identity");
}
};

let email_verified = identity
.get("email_verified")
Expand All @@ -1155,12 +1161,18 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) ->
);
}

let Some(cred) = crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await
else {
return Response::error(
mqdb_core::ErrorCode::NotFound,
"no credentials found (OAuth-only account)",
);
let cred = match crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await {
Ok(Some(v)) => v,
Ok(None) => {
return Response::error(
mqdb_core::ErrorCode::NotFound,
"no credentials found (OAuth-only account)",
);
}
Err(e) => {
warn!("password change: failed to read credentials {canonical_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to read credentials");
}
};

let Some(stored_hash) = cred.get("password_hash").and_then(|v| v.as_str()) else {
Expand All @@ -1185,13 +1197,17 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) ->
}
};

crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_credentials",
canonical_id,
&json!({"password_hash": new_hash}),
)
.await;
.await
{
error!("password change: failed to update credentials for {canonical_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password");
}

Response::ok(json!({"status": "password changed"}))
}
Expand All @@ -1216,11 +1232,17 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu
);
}

let Some(identity) =
crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await
else {
return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found");
};
let identity =
match crate::db_helpers::read_entity_db(ctx.db, "_identities", canonical_id).await {
Ok(Some(v)) => v,
Ok(None) => {
return Response::error(mqdb_core::ErrorCode::NotFound, "identity not found");
}
Err(e) => {
warn!("password reset start: failed to read identity {canonical_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to read identity");
}
};

let email_lower = email.to_lowercase();
let email_matches = if let Some(crypto) = ctx.identity_crypto {
Expand All @@ -1247,31 +1269,38 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu
email,
);

if let Some(challenges) = crate::db_helpers::list_entities_db(
match crate::db_helpers::list_entities_db(
ctx.db,
"_verification_challenges",
&format!("target_hash={target_hash}"),
)
.await
{
for challenge in challenges {
let status = challenge
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("");
if status != "pending" && status != "delivered" {
continue;
}
if let Some(id) = challenge.get("id").and_then(|v| v.as_str()) {
crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
id,
&json!({"status": "expired"}),
)
.await;
Ok(challenges) => {
for challenge in challenges {
let status = challenge
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("");
if status != "pending" && status != "delivered" {
continue;
}
if let Some(id) = challenge.get("id").and_then(|v| v.as_str())
&& let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
id,
&json!({"status": "expired"}),
)
.await
{
warn!("password reset start: failed to expire stale challenge {id}: {e}");
}
}
}
Err(e) => {
warn!("password reset start: failed to list existing challenges: {e}");
}
}

let challenge_id = crate::http::challenge_utils::uuid_v4();
Expand All @@ -1297,9 +1326,11 @@ async fn handle_password_reset_start_mqtt(ctx: &AdminContext<'_>, payload: &Valu
"expires_at": expires_at.to_string(),
});

if !crate::db_helpers::create_entity_db(ctx.db, "_verification_challenges", &challenge_data)
.await
if let Err(e) =
crate::db_helpers::create_entity_db(ctx.db, "_verification_challenges", &challenge_data)
.await
{
error!("password reset start: failed to create reset challenge for {canonical_id}: {e}");
return Response::error(
mqdb_core::ErrorCode::Internal,
"failed to create reset challenge",
Expand Down Expand Up @@ -1369,11 +1400,19 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
);
}

let Some(challenge) =
crate::db_helpers::read_entity_db(ctx.db, "_verification_challenges", challenge_id).await
else {
return Response::error(mqdb_core::ErrorCode::NotFound, "challenge not found");
};
let challenge =
match crate::db_helpers::read_entity_db(ctx.db, "_verification_challenges", challenge_id)
.await
{
Ok(Some(v)) => v,
Ok(None) => {
return Response::error(mqdb_core::ErrorCode::NotFound, "challenge not found");
}
Err(e) => {
warn!("password reset submit: failed to read challenge {challenge_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to read challenge");
}
};

let purpose = challenge
.get("purpose")
Expand Down Expand Up @@ -1412,13 +1451,16 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
.unwrap_or(0);
let now = crate::http::challenge_utils::now_unix();
if expires_at > 0 && now >= expires_at {
crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
challenge_id,
&json!({"status": "expired"}),
)
.await;
.await
{
warn!("password reset submit: failed to mark challenge {challenge_id} expired: {e}");
}
return Response::error(mqdb_core::ErrorCode::BadRequest, "challenge has expired");
}

Expand All @@ -1431,13 +1473,16 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
.and_then(Value::as_u64)
.unwrap_or(5);
if attempts >= max_attempts {
crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
challenge_id,
&json!({"status": "failed"}),
)
.await;
.await
{
warn!("password reset submit: failed to mark challenge {challenge_id} failed: {e}");
}
return Response::error(mqdb_core::ErrorCode::BadRequest, "too many attempts");
}

Expand All @@ -1454,23 +1499,29 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
} else {
status
};
crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
challenge_id,
&json!({"attempts": new_attempts, "status": new_status}),
)
.await;
.await
{
warn!("password reset submit: failed to record failed attempt on {challenge_id}: {e}");
}
return Response::error(mqdb_core::ErrorCode::Unauthorized, "invalid code");
}

crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_verification_challenges",
challenge_id,
&json!({"status": "verified", "attempts": new_attempts}),
)
.await;
.await
{
warn!("password reset submit: failed to mark challenge {challenge_id} verified: {e}");
}

let new_hash = match crate::http::credentials::hash_password(new_password) {
Ok(h) => h,
Expand All @@ -1486,35 +1537,60 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
.unwrap_or("");

let existing_creds =
crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await;
match crate::db_helpers::read_entity_db(ctx.db, "_credentials", canonical_id).await {
Ok(opt) => opt,
Err(e) => {
warn!("password reset submit: failed to read credentials {canonical_id}: {e}");
return Response::error(
mqdb_core::ErrorCode::Internal,
"failed to read credentials",
);
}
};
if existing_creds.is_some() {
crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_credentials",
canonical_id,
&json!({"password_hash": new_hash}),
)
.await;
} else {
crate::db_helpers::create_entity_db(
ctx.db,
"_credentials",
&json!({
"id": canonical_id,
"password_hash": new_hash,
"email_hash": target_hash,
}),
)
.await;
.await
{
error!("password reset submit: failed to update credentials for {canonical_id}: {e}");
return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password");
}
} else if let Err(e) = crate::db_helpers::create_entity_db(
ctx.db,
"_credentials",
&json!({
"id": canonical_id,
"password_hash": new_hash,
"email_hash": target_hash,
}),
)
.await
{
error!("password reset submit: failed to create credentials for {canonical_id}: {e}");
return Response::error(
mqdb_core::ErrorCode::Internal,
"failed to create credentials",
);
}

crate::db_helpers::update_entity_db(
if let Err(e) = crate::db_helpers::update_entity_db(
ctx.db,
"_identities",
canonical_id,
&json!({"email_verified": true}),
)
.await;
.await
{
error!("password reset submit: failed to mark email_verified for {canonical_id}: {e}");
return Response::error(
mqdb_core::ErrorCode::Internal,
"failed to mark email verified",
);
}

Response::ok(json!({"status": "password_reset"}))
}
Loading
Loading