fix(persistence): close 8 CodeRabbit PR #136 durability findings + aof_manifest split#144
Conversation
…loss) A dropped SpillCompletion is permanent data loss, not a benign event as the old comment claimed. By the time a completion is produced the key has already been evicted from RAM, and the manifest add_file + cold_index insert happen ONLY when the event loop consumes the completion (apply_spill_completions). So try_send(Full) -> drop left the .mpf file orphaned on disk (never recorded in the manifest) and lost its keys permanently after restart. CodeRabbit flagged this on PR #136 (review 4420584384); validated against the merged code. Fixes (issue #137): - send_one_completion now blocks (send_timeout loop) until the event loop drains a slot, applying backpressure instead of dropping. Deadlock-free: the dedicated spill thread is the only blocker and the eviction path enqueues SpillRequests with try_send (and refuses to free RAM unless the enqueue succeeds), so there is no cyclic wait. stop_flag keeps shutdown from wedging the join. - shutdown() now drains and returns the thread's final-flush completions so the caller (drain_and_shutdown_spill) can still apply them — closing the shutdown-time loss window. - The run loop drains request_rx on stop_flag before its final flush, so spills queued at shutdown are processed rather than abandoned in the channel. - persistence_tick: apply_spill_completions + the shutdown path share a new apply_completion_vec helper. Validated in OrbStack VM (cgroup-capped), red->green TDD: - full_completion_channel_blocks_instead_of_dropping (RED: 1/2 delivered -> GREEN: 2/2, none dropped) - shutdown_drains_and_returns_unapplied_completions (RED: 0 -> GREEN: 1) - full spill_thread module 10/10 green; fmt + clippy clean under default and --no-default-features --features runtime-tokio,jemalloc. author: Tin Dang <tin.dang@trustifytechnology.com>
… wholesale An entry that passes the `value_bytes.len() <= INLINE_MAX_VALUE_BYTES` (3500) pre-screen can still fail `build_kv_spill_batch` with InvalidData when it does not fit a fresh 4KB inline leaf (e.g. a large key plus an incompressible ~3.5KB value). The old code turned that single bad candidate into one wholesale `success: false` completion for the ENTIRE inline buffer — and those keys were already evicted from RAM, so the whole flush was lost. CodeRabbit flagged this on PR #136 (review 4420584384); validated against the merged code. Fix (issue #137): on inline-batch build OR write failure, fall back to writing each inline entry as its own single-page (+overflow) file via the existing `build_kv_spill_pages` path — the same path oversized entries already use, which succeeds where the batch fails because the value goes to overflow pages. Only an entry that cannot be written even with overflow (key too large for a leaf) now fails, and it fails in isolation. Extracted the per-entry logic into a shared `spill_single_entry` helper used by both the oversized loop and the fallback. Also corrected the stale `SPILL_COMPLETION_DROPPED` doc comment. Validated in OrbStack VM (cgroup-capped), red->green TDD: - inline_batch_failure_falls_back_to_per_entry_spill: a 800B key + incompressible 3500B value (RED: 0 salvaged -> GREEN: 1 salvaged, file readable on disk). - full spill_thread module 11/11 green; fmt + clippy clean under default and --no-default-features --features runtime-tokio,jemalloc. author: Tin Dang <tin.dang@trustifytechnology.com>
…ration spawn_tokio_connection, spawn_monoio_connection and spawn_migrated_monoio_ connection all thread the disk-offload spill context (spill_sender / spill_file_id / disk_offload_dir) into the ConnectionContext, but spawn_migrated_tokio_connection built it with None / Cell(0) / None. After an affinity migration the tokio connection therefore fell back to the non-spilling eviction path even when disk-offload was enabled, so cold-tier durability silently depended on whether the connection had migrated. CodeRabbit flagged this on PR #136 (review 4420584384); validated against the merged code (issue #141). Fix: thread spill_sender / spill_file_id / disk_offload_dir through spawn_migrated_tokio_connection and pass them to ConnectionContext::new, mirroring the three sibling spawn paths. Updated both call sites in event_loop.rs. Validation: this is mechanical parity with three already-correct sibling functions in the same file. A genuine red/green unit test is disproportionate here — ConnectionContext::new takes 26 args and is consumed inside a spawned task, so asserting the migrated context's spill fields would require stubbing the entire context or simulating a live cross-shard connection migration. Verified instead by: clean build + clippy under both default (monoio) and --no-default-features --features runtime-tokio,jemalloc, and structural parity (all four spawn paths now pass the spill context). The behavioural guarantee is covered by the existing disk-offload read-through integration suite. author: Tin Dang <tin.dang@trustifytechnology.com>
…ard accept run_embedded() diverged from the production main.rs startup path on two functional points (CodeRabbit PR #136 review 4420584384, issue #142): 1. Recovery only ran when `persistence_dir` was set, so `--appendonly no` + disk-offload (persistence_dir = None, disk_offload_base = Some) skipped `Shard::restore_from_persistence` entirely — cold-tier keys were unrecoverable after an embedded restart. Now gated on `should_run_recovery(persistence_dir, disk_offload_enabled)` = persistence OR disk-offload, recovering from `config.dir` when persistence_dir is None (mirrors main.rs:702-704). 2. Embedded forced `per_shard_accept = cfg!(target_os = "linux")`, re-enabling the tokio/Linux per-shard accept bind-order race that PR #136 disabled in main.rs. Now `false` (central accept), matching main.rs. Validated in OrbStack VM (cgroup-capped), red->green TDD under --no-default-features --features runtime-tokio,jemalloc (embedded is runtime-tokio-only): recovery_runs_for_disk_offload_without_persistence (RED with the disk-offload term dropped -> GREEN). fmt + clippy clean under both default (monoio) and tokio,jemalloc. Note: a full embedded+disk-offload+restart integration test was not added — there is no run_embedded test harness to extend and standing it up for a 2-line parity fix is disproportionate; the behavioural recovery path is the same one covered by tests/crash_recovery_disk_offload_no_aof.rs on main.rs. The extracted should_run_recovery predicate unit-tests the exact gate that regressed. author: Tin Dang <tin.dang@trustifytechnology.com>
…tway try_send_rewrite_per_shard fans the RewritePerShard message out to every shard writer with a blocking send. If a writer channel is disconnected the send fails and the function returned Err immediately — but the writers that ALREADY received the message decrement the PerShardRewriteCoord countdown when they fold, while the failed writer and the unsent tail never do. The countdown therefore never reaches zero, no shard runs the terminal branch, and AOF_REWRITE_IN_PROGRESS (set by bgrewriteaof_start_sharded before dispatch) is wedged true forever — every future BGREWRITEAOF is silently rejected for the lifetime of the process. Same wedged-flag family CodeRabbit flagged on PR #136 review 4420584384 (related to #138). Fix: on send failure, call coord.mark_failed() then coord.shard_done() once for each shard from the failing index through the tail (the failed shard plus every unsent one). The already-dispatched writers still decrement on fold, so the countdown reaches zero exactly once; mark_failed() is published before the tail decrements, so whichever decrement is terminal takes the abort path — keeps old_seq authoritative (no manifest commit) and clears AOF_REWRITE_IN_PROGRESS. The error log now names the offending writer index. Validated in OrbStack VM (cgroup-capped), red/green TDD: rewrite_fan_out_partial_failure_clears_in_progress_flag disconnects shard-0's writer, asserts the fan-out returns Err AND the in-progress flag is cleared (RED before the accounting fix — flag stayed true; GREEN after). fmt + clippy clean under both default (monoio) and tokio,jemalloc. author: Tin Dang <tin.dang@trustifytechnology.com>
…n on abort A per-shard BGREWRITEAOF reopens each shard's append file onto the NEW generation in phase 6 (do_rewrite_per_shard) BEFORE the coordinator decides whether the rewrite commits. If the rewrite then aborts — another shard failed to fold, or the final manifest commit errored — the manifest keeps old_seq and the abort path prunes the new-seq files, but the already-folded writers keep appending into the now-discarded new-seq incr. Recovery resolves base/incr by the committed seq (old_seq), so every post-abort write is silently lost. The old code acknowledged this with a "RESTART recommended" log; a server that is not restarted loses data. CodeRabbit flagged this on PR #136 review 4420584384. Fix — barrier-before-resume. PerShardRewriteCoord now publishes the committed generation exactly once from the terminal shard_done (new_seq on success, old_seq on abort/commit-failure) via a parking_lot Mutex<Option<u64>> + Condvar. After phase 7 each folded writer blocks in await_outcome and, if the committed seq != new_seq, reopens its append file onto the committed seq's incr (shard_incr_path_seq) — so it resumes appending into the authoritative generation with no restart and no loss. On the happy path committed == new_seq and the barrier is a no-op beyond unblocking. The barrier turns "every shard calls shard_done exactly once" into a LIVENESS requirement: a writer that decrements then blocks hangs forever if another shard never decrements. The one path that skips shard_done is a panic mid-fold (save_snapshot_to_bytes OOM-unwinding — issue #138), which under the default panic=unwind would now hang every healthy shard's AOF writer thread. Closed with a ShardDoneGuard: the fold creates it on entry and calls complete() on success (clean shard_done); on any ?-error or panic-unwind its Drop fires mark_failed + shard_done, so the countdown always closes, the terminal writer always publishes, and every waiter wakes and rolls back. The guard now owns the single decrement for ALL exits, so both writer-task error arms (monoio + tokio) no longer decrement after do_rewrite_per_shard (was a double-decrement risk). This incidentally closes the #138 panic-wedge. Each per-shard writer runs on its own dedicated OS thread (main.rs spawns aof-writer-{sid} → block_on_local), so blocking one at the barrier never starves the thread that must publish — no deadlock on either runtime. Validated in OrbStack VM (cgroup-capped), red/green TDD: - rewrite_abort_reopens_writer_onto_committed_old_generation: drives one shard's real fold with a second shard pre-failed; asserts the new-gen incr is pruned AND post-abort appends through *file land in the committed old-gen incr (RED before phase 8 — writes went to the pruned inode; GREEN after). - rewrite_abort_wakes_waiter_cross_thread: real condvar wait/notify across threads observes old_seq. - rewrite_fold_panic_releases_barrier_for_other_writers: a ShardDoneGuard dropped during catch_unwind releases a blocked waiter with old_seq (hangs forever without the guard). No regression: per-shard AOF SIGKILL recovery (crash_matrix_per_shard_aof, 3) and per-shard BGREWRITEAOF crash recovery (crash_matrix_per_shard_bgrewriteaof, 2) stay green. fmt + clippy clean under both default (monoio) and tokio,jemalloc. author: Tin Dang <tin.dang@trustifytechnology.com>
During a BGREWRITEAOF drain, both drain_pending_appends (legacy/TopLevel, default --shards 1 path) and drain_pending_appends_framed (per-shard) wrote the appended bytes and immediately acked AofAck::Synced — BEFORE the post-drain boundary sync_data(). Under appendfsync=always a client received +OK (durable) for an entry that a crash in the ack→fsync window would lose. CodeRabbit flagged this on PR #136 review 4420584384 (issue #140); the always-fsync durability contract requires the bytes to be on disk before the client is told Synced. Fix: park the AppendSync ack senders in DrainOutcome.pending_acks instead of acking inside the drain. New sync_and_fulfill_drain() performs the boundary fsync and only then resolves the parked acks — Synced on success, or FsyncFailed plus a propagated IO error on failure (never a false Synced). All six drain sites across do_rewrite_per_shard, do_rewrite_single and do_rewrite_sharded route through it. A write error inside the drain still drops the parked acks with the outcome (RecvError → hard failure), unchanged. Validated in OrbStack VM (cgroup-capped), red/green TDD: - drain_framed_parks_appendsync_ack_until_boundary_fsync: asserts the drain PARKS the ack (pending_acks.len()==1, not sent) and that it resolves Synced only after sync_data() (RED before: no pending_acks field / acked in drain). - drain_single_fulfills_fsync_failure_as_fsync_failed: the DEFAULT --shards 1 non-framed path must resolve FsyncFailed (never Synced) when the boundary fsync fails. No regression: full aof lib suite (66) green; bgrewriteaof TTL emission tests green; per-shard AOF SIGKILL + BGREWRITEAOF crash-recovery suites green. fmt + clippy clean under both default (monoio) and tokio,jemalloc. Note: tests/aof_fsync_err_subscribe_ordering single-shard variant (an #[ignore]d normal-append-path test, multi-shard twin passes) fails identically on HEAD without this change — pre-existing and unrelated to the rewrite-drain path touched here. author: Tin Dang <tin.dang@trustifytechnology.com>
…00-line cap src/persistence/aof_manifest.rs had grown to 3058 lines — past the 1500-line cap in CLAUDE.md. CodeRabbit flagged this on PR #136 review 4420584384 (issue #143). Converted the file into a directory module and relocated cohesive chunks, preserving every public path via re-export. Layout (all three files now under the cap): - aof_manifest/mod.rs (1395): AofManifest state + manifest-level I/O (paths, initialize/load, parse_v1/v2, cleanup_orphans, write_manifest, advance, migrate, verify, global_max_lsn) + manifest-core unit tests. - aof_manifest/shard_replay.rs (1159): the multi-part / per-shard replay free functions (replay_multi_part, replay_incr_resp, replay_incr_framed, replay_per_shard, replay_ordered_merge) + OrderedEntry + the replay unit tests. - aof_manifest/shard_rewrite.rs (577): the per-shard rewrite inherent methods (initialize_multi, try_initialize_multi, advance_shard, prune_shard_files) as `impl AofManifest` + the shard-rewrite unit tests. Mechanics: - `use super::*` in each submodule pulls the parent's types, std/tracing imports, consts, and private helpers down (a child module sees the parent's private items), so no import duplication beyond the test modules. - Relocated public replay API is re-exported from mod.rs (`pub use shard_replay::{OrderedEntry, replay_multi_part, replay_ordered_merge, replay_per_shard}`) so external callers (`crate::persistence::aof_manifest::replay_*`) keep resolving unchanged. - Per CLAUDE.md the cap and the "tests stay in mod.rs" convention can't both hold for a 3058-line file (tests alone are ~1100 lines); tests were moved next to the code they cover, each submodule getting its own temp_dir() copy with a distinct directory prefix (replay-/rewrite-) so the independent static counters never collide. No behaviour change. Validated in OrbStack VM: 30 aof_manifest unit tests green; per-shard AOF SIGKILL recovery (3) and per-shard BGREWRITEAOF crash recovery (2) green (exercise the relocated replay_per_shard / advance_shard / initialize_multi via recovery). fmt + clippy clean under both default (monoio) and tokio,jemalloc. author: Tin Dang <tin.dang@trustifytechnology.com>
Qodo reviews are paused for this user.Troubleshooting steps vary by plan Learn more → On a Teams plan? Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center? |
|
Warning Review limit reached
More reviews will be available in 13 minutes and 18 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (2)
📝 WalkthroughWalkthroughThis PR adds a per-shard AOF manifest (v1/v2) and per-shard replay, introduces a committed-generation barrier and panic-safe shard-done guard for BGREWRITEAOF coordination with deferred ack fulfillment across fsync boundaries, prevents spill-thread completion loss by blocking sends and returning final completions on shutdown, and preserves disk-offload state across migrated connections and startup recovery. ChangesPer-Shard Persistence and Durability
Sequence Diagram(s)sequenceDiagram
participant Client
participant AofWriterPool
participant PerShardWriter
participant PerShardRewriteCoord
participant Filesystem
Client->>AofWriterPool: send RewritePerShard(coord)
AofWriterPool->>PerShardWriter: RewritePerShard(coord)
PerShardWriter->>PerShardRewriteCoord: await_outcome()
PerShardRewriteCoord->>Filesystem: publish outcome (new_seq / old_seq)
PerShardRewriteCoord-->>PerShardWriter: wake with committed seq
PerShardWriter->>Filesystem: reopen append file on committed generation
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
Possibly related PRs
Suggested labels
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/shard/conn_accept.rs (1)
277-320:⚠️ Potential issue | 🟠 Major | ⚡ Quick winGate
spawn_migrated_tokio_connectionbehind#[cfg(unix)](or add a non-Unix stub)In
src/shard/conn_accept.rs,spawn_migrated_tokio_connectionis only#[cfg(feature = "runtime-tokio")], but its signature/body use Unix-only std APIs (std::os::unix::io::RawFd,FromRawFd, andstd::net::TcpStream::from_raw_fd).src/shard/event_loop.rscalls it under the sameruntime-tokio-only guard, with nounixgating—so enablingruntime-tokioon non-Unix targets will not compile. Add#[cfg(unix)]around the function (and its call sites) or provide a non-Unix stub.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/shard/conn_accept.rs` around lines 277 - 320, The function spawn_migrated_tokio_connection uses Unix-only APIs but is only feature-gated by runtime-tokio; gate it for Unix targets (e.g. add #[cfg(all(feature = "runtime-tokio", unix))] to the function) or add a non-Unix stub with the same signature (behind #[cfg(all(feature = "runtime-tokio", not(unix)))] ) that returns early/panics with a clear message; also apply the same cfg change to every call site (e.g. in src/shard/event_loop.rs) so calls are only compiled on the matching cfg or call the stub, ensuring consistent signatures across targets.
🧹 Nitpick comments (4)
src/persistence/aof_manifest/shard_rewrite.rs (1)
73-106: 💤 Low valueConsider cleaning up incr files during rollback.
The rollback logic removes base RDB files for successfully-created shards, but doesn't remove the corresponding incr files created at line 87. While this is not a correctness issue (orphan cleanup will remove them on next load), it would be more complete to also remove incr files during rollback.
♻️ Optional: also remove incr files during rollback
if let Err(e) = loop_result { // Rollback: remove base RDB files for all successfully-created shards. for sid in created_shards { let base = manifest.shard_base_path(sid); if let Err(re) = std::fs::remove_file(&base) { warn!( "initialize_multi rollback: failed to remove {}: {}", base.display(), re ); } + let incr = manifest.shard_incr_path(sid); + if let Err(re) = std::fs::remove_file(&incr) { + warn!( + "initialize_multi rollback: failed to remove {}: {}", + incr.display(), + re + ); + } } return Err(e); }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/persistence/aof_manifest/shard_rewrite.rs` around lines 73 - 106, The rollback after loop_result currently only removes base RDB files for entries in created_shards; extend that rollback to also remove the corresponding incr files (use manifest.shard_incr_path(sid)) inside the same for sid in created_shards loop or a follow-up loop, attempting std::fs::remove_file and emitting the same warn! message on failure (include incr path.display() and the error) so orphan incr files are cleaned up on rollback.src/persistence/aof_manifest/shard_replay.rs (3)
236-243: ⚡ Quick winReplace
expect()with infallible conversion pattern.The
expect()calls at lines 240 and 243 are technically unreachable (the bounds check at line 228 guarantees the slices are valid), but the coding guidelines discourageexpect()in library code outside tests. The conversion from a length-matched slice to a fixed-size array is actually infallible.♻️ Use infallible array conversion
- #[allow(clippy::unwrap_used)] // bounds-checked above; try_into is statically length-matched - let raw_lsn = u64::from_le_bytes(data[offset..offset + 8].try_into().expect("8 bytes")); - #[allow(clippy::unwrap_used)] // same bounds-check guarantee - let len = - u32::from_le_bytes(data[offset + 8..offset + 12].try_into().expect("4 bytes")) as usize; + // SAFETY: bounds-checked at line 228; slice lengths match array sizes exactly. + let raw_lsn = u64::from_le_bytes( + <[u8; 8]>::try_from(&data[offset..offset + 8]) + .unwrap_or_else(|_| unreachable!()), + ); + let len = u32::from_le_bytes( + <[u8; 4]>::try_from(&data[offset + 8..offset + 12]) + .unwrap_or_else(|_| unreachable!()), + ) as usize;Alternatively, since the conversion is statically guaranteed, you could use the more idiomatic pattern with direct slice indexing and array construction:
- #[allow(clippy::unwrap_used)] // bounds-checked above; try_into is statically length-matched - let raw_lsn = u64::from_le_bytes(data[offset..offset + 8].try_into().expect("8 bytes")); - #[allow(clippy::unwrap_used)] // same bounds-check guarantee - let len = - u32::from_le_bytes(data[offset + 8..offset + 12].try_into().expect("4 bytes")) as usize; + // SAFETY: bounds-checked at line 228; slice lengths match array sizes exactly. + let lsn_bytes: [u8; 8] = data[offset..offset + 8] + .try_into() + .unwrap_or([0; 8]); // unreachable fallback + let len_bytes: [u8; 4] = data[offset + 8..offset + 12] + .try_into() + .unwrap_or([0; 4]); // unreachable fallback + let raw_lsn = u64::from_le_bytes(lsn_bytes); + let len = u32::from_le_bytes(len_bytes) as usize;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/persistence/aof_manifest/shard_replay.rs` around lines 236 - 243, Replace the infallible slice-to-array conversions that currently call expect() for raw_lsn and len with explicit fixed-size buffers and copy_from_slice to avoid panicking; e.g. create a [0u8;8] buffer, call copy_from_slice(&data[offset..offset+8]) and then u64::from_le_bytes(buf) for raw_lsn, and similarly a [0u8;4] buffer for len and u32::from_le_bytes(buf) as usize — apply these changes where raw_lsn and len are computed in shard_replay.rs.
577-607: 💤 Low valueTorn commit detection heuristic may have false positives for heterogeneous batches.
The comment at lines 586-591 notes that this is heuristic and assumes uniform cardinality per LSN. If a future emitter produces legitimately heterogeneous batches (e.g., single-shard ordered ops mixed with multi-shard ones), this would incorrectly drop valid entries.
The current implementation is correct for the documented use case (production emitters guarantee uniform cardinality), and the note at lines 588-591 explicitly acknowledges this limitation. No change needed, but consider adding a debug_assert or invariant check when future cross-shard TXN work lands.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/persistence/aof_manifest/shard_replay.rs` around lines 577 - 607, Add a debug-only invariant/assertion around the per-LSN cardinality check to catch heterogeneous batches during development: after computing counts (the BTreeMap named counts built from entries) and max_count, add a debug_assert (or a conditional check gated by cfg(debug_assertions) or a feature flag) that validates all LSNs either have count == max_count or clearly belong to a documented single-shard op type; keep the existing torn_lsns logic for runtime safety but emit a debug-level summary (or panic in debug mode) when the invariant is violated to surface false-positive scenarios early for the OrderedAcrossShards/torn commit detection code path.
510-523: 💤 Low valueThread join panic handling uses non-informative error message.
When a shard thread panics, the error message is just "replay_per_shard worker thread panicked" without any indication of which shard panicked. For debugging, it would be helpful to include the shard index.
♻️ Include shard index in panic error message
// Collect results in shard order. handles .into_iter() - .map(|h| { + .enumerate() + .map(|(idx, h)| { h.join().unwrap_or_else(|_| { Err(crate::error::MoonError::from( crate::error::AofError::RewriteFailed { - detail: "replay_per_shard worker thread panicked".to_owned(), + detail: format!("replay_per_shard worker thread for shard {} panicked", idx), }, )) }) }) .collect()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/persistence/aof_manifest/shard_replay.rs` around lines 510 - 523, The panic handler for shard worker threads is not reporting which shard panicked; update the code that builds handles so you store the shard index with each handle (e.g., Vec<(usize, JoinHandle<...>)>) and then change this collection step to iterate with that index: replace handles.into_iter().map(|h| { ... }) with handles.into_iter().map(|(shard_idx, h)| h.join().unwrap_or_else(|_| Err(crate::error::MoonError::from(crate::error::AofError::RewriteFailed { detail: format!("replay_per_shard worker thread panicked for shard {}", shard_idx) })))) so the panic message includes the shard index; refer to the replay_per_shard worker creation site and the handles variable to add the index pairing.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/storage/tiered/spill_thread.rs`:
- Around line 429-476: The stall in send_one_completion can block the spill
thread and let eviction.rs drop SpillRequest (via try_send) after the key is
removed from RAM, causing permanent data loss; fix by making the eviction path
provide end-to-end backpressure: in the code that enqueues SpillRequest (the
eviction path in eviction.rs that currently uses try_send), switch to a
blocking/awaiting send or loop until the request queue accepts the SpillRequest
(respecting stop_flag) so eviction does not remove the in-memory key until the
request is enqueued, or alternatively check the completion/request queue
capacity before evicting and postpone/remap the eviction if the queue is full;
keep references to send_one_completion, SpillRequest, and request_rx when
locating the changes.
---
Outside diff comments:
In `@src/shard/conn_accept.rs`:
- Around line 277-320: The function spawn_migrated_tokio_connection uses
Unix-only APIs but is only feature-gated by runtime-tokio; gate it for Unix
targets (e.g. add #[cfg(all(feature = "runtime-tokio", unix))] to the function)
or add a non-Unix stub with the same signature (behind #[cfg(all(feature =
"runtime-tokio", not(unix)))] ) that returns early/panics with a clear message;
also apply the same cfg change to every call site (e.g. in
src/shard/event_loop.rs) so calls are only compiled on the matching cfg or call
the stub, ensuring consistent signatures across targets.
---
Nitpick comments:
In `@src/persistence/aof_manifest/shard_replay.rs`:
- Around line 236-243: Replace the infallible slice-to-array conversions that
currently call expect() for raw_lsn and len with explicit fixed-size buffers and
copy_from_slice to avoid panicking; e.g. create a [0u8;8] buffer, call
copy_from_slice(&data[offset..offset+8]) and then u64::from_le_bytes(buf) for
raw_lsn, and similarly a [0u8;4] buffer for len and u32::from_le_bytes(buf) as
usize — apply these changes where raw_lsn and len are computed in
shard_replay.rs.
- Around line 577-607: Add a debug-only invariant/assertion around the per-LSN
cardinality check to catch heterogeneous batches during development: after
computing counts (the BTreeMap named counts built from entries) and max_count,
add a debug_assert (or a conditional check gated by cfg(debug_assertions) or a
feature flag) that validates all LSNs either have count == max_count or clearly
belong to a documented single-shard op type; keep the existing torn_lsns logic
for runtime safety but emit a debug-level summary (or panic in debug mode) when
the invariant is violated to surface false-positive scenarios early for the
OrderedAcrossShards/torn commit detection code path.
- Around line 510-523: The panic handler for shard worker threads is not
reporting which shard panicked; update the code that builds handles so you store
the shard index with each handle (e.g., Vec<(usize, JoinHandle<...>)>) and then
change this collection step to iterate with that index: replace
handles.into_iter().map(|h| { ... }) with handles.into_iter().map(|(shard_idx,
h)| h.join().unwrap_or_else(|_|
Err(crate::error::MoonError::from(crate::error::AofError::RewriteFailed {
detail: format!("replay_per_shard worker thread panicked for shard {}",
shard_idx) })))) so the panic message includes the shard index; refer to the
replay_per_shard worker creation site and the handles variable to add the index
pairing.
In `@src/persistence/aof_manifest/shard_rewrite.rs`:
- Around line 73-106: The rollback after loop_result currently only removes base
RDB files for entries in created_shards; extend that rollback to also remove the
corresponding incr files (use manifest.shard_incr_path(sid)) inside the same for
sid in created_shards loop or a follow-up loop, attempting std::fs::remove_file
and emitting the same warn! message on failure (include incr path.display() and
the error) so orphan incr files are cleaned up on rollback.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2ecc15a2-494e-452d-b4a2-5ba73017efe4
📒 Files selected for processing (10)
src/persistence/aof.rssrc/persistence/aof_manifest.rssrc/persistence/aof_manifest/mod.rssrc/persistence/aof_manifest/shard_replay.rssrc/persistence/aof_manifest/shard_rewrite.rssrc/server/embedded.rssrc/shard/conn_accept.rssrc/shard/event_loop.rssrc/shard/persistence_tick.rssrc/storage/tiered/spill_thread.rs
… evict path CodeRabbit's PR #144 review raised a 🟠 Major data-loss finding against the async-spill eviction path (inline on spill_thread.rs:476, citing eviction.rs:230-239): it claimed the eviction code removes a key from the hot tier BEFORE sending the SpillRequest, so a full/disconnected spill channel drops the request and loses data. Verified against the live code: the finding is STALE, not a real defect. The production path `evict_one_async_spill` (eviction.rs:520-526) is fail-safe send-before-remove: if sender.try_send(req).is_err() { return false; } // key stays in RAM db.remove(key.as_bytes()); // only after success On a full/disconnected channel the send fails, the entry stays resident, and the eviction loop bails to retry next tick — no acknowledged write is lost. The worst case under sustained backpressure is keys remaining resident (eventually an OOM write-rejection once at budget), which is correct fail-safe behaviour. All live callers (handler_monoio, handler_sharded, persistence_tick) route through this path. What actually misled the reviewer were two stale artefacts, both fixed here — no behaviour change to the live path: 1. A stale doc comment that had drifted off `try_evict_if_needed_async_spill` and silently merged into `next_spill_file_id_seed`'s doc (missing blank line). It still described the OLD "remove from RAM, then best-effort send, drop request if channel full" ordering. Replaced with an accurate doc on the real function describing the fail-safe send-before-remove contract. 2. Dead code `try_evict_deferred` + its private helper `find_victim_for_policy` (zero callers in src/, tests/, benches/, examples/ — verified by repo-root grep and `cargo check --all-targets` on both feature sets). This was the only code in the tree that actually embodied the dangerous remove-first pattern the reviewer feared (db.remove before any send). Removed so the live tree has exactly one eviction-spill path, and it is the fail-safe one. Validated: clippy -D warnings clean on default (monoio) and tokio,jemalloc; 15 storage::eviction unit tests green; cargo check --all-targets green on both feature sets; fmt clean. The review's second 🟠 Major (conn_accept.rs:277-320, gate spawn_migrated_tokio_connection behind cfg(unix)) is skipped: moon targets only Linux + macOS (both Unix), the function compiles and clippy-passes under runtime-tokio on every supported target, and the gap only affects unsupported Windows builds — gating it would touch the event_loop.rs call site for zero supported-platform benefit. author: Tin Dang <tin.dang@trustifytechnology.com>
|
Re: the outside-diff 🟠 Major on Skipping with reason. The observation is factually correct (the fn uses The gap only manifests on unsupported Windows builds, and adding the gate would require matching |
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/storage/eviction.rs (1)
272-285:⚠️ Potential issue | 🟠 Major | 🏗️ Heavy liftHandle
read_diriteration errors as hard failures for spill seeding.Using
entries.flatten()silently dropsReadDirerrors. A partial scan can miss the highestheap-*.mpfid and return a seed that reuses an existing filename, risking cold-tier overwrite after restart. Please make incomplete scans fail the spill-seed path (e.g., returnResult<u64, io::Error>and propagate), instead of continuing silently.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/storage/eviction.rs` around lines 272 - 285, The directory scan currently uses entries.flatten() which silently ignores ReadDir errors and can produce an incorrect spill seed; change the scan in the spill-seeding code (the loop that iterates over entries, the max_id calculation and the match that returns m + 1 or 1) to propagate any IO errors instead of flattening: stop using flatten, iterate over entries and handle each Result<DirEntry, io::Error> by returning Err(e) on error, change the function signature to return Result<u64, io::Error> (or propagate the error to the caller) so failed ReadDir iterations become hard failures, and adjust call sites to handle the Result accordingly. Ensure the final seed logic still returns Ok(m + 1) or Ok(1) on success.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@src/storage/eviction.rs`:
- Around line 272-285: The directory scan currently uses entries.flatten() which
silently ignores ReadDir errors and can produce an incorrect spill seed; change
the scan in the spill-seeding code (the loop that iterates over entries, the
max_id calculation and the match that returns m + 1 or 1) to propagate any IO
errors instead of flattening: stop using flatten, iterate over entries and
handle each Result<DirEntry, io::Error> by returning Err(e) on error, change the
function signature to return Result<u64, io::Error> (or propagate the error to
the caller) so failed ReadDir iterations become hard failures, and adjust call
sites to handle the Result accordingly. Ensure the final seed logic still
returns Ok(m + 1) or Ok(1) on success.
CodeRabbit's PR #144 review (outside-diff 🟠 Major, conn_accept.rs:277-320) flagged that spawn_migrated_tokio_connection is gated only by #[cfg(feature = "runtime-tokio")] yet its signature takes std::os::unix::io::RawFd and reconstructs the socket via from_raw_fd — both Unix-only — so a runtime-tokio build on a non-Unix target would not compile. Make the platform coupling explicit and consistent: - conn_accept.rs: spawn_migrated_tokio_connection -> cfg(all(runtime-tokio, unix)) and its twin spawn_migrated_monoio_connection -> cfg(all(runtime-monoio, unix)). - event_loop.rs: the three migration-drain call sites gated to match (the two runtime-agnostic blocks' tokio/monoio wrappers, plus the monoio-only third site via #[cfg(unix)]). On every supported target (Linux + macOS, both Unix) `unix` is always true, so this is a no-op for behaviour and codegen — it only documents intent and stops a hypothetical non-Unix build from failing inside these functions. Verified: clippy -D warnings clean and cargo check --all-targets green on both default (monoio) and tokio,jemalloc; fmt clean. Scope note: this resolves the flagged surface but does NOT by itself make moon compile on a non-Unix target. The migration subsystem is Unix-coupled at the data-type level — MigrateConnectionPayload.fd (src/shard/dispatch.rs:295) is a RawFd, and the ShardMessage::MigrateConnection variant, pending_migrations Vec, and spsc handler all carry it ungated. Full non-Unix support would require cfg(unix)-gating that entire subsystem across dispatch.rs / spsc_handler.rs / handler_sharded — deliberately left out: moon targets only Linux + macOS, there is no Windows CI to verify against, and it touches hot-path dispatch for zero supported-platform benefit. author: Tin Dang <tin.dang@trustifytechnology.com>
CodeRabbit's PR #144 review raised a 🟠 Major claiming the async-spill eviction path removes a key from the hot tier BEFORE enqueuing its SpillRequest, so a full/disconnected spill channel would drop the request and lose data. The live code (evict_one_async_spill) is already fail-safe — it try_sends BEFORE db.remove, and a failed send leaves the key resident for retry — but that invariant had no direct regression guard. This adds one. async_spill_full_channel_keeps_victim_in_ram_no_data_loss: - pre-fills a bounded(1) spill channel so the eviction's try_send observes a FULL channel (the reviewer's exact backpressure scenario); - asserts try_evict_if_needed_async_spill surfaces OOM (no false success) AND the victim key is still resident (db.len()==1, key present) — no data loss. Red/green verified: GREEN on the live send-before-remove code; flipping the live ordering to remove-before-send locally turns it RED on the data-loss assertion (db.len() left:0, right:1), proving the test catches the regression. Live code restored byte-identical (test-only addition). No behaviour change. clippy -D warnings clean and 16 storage::eviction unit tests green on both default (monoio) and tokio,jemalloc; fmt clean. author: Tin Dang <tin.dang@trustifytechnology.com>
aof.rs had grown to 4379 lines — ~3x the project's 1500-line ceiling. Decompose
it into a directory module following the split convention (mod.rs + subfiles,
re-export via `pub use`, children pull parent privates via `use super::*`).
Pure code relocation: no logic, control flow, or signatures changed.
Layout (all under the cap):
- mod.rs (1075): module doc + imports + core types (AofAck, AckOutcome,
FsyncPolicy, AofMessage, PerShardRewriteCoord,
ShardDoneGuard, AofPoolSendError, consts) + the codec
(serialize_command, replay_aof) + codec/fsync tests.
The codec stays in the parent so children reach it via
`use super::*` with no sibling import (CLAUDE.md: mod.rs
"holds shared helpers + tests").
- pool.rs (1379): AofWriterPool + impl + pool_tests.
- writer_task.rs (1014): aof_writer_task, per_shard_aof_writer_task,
TEST_FAIL_WRITE_AT.
- rewrite.rs (941): generate_rewrite_commands .. rewrite_aof, drain/
do_rewrite paths + rewrite tests.
Cross-module wiring:
- Re-export only the moved public items from mod.rs (AofWriterPool,
generate_rewrite_commands, rewrite_aof, aof_writer_task,
per_shard_aof_writer_task) so external `persistence::aof::X` paths are
unchanged. serialize_command/replay_aof keep their paths (still in mod.rs).
- Widen to pub(crate) the rewrite items called across the new boundary by
pool_tests / writer_task: do_rewrite_per_shard/single/sharded,
drain_pending_appends{,_framed}, rewrite_aof_sharded_sync, and DrainOutcome
(struct + drained/pending_acks fields + fulfill_acks — pool_tests reads them).
- cfg-gate the imports and re-exports that are runtime-specific: rewrite_aof and
TEST_FAIL_WRITE_AT (tokio-only), do_rewrite_single/sharded and
drain_pending_appends (monoio-only). Matches each item's existing cfg so both
runtimes compile.
Verification (both runtimes, code-move so behaviour is unchanged):
- cargo check --all-targets: clean on default (monoio) and tokio,jemalloc.
- cargo clippy -- -D warnings: clean on both (CI-exact, no new lints in aof/).
- cargo fmt --check: clean.
- Unit tests: persistence::aof 66 pass (monoio) / 69 pass (tokio); 40 #[test]
fns preserved exactly (no test dropped or duplicated in redistribution).
- Integration (OrbStack moon-dev VM): crash_matrix_per_shard_bgrewriteaof 4 pass
(do_rewrite_per_shard + replay_aof end-to-end), aof_fsync_err_subscribe_ordering
3 pass. No failures.
author: Tin Dang <tin.dang@trustifytechnology.com>
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/persistence/aof/mod.rs`:
- Around line 543-557: The bug is that malformed top-level frames increment
count even though engine.replay_command is never called; remove the count += 1
from the early continue branches and instead increment count only after a
command has been successfully processed (e.g., after engine.replay_command
returns success). Update the match arms around Frame::Array handling so the
branches that skip processing do not mutate count, and ensure count is
incremented immediately after the successful replay path (reference symbols:
count, frame, Frame::Array, engine.replay_command).
- Around line 290-295: In await_outcome(), remove the final expect() and instead
extract the value using pattern matching: convert the current
while-wait-plus-expect into a loop that waits while self.outcome is None (using
self.outcome_cv.wait) and then uses if let Some(v) = self.outcome.lock().take()
{ return v } (or match) to return the u64; reference the symbols await_outcome,
self.outcome and self.outcome_cv and ensure you call take() (or match on the
Option) to consume and return the value without using expect/unwrap.
In `@src/persistence/aof/pool.rs`:
- Around line 238-242: The fire-and-forget helper try_send_append currently
swallows TrySendError and must record dropped appends like
try_send_append_ordered does: detect the TrySendError returned from
self.sender(shard_id).try_send(AofMessage::Append { lsn, bytes }) and on failure
increment the AOF_BACKPRESSURE_DROPPED metric (and/or use the same counter used
by try_send_append_ordered) and emit a concise log mentioning the shard_id and
lsn; mirror the exact error-handling/metrics/logging logic used in
try_send_append_ordered so both unordered and ordered paths count and report
dropped Append messages consistently.
In `@src/persistence/aof/rewrite.rs`:
- Around line 225-235: The current rewrite emits a relative PEXPIRE using
remaining_ms (created in the block where entry.has_expiry() and exp_ms =
entry.expires_at_ms(base_ts)), which extends TTLs after downtime; instead emit
an absolute-time PEXPIREAT using the absolute exp_ms value. Change the Frame
creation that currently builds Frame::BulkString(b"PEXPIRE") and
remaining_ms.to_string() to build Frame::BulkString(b"PEXPIREAT") and use
exp_ms.to_string() (keep the existing check exp_ms > now_ms and skip expired
keys), updating the serialize call accordingly so plain keys use absolute expiry
like the HashWithTtl branch.
- Around line 838-875: The AOF rewrite helpers (e.g., rewrite_aof_sync in
src/persistence/aof/rewrite.rs) snapshot only under read locks, so messages
still queued on the writer task channel (rx) can get serialized into the new
base and then appended again — causing double-apply; fix by ensuring the writer
task drains or stops accepting/flushes rx before invoking the rewrite helpers
(or alternately acquire the writer-side exclusive lock that prevents enqueueing
during snapshot), i.e., in writer_task.rs before calling rewrite_aof_sync (and
the counterpart used at 883-923), drain the rx channel (or atomically take and
hold pending messages) so no pending writes remain queued when the snapshot is
taken.
In `@src/persistence/aof/writer_task.rs`:
- Around line 39-43: The EverySec flush loop in aof_writer_task currently uses
interval.tick().await which can be starved by a ready receive arm; replace the
interval/tick usage with an explicit per-iteration sleep tied to last_fsync
(e.g. compute next_deadline = last_fsync + Duration::from_secs(1) and use
tokio::time::sleep_until(next_deadline.into()).await or tokio::time::sleep for
the remaining duration) and await that sleep inside the select so the timeout
cannot be permanently starved by other ready branches; update the code paths
that reference interval.tick() (including the initial tick consume) to the new
deadline-based sleep logic so the EverySec appendfsync window is honored.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 3354151b-da97-4e7a-90b5-daf9fc0ce570
📒 Files selected for processing (8)
src/persistence/aof.rssrc/persistence/aof/mod.rssrc/persistence/aof/pool.rssrc/persistence/aof/rewrite.rssrc/persistence/aof/writer_task.rssrc/shard/conn_accept.rssrc/shard/event_loop.rssrc/storage/eviction.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/storage/eviction.rs
- src/shard/conn_accept.rs
| fn await_outcome(&self) -> u64 { | ||
| let mut slot = self.outcome.lock(); | ||
| while slot.is_none() { | ||
| self.outcome_cv.wait(&mut slot); | ||
| } | ||
| slot.expect("outcome is Some after the wait loop") |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Remove the expect() from await_outcome.
The wait loop already establishes the Some invariant, so this panic is avoidable and violates the library-code expect() ban.
Proposed fix
fn await_outcome(&self) -> u64 {
let mut slot = self.outcome.lock();
- while slot.is_none() {
- self.outcome_cv.wait(&mut slot);
- }
- slot.expect("outcome is Some after the wait loop")
+ loop {
+ if let Some(committed_seq) = *slot {
+ return committed_seq;
+ }
+ self.outcome_cv.wait(&mut slot);
+ }
}As per coding guidelines, "No unwrap() or expect() in library code outside tests. Use pattern matching, if let, or let-else."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fn await_outcome(&self) -> u64 { | |
| let mut slot = self.outcome.lock(); | |
| while slot.is_none() { | |
| self.outcome_cv.wait(&mut slot); | |
| } | |
| slot.expect("outcome is Some after the wait loop") | |
| fn await_outcome(&self) -> u64 { | |
| let mut slot = self.outcome.lock(); | |
| loop { | |
| if let Some(committed_seq) = *slot { | |
| return committed_seq; | |
| } | |
| self.outcome_cv.wait(&mut slot); | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/mod.rs` around lines 290 - 295, In await_outcome(),
remove the final expect() and instead extract the value using pattern matching:
convert the current while-wait-plus-expect into a loop that waits while
self.outcome is None (using self.outcome_cv.wait) and then uses if let Some(v) =
self.outcome.lock().take() { return v } (or match) to return the u64; reference
the symbols await_outcome, self.outcome and self.outcome_cv and ensure you call
take() (or match on the Option) to consume and return the value without using
expect/unwrap.
| let (cmd, cmd_args) = match &frame { | ||
| Frame::Array(arr) if !arr.is_empty() => { | ||
| let name = match &arr[0] { | ||
| Frame::BulkString(s) => s.as_ref(), | ||
| Frame::SimpleString(s) => s.as_ref(), | ||
| _ => { | ||
| count += 1; | ||
| continue; | ||
| } | ||
| }; | ||
| (name as &[u8], &arr[1..]) | ||
| } | ||
| _ => { | ||
| count += 1; | ||
| continue; |
There was a problem hiding this comment.
Only count frames that were actually replayed.
These continue branches increment count even though engine.replay_command never runs, so malformed top-level frames are reported as successfully recovered.
Proposed fix
Frame::Array(arr) if !arr.is_empty() => {
let name = match &arr[0] {
Frame::BulkString(s) => s.as_ref(),
Frame::SimpleString(s) => s.as_ref(),
- _ => {
- count += 1;
- continue;
- }
+ _ => continue,
};
(name as &[u8], &arr[1..])
}
- _ => {
- count += 1;
- continue;
- }
+ _ => continue,
};
engine.replay_command(databases, cmd, cmd_args, &mut selected_db);
count += 1;📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| let (cmd, cmd_args) = match &frame { | |
| Frame::Array(arr) if !arr.is_empty() => { | |
| let name = match &arr[0] { | |
| Frame::BulkString(s) => s.as_ref(), | |
| Frame::SimpleString(s) => s.as_ref(), | |
| _ => { | |
| count += 1; | |
| continue; | |
| } | |
| }; | |
| (name as &[u8], &arr[1..]) | |
| } | |
| _ => { | |
| count += 1; | |
| continue; | |
| let (cmd, cmd_args) = match &frame { | |
| Frame::Array(arr) if !arr.is_empty() => { | |
| let name = match &arr[0] { | |
| Frame::BulkString(s) => s.as_ref(), | |
| Frame::SimpleString(s) => s.as_ref(), | |
| _ => continue, | |
| }; | |
| (name as &[u8], &arr[1..]) | |
| } | |
| _ => continue, | |
| }; | |
| engine.replay_command(databases, cmd, cmd_args, &mut selected_db); | |
| count += 1; |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/mod.rs` around lines 543 - 557, The bug is that malformed
top-level frames increment count even though engine.replay_command is never
called; remove the count += 1 from the early continue branches and instead
increment count only after a command has been successfully processed (e.g.,
after engine.replay_command returns success). Update the match arms around
Frame::Array handling so the branches that skip processing do not mutate count,
and ensure count is incremented immediately after the successful replay path
(reference symbols: count, frame, Frame::Array, engine.replay_command).
| pub fn try_send_append(&self, shard_id: usize, lsn: u64, bytes: Bytes) { | ||
| let _ = self | ||
| .sender(shard_id) | ||
| .try_send(AofMessage::Append { lsn, bytes }); | ||
| } |
There was a problem hiding this comment.
Count dropped fire-and-forget appends too.
Both append helpers discard TrySendError completely. Under appendfsync=everysec / no, a full writer channel now loses AOF entries without incrementing AOF_BACKPRESSURE_DROPPED or logging anything, even though this module documents dropped Append traffic as observable.
Proposed fix
pub fn try_send_append(&self, shard_id: usize, lsn: u64, bytes: Bytes) {
- let _ = self
- .sender(shard_id)
- .try_send(AofMessage::Append { lsn, bytes });
+ match self.sender(shard_id).try_send(AofMessage::Append { lsn, bytes }) {
+ Ok(()) => {}
+ Err(flume::TrySendError::Full(_)) => {
+ AOF_BACKPRESSURE_DROPPED.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
+ warn!("AOF writer channel full (shard {}): Append dropped", shard_id);
+ }
+ Err(flume::TrySendError::Disconnected(_)) => {
+ warn!("AOF writer channel disconnected (shard {}): Append dropped", shard_id);
+ }
+ }
}Apply the same handling to try_send_append_ordered.
Also applies to: 332-343
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/pool.rs` around lines 238 - 242, The fire-and-forget
helper try_send_append currently swallows TrySendError and must record dropped
appends like try_send_append_ordered does: detect the TrySendError returned from
self.sender(shard_id).try_send(AofMessage::Append { lsn, bytes }) and on failure
increment the AOF_BACKPRESSURE_DROPPED metric (and/or use the same counter used
by try_send_append_ordered) and emit a concise log mentioning the shard_id and
lsn; mirror the exact error-handling/metrics/logging logic used in
try_send_append_ordered so both unordered and ordered paths count and report
dropped Append messages consistently.
| // Generate PEXPIRE for keys with TTL | ||
| if entry.has_expiry() { | ||
| let exp_ms = entry.expires_at_ms(base_ts); | ||
| if exp_ms > now_ms { | ||
| let remaining_ms = exp_ms - now_ms; | ||
| let pexpire_frame = Frame::Array(framevec![ | ||
| Frame::BulkString(Bytes::from_static(b"PEXPIRE")), | ||
| Frame::BulkString(key.to_bytes()), | ||
| Frame::BulkString(Bytes::from(remaining_ms.to_string())), | ||
| ]); | ||
| serialize::serialize(&pexpire_frame, &mut buf); |
There was a problem hiding this comment.
Use absolute expiry when rewriting TTLs.
PEXPIRE stores a relative TTL from rewrite time, so a restart after downtime extends every rewritten key by the downtime interval. The HashWithTtl branch already uses absolute-time semantics via HPEXPIREAT; plain keys need the same treatment.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/rewrite.rs` around lines 225 - 235, The current rewrite
emits a relative PEXPIRE using remaining_ms (created in the block where
entry.has_expiry() and exp_ms = entry.expires_at_ms(base_ts)), which extends
TTLs after downtime; instead emit an absolute-time PEXPIREAT using the absolute
exp_ms value. Change the Frame creation that currently builds
Frame::BulkString(b"PEXPIRE") and remaining_ms.to_string() to build
Frame::BulkString(b"PEXPIREAT") and use exp_ms.to_string() (keep the existing
check exp_ms > now_ms and skip expired keys), updating the serialize call
accordingly so plain keys use absolute expiry like the HashWithTtl branch.
| fn rewrite_aof_sync(db: &SharedDatabases, aof_path: &Path) -> Result<(), MoonError> { | ||
| // Snapshot under read locks, build temp Database objects for RDB serialization | ||
| let snapshot: Vec<Database> = db | ||
| .iter() | ||
| .map(|lock| { | ||
| let guard = lock.read(); | ||
| let mut temp = Database::new(); | ||
| let now_ms = current_time_ms(); | ||
| for (k, v) in guard.data().iter() { | ||
| if !v.is_expired_at(guard.base_timestamp(), now_ms) { | ||
| temp.set(k.to_bytes(), v.clone()); | ||
| } | ||
| } | ||
| temp | ||
| }) | ||
| .collect(); | ||
|
|
||
| let rdb_bytes = crate::persistence::rdb::save_to_bytes(&snapshot)?; | ||
|
|
||
| let tmp_path = aof_path.with_extension("aof.tmp"); | ||
| std::fs::write(&tmp_path, &rdb_bytes).map_err(|e| AofError::Io { | ||
| path: tmp_path.clone(), | ||
| source: e, | ||
| })?; | ||
| std::fs::rename(&tmp_path, aof_path).map_err(|e| AofError::RewriteFailed { | ||
| detail: format!( | ||
| "rename {} -> {}: {}", | ||
| tmp_path.display(), | ||
| aof_path.display(), | ||
| e | ||
| ), | ||
| })?; | ||
|
|
||
| info!( | ||
| "AOF rewrite complete (RDB preamble): {} bytes", | ||
| rdb_bytes.len() | ||
| ); | ||
| Ok(()) |
There was a problem hiding this comment.
The tokio rewrite helpers can double-apply queued writes.
These helpers snapshot under read locks only. In src/persistence/aof/writer_task.rs Lines 315-345, the tokio writer flushes the old file and calls them without draining rx, so a write that updates the DB before the snapshot but is still queued in the channel is serialized into the new base and then appended again after reopen. That breaks recovery for non-idempotent commands.
Also applies to: 883-923
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/rewrite.rs` around lines 838 - 875, The AOF rewrite
helpers (e.g., rewrite_aof_sync in src/persistence/aof/rewrite.rs) snapshot only
under read locks, so messages still queued on the writer task channel (rx) can
get serialized into the new base and then appended again — causing double-apply;
fix by ensuring the writer task drains or stops accepting/flushes rx before
invoking the rewrite helpers (or alternately acquire the writer-side exclusive
lock that prevents enqueueing during snapshot), i.e., in writer_task.rs before
calling rewrite_aof_sync (and the counterpart used at 883-923), drain the rx
channel (or atomically take and hold pending messages) so no pending writes
remain queued when the snapshot is taken.
| let mut last_fsync = Instant::now(); | ||
| #[cfg(feature = "runtime-tokio")] | ||
| let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); | ||
| #[cfg(feature = "runtime-tokio")] | ||
| interval.tick().await; // consume first tick |
There was a problem hiding this comment.
Top-level tokio writer still has the starvable EverySec flush loop.
The per-shard tokio branch below already moved away from interval.tick() because an always-ready receive arm can starve it under sustained writes. aof_writer_task still uses the old pattern, so single-file tokio deployments can buffer writes well past one second and lose more than the configured appendfsync=everysec window on crash.
Also applies to: 372-378
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/persistence/aof/writer_task.rs` around lines 39 - 43, The EverySec flush
loop in aof_writer_task currently uses interval.tick().await which can be
starved by a ready receive arm; replace the interval/tick usage with an explicit
per-iteration sleep tied to last_fsync (e.g. compute next_deadline = last_fsync
+ Duration::from_secs(1) and use
tokio::time::sleep_until(next_deadline.into()).await or tokio::time::sleep for
the remaining duration) and await that sleep inside the select so the timeout
cannot be permanently starved by other ready branches; update the code paths
that reference interval.tick() (including the initial tick consume) to the new
deadline-based sleep logic so the EverySec appendfsync window is honored.
`command::vector_search::tests::test_vector_metrics_increment_decrement` flaked under the tokio full-suite run (2861/1, "FT.DROPINDEX should decrement VECTOR_INDEXES"), passing in isolation. Root cause is a test-isolation defect, not a production bug: `VECTOR_INDEXES` (src/vector/metrics.rs) is a process- global AtomicU64 shared by every test in the lib binary. 36 tests mutate it via ft_create/ft_dropindex; only 8 held `METRICS_LOCK` (a plain Mutex), which serialized those 8 against each other but did nothing about the ~28 lock-free mutators. The failing test reads the counter, drops an index, re-reads, and asserts `after_drop < before_drop` — a relative delta that a concurrent ft_create (+1) landing in the read-modify-read window cancels (N-1+1 = N). Only this assertion flaked because VECTOR_INDEXES is the one counter that moves both directions; the others assert `after > before` on monotonic-up counters. Fix — RwLock isolation (production code unchanged): - METRICS_LOCK: Mutex<()> -> RwLock<()>. - The 8 delta-reader tests take `.write()` (exclusive) — no mutator runs during their critical section. - The 28 mutator-only tests take `.read()` (shared) — they keep running in parallel with each other; they are excluded only during a reader's brief exclusive window. Zero added serialization between mutators. - Upgrade the reader assertions to exact deltas (+1 create, +1 search, -1 drop), now deterministic under the write guard — stronger than the old inequalities. Regression guard — metrics_write_guard_isolates_index_counter_from_concurrent_mutator: deterministic in BOTH directions. A spawned lock-respecting mutator blocks on `.read()` for a 20ms window while the test holds `.write()`, so the drop delta is isolated (GREEN). Flipping the test's `write()` to `read()` lets the mutator increment during the window and turns the assertion RED (verified: left:1 right:0 — the exact original failure mode). Verification: - Red/green proven on the regression test (write=GREEN, read=RED). - tokio full lib suite (the config that flaked): 2863 passed / 0 failed x2 rounds. - vector_search module stress: 20 rounds x 16 threads, 0 failures. - monoio metrics tests 11/0 (RwLock is runtime-agnostic). - CI-exact clippy -D warnings + fmt clean on both runtimes. author: Tin Dang <tin.dang@trustifytechnology.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/command/vector_search/tests.rs`:
- Around line 3-6: Replace the standard library lock with parking_lot: change
the import from std::sync::RwLock to parking_lot::RwLock and update the static
METRICS_LOCK declaration to use that type; then remove all .unwrap() calls when
acquiring guards (e.g., change .read().unwrap()/.write().unwrap() to .read()/
.write()) for METRICS_LOCK and any other RwLock usages in this test file so the
guards use parking_lot’s infallible API.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5b288f65-464e-4589-ad9f-16eac7623b6b
📒 Files selected for processing (1)
src/command/vector_search/tests.rs
…ion entry CI Lint gate requires a CHANGELOG.md entry (or the skip-changelog label) per PR. Document the PR #144 scope under [0.2.0-alpha] Unreleased: the 8 CodeRabbit PR #136 durability follow-ups, the two PR #144-review Majors (async-spill eviction regression test, cfg(unix) migration gate), the aof_manifest.rs and aof.rs decompositions, and the VECTOR_INDEXES RwLock test-isolation fix. author: Tin Dang <tin.dang@trustifytechnology.com>
…cade) Self-review of the VECTOR_INDEXES isolation fix surfaced two issues with the std::sync::RwLock guard: 1. CLAUDE.md mandates parking_lot locks over std::sync everywhere. 2. std::sync::RwLock poisons on panic. The isolation fix grew the lock-holder set from 8 to 36 tests, so a single failing assertion under the write guard would poison the lock and cascade PoisonError through all ~28 newly-added read-guard tests' .unwrap() calls — burying the real failing assertion under a wall of identical PoisonError panics. Switch METRICS_LOCK to parking_lot::RwLock (const new() works as a static, same as the existing GATE_TEST_LOCK in src/command/persistence.rs). parking_lot does not poison, so a failing test now reports only its own assertion. Its guards are infallible, so the 29 read + 10 write sites drop their `.unwrap()`. Write- exclusive / read-shared semantics are identical, so the deterministic regression test's red/green behaviour is unchanged. Verified: metrics tests 11/0 on both monoio and tokio; clippy -D warnings + fmt clean on both runtimes. author: Tin Dang <tin.dang@trustifytechnology.com>
Summary
Closes all 8 findings from the CodeRabbit review of #136 (review
4420584384) that the merged PR did not address — AOF / disk-offload durability bugs plus theaof_manifest.rsline-cap refactor. Each fix was validated with red/green TDD in the OrbStack Linux VM and is green on both runtimes.Verification (all green):
cargo test --libmonoio 3457 passed, tokio,jemalloc 2861 passed;cargo clippy -- -D warnings(default + tokio,jemalloc);cargo fmt --check. Per-shard AOF SIGKILL recovery + per-shard BGREWRITEAOF crash-recovery integration suites green.Fixes (issue -> commit)
SpillCompletion= permanent cold-tier data loss -> blockingsend_timeoutbackpressure + drain-on-shutdown6cac58fsuccess:falseon inline-batch failure -> per-entry fallback salvagedeae60dspawn_migrated_tokio_connectiondropped spill context -> thread it through, mirroring the 3 sibling spawn pathscf90800run_embeddedskipped cold recovery underappendonly=no+ re-enabled the per-shard-accept race ->should_run_recoverygate + central accept4828b3dAOF_REWRITE_IN_PROGRESSforever ->mark_failed+shard_donethe unsent tailfa9afe9PerShardRewriteCoordpublishes the committed seq; writers await + reopen onto it) +ShardDoneGuardso a fold panic can't hang every other writer at the barrier (closes the #138 panic-wedge too)c919ff5AppendSyncackedSyncedbefore the boundary fsync (breaksappendfsync=always) -> park acks, fulfilSynced/FsyncFailedstrictly aftersync_data73d9514aof_manifest.rs(3058 lines) ->aof_manifest/{mod.rs, shard_replay.rs, shard_rewrite.rs}, all under cap, public paths preserved viapub use14c572bNotes / design
aof-writer-{sid}->block_on_local) on both runtimes, so a synchronous barrier wait can never starve the thread that must publish.[profile.release]ispanic = "unwind"(nopanic = "abort"), so theShardDoneGuardis what prevents asave_snapshot_to_bytesOOM-unwind from hanging the whole writer pool.tests/aof_fsync_err_subscribe_ordering::aof_fsync_err_propagates_before_subscribe_single_shard(an#[ignore]d normal-append-path test; the multi-shard twin passes) fails identically on the base commit.aof.rsitself is now 4379 lines (>1500) after these durability fixes — tracked separately from Refactor: split src/persistence/aof_manifest.rs (>1500-line cap) into per-shard rewrite/replay submodules #143, not split here.Test plan
cargo test --lib(monoio) — 3457 passcargo test --lib --no-default-features --features runtime-tokio,jemalloc— 2861 passcargo clippy -- -D warnings(default + tokio,jemalloc)cargo fmt --checkcrash_matrix_per_shard_aof+crash_matrix_per_shard_bgrewriteaof(--ignored) — 5 passSummary by CodeRabbit
Bug Fixes
New Features
Tests
Chores