diff --git a/docs/config/fault-manager.rst b/docs/config/fault-manager.rst index 7a8bae0dd..b1fc39c94 100644 --- a/docs/config/fault-manager.rst +++ b/docs/config/fault-manager.rst @@ -168,6 +168,9 @@ Basic Snapshot Settings config_file: "" # Path to YAML config file recapture_cooldown_sec: 60.0 # Min seconds between snapshot captures per fault max_per_fault: 10 # Max snapshots stored per fault code (0 = unlimited) + capture_pool_size: 2 # Max concurrent capture threads (>= 1) + capture_queue_depth: 16 # Max pending captures before policy applies (>= 1) + capture_queue_full_policy: reject_newest # reject_newest | drop_oldest .. list-table:: :header-rows: 1 @@ -202,6 +205,15 @@ Basic Snapshot Settings - ``10`` - Maximum number of snapshots stored per fault code. When the limit is reached, new snapshots for that fault are rejected. Set to 0 for unlimited. + * - ``snapshots.capture_pool_size`` + - ``2`` + - Max concurrent capture threads under a fault storm (>= 1). + * - ``snapshots.capture_queue_depth`` + - ``16`` + - Max pending captures before the full-queue policy applies (>= 1). + * - ``snapshots.capture_queue_full_policy`` + - ``reject_newest`` + - Policy when the queue is full: ``reject_newest`` or ``drop_oldest``. Rosbag Recording ~~~~~~~~~~~~~~~~ diff --git a/docs/tutorials/snapshots.rst b/docs/tutorials/snapshots.rst index c66cc1d85..f9216ff67 100644 --- a/docs/tutorials/snapshots.rst +++ b/docs/tutorials/snapshots.rst @@ -90,6 +90,15 @@ Configure snapshot capture via fault manager parameters: - ``10`` - Maximum number of snapshots stored per fault code. When the limit is reached, new snapshots for that fault are rejected. Set to 0 for unlimited. + * - ``snapshots.capture_pool_size`` + - ``2`` + - Max concurrent capture threads under a fault storm (>= 1). + * - ``snapshots.capture_queue_depth`` + - ``16`` + - Max pending captures before the full-queue policy applies (>= 1). + * - ``snapshots.capture_queue_full_policy`` + - ``reject_newest`` + - Policy when the queue is full: ``reject_newest`` or ``drop_oldest``. Advanced Configuration ---------------------- diff --git a/src/ros2_medkit_fault_manager/CMakeLists.txt b/src/ros2_medkit_fault_manager/CMakeLists.txt index 1f9fb6956..23c4924c9 100644 --- a/src/ros2_medkit_fault_manager/CMakeLists.txt +++ b/src/ros2_medkit_fault_manager/CMakeLists.txt @@ -51,6 +51,7 @@ medkit_detect_compat_defs() # Library target (shared between executable and tests) add_library(fault_manager_lib STATIC + src/capture_thread_pool.cpp src/fault_manager_node.cpp src/fault_storage.cpp src/sqlite_fault_storage.cpp @@ -224,6 +225,12 @@ if(BUILD_TESTING) TIMEOUT 120 ) set_tests_properties(test_rosbag_entity_scope PROPERTIES LABELS "integration") + + # CaptureThreadPool unit tests (no ROS node, no domain id needed) + ament_add_gtest(test_capture_thread_pool test/test_capture_thread_pool.cpp) + target_link_libraries(test_capture_thread_pool fault_manager_lib) + medkit_target_dependencies(test_capture_thread_pool rclcpp) + ros2_medkit_relax_vendor_warnings() endif() diff --git a/src/ros2_medkit_fault_manager/README.md b/src/ros2_medkit_fault_manager/README.md index bb8f2cd28..c954d1a81 100644 --- a/src/ros2_medkit_fault_manager/README.md +++ b/src/ros2_medkit_fault_manager/README.md @@ -70,6 +70,8 @@ ros2 service call /fault_manager/clear_fault ros2_medkit_msgs/srv/ClearFault \ Snapshots capture topic data when faults are confirmed for post-mortem debugging. +Under a fault storm, captures are bounded by a worker pool (`capture_pool_size`) draining a bounded queue (`capture_queue_depth`); excess captures are dropped per `capture_queue_full_policy` and logged (throttled). + | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `snapshots.enabled` | bool | `true` | Enable/disable snapshot capture | @@ -78,6 +80,11 @@ Snapshots capture topic data when faults are confirmed for post-mortem debugging | `snapshots.max_message_size` | int | `65536` | Maximum message size in bytes (larger messages skipped) | | `snapshots.default_topics` | string[] | `[]` | Topics to capture for all faults | | `snapshots.config_file` | string | `""` | Path to YAML config for `fault_specific` and `patterns` | +| `snapshots.recapture_cooldown_sec` | double | `60.0` | Min seconds between captures for the same fault code. | +| `snapshots.max_per_fault` | int | `10` | Max snapshots retained per fault. | +| `snapshots.capture_pool_size` | int | `2` | Max concurrent capture threads under a fault storm (>= 1). | +| `snapshots.capture_queue_depth` | int | `16` | Max pending captures before the full-queue policy applies (>= 1). | +| `snapshots.capture_queue_full_policy` | string | `reject_newest` | Policy when the queue is full: `reject_newest` or `drop_oldest`. | **Topic Resolution Priority:** 1. `fault_specific` - Exact match for fault code (configured via YAML config file) diff --git a/src/ros2_medkit_fault_manager/config/fault_manager.yaml b/src/ros2_medkit_fault_manager/config/fault_manager.yaml index e62f03bf4..0b65fe3d6 100644 --- a/src/ros2_medkit_fault_manager/config/fault_manager.yaml +++ b/src/ros2_medkit_fault_manager/config/fault_manager.yaml @@ -19,3 +19,8 @@ fault_manager: # to entity-scoped capture and is crash-safe (falls back / self-disables if no # storage backend is usable). snapshots.rosbag.enabled: false + + # Capture concurrency bound under fault storms (issue #441): + # snapshots.capture_pool_size: 2 # max concurrent capture threads (>= 1) + # snapshots.capture_queue_depth: 16 # max pending captures (>= 1) + # snapshots.capture_queue_full_policy: reject_newest # reject_newest | drop_oldest diff --git a/src/ros2_medkit_fault_manager/design/index.rst b/src/ros2_medkit_fault_manager/design/index.rst index a95ab6fcd..e4c103568 100644 --- a/src/ros2_medkit_fault_manager/design/index.rst +++ b/src/ros2_medkit_fault_manager/design/index.rst @@ -60,6 +60,19 @@ The following diagram shows the relationships between the main components of the + get_storage(): FaultStorage& } + class CaptureThreadPool { + +enqueue(fault_code) : EnqueueOutcome + +shutdown() + +dropped_captures() : uint64_t + -workers_ : vector + -queue_ : deque + } + + enum QueueFullPolicy { + kRejectNewest + kDropOldest + } + abstract class FaultStorage <> { + {abstract} report_fault(): bool + {abstract} list_faults(): vector @@ -91,6 +104,7 @@ The following diagram shows the relationships between the main components of the ' Composition FaultManagerNode *-down-> InMemoryFaultStorage : owns + FaultManagerNode --> CaptureThreadPool : enqueues capture jobs ' InMemoryFaultStorage contains FaultStates InMemoryFaultStorage o-right-> FaultState : contains many @@ -131,7 +145,13 @@ Main Components - Maps directly to ``ros2_medkit_msgs::msg::Fault`` via ``to_msg()`` - Uses ``std::set`` for reporting_sources to ensure uniqueness - Tracks first and last occurrence timestamps - - Manages fault status lifecycle with debounce (PREFAILED → CONFIRMED → CLEARED) + - Manages fault status lifecycle with debounce (PREFAILED -> CONFIRMED -> CLEARED) + +5. **CaptureThreadPool** - Bounded worker pool for snapshot capture + Under a fault storm the node enqueues capture jobs into a bounded ``CaptureThreadPool`` + (``pool_size`` workers draining a ``queue_depth``-bounded queue) rather than spawning one + thread per fault. The full-queue policy (``reject_newest``/``drop_oldest``) bounds memory, + and the destructor joins the pool before rosbag teardown. Services -------- diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/capture_thread_pool.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/capture_thread_pool.hpp new file mode 100644 index 000000000..7cff1d63d --- /dev/null +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/capture_thread_pool.hpp @@ -0,0 +1,106 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/logger.hpp" + +namespace ros2_medkit_fault_manager { + +/// Policy applied when the bounded pending queue is full. +enum class QueueFullPolicy { + kRejectNewest, ///< Drop the incoming job. + kDropOldest ///< Evict the oldest pending job, queue the incoming one. +}; + +/// Result of an enqueue attempt. +enum class EnqueueResult { + kAccepted, ///< Queued; will run. + kDroppedNewest, ///< Queue full (reject_newest): incoming dropped. + kEvictedOldest, ///< Queue full (drop_oldest): oldest evicted, incoming queued. + kRejectedShuttingDown ///< Pool is shutting down: not queued, will not run. +}; + +struct EnqueueOutcome { + EnqueueResult result; + std::optional evicted_code; ///< Set only for kEvictedOldest. +}; + +/// Bounded worker pool that runs fault-capture jobs off the service thread. +/// +/// Caps concurrent in-flight captures at pool_size and bounds pending work at +/// queue_depth with a configurable full-queue policy, giving peak memory and +/// thread count a hard upper bound under a fault storm. See issue #441. +class CaptureThreadPool { + public: + /// @param pool_size Number of worker threads (clamped to >= 1). + /// @param queue_depth Max pending jobs (clamped to >= 1). + /// @param full_policy Behavior when the pending queue is full. + /// @param logger Logger for worker-side capture failures. + /// @param capture_fn Invoked per job on a worker thread. Must be thread-safe + /// for pool_size concurrent calls. Exceptions are caught and logged. + CaptureThreadPool(std::size_t pool_size, std::size_t queue_depth, QueueFullPolicy full_policy, rclcpp::Logger logger, + std::function capture_fn); + ~CaptureThreadPool(); + + CaptureThreadPool(const CaptureThreadPool &) = delete; + CaptureThreadPool & operator=(const CaptureThreadPool &) = delete; + CaptureThreadPool(CaptureThreadPool &&) = delete; + CaptureThreadPool & operator=(CaptureThreadPool &&) = delete; + + /// Enqueue a capture job. Non-blocking. Thread-safe. + EnqueueOutcome enqueue(const std::string & fault_code); + + /// Stop accepting work, let in-flight jobs finish, discard pending, join all + /// workers. Idempotent and noexcept. Called by the destructor. + void shutdown() noexcept; + + /// Lifetime total of storm drops (kDroppedNewest + kEvictedOldest). + uint64_t dropped_captures() const { + return dropped_captures_.load(); + } + + /// Current number of pending (not yet started) jobs. For tests/observability. + std::size_t pending_size() const; + + private: + void worker_loop(); + + const std::size_t queue_depth_; + const QueueFullPolicy full_policy_; + rclcpp::Logger logger_; + std::function capture_fn_; + + mutable std::mutex queue_mutex_; + std::condition_variable cv_; + std::deque queue_; + bool stop_{false}; + + std::atomic dropped_captures_{0}; + std::vector workers_; +}; + +} // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp index d946256e8..9af908a74 100644 --- a/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp +++ b/src/ros2_medkit_fault_manager/include/ros2_medkit_fault_manager/fault_manager_node.hpp @@ -20,6 +20,7 @@ #include #include "rclcpp/rclcpp.hpp" +#include "ros2_medkit_fault_manager/capture_thread_pool.hpp" #include "ros2_medkit_fault_manager/correlation/correlation_engine.hpp" #include "ros2_medkit_fault_manager/entity_threshold_resolver.hpp" #include "ros2_medkit_fault_manager/fault_storage.hpp" @@ -65,6 +66,16 @@ class FaultManagerNode : public rclcpp::Node { return storage_type_; } + int capture_pool_size_for_test() const { + return capture_pool_size_; + } + int capture_queue_depth_for_test() const { + return capture_queue_depth_; + } + QueueFullPolicy capture_queue_full_policy_for_test() const { + return capture_queue_full_policy_; + } + /// Check if entity matches any reporting source /// @param reporting_sources List of reporting sources from fault /// @param entity_id Entity ID to match (exact match or as suffix of FQN) @@ -144,6 +155,9 @@ class FaultManagerNode : public rclcpp::Node { int32_t healing_threshold_{3}; double auto_confirm_after_sec_{0.0}; double snapshot_recapture_cooldown_sec_{60.0}; + int capture_pool_size_{2}; + int capture_queue_depth_{16}; + QueueFullPolicy capture_queue_full_policy_{QueueFullPolicy::kRejectNewest}; DebounceConfig global_config_; ///< Global debounce config (built from ROS params) std::unique_ptr storage_; std::unique_ptr threshold_resolver_; ///< Per-entity threshold overrides @@ -165,19 +179,19 @@ class FaultManagerNode : public rclcpp::Node { rclcpp::Publisher::SharedPtr event_publisher_; /// Snapshot capture for capturing topic data on fault confirmation. - /// shared_ptr to allow safe capture-by-value in detached capture threads. + /// shared_ptr to allow safe capture-by-value in the pool's capture jobs. std::shared_ptr snapshot_capture_; /// Rosbag capture for time-window recording (nullptr if disabled). - /// shared_ptr to allow safe capture-by-value in detached capture threads. + /// shared_ptr to allow safe capture-by-value in the pool's capture jobs. std::shared_ptr rosbag_capture_; /// Correlation engine for fault correlation/muting (nullptr if disabled) std::unique_ptr correlation_engine_; - /// Tracks active capture threads for clean shutdown (join before destruction) - std::mutex capture_threads_mutex_; - std::vector capture_threads_; + /// Bounded pool that runs capture jobs off the service thread (issue #441). + /// Declared after snapshot_capture_/rosbag_capture_ so it is destroyed first. + std::unique_ptr capture_pool_; /// Per-fault cooldown tracking for snapshot recapture std::mutex last_capture_mutex_; diff --git a/src/ros2_medkit_fault_manager/src/capture_thread_pool.cpp b/src/ros2_medkit_fault_manager/src/capture_thread_pool.cpp new file mode 100644 index 000000000..3efe8b8fa --- /dev/null +++ b/src/ros2_medkit_fault_manager/src/capture_thread_pool.cpp @@ -0,0 +1,130 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "ros2_medkit_fault_manager/capture_thread_pool.hpp" + +#include +#include + +#include "rclcpp/logging.hpp" + +namespace ros2_medkit_fault_manager { + +CaptureThreadPool::CaptureThreadPool(std::size_t pool_size, std::size_t queue_depth, QueueFullPolicy full_policy, + rclcpp::Logger logger, std::function capture_fn) + : queue_depth_(queue_depth == 0 ? 1 : queue_depth) + , full_policy_(full_policy) + , logger_(std::move(logger)) + , capture_fn_(std::move(capture_fn)) { + // Defensive clamp: this is a public class. Callers (the node) validate to >= 1, + // but a stray 0 must not UB. With 0 workers nothing drains the queue; with + // queue_depth 0 + kDropOldest, enqueue() would call front() on an empty deque. + // queue_depth_ is clamped in the initializer above. + const std::size_t worker_count = pool_size == 0 ? 1 : pool_size; + workers_.reserve(worker_count); + for (std::size_t i = 0; i < worker_count; ++i) { + workers_.emplace_back([this] { + worker_loop(); + }); + } +} + +CaptureThreadPool::~CaptureThreadPool() { + shutdown(); +} + +EnqueueOutcome CaptureThreadPool::enqueue(const std::string & fault_code) { + std::lock_guard lock(queue_mutex_); + if (stop_) { + return {EnqueueResult::kRejectedShuttingDown, std::nullopt}; + } + if (queue_.size() < queue_depth_) { + queue_.push_back(fault_code); + cv_.notify_one(); + return {EnqueueResult::kAccepted, std::nullopt}; + } + // Queue full. + if (full_policy_ == QueueFullPolicy::kRejectNewest) { + dropped_captures_.fetch_add(1, std::memory_order_relaxed); + return {EnqueueResult::kDroppedNewest, std::nullopt}; + } + // kDropOldest: evict the oldest pending job. + std::string evicted = std::move(queue_.front()); + queue_.pop_front(); + queue_.push_back(fault_code); + dropped_captures_.fetch_add(1, std::memory_order_relaxed); + cv_.notify_one(); + return {EnqueueResult::kEvictedOldest, std::move(evicted)}; +} + +void CaptureThreadPool::shutdown() noexcept { + { + std::lock_guard lock(queue_mutex_); + if (stop_) { + return; + } + stop_ = true; + queue_.clear(); // discard pending jobs atomically with setting stop_ + } + cv_.notify_all(); + for (auto & worker : workers_) { + if (worker.joinable()) { + // join() only throws std::system_error, and only for a self-join + // (resource_deadlock_would_occur). shutdown() runs on the owner/destructor + // thread, never on a worker, so that cannot happen here. The catch keeps + // shutdown() noexcept and, in the impossible-in-practice case that one join + // fails, still lets us join the remaining workers instead of terminating. + try { + worker.join(); + } catch (const std::system_error & e) { + RCLCPP_ERROR(logger_, "CaptureThreadPool worker join failed: %s", e.what()); + } + } + } + // Release captured shared_ptrs deterministically, before owning objects vanish. + capture_fn_ = nullptr; +} + +std::size_t CaptureThreadPool::pending_size() const { + std::lock_guard lock(queue_mutex_); + return queue_.size(); +} + +void CaptureThreadPool::worker_loop() { + for (;;) { + std::string job; + { + std::unique_lock lock(queue_mutex_); + cv_.wait(lock, [this] { + return stop_ || !queue_.empty(); + }); + if (stop_) { + return; // check stop BEFORE dispatch -> pending discarded, not drained + } + job = std::move(queue_.front()); + queue_.pop_front(); + } // release lock BEFORE running the (multi-second) job + try { + if (capture_fn_) { + capture_fn_(job); + } + } catch (const std::exception & e) { + RCLCPP_ERROR(logger_, "Capture job for '%s' threw: %s", job.c_str(), e.what()); + } catch (...) { + RCLCPP_ERROR(logger_, "Capture job for '%s' threw unknown exception", job.c_str()); + } + } +} + +} // namespace ros2_medkit_fault_manager diff --git a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp index 3ad6ab6af..c0fd47482 100644 --- a/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp +++ b/src/ros2_medkit_fault_manager/src/fault_manager_node.cpp @@ -21,7 +21,6 @@ #include #include #include -#include #include "ros2_medkit_fault_manager/correlation/config_parser.hpp" #include "ros2_medkit_fault_manager/sqlite_fault_storage.hpp" @@ -108,6 +107,41 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node(" snapshot_recapture_cooldown_sec_); snapshot_recapture_cooldown_sec_ = 0.0; } + + // Capture concurrency bound (issue #441): cap concurrent capture threads and + // bound the pending queue so a fault storm cannot grow memory without limit. + // declare_parameter returns int64_t on Jazzy; the static_cast keeps -Wconversion quiet. + capture_pool_size_ = static_cast(declare_parameter("snapshots.capture_pool_size", 2)); + if (capture_pool_size_ < 1) { + RCLCPP_WARN(get_logger(), "snapshots.capture_pool_size must be >= 1, got %ld. Clamping to 1", + static_cast(capture_pool_size_)); + capture_pool_size_ = 1; + } + + capture_queue_depth_ = static_cast(declare_parameter("snapshots.capture_queue_depth", 16)); + if (capture_queue_depth_ < 1) { + RCLCPP_WARN(get_logger(), "snapshots.capture_queue_depth must be >= 1, got %ld. Clamping to 1", + static_cast(capture_queue_depth_)); + capture_queue_depth_ = 1; + } + + const std::string policy_str = declare_parameter("snapshots.capture_queue_full_policy", "reject_newest"); + if (policy_str == "reject_newest") { + capture_queue_full_policy_ = QueueFullPolicy::kRejectNewest; + } else if (policy_str == "drop_oldest") { + capture_queue_full_policy_ = QueueFullPolicy::kDropOldest; + } else { + RCLCPP_WARN(get_logger(), + "snapshots.capture_queue_full_policy '%s' is invalid (expected 'reject_newest' or " + "'drop_oldest'). Using 'reject_newest'", + policy_str.c_str()); + capture_queue_full_policy_ = QueueFullPolicy::kRejectNewest; + } + + RCLCPP_INFO(get_logger(), "Capture concurrency: pool_size=%d, queue_depth=%d, policy=%s", capture_pool_size_, + capture_queue_depth_, + capture_queue_full_policy_ == QueueFullPolicy::kRejectNewest ? "reject_newest" : "drop_oldest"); + auto max_snapshots = declare_parameter("snapshots.max_per_fault", 10); if (max_snapshots < 0) { RCLCPP_WARN(get_logger(), "snapshots.max_per_fault should be >= 0, got %ld. Disabling limit.", max_snapshots); @@ -210,6 +244,22 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node(" rosbag_capture_ = std::make_shared(this, storage_.get(), snapshot_config.rosbag, snapshot_config); } + // Drive captures through a bounded pool instead of one thread per fault. + if (snapshot_capture_ || rosbag_capture_) { + auto snap = snapshot_capture_; + auto bag = rosbag_capture_; + capture_pool_ = std::make_unique( + static_cast(capture_pool_size_), static_cast(capture_queue_depth_), + capture_queue_full_policy_, get_logger(), [snap, bag](const std::string & fault_code) { + if (snap) { + snap->capture(fault_code); + } + if (bag) { + bag->on_fault_confirmed(fault_code); + } + }); + } + // Initialize correlation engine (nullptr if disabled or not configured) correlation_engine_ = create_correlation_engine(); @@ -247,23 +297,16 @@ FaultManagerNode::FaultManagerNode(const rclcpp::NodeOptions & options) : Node(" } FaultManagerNode::~FaultManagerNode() { - // Stop rosbag capture first (has active subscriptions and timers) + // Join capture workers FIRST so no worker is mid-capture when rosbag tears + // down. RosbagCapture::stop() is not a barrier against an in-flight + // on_fault_confirmed() (it clears subscriptions/buffers without serializing + // against the capture path). See issue #441. + if (capture_pool_) { + capture_pool_->shutdown(); + } if (rosbag_capture_) { rosbag_capture_->stop(); } - - // Join any active capture threads to prevent use-after-free on node destruction. - // Without this, detached threads holding shared_ptrs to SnapshotCapture/RosbagCapture - // can access the destroyed node_ pointer, causing SIGSEGV. - { - std::lock_guard lock(capture_threads_mutex_); - for (auto & t : capture_threads_) { - if (t.joinable()) { - t.join(); - } - } - capture_threads_.clear(); - } } std::unique_ptr FaultManagerNode::create_storage() { @@ -400,46 +443,70 @@ void FaultManagerNode::handle_report_fault( } // Note: PREFAILED/PREPASSED status changes don't emit events (debounce in progress) - // Capture snapshots and rosbag when fault is confirmed (even if muted). - // Run asynchronously to avoid blocking the service callback for seconds, - // which would prevent other service calls (list_faults, get_fault, etc.) - // from being processed during capture. SnapshotCapture::capture_topic_on_demand - // uses a local callback group + local executor, so it's safe from a separate thread. - if (just_confirmed && (snapshot_capture_ || rosbag_capture_)) { - // Check recapture cooldown - skip if captured recently for this fault - bool should_capture = true; - if (snapshot_recapture_cooldown_sec_ > 0.0) { - std::lock_guard lock(last_capture_mutex_); - auto it = last_capture_times_.find(request->fault_code); + // Capture snapshots/rosbag when a fault confirms. Bounded pool (issue #441): + // the cooldown check, enqueue, and cooldown update are atomic under + // last_capture_mutex_ to prevent a concurrent same-fault double-enqueue. + if (just_confirmed && capture_pool_) { + const std::string fault_code = request->fault_code; + const bool cooldown_enabled = snapshot_recapture_cooldown_sec_ > 0.0; + std::unique_lock cd_lock(last_capture_mutex_, std::defer_lock); + if (cooldown_enabled) { + cd_lock.lock(); + } + + bool on_cooldown = false; + if (cooldown_enabled) { + auto it = last_capture_times_.find(fault_code); if (it != last_capture_times_.end()) { auto elapsed = std::chrono::steady_clock::now() - it->second; - if (elapsed < std::chrono::duration(snapshot_recapture_cooldown_sec_)) { - should_capture = false; - RCLCPP_DEBUG(get_logger(), "Skipping capture for '%s' - cooldown active", request->fault_code.c_str()); - } - } - if (should_capture) { - last_capture_times_[request->fault_code] = std::chrono::steady_clock::now(); + on_cooldown = elapsed < std::chrono::duration(snapshot_recapture_cooldown_sec_); } } - if (should_capture) { - std::string fault_code = request->fault_code; - auto snapshot_cap = snapshot_capture_; - auto rosbag_cap = rosbag_capture_; - std::thread capture_thread([snapshot_cap, rosbag_cap, fault_code]() { - if (snapshot_cap) { - snapshot_cap->capture(fault_code); - } - if (rosbag_cap) { - rosbag_cap->on_fault_confirmed(fault_code); - } - }); - { - std::lock_guard ct_lock(capture_threads_mutex_); - capture_threads_.push_back(std::move(capture_thread)); + if (on_cooldown) { + RCLCPP_DEBUG(get_logger(), "Skipping capture for '%s' - cooldown active", fault_code.c_str()); + } else { + const EnqueueOutcome outcome = capture_pool_->enqueue(fault_code); + const auto now = std::chrono::steady_clock::now(); + // RCLCPP_WARN_THROTTLE needs a non-const Clock lvalue (Humble/Lyrical + // compat); mirror rosbag_capture.cpp's local-copy pattern. Cast the + // uint64_t counter to unsigned long long + %llu to avoid -Wuseless-cast + // (uint64_t == unsigned long on LP64). + rclcpp::Clock throttle_clock(*get_clock()); + switch (outcome.result) { + case EnqueueResult::kAccepted: + if (cooldown_enabled) { + last_capture_times_[fault_code] = now; + } + break; + case EnqueueResult::kEvictedOldest: + if (cooldown_enabled) { + last_capture_times_[fault_code] = now; + if (outcome.evicted_code) { + last_capture_times_.erase(*outcome.evicted_code); // keep evicted fault retriable + } + } + RCLCPP_WARN_THROTTLE(get_logger(), throttle_clock, 2000, + "Capture queue full (drop_oldest): evicted pending '%s' for '%s' " + "(pool=%d, queue=%d, total_dropped=%llu)", + outcome.evicted_code ? outcome.evicted_code->c_str() : "?", fault_code.c_str(), + capture_pool_size_, capture_queue_depth_, + static_cast(capture_pool_->dropped_captures())); + break; + case EnqueueResult::kDroppedNewest: + // Cooldown NOT recorded: capture still possible if the fault later + // heals/clears and re-confirms. + RCLCPP_WARN_THROTTLE(get_logger(), throttle_clock, 2000, + "Capture queue full (reject_newest): dropped capture for '%s' " + "(pool=%d, queue=%d, total_dropped=%llu)", + fault_code.c_str(), capture_pool_size_, capture_queue_depth_, + static_cast(capture_pool_->dropped_captures())); + break; + case EnqueueResult::kRejectedShuttingDown: + RCLCPP_DEBUG(get_logger(), "Capture pool shutting down; skipped capture for '%s'", fault_code.c_str()); + break; } - } // if (should_capture) + } } // Handle PREFAILED state for lazy_start rosbag capture diff --git a/src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp b/src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp new file mode 100644 index 000000000..29e182608 --- /dev/null +++ b/src/ros2_medkit_fault_manager/test/test_capture_thread_pool.cpp @@ -0,0 +1,311 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/rclcpp.hpp" +#include "ros2_medkit_fault_manager/capture_thread_pool.hpp" + +using ros2_medkit_fault_manager::CaptureThreadPool; +using ros2_medkit_fault_manager::EnqueueResult; +using ros2_medkit_fault_manager::QueueFullPolicy; + +namespace { + +rclcpp::Logger test_logger() { + return rclcpp::get_logger("test_capture_thread_pool"); +} + +// Capture_fn that records every invocation and blocks each job until released, +// so tests can deterministically hold workers busy and inspect concurrency. +class GatedCallback { + public: + std::function fn() { + return [this](const std::string & code) { + run(code); + }; + } + + void run(const std::string & code) { + { + std::lock_guard lock(m_); + ++active_; + max_active_ = std::max(max_active_, active_); + started_.push_back(code); + executed_.insert(code); + } + entered_cv_.notify_all(); + { + std::unique_lock lock(m_); + release_cv_.wait(lock, [this] { + return released_; + }); + --active_; + } + } + + void wait_until_active(std::size_t n) { + std::unique_lock lock(m_); + entered_cv_.wait(lock, [&] { + return active_ >= n; + }); + } + + void wait_until_started(std::size_t n) { + std::unique_lock lock(m_); + entered_cv_.wait(lock, [&] { + return started_.size() >= n; + }); + } + + void release() { + { + std::lock_guard lock(m_); + released_ = true; + } + release_cv_.notify_all(); + } + + std::size_t max_active() { + std::lock_guard lock(m_); + return max_active_; + } + std::vector started() { + std::lock_guard lock(m_); + return started_; + } + std::set executed() { + std::lock_guard lock(m_); + return executed_; + } + + private: + std::mutex m_; + std::condition_variable entered_cv_; + std::condition_variable release_cv_; + std::size_t active_{0}; + std::size_t max_active_{0}; + bool released_{false}; + std::vector started_; + std::set executed_; +}; + +class CaptureThreadPoolTest : public ::testing::Test { + protected: + static void SetUpTestSuite() { + if (!rclcpp::ok()) { + rclcpp::init(0, nullptr); + } + } +}; + +} // namespace + +// Concurrency reaches AND never exceeds pool_size. +TEST_F(CaptureThreadPoolTest, BoundsAndReachesPoolSize) { + GatedCallback gc; + CaptureThreadPool pool(2, 8, QueueFullPolicy::kRejectNewest, test_logger(), gc.fn()); + for (int i = 0; i < 6; ++i) { + pool.enqueue("j" + std::to_string(i)); + } + gc.wait_until_active(2); // prove 2 ran concurrently + EXPECT_EQ(gc.max_active(), 2u); // never more than pool_size + gc.release(); + pool.shutdown(); + EXPECT_EQ(gc.max_active(), 2u); +} + +TEST_F(CaptureThreadPoolTest, PoolSizeOneSerializes) { + GatedCallback gc; + CaptureThreadPool pool(1, 8, QueueFullPolicy::kRejectNewest, test_logger(), gc.fn()); + pool.enqueue("a"); + pool.enqueue("b"); + gc.wait_until_active(1); + EXPECT_EQ(gc.max_active(), 1u); + gc.release(); + pool.shutdown(); + EXPECT_EQ(gc.max_active(), 1u); +} + +// Defensive clamp: pool_size/queue_depth of 0 must not UB. In particular +// queue_depth 0 + kDropOldest would call front() on an empty deque without the +// clamp. With both clamped to 1, a worker exists and drop_oldest evicts safely. +TEST_F(CaptureThreadPoolTest, ZeroSizesClampToOne) { + GatedCallback gc; + CaptureThreadPool pool(0, 0, QueueFullPolicy::kDropOldest, test_logger(), gc.fn()); + EXPECT_EQ(pool.enqueue("a").result, EnqueueResult::kAccepted); + gc.wait_until_active(1); // a worker exists (pool_size clamped to 1) + EXPECT_EQ(pool.enqueue("b").result, EnqueueResult::kAccepted); // queue_depth clamped to 1 + auto outcome = pool.enqueue("c"); // full -> evict oldest, no UB + EXPECT_EQ(outcome.result, EnqueueResult::kEvictedOldest); + ASSERT_TRUE(outcome.evicted_code.has_value()); + EXPECT_EQ(*outcome.evicted_code, "b"); + gc.release(); + pool.shutdown(); +} + +TEST_F(CaptureThreadPoolTest, RejectNewestDropsExcessExactly) { + GatedCallback gc; + CaptureThreadPool pool(2, 8, QueueFullPolicy::kRejectNewest, test_logger(), gc.fn()); + pool.enqueue("blk0"); + pool.enqueue("blk1"); + gc.wait_until_active(2); // both workers busy; queue empties + for (int i = 0; i < 8; ++i) { // fill pending queue (queue_depth) + EXPECT_EQ(pool.enqueue("f" + std::to_string(i)).result, EnqueueResult::kAccepted); + } + for (int i = 0; i < 5; ++i) { // overflow + EXPECT_EQ(pool.enqueue("x" + std::to_string(i)).result, EnqueueResult::kDroppedNewest); + } + EXPECT_EQ(pool.dropped_captures(), 5u); + gc.release(); + pool.shutdown(); + for (int i = 0; i < 5; ++i) { // dropped jobs never ran + EXPECT_EQ(gc.executed().count("x" + std::to_string(i)), 0u); + } +} + +TEST_F(CaptureThreadPoolTest, DropOldestEvictsOldestPending) { + GatedCallback gc; + CaptureThreadPool pool(2, 8, QueueFullPolicy::kDropOldest, test_logger(), gc.fn()); + pool.enqueue("blk0"); + pool.enqueue("blk1"); + gc.wait_until_active(2); + for (int i = 0; i < 8; ++i) { + EXPECT_EQ(pool.enqueue("f" + std::to_string(i)).result, EnqueueResult::kAccepted); + } + for (int i = 0; i < 5; ++i) { // each overflow evicts current oldest f0..f4 + auto outcome = pool.enqueue("x" + std::to_string(i)); + EXPECT_EQ(outcome.result, EnqueueResult::kEvictedOldest); + ASSERT_TRUE(outcome.evicted_code.has_value()); + EXPECT_EQ(*outcome.evicted_code, "f" + std::to_string(i)); + } + EXPECT_EQ(pool.dropped_captures(), 5u); + gc.release(); + gc.wait_until_started(10); // blk0, blk1 + f5,f6,f7 + x0..x4 (evicted f0..f4 never queued) + pool.shutdown(); + auto ran = gc.executed(); + for (int i = 0; i < 5; ++i) { // evicted absent + EXPECT_EQ(ran.count("f" + std::to_string(i)), 0u); + } + for (int i = 0; i < 5; ++i) { // newest present + EXPECT_EQ(ran.count("x" + std::to_string(i)), 1u); + } +} + +TEST_F(CaptureThreadPoolTest, FifoOrder) { + GatedCallback gc; + CaptureThreadPool pool(1, 8, QueueFullPolicy::kRejectNewest, test_logger(), gc.fn()); + pool.enqueue("A"); + gc.wait_until_active(1); + pool.enqueue("B"); + pool.enqueue("C"); + gc.release(); + gc.wait_until_started(3); // A, B, C must all run before shutdown discards queue + pool.shutdown(); + std::vector expected{"A", "B", "C"}; + EXPECT_EQ(gc.started(), expected); +} + +TEST_F(CaptureThreadPoolTest, CallbackThrowKeepsWorkerAlive) { + std::mutex m; + std::condition_variable cv; + int ran = 0; + auto fn = [&](const std::string & code) { + if (code == "boom") { + throw std::runtime_error("boom"); + } + { + std::lock_guard lock(m); + ++ran; + } + cv.notify_all(); + }; + CaptureThreadPool pool(1, 8, QueueFullPolicy::kRejectNewest, test_logger(), fn); + pool.enqueue("boom"); + pool.enqueue("ok1"); + pool.enqueue("ok2"); + { + std::unique_lock lock(m); + cv.wait(lock, [&] { + return ran >= 2; + }); + } + pool.shutdown(); + EXPECT_EQ(ran, 2); +} + +TEST_F(CaptureThreadPoolTest, EnqueueAfterShutdownRejected) { + CaptureThreadPool pool(1, 8, QueueFullPolicy::kRejectNewest, test_logger(), [](const std::string &) {}); + pool.shutdown(); + auto outcome = pool.enqueue("x"); + EXPECT_EQ(outcome.result, EnqueueResult::kRejectedShuttingDown); + EXPECT_EQ(pool.dropped_captures(), 0u); +} + +TEST_F(CaptureThreadPoolTest, ShutdownDiscardsPendingCompletesInFlight) { + GatedCallback gc; + CaptureThreadPool pool(1, 8, QueueFullPolicy::kRejectNewest, test_logger(), gc.fn()); + EXPECT_EQ(pool.enqueue("inflight").result, EnqueueResult::kAccepted); + gc.wait_until_active(1); + EXPECT_EQ(pool.enqueue("pending").result, EnqueueResult::kAccepted); + EXPECT_EQ(pool.pending_size(), 1u); + + std::thread shut([&] { + pool.shutdown(); + }); + while (pool.pending_size() != 0u) { // wait until shutdown set stop_ and cleared the queue + std::this_thread::yield(); + } + gc.release(); // now safe: stop_ is set, worker discards "pending" on re-loop + shut.join(); + + auto ran = gc.executed(); + EXPECT_EQ(ran.count("inflight"), 1u); + EXPECT_EQ(ran.count("pending"), 0u); +} + +TEST_F(CaptureThreadPoolTest, ShutdownIsIdempotent) { + CaptureThreadPool pool(2, 8, QueueFullPolicy::kRejectNewest, test_logger(), [](const std::string &) {}); + pool.shutdown(); + pool.shutdown(); // must not throw, hang, or double-join + SUCCEED(); +} + +// shutdown() releases the captured callback (and the shared_ptrs it holds) after +// joining - the load-bearing invariant behind the #441 destructor reorder. +TEST_F(CaptureThreadPoolTest, ReleasesCaptureFnAfterShutdown) { + auto sentinel = std::make_shared(42); + { + std::weak_ptr weak = sentinel; + CaptureThreadPool pool(2, 8, QueueFullPolicy::kRejectNewest, test_logger(), [held = sentinel](const std::string &) { + (void)held; + }); + sentinel.reset(); // pool's capture_fn now holds the only strong ref + EXPECT_FALSE(weak.expired()); + pool.shutdown(); + EXPECT_TRUE(weak.expired()); // capture_fn released after join -> ref dropped + } +} diff --git a/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp b/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp index 7e1b8366b..90d84af76 100644 --- a/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp +++ b/src/ros2_medkit_fault_manager/test/test_fault_manager.cpp @@ -718,6 +718,32 @@ TEST(FaultManagerNodeParameterTest, AutoConfirmAfterSec) { EXPECT_DOUBLE_EQ(config.auto_confirm_after_sec, 15.0); } +TEST(FaultManagerNodeParameterTest, ClampsInvalidCaptureParams) { + rclcpp::NodeOptions options; + options.parameter_overrides({ + {"storage_type", "memory"}, + {"snapshots.capture_pool_size", -3}, + {"snapshots.capture_queue_depth", 0}, + {"snapshots.capture_queue_full_policy", std::string("bogus")}, + }); + auto node = std::make_shared(options); + + EXPECT_EQ(node->capture_pool_size_for_test(), 1); + EXPECT_EQ(node->capture_queue_depth_for_test(), 1); + EXPECT_EQ(node->capture_queue_full_policy_for_test(), ros2_medkit_fault_manager::QueueFullPolicy::kRejectNewest); +} + +TEST(FaultManagerNodeParameterTest, ParsesDropOldestPolicy) { + rclcpp::NodeOptions options; + options.parameter_overrides({ + {"storage_type", "memory"}, + {"snapshots.capture_queue_full_policy", std::string("drop_oldest")}, + }); + auto node = std::make_shared(options); + + EXPECT_EQ(node->capture_queue_full_policy_for_test(), ros2_medkit_fault_manager::QueueFullPolicy::kDropOldest); +} + // FaultEvent Publishing Tests // // Each test iteration uses a unique namespace to prevent DDS cross-contamination diff --git a/src/ros2_medkit_fault_manager/test/test_integration.test.py b/src/ros2_medkit_fault_manager/test/test_integration.test.py index f2d794215..f9311de1d 100644 --- a/src/ros2_medkit_fault_manager/test/test_integration.test.py +++ b/src/ros2_medkit_fault_manager/test/test_integration.test.py @@ -956,6 +956,78 @@ def test_26_cleared_fault_can_be_reactivated(self): self.assertEqual(len(fault.reporting_sources), 2) print(f'Cleared fault reactivated: occurrence_count={fault.occurrence_count}') + def test_27_capture_pool_survives_fault_storm(self): + """ + Burst of distinct CRITICAL faults; the node must stay responsive (no crash/UAF). + + Exercises concurrent capture through the bounded pool and clean teardown (issue #441). + We assert liveness + that snapshots land for at least some faults. Iterate ALL faults + when reading snapshots (rosbag uses a single ring buffer; only one of N gets rosbag + data, so do not assert all N captured rosbag data). + """ + fault_codes = [f'STORM_FAULT_{i}' for i in range(8)] + + # STORM_FAULT_* codes fall through to default_topics: [/test/default_data]. + # Publish on that topic so the snapshot capture has data to collect. + default_pub = self.node.create_publisher(Temperature, '/test/default_data', 10) + + temp_msg = Temperature() + temp_msg.temperature = 70.0 + temp_msg.variance = 0.1 + + # Establish topic presence before faults are confirmed. + self._publish_and_spin(default_pub, temp_msg, count=20, interval=0.05) + + stop_publishing = threading.Event() + + def keep_publishing(): + while not stop_publishing.is_set(): + default_pub.publish(temp_msg) + time.sleep(0.05) + + pub_thread = threading.Thread(target=keep_publishing) + pub_thread.start() + try: + for code in fault_codes: + request = ReportFault.Request() + request.fault_code = code + request.event_type = ReportFault.Request.EVENT_FAILED + request.severity = Fault.SEVERITY_CRITICAL # immediate confirm + request.description = 'storm' + request.source_id = '/test_node' + response = self._call_service(self.report_fault_client, request) + self.assertTrue(response.accepted) + + time.sleep(5.0) # let the bounded pool drain (snapshot timeout is 3s) + finally: + stop_publishing.set() + pub_thread.join() + + # Node is still responsive after the storm (no crash/UAF). + list_request = ListFaults.Request() + list_request.filter_by_severity = False + list_request.severity = 0 + list_request.statuses = [Fault.STATUS_CONFIRMED] + list_response = self._call_service(self.list_faults_client, list_request) + storm = [f for f in list_response.faults if f.fault_code.startswith('STORM_FAULT_')] + self.assertGreater(len(storm), 0, 'Storm faults not persisted') + + # At least some faults captured topic data through the bounded pool. + # Count only faults with a non-empty topics dict - the meaningful signal + # that the capture path actually ran and received messages on /test/default_data. + captured = 0 + for code in fault_codes: + snap_request = GetSnapshots.Request() + snap_request.fault_code = code + snap_request.topic = '' + snap_response = self._call_service(self.get_snapshots_client, snap_request) + if snap_response.success and len(snap_response.data) > 0: + data = json.loads(snap_response.data) + if data.get('topics'): + captured += 1 + print(f'Storm capture: {captured}/{len(fault_codes)} faults captured topic data') + self.assertGreater(captured, 0, 'No storm fault captured topic data on /test/default_data') + @launch_testing.post_shutdown_test() class TestFaultManagerShutdown(unittest.TestCase): diff --git a/src/ros2_medkit_gateway/README.md b/src/ros2_medkit_gateway/README.md index e056c85cd..572e13cd8 100644 --- a/src/ros2_medkit_gateway/README.md +++ b/src/ros2_medkit_gateway/README.md @@ -930,6 +930,11 @@ Snapshots are configured via FaultManager parameters: | `snapshots.max_message_size` | int | `65536` | Maximum message size in bytes (larger messages skipped) | | `snapshots.default_topics` | string[] | `[]` | Topics to capture for all faults | | `snapshots.config_file` | string | `""` | Path to YAML config file for `fault_specific` and `patterns` | +| `snapshots.recapture_cooldown_sec` | double | `60.0` | Min seconds between captures for the same fault code. | +| `snapshots.max_per_fault` | int | `10` | Max snapshots retained per fault. | +| `snapshots.capture_pool_size` | int | `2` | Max concurrent capture threads under a fault storm (>= 1). | +| `snapshots.capture_queue_depth` | int | `16` | Max pending captures before the full-queue policy applies (>= 1). | +| `snapshots.capture_queue_full_policy` | string | `reject_newest` | Policy when the queue is full: `reject_newest` or `drop_oldest`. | **Topic Resolution Priority:** 1. `fault_specific` - Exact match for fault code (configured via YAML config file)