Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,23 @@ All notable changes to this project will be documented in this file.

Each entry lists the date and the crate versions that were released.

## 2026-05-19 — mqdb-cluster 0.3.5

### Removed

- `StoreManager::clear_partition` (the 15-line aggregator at `partition_io.rs:206`). It had **no callers anywhere in the codebase** — production, tests, or otherwise — and aggregated per-partition wipes across stores that included the broadcast catalog (schemas, constraints). With schemas and constraints now treated as broadcast state on the export side (every partition snapshot ships the full catalog, per the 0.3.2 / 0.3.4 work), the aggregator was wrong-by-construction: a partition wipe must not drop catalog entries that every node is supposed to hold.
- `SchemaStore::clear_partition` and its `clear_partition_removes_only_target` unit test. The method was referenced only by its own test; nothing in the migration, rebalance, or snapshot paths called it.
- `ConstraintStore::clear_partition` and its `clear_partition_removes_only_target` unit test. Same dead-code-with-self-test status as above.
- Per-partition `clear_partition` on `DbDataStore`, `IndexStore`, `UniqueStore`, `FkValidationStore`, and the MQTT-side stores (sessions, retained, topics, wildcards, inflight, offsets, idempotency, qos2) is intentionally preserved — those stores are genuinely partition-scoped.

### Changed

- Added struct-level doc comments on `SchemaStore` and `ConstraintStore` documenting the broadcast-catalog invariant: every node holds the full catalog, every partition snapshot ships all entries, and there is intentionally no per-partition clearing API. Makes the invariant explicit at the point where a future contributor would otherwise consider adding a clearing method.

### Notes

- Follow-up to the broadcast-catalog model established by PR #61 (the export-side counterpart). 83 deletions across `partition_io.rs`, `schema_store.rs`, and `constraint_store.rs`; 12 lines of documentation added.

## 2026-05-06 — mqdb-cli 0.7.6

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/mqdb-cluster/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqdb-cluster"
version = "0.3.4"
version = "0.3.5"
publish = false
edition.workspace = true
license = "AGPL-3.0-only"
Expand Down
41 changes: 6 additions & 35 deletions crates/mqdb-cluster/src/cluster/db/constraint_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@ impl std::fmt::Display for ConstraintStoreError {

impl std::error::Error for ConstraintStoreError {}

/// Cluster-wide constraint catalog (unique + foreign-key definitions).
///
/// Constraints are broadcast state: every node holds the full catalog and
/// every partition snapshot ships all entries. There is intentionally no
/// per-partition clearing API — a partition wipe must not drop catalog
/// entries that every node is supposed to hold.
pub struct ConstraintStore {
node_id: NodeId,
constraints: RwLock<HashMap<String, ClusterConstraint>>,
Expand Down Expand Up @@ -533,15 +539,6 @@ impl ConstraintStore {

Ok(imported)
}

/// # Panics
/// Panics if the internal lock is poisoned.
pub fn clear_partition(&self, partition: PartitionId) -> usize {
let mut constraints = self.constraints.write().unwrap();
let before = constraints.len();
constraints.retain(|_, c| c.partition() != partition);
before - constraints.len()
}
}

impl std::fmt::Debug for ConstraintStore {
Expand Down Expand Up @@ -978,32 +975,6 @@ mod tests {
}
}

#[test]
fn clear_partition_removes_only_target() {
let store = ConstraintStore::new(node(1));
let entities = ["alpha", "beta", "gamma", "delta"];
for entity in &entities {
store
.add(ClusterConstraint::unique(
entity,
&format!("uniq_{entity}"),
"email",
))
.unwrap();
}

let target = store.get("alpha", "uniq_alpha").unwrap().partition();
let removed = store.clear_partition(target);
assert!(removed >= 1);

for entity in &entities {
let c = store.get(entity, &format!("uniq_{entity}"));
if let Some(c) = c {
assert_ne!(c.partition(), target);
}
}
}

#[test]
fn export_all_carries_every_constraint_regardless_of_partition() {
let src = ConstraintStore::new(node(1));
Expand Down
34 changes: 6 additions & 28 deletions crates/mqdb-cluster/src/cluster/db/schema_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,12 @@ impl std::fmt::Display for SchemaStoreError {

impl std::error::Error for SchemaStoreError {}

/// Cluster-wide schema catalog.
///
/// Schemas are broadcast state: every node holds the full catalog and every
/// partition snapshot ships all entries. There is intentionally no
/// per-partition clearing API — a partition wipe must not drop catalog
/// entries that every node is supposed to hold.
pub struct SchemaStore {
node_id: NodeId,
schemas: RwLock<HashMap<String, ClusterSchema>>,
Expand Down Expand Up @@ -341,15 +347,6 @@ impl SchemaStore {

Ok(imported)
}

/// # Panics
/// Panics if the internal lock is poisoned.
pub fn clear_partition(&self, partition: PartitionId) -> usize {
let mut schemas = self.schemas.write().unwrap();
let before = schemas.len();
schemas.retain(|_, s| s.partition() != partition);
before - schemas.len()
}
}

impl std::fmt::Debug for SchemaStore {
Expand Down Expand Up @@ -499,25 +496,6 @@ mod tests {
}
}

#[test]
fn clear_partition_removes_only_target() {
let store = SchemaStore::new(node(1));
store.register("alpha", b"{}").unwrap();
store.register("beta", b"{}").unwrap();
store.register("gamma", b"{}").unwrap();

let target = store.get("alpha").unwrap().partition();
let removed = store.clear_partition(target);

assert!(removed >= 1);
assert!(store.get("alpha").is_none());
for entity in ["beta", "gamma"] {
if let Some(s) = store.get(entity) {
assert_ne!(s.partition(), target);
}
}
}

#[test]
fn export_all_carries_every_schema_regardless_of_partition() {
let src = SchemaStore::new(node(1));
Expand Down
20 changes: 0 additions & 20 deletions crates/mqdb-cluster/src/cluster/store_manager/partition_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,26 +196,6 @@ impl StoreManager {
}
}
}

pub fn clear_partition(&self, partition: PartitionId) -> usize {
let mut total_cleared = 0;
total_cleared += self.sessions.clear_partition(partition);
total_cleared += self.qos2.clear_partition(partition);
total_cleared += self.subscriptions.clear_partition(partition);
total_cleared += self.retained.clear_partition(partition);
total_cleared += self.topics.clear_partition(partition);
total_cleared += self.wildcards.clear_partition(partition);
total_cleared += self.inflight.clear_partition(partition);
total_cleared += self.offsets.clear_partition(partition);
total_cleared += self.idempotency.clear_partition(partition);
total_cleared += self.db_data.clear_partition(partition);
total_cleared += self.db_schema.clear_partition(partition);
total_cleared += self.db_index.clear_partition(partition);
total_cleared += self.db_unique.clear_partition(partition);
total_cleared += self.db_fk.clear_partition(partition);
total_cleared += self.db_constraints.clear_partition(partition);
total_cleared
}
}

#[cfg(test)]
Expand Down
Loading