From 1da9927aa120c874cb2fc8bae862756d95bd26db Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 28 Apr 2026 13:35:56 +0530 Subject: [PATCH 1/2] parallel quest smoke test --- docker-compose-test-with-kafka.yaml | 28 +++++++++++++++------------- docker-compose-test.yaml | 2 +- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/docker-compose-test-with-kafka.yaml b/docker-compose-test-with-kafka.yaml index 1dd2185bb..06f285c08 100644 --- a/docker-compose-test-with-kafka.yaml +++ b/docker-compose-test-with-kafka.yaml @@ -70,19 +70,21 @@ services: image: ghcr.io/parseablehq/quest:main platform: linux/amd64 pull_policy: always - command: [ - "load", - "http://parseable:8000", - "parseableadmin", - "parseableadmin", - "20", - "10", - "5m", - "minio:9000", - "parseable", - "supersecret", - "parseable" - ] + entrypoint: + - sh + - -c + - | + STREAM=$$(head /dev/urandom | tr -dc a-z | head -c10) && \ + ./quest.test -test.v -test.parallel=10 \ + -mode=load \ + -query-url=http://parseable:8000 \ + -stream=$$STREAM \ + -query-user=parseableadmin \ + -query-pass=parseableadmin \ + -minio-url=minio:9000 \ + -minio-user=parseable \ + -minio-pass=supersecret \ + -minio-bucket=parseable depends_on: parseable: condition: service_healthy diff --git a/docker-compose-test.yaml b/docker-compose-test.yaml index 70b2af76a..6137321e3 100644 --- a/docker-compose-test.yaml +++ b/docker-compose-test.yaml @@ -65,7 +65,7 @@ services: platform: linux/amd64 pull_policy: always command: [ - "load", + "load-parallel", "http://parseable:8000", "parseableadmin", "parseableadmin", From a65fb0d67092141c7a4ba986df6648de83727d0b Mon Sep 17 00:00:00 2001 From: Anant Vindal Date: Tue, 28 Apr 2026 15:55:49 +0530 Subject: [PATCH 2/2] fix: rbac deadlock, comment resource util --- src/handlers/http/middleware.rs | 31 ++- src/handlers/http/modal/query/querier_rbac.rs | 229 +++++++++--------- src/handlers/http/modal/query/querier_role.rs | 39 +-- src/handlers/http/modal/server.rs | 23 +- src/handlers/http/rbac.rs | 165 ++++++------- src/handlers/http/role.rs | 77 +++--- src/rbac/mod.rs | 77 ++++-- 7 files changed, 338 insertions(+), 303 deletions(-) diff --git a/src/handlers/http/middleware.rs b/src/handlers/http/middleware.rs index 73ad3fca4..46ed49713 100644 --- a/src/handlers/http/middleware.rs +++ b/src/handlers/http/middleware.rs @@ -459,16 +459,27 @@ pub async fn refresh_token( roles_to_permission(user_roles, tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)), &tenant_id, ); - } else if let Some(users) = users().get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) - && let Some(user) = users.get(&userid) - { - mut_sessions().track_new( - userid.clone(), - key.clone(), - Utc::now() + EXPIRY_DURATION, - roles_to_permission(user.roles(), tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)), - &tenant_id, - ); + } else { + // Clone user roles under USERS read lock, then drop it before + // acquiring SESSIONS write lock to preserve lock ordering. + let user_roles = if let Some(users) = + users().get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) + && let Some(user) = users.get(&userid) + { + Some(user.roles()) + } else { + None + }; + // USERS read lock dropped here + if let Some(user_roles) = user_roles { + mut_sessions().track_new( + userid.clone(), + key.clone(), + Utc::now() + EXPIRY_DURATION, + roles_to_permission(user_roles, tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)), + &tenant_id, + ); + } } } Ok(()) diff --git a/src/handlers/http/modal/query/querier_rbac.rs b/src/handlers/http/modal/query/querier_rbac.rs index 576a500b8..ce1d411ae 100644 --- a/src/handlers/http/modal/query/querier_rbac.rs +++ b/src/handlers/http/modal/query/querier_rbac.rs @@ -51,7 +51,6 @@ pub async fn post_user( let tenant_id = get_tenant_id_from_request(&req); let caller_userid = get_user_from_request(&req)?; validator::user_role_name(&username)?; - let mut metadata = get_metadata(&tenant_id).await?; let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? @@ -73,23 +72,28 @@ pub async fn post_user( if !non_existent_roles.is_empty() { return Err(RBACError::RolesDoNotExist(non_existent_roles)); } - let _guard = UPDATE_LOCK.lock().await; - if Users.contains(&username, &tenant_id) - || metadata - .users - .iter() - .any(|user| matches!(&user.ty, UserType::Native(basic) if basic.username == username)) - { - return Err(RBACError::UserExists(username)); - } - let (mut user, password) = user::User::new_basic(username.clone(), tenant_id.clone(), false); - // add user roles - user.roles.clone_from(&user_roles); - metadata.users.push(user.clone()); + let (user, password) = { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + + if Users.contains(&username, &tenant_id) + || metadata.users.iter().any( + |user| matches!(&user.ty, UserType::Native(basic) if basic.username == username), + ) + { + return Err(RBACError::UserExists(username)); + } + + let (mut user, password) = + user::User::new_basic(username.clone(), tenant_id.clone(), false); + user.roles.clone_from(&user_roles); + metadata.users.push(user.clone()); + put_metadata(&metadata, &tenant_id).await?; + (user, password) + }; - put_metadata(&metadata, &tenant_id).await?; - // let created_role = user_roles.clone(); + // Update in-memory state after releasing the lock Users.put_user(user.clone()); if let Err(e) = sync_user_creation(&req, user, &None, &tenant_id, &caller_userid).await { @@ -108,13 +112,12 @@ pub async fn delete_user( let tenant_id = get_tenant_id_from_request(&req); let caller_userid = get_user_from_request(&req)?; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let _guard = UPDATE_LOCK.lock().await; - // fail this request if the user does not exist + + // Validations before acquiring the lock if !Users.contains(&userid, &tenant_id) { return Err(RBACError::UserDoesNotExist); }; - // find username by userid, for native users, username is userid, for oauth users, we need to look up let username = if let Some(users) = users().get(tenant) && let Some(user) = users.get(&userid) { @@ -123,46 +126,51 @@ pub async fn delete_user( return Err(RBACError::UserDoesNotExist); }; - // delete from parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - metadata.users.retain(|user| user.userid() != userid); - - // also delete from user groups - let user_groups = Users.get_user_groups(&userid, &tenant_id); - let mut groups_to_update = Vec::new(); - - for user_group in user_groups { - if let Some(groups) = write_user_groups().get_mut(tenant) - && let Some(ug) = groups.get_mut(&user_group) - && let Some(users) = users().get(tenant) - && let Some(user) = users.get(&userid) - { - let userid = match &user.ty { - UserType::Native(basic) => basic.username.clone(), - UserType::OAuth(oauth) => oauth.userid.clone(), - UserType::ApiKey(api_key) => api_key.userid.clone(), - }; - ug.remove_users_by_user_ids(HashSet::from_iter([userid]))?; - groups_to_update.push(ug.clone()); - } else { - // User not found, skip or log as needed - continue; + { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + metadata.users.retain(|user| user.userid() != userid); + + // also delete from user groups + let user_groups = Users.get_user_groups(&userid, &tenant_id); + let mut groups_to_update = Vec::new(); + + for user_group in user_groups { + if let Some(groups) = write_user_groups().get_mut(tenant) + && let Some(ug) = groups.get_mut(&user_group) + && let Some(users) = users().get(tenant) + && let Some(user) = users.get(&userid) + { + let userid = match &user.ty { + UserType::Native(basic) => basic.username.clone(), + UserType::OAuth(oauth) => oauth.userid.clone(), + UserType::ApiKey(api_key) => api_key.userid.clone(), + }; + ug.remove_users_by_user_ids(HashSet::from_iter([userid]))?; + groups_to_update.push(ug.clone()); + } else { + // User not found, skip or log as needed + continue; + } + + for updated_group in &groups_to_update { + if let Some(existing) = metadata + .user_groups + .iter_mut() + .find(|ug| ug.name == updated_group.name) + { + existing.clone_from(updated_group); + } else { + metadata.user_groups.push(updated_group.clone()); + } + } + + put_metadata(&metadata, &tenant_id).await?; } } - // For each updated group, replace in place if found; otherwise push - for updated_group in &groups_to_update { - if let Some(existing) = metadata - .user_groups - .iter_mut() - .find(|ug| ug.name == updated_group.name) - { - existing.clone_from(updated_group); - } else { - metadata.user_groups.push(updated_group.clone()); - } - } - put_metadata(&metadata, &tenant_id).await?; + // Update in-memory state after releasing the lock + Users.delete_user(&userid, &tenant_id); if let Err(e) = sync_user_deletion_with_ingestors(&req, &userid, &tenant_id, &caller_userid).await @@ -170,8 +178,6 @@ pub async fn delete_user( tracing::error!("{e}"); }; - // update in mem table - Users.delete_user(&userid, &tenant_id); Ok(HttpResponse::Ok().json(format!("deleted user: {username}"))) } @@ -214,21 +220,21 @@ pub async fn add_roles_to_user( return Err(RBACError::RolesDoNotExist(non_existent_roles)); } - // update parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - if let Some(user) = metadata - .users - .iter_mut() - .find(|user| user.userid() == userid) { - user.roles.extend(roles_to_add.clone()); - } else { - // should be unreachable given state is always consistent - return Err(RBACError::UserDoesNotExist); + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.userid() == userid) + { + user.roles.extend(roles_to_add.clone()); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; } - put_metadata(&metadata, &tenant_id).await?; - // sync to other nodes before updating in-memory (which invalidates sessions) if let Err(e) = sync_users_with_roles_with_ingestors( &req, @@ -243,7 +249,7 @@ pub async fn add_roles_to_user( tracing::error!("Failed to sync role addition to cluster nodes: {e}"); } - // update in mem table (this invalidates the user's session) + // Update in-memory state after releasing the lock Users.add_roles(&userid.clone(), roles_to_add.clone(), &tenant_id); Ok(format!("Roles updated successfully for {username}")) @@ -260,13 +266,11 @@ pub async fn remove_roles_from_user( let tenant_id = get_tenant_id_from_request(&req); let caller_userid = get_user_from_request(&req)?; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - let _guard = UPDATE_LOCK.lock().await; if !Users.contains(&userid, &tenant_id) { return Err(RBACError::UserDoesNotExist); }; - // find username by userid, for native users, username is userid, for oauth users, we need to look up let username = if let Some(users) = users().get(tenant) && let Some(user) = users.get(&userid) { @@ -276,8 +280,6 @@ pub async fn remove_roles_from_user( }; let mut non_existent_roles = Vec::new(); - - // check if the role exists roles_to_remove.iter().for_each(|r| { if let Some(tenant_roles) = roles().get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) && tenant_roles.get(r).is_none() @@ -290,7 +292,6 @@ pub async fn remove_roles_from_user( return Err(RBACError::RolesDoNotExist(non_existent_roles)); } - // check for role not present with user let user_roles: HashSet = HashSet::from_iter(Users.get_role(&userid, &tenant_id)); let roles_not_with_user: HashSet = HashSet::from_iter(roles_to_remove.difference(&user_roles).cloned()); @@ -300,7 +301,6 @@ pub async fn remove_roles_from_user( ))); } - // In multi-tenant, prevent removing the admin role from the tenant owner if PARSEABLE.options.is_multi_tenant() && roles_to_remove.contains("admin") && let Some(tid) = tenant_id.as_deref() @@ -317,23 +317,23 @@ pub async fn remove_roles_from_user( } } - // update parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - if let Some(user) = metadata - .users - .iter_mut() - .find(|user| user.userid() == userid) { - let diff: HashSet = - HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); - user.roles = diff; - } else { - // should be unreachable given state is always consistent - return Err(RBACError::UserDoesNotExist); + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.userid() == userid) + { + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; } - put_metadata(&metadata, &tenant_id).await?; - // sync to other nodes before updating in-memory (which invalidates sessions) if let Err(e) = sync_users_with_roles_with_ingestors( &req, @@ -348,7 +348,7 @@ pub async fn remove_roles_from_user( tracing::error!("Failed to sync role removal to cluster nodes: {e}"); } - // update in mem table (this invalidates the user's session) + // Update in-memory state after releasing the lock Users.remove_roles(&userid.clone(), roles_to_remove.clone(), &tenant_id); Ok(HttpResponse::Ok().json(format!("Roles updated successfully for {username}"))) @@ -361,30 +361,31 @@ pub async fn post_gen_password( userid: web::Path, ) -> Result { let username = userid.into_inner(); - let mut new_password = String::default(); - let mut new_hash = String::default(); let tenant_id = get_tenant_id_from_request(&req); let caller_userid = get_user_from_request(&req)?; - let mut metadata = get_metadata(&tenant_id).await?; - - let _guard = UPDATE_LOCK.lock().await; - let user::PassCode { password, hash } = user::Basic::gen_new_password(); - new_password.clone_from(&password); - new_hash.clone_from(&hash); - if let Some(user) = metadata - .users - .iter_mut() - .filter_map(|user| match user.ty { - user::UserType::Native(ref mut user) => Some(user), - _ => None, - }) - .find(|user| user.username == username) - { - user.password_hash.clone_from(&hash); - } else { - return Err(RBACError::UserDoesNotExist); - } - put_metadata(&metadata, &tenant_id).await?; + + let (new_password, new_hash) = { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; + (password, hash) + }; + + // Update in-memory state after releasing the lock Users.change_password_hash(&username, &new_hash, &tenant_id); if let Err(e) = sync_password_reset_with_ingestors(req, &username, &caller_userid).await { diff --git a/src/handlers/http/modal/query/querier_role.rs b/src/handlers/http/modal/query/querier_role.rs index 321efa9b0..78943c9af 100644 --- a/src/handlers/http/modal/query/querier_role.rs +++ b/src/handlers/http/modal/query/querier_role.rs @@ -27,6 +27,7 @@ use crate::{ handlers::http::{ cluster::{sync_role_delete, sync_role_update}, modal::utils::rbac_utils::{get_metadata, put_metadata}, + rbac::UPDATE_LOCK, role::RoleError, }, parseable::DEFAULT_TENANT, @@ -74,10 +75,12 @@ pub async fn put( return Err(RoleError::SuperAdminPrivilege); } - let mut metadata = get_metadata(&tenant_id).await?; - metadata.roles.insert(name.clone(), role.clone()); - - put_metadata(&metadata, &tenant_id).await?; + { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + metadata.roles.insert(name.clone(), role.clone()); + put_metadata(&metadata, &tenant_id).await?; + } mut_roles() .entry(tenant.to_owned()) .or_default() @@ -142,26 +145,28 @@ pub async fn delete( return Err(RoleError::ProtectedRole); } - // check if the role is being used by any user or group - let mut metadata = get_metadata(&tenant_id).await?; - if metadata.users.iter().any(|user| user.roles.contains(&name)) { - return Err(RoleError::RoleInUse); - } - if metadata - .user_groups - .iter() - .any(|user_group| user_group.roles.contains(&name)) { - return Err(RoleError::RoleInUse); + let _guard = UPDATE_LOCK.lock().await; + // check if the role is being used by any user or group + let mut metadata = get_metadata(&tenant_id).await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + if metadata + .user_groups + .iter() + .any(|user_group| user_group.roles.contains(&name)) + { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + put_metadata(&metadata, &tenant_id).await?; } - metadata.roles.remove(&name); - put_metadata(&metadata, &tenant_id).await?; mut_roles() .entry(tenant.to_owned()) .or_default() .remove(&name); - // mut_roles().remove(&name); // refresh the sessions of all users using this role // for this, iterate over all user_groups and users and create a hashset of users diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 29a977c3d..0bad411fa 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -30,7 +30,6 @@ use crate::handlers::http::middleware::IntraClusterRequest; use crate::handlers::http::modal::initialize_hot_tier_metadata_on_startup; use crate::handlers::http::prism_base_path; use crate::handlers::http::query; -use crate::handlers::http::resource_check; use crate::handlers::http::targets; use crate::handlers::http::users::dashboards; use crate::handlers::http::users::filters; @@ -44,7 +43,6 @@ use crate::sync::sync_start; use actix_web::Resource; use actix_web::Scope; -use actix_web::middleware::from_fn; use actix_web::web; use actix_web::web::resource; use actix_web_prometheus::PrometheusMetrics; @@ -78,12 +76,8 @@ impl ParseableServer for Server { .service( web::scope(&base_path()) .service(Self::get_correlation_webscope()) - .service(Self::get_query_factory().wrap(from_fn( - resource_check::check_resource_utilization_middleware, - ))) - .service(Self::get_ingest_factory().wrap(from_fn( - resource_check::check_resource_utilization_middleware, - ))) + .service(Self::get_query_factory()) + .service(Self::get_ingest_factory()) .service(Self::get_liveness_factory()) .service(Self::get_readiness_factory()) .service(Self::get_about_factory()) @@ -96,9 +90,7 @@ impl ParseableServer for Server { .service(Self::get_oauth_webscope()) .service(Self::get_user_role_webscope()) .service(Self::get_roles_webscope()) - .service(Self::get_counts_webscope().wrap(from_fn( - resource_check::check_resource_utilization_middleware, - ))) + .service(Self::get_counts_webscope()) .service(Self::get_alerts_webscope()) .service(Self::get_targets_webscope()) .service(Self::get_metrics_webscope()) @@ -111,9 +103,7 @@ impl ParseableServer for Server { .service(Server::get_prism_datasets()) .service(Self::get_dataset_stats_webscope()), ) - .service(Self::get_ingest_otel_factory().wrap(from_fn( - resource_check::check_resource_utilization_middleware, - ))) + .service(Self::get_ingest_otel_factory()) .service(Self::get_generated()); } @@ -460,10 +450,7 @@ impl Server { .route( web::post() .to(ingest::post_event) - .authorize_for_resource(Action::Ingest) - .wrap(from_fn( - resource_check::check_resource_utilization_middleware, - )), + .authorize_for_resource(Action::Ingest), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( diff --git a/src/handlers/http/rbac.rs b/src/handlers/http/rbac.rs index 552b52c53..3fa1315fb 100644 --- a/src/handlers/http/rbac.rs +++ b/src/handlers/http/rbac.rs @@ -129,7 +129,6 @@ pub async fn post_user( let userid = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); validator::user_role_name(&userid)?; - let mut metadata = get_metadata(&tenant_id).await?; let user_roles: HashSet = if let Some(body) = body { serde_json::from_value(body.into_inner())? @@ -148,33 +147,34 @@ pub async fn post_user( if !non_existent_roles.is_empty() { return Err(RBACError::RolesDoNotExist(non_existent_roles)); } - let _guard = UPDATE_LOCK.lock().await; - if Users.contains(&userid, &tenant_id) - || metadata.users.iter().any(|user| match &user.ty { - UserType::Native(basic) => basic.username == userid, - // OAuth users are created via the OIDC flow, and API key users - // are provisioned via the /apikeys endpoints. - UserType::OAuth(_) | UserType::ApiKey(_) => false, - }) - { - return Err(RBACError::UserExists(userid)); - } - let (user, password) = user::User::new_basic(userid.clone(), tenant_id.clone(), false); + let (user, password) = { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + + if Users.contains(&userid, &tenant_id) + || metadata.users.iter().any(|user| match &user.ty { + UserType::Native(basic) => basic.username == userid, + UserType::OAuth(_) => false, + UserType::ApiKey(_) => false, + }) + { + return Err(RBACError::UserExists(userid)); + } - metadata.users.push(user.clone()); + let (mut user, password) = user::User::new_basic(userid.clone(), tenant_id.clone(), false); + user.roles.clone_from(&user_roles); + metadata.users.push(user.clone()); + put_metadata(&metadata, &tenant_id).await?; + (user, password) + }; - put_metadata(&metadata, &tenant_id).await?; - let created_role = user_roles.clone(); - Users.put_user(user.clone()); - if !created_role.is_empty() { - add_roles_to_user( - req, - web::Path::::from(userid.clone()), - web::Json(created_role), - ) - .await?; + // Update in-memory state after releasing the lock + Users.put_user(user); + if !user_roles.is_empty() { + Users.add_roles(&userid, user_roles, &tenant_id); } + Ok(password) } @@ -192,28 +192,28 @@ pub async fn post_gen_password( { return Err(RBACError::ProtectedUser); } - let mut new_password = String::default(); - let mut new_hash = String::default(); - let mut metadata = get_metadata(&tenant_id).await?; - - let _guard = UPDATE_LOCK.lock().await; - let user::PassCode { password, hash } = user::Basic::gen_new_password(); - new_password.clone_from(&password); - new_hash.clone_from(&hash); - if let Some(user) = metadata - .users - .iter_mut() - .filter_map(|user| match user.ty { - user::UserType::Native(ref mut user) => Some(user), - _ => None, - }) - .find(|user| user.username == userid) - { - user.password_hash.clone_from(&hash); - } else { - return Err(RBACError::UserDoesNotExist); - } - put_metadata(&metadata, &tenant_id).await?; + let (new_password, new_hash) = { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == userid) + { + user.password_hash.clone_from(&hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; + (password, hash) + }; + + // Update in-memory state after releasing the lock Users.change_password_hash(&userid, &new_hash, &tenant_id); Ok(new_password) @@ -286,25 +286,22 @@ pub async fn delete_user( ) -> Result { let userid = userid.into_inner(); let tenant_id = get_tenant_id_from_request(&req); - let _guard = UPDATE_LOCK.lock().await; - // if user is a part of any groups then don't allow deletion + + // Validations before acquiring the lock (read-only in-memory checks) if !Users.get_user_groups(&userid, &tenant_id).is_empty() { return Err(RBACError::InvalidDeletionRequest(format!( "User: {userid} should not be a part of any groups" ))); } - // fail this request if the user is protected if let Some(p) = Users.is_protected(&userid, &tenant_id) && p { return Err(RBACError::ProtectedUser); } - // fail this request if the user does not exists if !Users.contains(&userid, &tenant_id) { return Err(RBACError::UserDoesNotExist); }; - // find username by userid, for native users, username is userid, for oauth users, we need to look up let username = if let Some(users) = users().get(tenant_id.as_deref().unwrap_or(DEFAULT_TENANT)) && let Some(user) = users.get(&userid) { @@ -313,12 +310,14 @@ pub async fn delete_user( return Err(RBACError::UserDoesNotExist); }; - // delete from parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - metadata.users.retain(|user| user.userid() != userid); - put_metadata(&metadata, &tenant_id).await?; + { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + metadata.users.retain(|user| user.userid() != userid); + put_metadata(&metadata, &tenant_id).await?; + } - // update in mem table + // Update in-memory state after releasing the lock Users.delete_user(&userid, &tenant_id); Ok(HttpResponse::Ok().json(format!("deleted user: {username}"))) @@ -381,21 +380,22 @@ pub async fn add_roles_to_user( return Err(RBACError::RolesDoNotExist(non_existent_roles)); } - // update parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - if let Some(user) = metadata - .users - .iter_mut() - .find(|user| user.userid() == userid) { - user.roles.extend(roles_to_add.clone()); - } else { - // should be unreachable given state is always consistent - return Err(RBACError::UserDoesNotExist); + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.userid() == userid) + { + user.roles.extend(roles_to_add.clone()); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; } - put_metadata(&metadata, &tenant_id).await?; - // update in mem table + // Update in-memory state after releasing the lock Users.add_roles(&userid.clone(), roles_to_add, &tenant_id); Ok(HttpResponse::Ok().json(format!("Roles updated successfully for {username}"))) @@ -455,23 +455,24 @@ pub async fn remove_roles_from_user( ))); } - // update parseable.json first - let mut metadata = get_metadata(&tenant_id).await?; - if let Some(user) = metadata - .users - .iter_mut() - .find(|user| user.userid() == userid) { - let diff: HashSet = - HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); - user.roles = diff; - } else { - // should be unreachable given state is always consistent - return Err(RBACError::UserDoesNotExist); + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.userid() == userid) + { + let diff: HashSet = + HashSet::from_iter(user.roles.difference(&roles_to_remove).cloned()); + user.roles = diff; + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata, &tenant_id).await?; } - put_metadata(&metadata, &tenant_id).await?; - // update in mem table + // Update in-memory state after releasing the lock Users.remove_roles(&userid.clone(), roles_to_remove, &tenant_id); Ok(HttpResponse::Ok().json(format!("Roles updated successfully for {username}"))) diff --git a/src/handlers/http/role.rs b/src/handlers/http/role.rs index abe8f3dc9..bc6761f69 100644 --- a/src/handlers/http/role.rs +++ b/src/handlers/http/role.rs @@ -25,6 +25,7 @@ use actix_web::{ web::{self, Json}, }; +use crate::handlers::http::rbac::UPDATE_LOCK; use crate::rbac::map::roles; use crate::rbac::role::model::{Role, RoleType, RoleUI}; use crate::{ @@ -55,21 +56,23 @@ pub async fn put( // validate the role name validator::user_role_name(&name).map_err(RoleError::ValidationError)?; - let mut metadata = get_metadata(&tenant_id).await?; - metadata.roles.insert(name.clone(), role.clone()); - - put_metadata(&metadata, &tenant_id).await?; + { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + metadata.roles.insert(name.clone(), role.clone()); + put_metadata(&metadata, &tenant_id).await?; + } - let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); + // Update in-memory state after releasing the lock + let tenant_str = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned(); mut_roles() - .entry(tenant_id.to_owned()) + .entry(tenant_str.clone()) .or_default() .insert(name.clone(), role.clone()); - // refresh the sessions of all users using this role - // for this, iterate over all user_groups and users and create a hashset of users + // Refresh sessions for users who have this role let mut session_refresh_users: HashSet = HashSet::new(); - if let Some(groups) = read_user_groups().get(tenant_id) { + if let Some(groups) = read_user_groups().get(tenant_str.as_str()) { for user_group in groups.values() { if user_group.roles.contains(&name) { session_refresh_users @@ -77,18 +80,15 @@ pub async fn put( } } } - - // iterate over all users to see if they have this role - if let Some(users) = users().get(tenant_id) { + if let Some(users) = users().get(tenant_str.as_str()) { for user in users.values() { if user.roles.contains(&name) { session_refresh_users.insert(user.userid().to_string()); } } } - for userid in session_refresh_users { - mut_sessions().remove_user(&userid, tenant_id); + mut_sessions().remove_user(&userid, &tenant_str); } Ok(HttpResponse::Ok().finish()) @@ -138,26 +138,28 @@ pub async fn delete( return Err(RoleError::ProtectedRole); } - // check if the role is being used by any user or group - let mut metadata = get_metadata(&tenant_id).await?; - if metadata.users.iter().any(|user| user.roles.contains(&name)) { - return Err(RoleError::RoleInUse); - } - if metadata - .user_groups - .iter() - .any(|user_group| user_group.roles.contains(&name)) { - return Err(RoleError::RoleInUse); + let _guard = UPDATE_LOCK.lock().await; + // check if the role is being used by any user or group + let mut metadata = get_metadata(&tenant_id).await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + if metadata + .user_groups + .iter() + .any(|user_group| user_group.roles.contains(&name)) + { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + put_metadata(&metadata, &tenant_id).await?; } - metadata.roles.remove(&name); - put_metadata(&metadata, &tenant_id).await?; mut_roles() .entry(tenant.to_owned()) .or_default() .remove(&name); - // mut_roles().remove(&name); Ok(HttpResponse::Ok().finish()) } @@ -170,17 +172,16 @@ pub async fn put_default( ) -> Result { let name = name.into_inner(); let tenant_id = get_tenant_id_from_request(&req); - let mut metadata = get_metadata(&tenant_id).await?; - metadata.default_role = Some(name.clone()); - DEFAULT_ROLE - .write() - // .unwrap() - .insert( - tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned(), - Some(name), - ); - // *DEFAULT_ROLE.lock().unwrap() = Some(name); - put_metadata(&metadata, &tenant_id).await?; + { + let _guard = UPDATE_LOCK.lock().await; + let mut metadata = get_metadata(&tenant_id).await?; + metadata.default_role = Some(name.clone()); + put_metadata(&metadata, &tenant_id).await?; + } + DEFAULT_ROLE.write().insert( + tenant_id.as_deref().unwrap_or(DEFAULT_TENANT).to_owned(), + Some(name), + ); Ok(HttpResponse::Ok().finish()) } diff --git a/src/rbac/mod.rs b/src/rbac/mod.rs index 1d952bbab..2e1d520dd 100644 --- a/src/rbac/mod.rs +++ b/src/rbac/mod.rs @@ -149,38 +149,60 @@ impl Users { // caller ensures that this operation is valid for the user pub fn change_password_hash(&self, userid: &str, hash: &String, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) + let found = if let Some(users) = mut_users().get_mut(tenant_id) && let Some(User { ty: UserType::Native(user), .. }) = users.get_mut(userid) { user.password_hash.clone_from(hash); - mut_sessions().remove_user(userid, tenant_id); + true + } else { + false }; + // USERS write lock dropped — acquire SESSIONS lock separately to + // avoid USERS→SESSIONS ordering that inverts the auth-path's + // SESSIONS→USERS ordering (deadlock). + if found { + mut_sessions().remove_user(userid, tenant_id); + } } pub fn add_roles(&self, userid: &str, roles: HashSet, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) + let new_perms = if let Some(users) = mut_users().get_mut(tenant_id) && let Some(user) = users.get_mut(userid) { user.roles.extend(roles); - let new_perms = roles_to_permission(user.roles(), tenant_id); - mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + Some(roles_to_permission(user.roles(), tenant_id)) + } else { + None }; + // USERS write lock dropped — acquire SESSIONS lock separately to + // avoid USERS→SESSIONS ordering that inverts the auth-path's + // SESSIONS→USERS ordering (deadlock). + if let Some(new_perms) = new_perms { + mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + } } pub fn remove_roles(&self, userid: &str, roles: HashSet, tenant_id: &Option) { let tenant_id = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = mut_users().get_mut(tenant_id) + let new_perms = if let Some(users) = mut_users().get_mut(tenant_id) && let Some(user) = users.get_mut(userid) { let diff = HashSet::from_iter(user.roles.difference(&roles).cloned()); user.roles = diff; - let new_perms = roles_to_permission(user.roles(), tenant_id); - mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + Some(roles_to_permission(user.roles(), tenant_id)) + } else { + None }; + // USERS write lock dropped — acquire SESSIONS lock separately to + // avoid USERS→SESSIONS ordering that inverts the auth-path's + // SESSIONS→USERS ordering (deadlock). + if let Some(new_perms) = new_perms { + mut_sessions().refresh_user_permissions(userid, tenant_id, &new_perms); + } } pub fn contains(&self, userid: &str, tenant_id: &Option) -> bool { @@ -262,29 +284,36 @@ impl Users { get_tenant_id_from_key(&key) }; let tenant = tenant_id.as_deref().unwrap_or(DEFAULT_TENANT); - if let Some(users) = users().get(tenant) + // Clone user data under USERS read lock, then drop it before + // acquiring SESSIONS write lock. This avoids USERS→SESSIONS + // ordering that could participate in a deadlock with the + // auth-path's SESSIONS→USERS ordering. + let verified = if let Some(users) = users().get(tenant) && let Some( user @ User { ty: UserType::Native(basic_user), .. }, ) = users.get(username) + && basic_user.verify_password(password) { - // if user exists and password matches - // add this user to auth map - if basic_user.verify_password(password) { - let mut sessions = mut_sessions(); - sessions.track_new( - username.clone(), - key.clone(), - DateTime::::MAX_UTC, - roles_to_permission(user.roles(), tenant), - &user.tenant, - ); - return sessions - .check_auth(&key, action, context_stream, context_user) - .expect("entry for this key just added"); - } + Some((user.roles(), user.tenant.clone())) + } else { + None + }; + // USERS read lock dropped here + if let Some((user_roles, user_tenant)) = verified { + let mut sessions = mut_sessions(); + sessions.track_new( + username.clone(), + key.clone(), + DateTime::::MAX_UTC, + roles_to_permission(user_roles, tenant), + &user_tenant, + ); + return sessions + .check_auth(&key, action, context_stream, context_user) + .expect("entry for this key just added"); } Response::UnAuthorized