diff --git a/crates/mqdb-cluster/src/cluster/db/constraint_store.rs b/crates/mqdb-cluster/src/cluster/db/constraint_store.rs index 73b427c..bf28581 100644 --- a/crates/mqdb-cluster/src/cluster/db/constraint_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/constraint_store.rs @@ -429,19 +429,41 @@ impl ConstraintStore { /// # Panics /// Panics if the internal lock is poisoned. - #[allow(clippy::cast_possible_truncation)] #[must_use] pub fn export_for_partition(&self, partition: PartitionId) -> Vec { let constraints = self.constraints.read().unwrap(); - let matching: Vec<_> = constraints + let iter = constraints .iter() - .filter(|(_, c)| c.partition() == partition) - .collect(); + .filter(|(_, c)| c.partition() == partition); + Self::serialize_entries(iter) + } + + /// Exports every constraint regardless of partition. + /// + /// Constraints are cluster-wide broadcast state — every node needs the full + /// catalog so cascade, RESTRICT, and unique enforcement work uniformly. Per- + /// partition snapshots use this so a joining node receives the complete + /// constraint set rather than only the constraints whose + /// `schema_partition(entity)` happens to land on a partition it owns. + /// + /// # Panics + /// Panics if the internal lock is poisoned. + #[must_use] + pub fn export_all(&self) -> Vec { + let constraints = self.constraints.read().unwrap(); + Self::serialize_entries(constraints.iter()) + } + #[allow(clippy::cast_possible_truncation)] + fn serialize_entries<'a, I>(iter: I) -> Vec + where + I: Iterator, + { + let entries: Vec<_> = iter.collect(); let mut buf = Vec::new(); - buf.extend_from_slice(&(matching.len() as u32).to_be_bytes()); + buf.extend_from_slice(&(entries.len() as u32).to_be_bytes()); - for (key, constraint) in matching { + for (key, constraint) in entries { let key_bytes = key.as_bytes(); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); buf.extend_from_slice(key_bytes); @@ -981,4 +1003,42 @@ mod tests { } } } + + #[test] + fn export_all_carries_every_constraint_regardless_of_partition() { + let src = ConstraintStore::new(node(1)); + let dst = ConstraintStore::new(node(2)); + + let entities = ["alpha", "beta", "gamma", "delta", "epsilon"]; + for entity in &entities { + src.add(ClusterConstraint::unique( + entity, + &format!("uniq_{entity}"), + "email", + )) + .unwrap(); + } + + let partitions: std::collections::HashSet<_> = src + .list_all() + .iter() + .map(ClusterConstraint::partition) + .collect(); + assert!( + partitions.len() > 1, + "test prerequisite: constraints must span more than one partition" + ); + + let payload = src.export_all(); + let imported = dst.import_constraints(&payload).unwrap(); + assert_eq!(imported, entities.len()); + + for entity in &entities { + let name = format!("uniq_{entity}"); + assert!( + dst.get(entity, &name).is_some(), + "every constraint must arrive via export_all, regardless of partition: {entity}:{name}" + ); + } + } } diff --git a/crates/mqdb-cluster/src/cluster/db/schema_store.rs b/crates/mqdb-cluster/src/cluster/db/schema_store.rs index 2d9ae47..7e7e927 100644 --- a/crates/mqdb-cluster/src/cluster/db/schema_store.rs +++ b/crates/mqdb-cluster/src/cluster/db/schema_store.rs @@ -240,19 +240,38 @@ impl SchemaStore { /// # Panics /// Panics if the internal lock is poisoned. - #[allow(clippy::cast_possible_truncation)] #[must_use] pub fn export_for_partition(&self, partition: PartitionId) -> Vec { let schemas = self.schemas.read().unwrap(); - let matching: Vec<_> = schemas - .iter() - .filter(|(_, s)| s.partition() == partition) - .collect(); + let iter = schemas.iter().filter(|(_, s)| s.partition() == partition); + Self::serialize_entries(iter) + } + + /// Exports every schema regardless of partition. + /// + /// Schemas are cluster-wide broadcast state — every node needs the full + /// catalog to validate writes locally. Per-partition snapshots use this so + /// a joining node receives the complete schema set, not only the schemas + /// whose `schema_partition(entity)` happens to land on a partition it owns. + /// + /// # Panics + /// Panics if the internal lock is poisoned. + #[must_use] + pub fn export_all(&self) -> Vec { + let schemas = self.schemas.read().unwrap(); + Self::serialize_entries(schemas.iter()) + } + #[allow(clippy::cast_possible_truncation)] + fn serialize_entries<'a, I>(iter: I) -> Vec + where + I: Iterator, + { + let entries: Vec<_> = iter.collect(); let mut buf = Vec::new(); - buf.extend_from_slice(&(matching.len() as u32).to_be_bytes()); + buf.extend_from_slice(&(entries.len() as u32).to_be_bytes()); - for (key, schema) in matching { + for (key, schema) in entries { let key_bytes = key.as_bytes(); buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes()); buf.extend_from_slice(key_bytes); @@ -498,4 +517,33 @@ mod tests { } } } + + #[test] + fn export_all_carries_every_schema_regardless_of_partition() { + let src = SchemaStore::new(node(1)); + let dst = SchemaStore::new(node(2)); + + let entities = ["alpha", "beta", "gamma", "delta", "epsilon"]; + for entity in &entities { + src.register(entity, b"{}").unwrap(); + } + + let partitions: std::collections::HashSet<_> = + src.list().iter().map(ClusterSchema::partition).collect(); + assert!( + partitions.len() > 1, + "test prerequisite: schemas must span more than one partition" + ); + + let payload = src.export_all(); + let imported = dst.import_schemas(&payload).unwrap(); + assert_eq!(imported, entities.len()); + + for entity in &entities { + assert!( + dst.get(entity).is_some(), + "every schema must arrive via export_all, regardless of partition: {entity}" + ); + } + } } diff --git a/crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs b/crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs index 4202734..a7664cc 100644 --- a/crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs +++ b/crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs @@ -48,10 +48,7 @@ impl StoreManager { entity::DB_DATA, self.db_data.export_for_partition(partition), ), - ( - entity::DB_SCHEMA, - self.db_schema.export_for_partition(partition), - ), + (entity::DB_SCHEMA, self.db_schema.export_all()), ( entity::DB_INDEX, self.db_index.export_for_partition(partition), @@ -61,10 +58,7 @@ impl StoreManager { self.db_unique.export_for_partition(partition), ), (entity::DB_FK, self.db_fk.export_for_partition(partition)), - ( - entity::DB_CONSTRAINT, - self.db_constraints.export_for_partition(partition), - ), + (entity::DB_CONSTRAINT, self.db_constraints.export_all()), ]; buf.extend_from_slice(&(store_data.len() as u8).to_be_bytes()); @@ -382,21 +376,18 @@ mod tests { assert_eq!(dst_has, up == target, "db_unique for {entity}"); } - assert_eq!( + assert!( dst.db_schema.get(entity).is_some(), - src.db_schema.get(entity).unwrap().partition() == target, - "db_schema for {entity}" + "db_schema for {entity} must arrive via any partition snapshot \ + — schemas are broadcast state and need to be uniform on every node" ); let constraint_name = format!("uniq_{entity}"); - assert_eq!( + assert!( dst.db_constraints.get(entity, &constraint_name).is_some(), - src.db_constraints - .get(entity, &constraint_name) - .unwrap() - .partition() - == target, - "db_constraints for {entity}" + "db_constraints {entity}:{constraint_name} must arrive via any \ + partition snapshot — constraints are broadcast state and need to \ + be uniform on every node" ); let fk_id = format!("fk-{entity}"); @@ -469,4 +460,89 @@ mod tests { "destination reverse index must hold c1 after import — without the rebuild step this would be empty", ); } + + #[test] + fn partition_snapshot_carries_full_schema_and_constraint_catalog() { + use crate::cluster::db::{ClusterConstraint, OnDeleteAction}; + + let src = StoreManager::new(node(1)); + let dst = StoreManager::new(node(2)); + + let entities = ["alpha", "beta", "gamma", "delta", "epsilon", "zeta"]; + for entity in &entities { + src.db_schema.register(entity, b"{\"fields\":[]}").unwrap(); + src.db_constraints + .add(ClusterConstraint::unique( + entity, + &format!("uniq_{entity}"), + "email", + )) + .unwrap(); + src.db_constraints + .add(ClusterConstraint::foreign_key( + entity, + &format!("fk_{entity}"), + "parent_id", + "alpha", + "id", + OnDeleteAction::Cascade, + )) + .unwrap(); + } + + src.db_data.create("alpha", "rec-1", b"{}", 1_000).unwrap(); + let any_partition = src.db_data.get("alpha", "rec-1").unwrap().partition(); + + let schema_partitions: std::collections::HashSet<_> = src + .db_schema + .list() + .iter() + .map(crate::cluster::db::ClusterSchema::partition) + .collect(); + let constraint_partitions: std::collections::HashSet<_> = src + .db_constraints + .list_all() + .iter() + .map(ClusterConstraint::partition) + .collect(); + assert!( + schema_partitions.len() > 1, + "test prerequisite: schemas must span more than one partition" + ); + assert!( + constraint_partitions.len() > 1, + "test prerequisite: constraints must span more than one partition" + ); + + let payload = src.export_partition(any_partition); + dst.import_partition(&payload).expect("import"); + + for entity in &entities { + assert!( + dst.db_schema.get(entity).is_some(), + "every schema must arrive via any partition snapshot, regardless of partition: {entity}" + ); + let uniq_name = format!("uniq_{entity}"); + assert!( + dst.db_constraints.get(entity, &uniq_name).is_some(), + "every unique constraint must arrive via any partition snapshot: {entity}:{uniq_name}" + ); + let fk_name = format!("fk_{entity}"); + assert!( + dst.db_constraints.get(entity, &fk_name).is_some(), + "every fk constraint must arrive via any partition snapshot: {entity}:{fk_name}" + ); + } + + assert_eq!( + dst.db_schema.list().len(), + entities.len(), + "destination schema catalog count must match source" + ); + assert_eq!( + dst.db_constraints.list_all().len(), + entities.len() * 2, + "destination constraint catalog count must match source" + ); + } }