Skip to content

Bound concurrent snapshot capture under fault storms#456

Open
bburda wants to merge 9 commits into
mainfrom
feat/issue-441-bound-capture-concurrency
Open

Bound concurrent snapshot capture under fault storms#456
bburda wants to merge 9 commits into
mainfrom
feat/issue-441-bound-capture-concurrency

Conversation

@bburda

@bburda bburda commented Jun 21, 2026

Copy link
Copy Markdown
Collaborator

Summary

Replace the unbounded thread-per-fault snapshot/rosbag capture in the fault manager with a bounded worker pool draining a bounded queue, so peak memory and thread count have a hard, configurable upper bound under a fault storm.

Previously, each fault confirmation spawned one std::thread to run snapshot + rosbag capture, with no ceiling on concurrent captures across different faults: N simultaneous faults cost N threads and N capture buffers (measured peak memory grew from about +0.5 MiB at N=1 to about +5.8 MiB at N=16, and above roughly N=4 the process did not return to its pre-fault footprint within 20 s). The capture_threads_ vector also accumulated finished thread handles for the node's entire lifetime.

What changed

  • CaptureThreadPool - a fixed pool of N workers draining a bounded FIFO queue of fault codes. Hard concurrency bound = pool size; queued items are cheap strings. Full-queue policy is configurable: reject_newest (drop the incoming capture) or drop_oldest (evict the oldest pending capture). Drops are logged (throttled) and counted.
  • Three new parameters under snapshots.: capture_pool_size (default 2), capture_queue_depth (default 16), capture_queue_full_policy (default reject_newest).
  • Confirm path enqueues into the pool instead of spawning a thread; the cooldown check, enqueue, and cooldown stamp/erase are atomic under one lock, and a dropped capture is not silently put on cooldown.
  • Teardown fix: the destructor now joins the pool before RosbagCapture::stop(). stop() is not a barrier against an in-flight on_fault_confirmed() (it clears subscriptions/buffers), so joining workers first removes a latent use-after-free window.

Behavior note

Under a sustained storm that exceeds pool_size + queue_depth, some captures are now dropped per the configured policy (the intended bound). Defaults are sized to absorb the documented N<=16 storm without dropping. This is not an API change; existing snapshot/rosbag behavior is unchanged below the bound.


Issue


Type

  • Bug fix
  • New feature or tests
  • Breaking change
  • Documentation only

Testing

  • Unit tests for CaptureThreadPool: bounded-and-reaches concurrency, both full-queue policies with exact drop counts and eviction order, FIFO ordering, pool_size=1 serialization, callback-exception isolation, enqueue-after-shutdown, shutdown discards-pending while completing in-flight, idempotent shutdown, and deterministic capture_fn release after join. All gates are mutex/condition-variable based (no sleeps), C++17.
  • Parameter-validation tests: clamping (< 1 -> 1) and policy parsing (including the drop_oldest path).
  • Integration test: confirms a burst of 8 distinct faults and verifies the node stays responsive (no crash/UAF) and snapshots are still captured through the bounded pool.
  • Full ros2_medkit_fault_manager unit + integration + linter suites pass; clang-tidy is clean on the changed files.

Reviewers can verify with colcon test --packages-select ros2_medkit_fault_manager (unit + linters) and inspect the bound via the CaptureThreadPool unit tests.


Checklist

  • Breaking changes are clearly described (and announced in docs / changelog if needed)
  • Tests were added or updated if needed
  • Docs were updated if behavior or public API changed

🤖 Generated with Claude Code

Copilot AI review requested due to automatic review settings June 21, 2026 12:50

Copilot AI left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR hard-bounds snapshot/rosbag capture concurrency in ros2_medkit_fault_manager by replacing the previous “thread-per-confirmed-fault” model with a bounded CaptureThreadPool, preventing unbounded thread/memory growth during fault storms (issue #441).

Changes:

  • Add CaptureThreadPool (fixed-size worker pool + bounded FIFO queue) and new snapshots.* parameters to configure pool size, queue depth, and full-queue policy.
  • Route fault-confirmation capture through the pool, including cooldown + enqueue + cooldown bookkeeping under one mutex, and reorder teardown to join workers before rosbag shutdown.
  • Add unit/integration tests and update documentation/config examples to reflect bounded storm behavior.

Reviewed changes

Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
src/ros2_medkit_gateway/README.md Documents new/previously-undocumented snapshot parameters.
src/ros2_medkit_fault_manager/src/fault_manager_node.cpp Enqueues capture work into the bounded pool; adds new parameters; joins pool before rosbag stop.
src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp Adds pool configuration members and test accessors; replaces thread vector with pool handle.
src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/capture_thread_pool.hpp Introduces pool/queue policy API and observability for drops/pending size.
src/ros2_medkit_fault_manager/src/capture_thread_pool.cpp Implements worker loop, bounded enqueue semantics, and shutdown/join behavior.
src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp Adds focused unit tests for concurrency bounds and queue-full policies.
src/ros2_medkit_fault_manager/test/test_fault_manager.cpp Adds parameter parsing/clamping tests for new pool settings.
src/ros2_medkit_fault_manager/test/test_integration.test.py Adds integration coverage for storm responsiveness and capture liveness.
src/ros2_medkit_fault_manager/CMakeLists.txt Builds pool implementation into the library and adds the new GTest target.
src/ros2_medkit_fault_manager/README.md Documents bounded storm behavior and new parameters.
src/ros2_medkit_fault_manager/design/index.rst Updates design diagram and component description to include the pool.
src/ros2_medkit_fault_manager/config/fault_manager.yaml Adds commented config knobs for the new pool/queue controls.
docs/tutorials/snapshots.rst Documents new snapshot pool parameters in the tutorial.
docs/config/fault-manager.rst Documents new snapshot pool parameters in config reference.

Comment thread src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp
Comment on lines +76 to +84
for (auto & worker : workers_) {
if (worker.joinable()) {
try {
worker.join();
} catch (const std::system_error & e) {
RCLCPP_ERROR(logger_, "CaptureThreadPool worker join failed: %s", e.what());
}
}
}
Comment on lines +24 to +36
CaptureThreadPool::CaptureThreadPool(std::size_t pool_size, std::size_t queue_depth, QueueFullPolicy full_policy,
rclcpp::Logger logger, std::function<void(const std::string &)> capture_fn)
: queue_depth_(queue_depth)
, full_policy_(full_policy)
, logger_(std::move(logger))
, capture_fn_(std::move(capture_fn)) {
workers_.reserve(pool_size);
for (std::size_t i = 0; i < pool_size; ++i) {
workers_.emplace_back([this] {
worker_loop();
});
}
}
@bburda bburda self-assigned this Jun 21, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bound concurrent snapshot capture under fault storms

2 participants