Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.

Each entry lists the date and the crate versions that were released.

## 2026-05-23 — mqdb-agent 0.8.3

### Fixed

- MQTT password change and reset (`$DB/_auth/password/change`, `$DB/_auth/password/reset/submit`) updated `_credentials` and returned success without invalidating any HTTP sessions for the same user, leaving every cookie-backed session live until its 24h TTL — the HTTP-only fix in 0.8.2 (#68) only closed the HTTP scope of #37. `SessionStore`, `JtiRevocationStore`, and the existing HTTP-session-invalidation step now reach both MQTT handlers via Option-wrapped `Arc` references threaded through `MqdbAgent` → `MessageContext` → `AdminContext` (Option because the cluster-agent code path does not run an HTTP server). After the MQTT credential write succeeds, `invalidate_http_sessions` calls `destroy_others_by_canonical_id(canonical_id, None)` (no live caller session at MQTT request time) and revokes the returned JTIs via the new `JtiRevocationStore::revoke_many`.
- Cleanup along the way: `Session`/`NewSession`/`SessionRef` now carry the `jti` directly (captured when the session's JWT is minted), so `destroy_others_by_canonical_id` returns JTIs rather than JWTs and `handle_logout` no longer decodes its own session's JWT to find the JTI. `mint_callback_jwt` returns `(jwt, jti)` so all 3 callers (callback, register, login) pass the JTI through to `SessionStore::create`; the dev-login session keeps an empty JTI and is filtered out of revocation results. `verify_jwt_ignore_expiry` is no longer called from the password-change/logout paths (only the refresh path still needs it). `HttpServerConfig` now owns the `Arc<SessionStore>` and `Arc<JtiRevocationStore>` so the same instances back both the HTTP server and the MQTT handler task — closing the architectural gap noted as the reason for partial scope in #68.
- Code-review nit from #68 addressed: `JtiRevocationStore::revoke` now `warn!`s when the `MAX_REVOKED_JTIS` cap is hit and the JTI is dropped (was silent).
- Test coverage: 2 new unit tests in `session_store.rs` (`destroy_others_skips_empty_jti_sessions`, `revoke_many_revokes_all_jtis`); the existing 3 destroy-others tests rewritten around JTIs instead of JWTs to match the new return type.

## 2026-05-23 — mqdb-agent 0.8.2

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ A successful `POST /auth/password/change` keeps the caller's session and destroy

### Password Change & Reset MQTT API

Password change and reset are also available over MQTT 5.0 request-response for JWT-authenticated users. The MQTT path currently updates `_credentials` but does not invalidate HTTP sessions for the same user — tracked in issue #69.
Password change and reset are also available over MQTT 5.0 request-response for JWT-authenticated users. The MQTT path destroys every HTTP session for the affected user and revokes their JTIs, matching the HTTP-path behavior.

| Topic | Payload | Description |
|-------|---------|-------------|
Expand Down
2 changes: 1 addition & 1 deletion crates/mqdb-agent/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqdb-agent"
version = "0.8.2"
version = "0.8.3"
edition.workspace = true
license = "Apache-2.0"
authors.workspace = true
Expand Down
25 changes: 25 additions & 0 deletions crates/mqdb-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub(super) struct MessageContext<'a> {
pub auth_rate_limiter: &'a RateLimiter,
#[cfg(feature = "http-api")]
pub identity_crypto: Option<&'a Arc<crate::http::IdentityCrypto>>,
#[cfg(feature = "http-api")]
pub session_store: Option<&'a Arc<crate::http::SessionStore>>,
#[cfg(feature = "http-api")]
pub jti_revocation: Option<&'a Arc<crate::http::JtiRevocationStore>>,
}

#[allow(clippy::too_many_lines)]
Expand Down Expand Up @@ -90,6 +94,10 @@ pub(super) async fn handle_message(ctx: &MessageContext<'_>, message: Message) {
auth_rate_limiter: ctx.auth_rate_limiter,
#[cfg(feature = "http-api")]
identity_crypto: ctx.identity_crypto,
#[cfg(feature = "http-api")]
session_store: ctx.session_store,
#[cfg(feature = "http-api")]
jti_revocation: ctx.jti_revocation,
};
handle_admin_operation(&admin_ctx, admin_op).await;
return;
Expand Down Expand Up @@ -277,6 +285,10 @@ struct AdminContext<'a> {
auth_rate_limiter: &'a RateLimiter,
#[cfg(feature = "http-api")]
identity_crypto: Option<&'a Arc<crate::http::IdentityCrypto>>,
#[cfg(feature = "http-api")]
session_store: Option<&'a Arc<crate::http::SessionStore>>,
#[cfg(feature = "http-api")]
jti_revocation: Option<&'a Arc<crate::http::JtiRevocationStore>>,
}

#[allow(clippy::too_many_lines)]
Expand Down Expand Up @@ -1105,6 +1117,15 @@ async fn dispatch_vault_admin_mqtt(
}
}

#[cfg(feature = "http-api")]
fn invalidate_http_sessions(ctx: &AdminContext<'_>, canonical_id: &str) {
let (Some(sessions), Some(jtis)) = (ctx.session_store, ctx.jti_revocation) else {
return;
};
let revoked = sessions.destroy_others_by_canonical_id(canonical_id, None);
jtis.revoke_many(&revoked);
}

#[cfg(feature = "http-api")]
async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) -> Response {
use serde_json::json;
Expand Down Expand Up @@ -1209,6 +1230,8 @@ async fn handle_password_change_mqtt(ctx: &AdminContext<'_>, payload: &Value) ->
return Response::error(mqdb_core::ErrorCode::Internal, "failed to update password");
}

invalidate_http_sessions(ctx, canonical_id);

Response::ok(json!({"status": "password changed"}))
}

Expand Down Expand Up @@ -1592,5 +1615,7 @@ async fn handle_password_reset_submit_mqtt(ctx: &AdminContext<'_>, payload: &Val
);
}

invalidate_http_sessions(ctx, canonical_id);

Response::ok(json!({"status": "password_reset"}))
}
10 changes: 10 additions & 0 deletions crates/mqdb-agent/src/agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub struct MqdbAgent {
pub(super) auth_rate_limiter: Arc<RateLimiter>,
#[cfg(feature = "http-api")]
pub(super) identity_crypto: Option<Arc<crate::http::IdentityCrypto>>,
#[cfg(feature = "http-api")]
pub(super) session_store: Option<Arc<crate::http::SessionStore>>,
#[cfg(feature = "http-api")]
pub(super) jti_revocation: Option<Arc<crate::http::JtiRevocationStore>>,
pub(super) license_expires_at: Option<u64>,
#[cfg(feature = "opentelemetry")]
pub(super) telemetry_config: Option<mqtt5::telemetry::TelemetryConfig>,
Expand Down Expand Up @@ -75,6 +79,10 @@ impl MqdbAgent {
auth_rate_limiter: Arc::new(RateLimiter::new(10)),
#[cfg(feature = "http-api")]
identity_crypto: None,
#[cfg(feature = "http-api")]
session_store: None,
#[cfg(feature = "http-api")]
jti_revocation: None,
license_expires_at: None,
#[cfg(feature = "opentelemetry")]
telemetry_config: None,
Expand Down Expand Up @@ -225,6 +233,8 @@ impl MqdbAgent {
#[allow(clippy::missing_panics_doc)]
pub fn with_http_config(mut self, config: crate::http::HttpServerConfig) -> Self {
self.identity_crypto.clone_from(&config.identity_crypto);
self.session_store = Some(Arc::clone(&config.session_store));
self.jti_revocation = Some(Arc::clone(&config.jti_revocation));
*self.http_config.lock().expect("http_config lock") = Some(config);
self
}
Expand Down
8 changes: 8 additions & 0 deletions crates/mqdb-agent/src/agent/tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ impl MqdbAgent {
let auth_rate_limiter = Arc::clone(&self.auth_rate_limiter);
#[cfg(feature = "http-api")]
let identity_crypto = self.identity_crypto.clone();
#[cfg(feature = "http-api")]
let session_store = self.session_store.clone();
#[cfg(feature = "http-api")]
let jti_revocation = self.jti_revocation.clone();

tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -131,6 +135,10 @@ impl MqdbAgent {
auth_rate_limiter: &auth_rate_limiter,
#[cfg(feature = "http-api")]
identity_crypto: identity_crypto.as_ref(),
#[cfg(feature = "http-api")]
session_store: session_store.as_ref(),
#[cfg(feature = "http-api")]
jti_revocation: jti_revocation.as_ref(),
};
handle_message(&ctx, message).await;
} else {
Expand Down
48 changes: 19 additions & 29 deletions crates/mqdb-agent/src/http/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub struct ServerState {
pub mqtt_client: Arc<MqttClient>,
pub db_access: Arc<dyn DbAccess>,
pub frontend_redirect_uri: Option<String>,
pub session_store: SessionStore,
pub session_store: Arc<SessionStore>,
pub ticket_expiry_secs: u64,
pub cookie_secure: bool,
pub cors_origin: Option<String>,
pub ticket_rate_limiter: RateLimiter,
pub login_rate_limiter: RateLimiter,
pub register_rate_limiter: RateLimiter,
pub jti_revocation: JtiRevocationStore,
pub jti_revocation: Arc<JtiRevocationStore>,
pub trust_proxy: bool,
pub identity_crypto: Option<Arc<IdentityCrypto>>,
pub ownership_config: Arc<OwnershipConfig>,
Expand Down Expand Up @@ -266,10 +266,11 @@ pub async fn handle_callback(state: &ServerState, query: &str) -> HttpResponse {
persist_oauth_tokens(state, &link_key, &canonical_id, refresh_token, &identity).await;
}

let jwt = mint_callback_jwt(state, &canonical_id, &identity);
let (jwt, jti) = mint_callback_jwt(state, &canonical_id, &identity);

let Some(session_id) = state.session_store.create(NewSession {
jwt,
jti,
canonical_id,
provider: provider.to_string(),
provider_sub: identity.provider_sub.clone(),
Expand Down Expand Up @@ -621,26 +622,27 @@ fn mint_callback_jwt(
state: &ServerState,
canonical_id: &str,
identity: &ProviderIdentity,
) -> String {
) -> (String, String) {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_or(0, |d| d.as_secs());

let jti = JtiRevocationStore::generate_jti();
let claims = json!({
"sub": canonical_id,
"iss": state.jwt_config.issuer,
"aud": state.jwt_config.audience,
"exp": now + state.jwt_config.expiry_secs,
"iat": now,
"jti": JtiRevocationStore::generate_jti(),
"jti": jti,
"email": identity.email,
"name": identity.name,
"picture": identity.picture,
"provider": identity.provider,
"provider_sub": identity.provider_sub,
});

sign_jwt(&claims, &state.jwt_config)
(sign_jwt(&claims, &state.jwt_config), jti)
}

pub async fn handle_refresh(state: &ServerState, body: &[u8]) -> HttpResponse {
Expand Down Expand Up @@ -957,10 +959,9 @@ pub fn handle_logout(state: &ServerState, headers: &HeaderMap) -> HttpResponse {

if let Some(session_id) = parse_session_id(cookie_header) {
if let Some(session) = state.session_store.get(session_id)
&& let Some(payload) = verify_jwt_ignore_expiry(&session.jwt, &state.jwt_config)
&& let Some(jti) = payload.get("jti").and_then(|v| v.as_str())
&& !session.jti.is_empty()
{
state.jti_revocation.revoke(jti);
state.jti_revocation.revoke(&session.jti);
}
state.session_store.destroy(session_id);
}
Expand Down Expand Up @@ -1740,9 +1741,10 @@ pub async fn handle_register(state: &ServerState, body: &[u8], client_ip: &str)
return json_response_with_credentials(500, &json!({"error": "registration failed"}), cors);
}

let jwt = mint_callback_jwt(state, &canonical_id, &identity);
let (jwt, jti) = mint_callback_jwt(state, &canonical_id, &identity);
let Some(session_id) = state.session_store.create(NewSession {
jwt,
jti,
canonical_id: canonical_id.clone(),
provider: "email".to_string(),
provider_sub,
Expand Down Expand Up @@ -1867,10 +1869,11 @@ pub async fn handle_login(state: &ServerState, body: &[u8], client_ip: &str) ->
picture: picture.clone(),
email_verified: verified,
};
let jwt = mint_callback_jwt(state, canonical_id, &identity);
let (jwt, jti) = mint_callback_jwt(state, canonical_id, &identity);

let Some(session_id) = state.session_store.create(NewSession {
jwt,
jti,
canonical_id: canonical_id.to_string(),
provider: "email".to_string(),
provider_sub: canonical_id.to_string(),
Expand Down Expand Up @@ -2298,20 +2301,6 @@ pub async fn handle_verify_status(state: &ServerState, headers: &HeaderMap) -> H
)
}

fn revoke_jwt_jtis(state: &ServerState, jwts: &[String]) {
for jwt in jwts {
let Some(payload) = verify_jwt_ignore_expiry(jwt, &state.jwt_config) else {
warn!("failed to decode destroyed session JWT; JTI not revoked");
continue;
};
let Some(jti) = payload.get("jti").and_then(|v| v.as_str()) else {
warn!("destroyed session JWT missing jti claim; not revoked");
continue;
};
state.jti_revocation.revoke(jti);
}
}

async fn verify_stored_password(
state: &ServerState,
canonical_id: &str,
Expand Down Expand Up @@ -2442,10 +2431,10 @@ pub async fn handle_password_change(
);
}

let revoked_jwts = state
let revoked_jtis = state
.session_store
.destroy_others_by_canonical_id(&canonical_id, Some(current_session_id));
revoke_jwt_jtis(state, &revoked_jwts);
state.jti_revocation.revoke_many(&revoked_jtis);

json_response_with_credentials(200, &json!({"status": "password changed"}), cors)
}
Expand Down Expand Up @@ -2798,10 +2787,10 @@ pub async fn handle_password_reset_submit(
)
.await;

let revoked_jwts = state
let revoked_jtis = state
.session_store
.destroy_others_by_canonical_id(canonical_id, None);
revoke_jwt_jtis(state, &revoked_jwts);
state.jti_revocation.revoke_many(&revoked_jtis);

json_response_with_credentials(200, &json!({"status": "password_reset"}), cors)
}
Expand Down Expand Up @@ -2845,6 +2834,7 @@ pub async fn handle_dev_login(state: &ServerState, body: &[u8]) -> HttpResponse

let Some(session_id) = state.session_store.create(NewSession {
jwt: String::new(),
jti: String::new(),
canonical_id: canonical_id.clone(),
provider: "dev".to_string(),
provider_sub: "dev-local".to_string(),
Expand Down
2 changes: 1 addition & 1 deletion crates/mqdb-agent/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ pub use jwt_signer::{JwtSigningAlgorithm, JwtSigningConfig};
pub use providers::google::GoogleProvider;
pub use providers::{Provider, ProviderConfig, ProviderRegistry};
pub use server::{HttpServer, HttpServerConfig};
pub use session_store::SessionStore;
pub use session_store::{JtiRevocationStore, SessionStore};
6 changes: 4 additions & 2 deletions crates/mqdb-agent/src/http/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ pub struct HttpServerConfig {
pub vault_backend: Option<Arc<dyn VaultBackend>>,
pub auth_rate_limit: u32,
pub email_auth: bool,
pub session_store: Arc<SessionStore>,
pub jti_revocation: Arc<JtiRevocationStore>,
}

pub struct HttpServer {
Expand Down Expand Up @@ -79,14 +81,14 @@ impl HttpServer {
mqtt_client: self.mqtt_client,
db_access: self.config.db_access,
frontend_redirect_uri: self.config.frontend_redirect_uri,
session_store: SessionStore::new(),
session_store: self.config.session_store,
ticket_expiry_secs: self.config.ticket_expiry_secs,
cookie_secure: self.config.cookie_secure,
cors_origin: self.config.cors_origin,
ticket_rate_limiter: RateLimiter::new(self.config.ticket_rate_limit),
login_rate_limiter: RateLimiter::new(if no_rate_limit { u32::MAX } else { 10 }),
register_rate_limiter: RateLimiter::new(if no_rate_limit { u32::MAX } else { 5 }),
jti_revocation: JtiRevocationStore::new(),
jti_revocation: self.config.jti_revocation,
trust_proxy: self.config.trust_proxy,
identity_crypto: self.config.identity_crypto,
ownership_config: self.config.ownership_config,
Expand Down
Loading
Loading