Bound concurrent snapshot capture under fault storms#456
Open
bburda wants to merge 9 commits into
Open
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR hard-bounds snapshot/rosbag capture concurrency in ros2_medkit_fault_manager by replacing the previous “thread-per-confirmed-fault” model with a bounded CaptureThreadPool, preventing unbounded thread/memory growth during fault storms (issue #441).
Changes:
- Add
CaptureThreadPool(fixed-size worker pool + bounded FIFO queue) and newsnapshots.*parameters to configure pool size, queue depth, and full-queue policy. - Route fault-confirmation capture through the pool, including cooldown + enqueue + cooldown bookkeeping under one mutex, and reorder teardown to join workers before rosbag shutdown.
- Add unit/integration tests and update documentation/config examples to reflect bounded storm behavior.
Reviewed changes
Copilot reviewed 14 out of 14 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/ros2_medkit_gateway/README.md | Documents new/previously-undocumented snapshot parameters. |
| src/ros2_medkit_fault_manager/src/fault_manager_node.cpp | Enqueues capture work into the bounded pool; adds new parameters; joins pool before rosbag stop. |
| src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp | Adds pool configuration members and test accessors; replaces thread vector with pool handle. |
| src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/capture_thread_pool.hpp | Introduces pool/queue policy API and observability for drops/pending size. |
| src/ros2_medkit_fault_manager/src/capture_thread_pool.cpp | Implements worker loop, bounded enqueue semantics, and shutdown/join behavior. |
| src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp | Adds focused unit tests for concurrency bounds and queue-full policies. |
| src/ros2_medkit_fault_manager/test/test_fault_manager.cpp | Adds parameter parsing/clamping tests for new pool settings. |
| src/ros2_medkit_fault_manager/test/test_integration.test.py | Adds integration coverage for storm responsiveness and capture liveness. |
| src/ros2_medkit_fault_manager/CMakeLists.txt | Builds pool implementation into the library and adds the new GTest target. |
| src/ros2_medkit_fault_manager/README.md | Documents bounded storm behavior and new parameters. |
| src/ros2_medkit_fault_manager/design/index.rst | Updates design diagram and component description to include the pool. |
| src/ros2_medkit_fault_manager/config/fault_manager.yaml | Adds commented config knobs for the new pool/queue controls. |
| docs/tutorials/snapshots.rst | Documents new snapshot pool parameters in the tutorial. |
| docs/config/fault-manager.rst | Documents new snapshot pool parameters in config reference. |
Comment on lines
+76
to
+84
| for (auto & worker : workers_) { | ||
| if (worker.joinable()) { | ||
| try { | ||
| worker.join(); | ||
| } catch (const std::system_error & e) { | ||
| RCLCPP_ERROR(logger_, "CaptureThreadPool worker join failed: %s", e.what()); | ||
| } | ||
| } | ||
| } |
Comment on lines
+24
to
+36
| CaptureThreadPool::CaptureThreadPool(std::size_t pool_size, std::size_t queue_depth, QueueFullPolicy full_policy, | ||
| rclcpp::Logger logger, std::function<void(const std::string &)> capture_fn) | ||
| : queue_depth_(queue_depth) | ||
| , full_policy_(full_policy) | ||
| , logger_(std::move(logger)) | ||
| , capture_fn_(std::move(capture_fn)) { | ||
| workers_.reserve(pool_size); | ||
| for (std::size_t i = 0; i < pool_size; ++i) { | ||
| workers_.emplace_back([this] { | ||
| worker_loop(); | ||
| }); | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Replace the unbounded thread-per-fault snapshot/rosbag capture in the fault manager with a bounded worker pool draining a bounded queue, so peak memory and thread count have a hard, configurable upper bound under a fault storm.
Previously, each fault confirmation spawned one
std::threadto run snapshot + rosbag capture, with no ceiling on concurrent captures across different faults: N simultaneous faults cost N threads and N capture buffers (measured peak memory grew from about +0.5 MiB at N=1 to about +5.8 MiB at N=16, and above roughly N=4 the process did not return to its pre-fault footprint within 20 s). Thecapture_threads_vector also accumulated finished thread handles for the node's entire lifetime.What changed
CaptureThreadPool- a fixed pool of N workers draining a bounded FIFO queue of fault codes. Hard concurrency bound = pool size; queued items are cheap strings. Full-queue policy is configurable:reject_newest(drop the incoming capture) ordrop_oldest(evict the oldest pending capture). Drops are logged (throttled) and counted.snapshots.:capture_pool_size(default2),capture_queue_depth(default16),capture_queue_full_policy(defaultreject_newest).RosbagCapture::stop().stop()is not a barrier against an in-flighton_fault_confirmed()(it clears subscriptions/buffers), so joining workers first removes a latent use-after-free window.Behavior note
Under a sustained storm that exceeds
pool_size + queue_depth, some captures are now dropped per the configured policy (the intended bound). Defaults are sized to absorb the documented N<=16 storm without dropping. This is not an API change; existing snapshot/rosbag behavior is unchanged below the bound.Issue
Type
Testing
CaptureThreadPool: bounded-and-reaches concurrency, both full-queue policies with exact drop counts and eviction order, FIFO ordering,pool_size=1serialization, callback-exception isolation, enqueue-after-shutdown, shutdown discards-pending while completing in-flight, idempotent shutdown, and deterministiccapture_fnrelease after join. All gates are mutex/condition-variable based (no sleeps), C++17.< 1-> 1) and policy parsing (including thedrop_oldestpath).ros2_medkit_fault_managerunit + integration + linter suites pass; clang-tidy is clean on the changed files.Reviewers can verify with
colcon test --packages-select ros2_medkit_fault_manager(unit + linters) and inspect the bound via theCaptureThreadPoolunit tests.Checklist
🤖 Generated with Claude Code