Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/config/fault-manager.rst
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ Basic Snapshot Settings
config_file: "" # Path to YAML config file
recapture_cooldown_sec: 60.0 # Min seconds between snapshot captures per fault
max_per_fault: 10 # Max snapshots stored per fault code (0 = unlimited)
capture_pool_size: 2 # Max concurrent capture threads (>= 1)
capture_queue_depth: 16 # Max pending captures before policy applies (>= 1)
capture_queue_full_policy: reject_newest # reject_newest | drop_oldest

.. list-table::
:header-rows: 1
Expand Down Expand Up @@ -202,6 +205,15 @@ Basic Snapshot Settings
- ``10``
- Maximum number of snapshots stored per fault code. When the limit is reached,
new snapshots for that fault are rejected. Set to 0 for unlimited.
* - ``snapshots.capture_pool_size``
- ``2``
- Max concurrent capture threads under a fault storm (>= 1).
* - ``snapshots.capture_queue_depth``
- ``16``
- Max pending captures before the full-queue policy applies (>= 1).
* - ``snapshots.capture_queue_full_policy``
- ``reject_newest``
- Policy when the queue is full: ``reject_newest`` or ``drop_oldest``.

Rosbag Recording
~~~~~~~~~~~~~~~~
Expand Down
9 changes: 9 additions & 0 deletions docs/tutorials/snapshots.rst
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ Configure snapshot capture via fault manager parameters:
- ``10``
- Maximum number of snapshots stored per fault code. When the limit is reached,
new snapshots for that fault are rejected. Set to 0 for unlimited.
* - ``snapshots.capture_pool_size``
- ``2``
- Max concurrent capture threads under a fault storm (>= 1).
* - ``snapshots.capture_queue_depth``
- ``16``
- Max pending captures before the full-queue policy applies (>= 1).
* - ``snapshots.capture_queue_full_policy``
- ``reject_newest``
- Policy when the queue is full: ``reject_newest`` or ``drop_oldest``.

Advanced Configuration
----------------------
Expand Down
7 changes: 7 additions & 0 deletions src/ros2_medkit_fault_manager/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ medkit_detect_compat_defs()

# Library target (shared between executable and tests)
add_library(fault_manager_lib STATIC
src/capture_thread_pool.cpp
src/fault_manager_node.cpp
src/fault_storage.cpp
src/sqlite_fault_storage.cpp
Expand Down Expand Up @@ -224,6 +225,12 @@ if(BUILD_TESTING)
TIMEOUT 120
)
set_tests_properties(test_rosbag_entity_scope PROPERTIES LABELS "integration")

# CaptureThreadPool unit tests (no ROS node, no domain id needed)
ament_add_gtest(test_capture_thread_pool test/test_capture_thread_pool.cpp)
target_link_libraries(test_capture_thread_pool fault_manager_lib)
medkit_target_dependencies(test_capture_thread_pool rclcpp)

ros2_medkit_relax_vendor_warnings()
endif()

Expand Down
7 changes: 7 additions & 0 deletions src/ros2_medkit_fault_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,8 @@ ros2 service call /fault_manager/clear_fault ros2_medkit_msgs/srv/ClearFault \

Snapshots capture topic data when faults are confirmed for post-mortem debugging.

Under a fault storm, captures are bounded by a worker pool (`capture_pool_size`) draining a bounded queue (`capture_queue_depth`); excess captures are dropped per `capture_queue_full_policy` and logged (throttled).

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `snapshots.enabled` | bool | `true` | Enable/disable snapshot capture |
Expand All @@ -78,6 +80,11 @@ Snapshots capture topic data when faults are confirmed for post-mortem debugging
| `snapshots.max_message_size` | int | `65536` | Maximum message size in bytes (larger messages skipped) |
| `snapshots.default_topics` | string[] | `[]` | Topics to capture for all faults |
| `snapshots.config_file` | string | `""` | Path to YAML config for `fault_specific` and `patterns` |
| `snapshots.recapture_cooldown_sec` | double | `60.0` | Min seconds between captures for the same fault code. |
| `snapshots.max_per_fault` | int | `10` | Max snapshots retained per fault. |
| `snapshots.capture_pool_size` | int | `2` | Max concurrent capture threads under a fault storm (>= 1). |
| `snapshots.capture_queue_depth` | int | `16` | Max pending captures before the full-queue policy applies (>= 1). |
| `snapshots.capture_queue_full_policy` | string | `reject_newest` | Policy when the queue is full: `reject_newest` or `drop_oldest`. |

**Topic Resolution Priority:**
1. `fault_specific` - Exact match for fault code (configured via YAML config file)
Expand Down
5 changes: 5 additions & 0 deletions src/ros2_medkit_fault_manager/config/fault_manager.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,8 @@ fault_manager:
# to entity-scoped capture and is crash-safe (falls back / self-disables if no
# storage backend is usable).
snapshots.rosbag.enabled: false

# Capture concurrency bound under fault storms (issue #441):
# snapshots.capture_pool_size: 2 # max concurrent capture threads (>= 1)
# snapshots.capture_queue_depth: 16 # max pending captures (>= 1)
# snapshots.capture_queue_full_policy: reject_newest # reject_newest | drop_oldest
22 changes: 21 additions & 1 deletion src/ros2_medkit_fault_manager/design/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ The following diagram shows the relationships between the main components of the
+ get_storage(): FaultStorage&
}

class CaptureThreadPool {
+enqueue(fault_code) : EnqueueOutcome
+shutdown()
+dropped_captures() : uint64_t
-workers_ : vector<thread>
-queue_ : deque<string>
}

enum QueueFullPolicy {
kRejectNewest
kDropOldest
}

abstract class FaultStorage <<interface>> {
+ {abstract} report_fault(): bool
+ {abstract} list_faults(): vector<Fault>
Expand Down Expand Up @@ -91,6 +104,7 @@ The following diagram shows the relationships between the main components of the

' Composition
FaultManagerNode *-down-> InMemoryFaultStorage : owns
FaultManagerNode --> CaptureThreadPool : enqueues capture jobs

' InMemoryFaultStorage contains FaultStates
InMemoryFaultStorage o-right-> FaultState : contains many
Expand Down Expand Up @@ -131,7 +145,13 @@ Main Components
- Maps directly to ``ros2_medkit_msgs::msg::Fault`` via ``to_msg()``
- Uses ``std::set`` for reporting_sources to ensure uniqueness
- Tracks first and last occurrence timestamps
- Manages fault status lifecycle with debounce (PREFAILED → CONFIRMED → CLEARED)
- Manages fault status lifecycle with debounce (PREFAILED -> CONFIRMED -> CLEARED)

5. **CaptureThreadPool** - Bounded worker pool for snapshot capture
Under a fault storm the node enqueues capture jobs into a bounded ``CaptureThreadPool``
(``pool_size`` workers draining a ``queue_depth``-bounded queue) rather than spawning one
thread per fault. The full-queue policy (``reject_newest``/``drop_oldest``) bounds memory,
and the destructor joins the pool before rosbag teardown.

Services
--------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// Copyright 2026 bburda
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <atomic>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <deque>
#include <functional>
#include <mutex>
#include <optional>
#include <string>
#include <thread>
#include <vector>

#include "rclcpp/logger.hpp"

namespace ros2_medkit_fault_manager {

/// Policy applied when the bounded pending queue is full.
enum class QueueFullPolicy {
kRejectNewest, ///< Drop the incoming job.
kDropOldest ///< Evict the oldest pending job, queue the incoming one.
};

/// Result of an enqueue attempt.
enum class EnqueueResult {
kAccepted, ///< Queued; will run.
kDroppedNewest, ///< Queue full (reject_newest): incoming dropped.
kEvictedOldest, ///< Queue full (drop_oldest): oldest evicted, incoming queued.
kRejectedShuttingDown ///< Pool is shutting down: not queued, will not run.
};

struct EnqueueOutcome {
EnqueueResult result;
std::optional<std::string> evicted_code; ///< Set only for kEvictedOldest.
};

/// Bounded worker pool that runs fault-capture jobs off the service thread.
///
/// Caps concurrent in-flight captures at pool_size and bounds pending work at
/// queue_depth with a configurable full-queue policy, giving peak memory and
/// thread count a hard upper bound under a fault storm. See issue #441.
class CaptureThreadPool {
public:
/// @param pool_size Number of worker threads (clamped to >= 1).
/// @param queue_depth Max pending jobs (clamped to >= 1).
/// @param full_policy Behavior when the pending queue is full.
/// @param logger Logger for worker-side capture failures.
/// @param capture_fn Invoked per job on a worker thread. Must be thread-safe
/// for pool_size concurrent calls. Exceptions are caught and logged.
CaptureThreadPool(std::size_t pool_size, std::size_t queue_depth, QueueFullPolicy full_policy, rclcpp::Logger logger,
std::function<void(const std::string &)> capture_fn);
~CaptureThreadPool();

CaptureThreadPool(const CaptureThreadPool &) = delete;
CaptureThreadPool & operator=(const CaptureThreadPool &) = delete;
CaptureThreadPool(CaptureThreadPool &&) = delete;
CaptureThreadPool & operator=(CaptureThreadPool &&) = delete;

/// Enqueue a capture job. Non-blocking. Thread-safe.
EnqueueOutcome enqueue(const std::string & fault_code);

/// Stop accepting work, let in-flight jobs finish, discard pending, join all
/// workers. Idempotent and noexcept. Called by the destructor.
void shutdown() noexcept;

/// Lifetime total of storm drops (kDroppedNewest + kEvictedOldest).
uint64_t dropped_captures() const {
return dropped_captures_.load();
}

/// Current number of pending (not yet started) jobs. For tests/observability.
std::size_t pending_size() const;

private:
void worker_loop();

const std::size_t queue_depth_;
const QueueFullPolicy full_policy_;
rclcpp::Logger logger_;
std::function<void(const std::string &)> capture_fn_;

mutable std::mutex queue_mutex_;
std::condition_variable cv_;
std::deque<std::string> queue_;
bool stop_{false};

std::atomic<uint64_t> dropped_captures_{0};
std::vector<std::thread> workers_;
};

} // namespace ros2_medkit_fault_manager
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <unordered_map>

#include "rclcpp/rclcpp.hpp"
#include "ros2_medkit_fault_manager/capture_thread_pool.hpp"
#include "ros2_medkit_fault_manager/correlation/correlation_engine.hpp"
#include "ros2_medkit_fault_manager/entity_threshold_resolver.hpp"
#include "ros2_medkit_fault_manager/fault_storage.hpp"
Expand Down Expand Up @@ -65,6 +66,16 @@ class FaultManagerNode : public rclcpp::Node {
return storage_type_;
}

int capture_pool_size_for_test() const {
return capture_pool_size_;
}
int capture_queue_depth_for_test() const {
return capture_queue_depth_;
}
QueueFullPolicy capture_queue_full_policy_for_test() const {
return capture_queue_full_policy_;
}

/// Check if entity matches any reporting source
/// @param reporting_sources List of reporting sources from fault
/// @param entity_id Entity ID to match (exact match or as suffix of FQN)
Expand Down Expand Up @@ -144,6 +155,9 @@ class FaultManagerNode : public rclcpp::Node {
int32_t healing_threshold_{3};
double auto_confirm_after_sec_{0.0};
double snapshot_recapture_cooldown_sec_{60.0};
int capture_pool_size_{2};
int capture_queue_depth_{16};
QueueFullPolicy capture_queue_full_policy_{QueueFullPolicy::kRejectNewest};
DebounceConfig global_config_; ///< Global debounce config (built from ROS params)
std::unique_ptr<FaultStorage> storage_;
std::unique_ptr<EntityThresholdResolver> threshold_resolver_; ///< Per-entity threshold overrides
Expand All @@ -165,19 +179,19 @@ class FaultManagerNode : public rclcpp::Node {
rclcpp::Publisher<ros2_medkit_msgs::msg::FaultEvent>::SharedPtr event_publisher_;

/// Snapshot capture for capturing topic data on fault confirmation.
/// shared_ptr to allow safe capture-by-value in detached capture threads.
/// shared_ptr to allow safe capture-by-value in the pool's capture jobs.
std::shared_ptr<SnapshotCapture> snapshot_capture_;

/// Rosbag capture for time-window recording (nullptr if disabled).
/// shared_ptr to allow safe capture-by-value in detached capture threads.
/// shared_ptr to allow safe capture-by-value in the pool's capture jobs.
std::shared_ptr<RosbagCapture> rosbag_capture_;

/// Correlation engine for fault correlation/muting (nullptr if disabled)
std::unique_ptr<correlation::CorrelationEngine> correlation_engine_;

/// Tracks active capture threads for clean shutdown (join before destruction)
std::mutex capture_threads_mutex_;
std::vector<std::thread> capture_threads_;
/// Bounded pool that runs capture jobs off the service thread (issue #441).
/// Declared after snapshot_capture_/rosbag_capture_ so it is destroyed first.
std::unique_ptr<CaptureThreadPool> capture_pool_;

/// Per-fault cooldown tracking for snapshot recapture
std::mutex last_capture_mutex_;
Expand Down
Loading
Loading