Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 66 additions & 6 deletions crates/mqdb-cluster/src/cluster/db/constraint_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -429,19 +429,41 @@ impl ConstraintStore {

/// # Panics
/// Panics if the internal lock is poisoned.
#[allow(clippy::cast_possible_truncation)]
#[must_use]
pub fn export_for_partition(&self, partition: PartitionId) -> Vec<u8> {
let constraints = self.constraints.read().unwrap();
let matching: Vec<_> = constraints
let iter = constraints
.iter()
.filter(|(_, c)| c.partition() == partition)
.collect();
.filter(|(_, c)| c.partition() == partition);
Self::serialize_entries(iter)
}

/// Exports every constraint regardless of partition.
///
/// Constraints are cluster-wide broadcast state — every node needs the full
/// catalog so cascade, RESTRICT, and unique enforcement work uniformly. Per-
/// partition snapshots use this so a joining node receives the complete
/// constraint set rather than only the constraints whose
/// `schema_partition(entity)` happens to land on a partition it owns.
///
/// # Panics
/// Panics if the internal lock is poisoned.
#[must_use]
pub fn export_all(&self) -> Vec<u8> {
let constraints = self.constraints.read().unwrap();
Self::serialize_entries(constraints.iter())
}

#[allow(clippy::cast_possible_truncation)]
fn serialize_entries<'a, I>(iter: I) -> Vec<u8>
where
I: Iterator<Item = (&'a String, &'a ClusterConstraint)>,
{
let entries: Vec<_> = iter.collect();
let mut buf = Vec::new();
buf.extend_from_slice(&(matching.len() as u32).to_be_bytes());
buf.extend_from_slice(&(entries.len() as u32).to_be_bytes());

for (key, constraint) in matching {
for (key, constraint) in entries {
let key_bytes = key.as_bytes();
buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
buf.extend_from_slice(key_bytes);
Expand Down Expand Up @@ -981,4 +1003,42 @@ mod tests {
}
}
}

#[test]
fn export_all_carries_every_constraint_regardless_of_partition() {
let src = ConstraintStore::new(node(1));
let dst = ConstraintStore::new(node(2));

let entities = ["alpha", "beta", "gamma", "delta", "epsilon"];
for entity in &entities {
src.add(ClusterConstraint::unique(
entity,
&format!("uniq_{entity}"),
"email",
))
.unwrap();
}

let partitions: std::collections::HashSet<_> = src
.list_all()
.iter()
.map(ClusterConstraint::partition)
.collect();
assert!(
partitions.len() > 1,
"test prerequisite: constraints must span more than one partition"
);

let payload = src.export_all();
let imported = dst.import_constraints(&payload).unwrap();
assert_eq!(imported, entities.len());

for entity in &entities {
let name = format!("uniq_{entity}");
assert!(
dst.get(entity, &name).is_some(),
"every constraint must arrive via export_all, regardless of partition: {entity}:{name}"
);
}
}
}
62 changes: 55 additions & 7 deletions crates/mqdb-cluster/src/cluster/db/schema_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,38 @@ impl SchemaStore {

/// # Panics
/// Panics if the internal lock is poisoned.
#[allow(clippy::cast_possible_truncation)]
#[must_use]
pub fn export_for_partition(&self, partition: PartitionId) -> Vec<u8> {
let schemas = self.schemas.read().unwrap();
let matching: Vec<_> = schemas
.iter()
.filter(|(_, s)| s.partition() == partition)
.collect();
let iter = schemas.iter().filter(|(_, s)| s.partition() == partition);
Self::serialize_entries(iter)
}

/// Exports every schema regardless of partition.
///
/// Schemas are cluster-wide broadcast state — every node needs the full
/// catalog to validate writes locally. Per-partition snapshots use this so
/// a joining node receives the complete schema set, not only the schemas
/// whose `schema_partition(entity)` happens to land on a partition it owns.
///
/// # Panics
/// Panics if the internal lock is poisoned.
#[must_use]
pub fn export_all(&self) -> Vec<u8> {
let schemas = self.schemas.read().unwrap();
Self::serialize_entries(schemas.iter())
}

#[allow(clippy::cast_possible_truncation)]
fn serialize_entries<'a, I>(iter: I) -> Vec<u8>
where
I: Iterator<Item = (&'a String, &'a ClusterSchema)>,
{
let entries: Vec<_> = iter.collect();
let mut buf = Vec::new();
buf.extend_from_slice(&(matching.len() as u32).to_be_bytes());
buf.extend_from_slice(&(entries.len() as u32).to_be_bytes());

for (key, schema) in matching {
for (key, schema) in entries {
let key_bytes = key.as_bytes();
buf.extend_from_slice(&(key_bytes.len() as u16).to_be_bytes());
buf.extend_from_slice(key_bytes);
Expand Down Expand Up @@ -498,4 +517,33 @@ mod tests {
}
}
}

#[test]
fn export_all_carries_every_schema_regardless_of_partition() {
let src = SchemaStore::new(node(1));
let dst = SchemaStore::new(node(2));

let entities = ["alpha", "beta", "gamma", "delta", "epsilon"];
for entity in &entities {
src.register(entity, b"{}").unwrap();
}

let partitions: std::collections::HashSet<_> =
src.list().iter().map(ClusterSchema::partition).collect();
assert!(
partitions.len() > 1,
"test prerequisite: schemas must span more than one partition"
);

let payload = src.export_all();
let imported = dst.import_schemas(&payload).unwrap();
assert_eq!(imported, entities.len());

for entity in &entities {
assert!(
dst.get(entity).is_some(),
"every schema must arrive via export_all, regardless of partition: {entity}"
);
}
}
}
112 changes: 94 additions & 18 deletions crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ impl StoreManager {
entity::DB_DATA,
self.db_data.export_for_partition(partition),
),
(
entity::DB_SCHEMA,
self.db_schema.export_for_partition(partition),
),
(entity::DB_SCHEMA, self.db_schema.export_all()),
(
entity::DB_INDEX,
self.db_index.export_for_partition(partition),
Expand All @@ -61,10 +58,7 @@ impl StoreManager {
self.db_unique.export_for_partition(partition),
),
(entity::DB_FK, self.db_fk.export_for_partition(partition)),
(
entity::DB_CONSTRAINT,
self.db_constraints.export_for_partition(partition),
),
(entity::DB_CONSTRAINT, self.db_constraints.export_all()),
];

buf.extend_from_slice(&(store_data.len() as u8).to_be_bytes());
Expand Down Expand Up @@ -382,21 +376,18 @@ mod tests {
assert_eq!(dst_has, up == target, "db_unique for {entity}");
}

assert_eq!(
assert!(
dst.db_schema.get(entity).is_some(),
src.db_schema.get(entity).unwrap().partition() == target,
"db_schema for {entity}"
"db_schema for {entity} must arrive via any partition snapshot \
— schemas are broadcast state and need to be uniform on every node"
);

let constraint_name = format!("uniq_{entity}");
assert_eq!(
assert!(
dst.db_constraints.get(entity, &constraint_name).is_some(),
src.db_constraints
.get(entity, &constraint_name)
.unwrap()
.partition()
== target,
"db_constraints for {entity}"
"db_constraints {entity}:{constraint_name} must arrive via any \
partition snapshot — constraints are broadcast state and need to \
be uniform on every node"
);

let fk_id = format!("fk-{entity}");
Expand Down Expand Up @@ -469,4 +460,89 @@ mod tests {
"destination reverse index must hold c1 after import — without the rebuild step this would be empty",
);
}

#[test]
fn partition_snapshot_carries_full_schema_and_constraint_catalog() {
use crate::cluster::db::{ClusterConstraint, OnDeleteAction};

let src = StoreManager::new(node(1));
let dst = StoreManager::new(node(2));

let entities = ["alpha", "beta", "gamma", "delta", "epsilon", "zeta"];
for entity in &entities {
src.db_schema.register(entity, b"{\"fields\":[]}").unwrap();
src.db_constraints
.add(ClusterConstraint::unique(
entity,
&format!("uniq_{entity}"),
"email",
))
.unwrap();
src.db_constraints
.add(ClusterConstraint::foreign_key(
entity,
&format!("fk_{entity}"),
"parent_id",
"alpha",
"id",
OnDeleteAction::Cascade,
))
.unwrap();
}

src.db_data.create("alpha", "rec-1", b"{}", 1_000).unwrap();
let any_partition = src.db_data.get("alpha", "rec-1").unwrap().partition();

let schema_partitions: std::collections::HashSet<_> = src
.db_schema
.list()
.iter()
.map(crate::cluster::db::ClusterSchema::partition)
.collect();
let constraint_partitions: std::collections::HashSet<_> = src
.db_constraints
.list_all()
.iter()
.map(ClusterConstraint::partition)
.collect();
assert!(
schema_partitions.len() > 1,
"test prerequisite: schemas must span more than one partition"
);
assert!(
constraint_partitions.len() > 1,
"test prerequisite: constraints must span more than one partition"
);

let payload = src.export_partition(any_partition);
dst.import_partition(&payload).expect("import");

for entity in &entities {
assert!(
dst.db_schema.get(entity).is_some(),
"every schema must arrive via any partition snapshot, regardless of partition: {entity}"
);
let uniq_name = format!("uniq_{entity}");
assert!(
dst.db_constraints.get(entity, &uniq_name).is_some(),
"every unique constraint must arrive via any partition snapshot: {entity}:{uniq_name}"
);
let fk_name = format!("fk_{entity}");
assert!(
dst.db_constraints.get(entity, &fk_name).is_some(),
"every fk constraint must arrive via any partition snapshot: {entity}:{fk_name}"
);
}

assert_eq!(
dst.db_schema.list().len(),
entities.len(),
"destination schema catalog count must match source"
);
assert_eq!(
dst.db_constraints.list_all().len(),
entities.len() * 2,
"destination constraint catalog count must match source"
);
}
}
Loading