Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 63 additions & 4 deletions docs/config/server.rst
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,65 @@ Lower values shorten the worst-case recovery window if a graph event is missed
but increase idle CPU. The default rarely fires on a stable graph because the
graph-event poll handles node up/down events directly.

Thread Pools
------------

The gateway runs two thread pools. By default both are bounded to a small fixed
size instead of scaling with the host CPU count, so the gateway's thread
footprint is the same on a 4-core SBC and a 64-core server. Both values are
clamped to a minimum of 1.

.. list-table::
:header-rows: 1
:widths: 30 10 12 48

* - Parameter
- Type
- Default
- Description
* - ``server.http_thread_pool_size``
- int
- ``3``
- Worker threads in the HTTP request pool (cpp-httplib). Replaces the
library default of ``max(8, cores - 1)``. Kept at or above
``sse.max_clients`` so SSE streams cannot starve every worker (see note
below). Clamped to ``[1, 1024]``.
* - ``server.executor_threads``
- int
- ``2``
- Threads in the main rclcpp ``MultiThreadedExecutor``. Replaces rclcpp's
default (host cores, minimum 2). Clamped to ``[1, 256]``.

**HTTP pool and SSE.** Each active SSE stream (fault dashboard, cyclic
subscriptions, trigger events - see `SSE (Server-Sent Events)`_) holds one HTTP
pool thread for its entire lifetime, and the data cold-wait path can block up to
``data_provider.cold_wait_cap`` additional threads. To stop SSE from starving
ordinary requests, ``sse.max_clients`` defaults (2) at or below
``http_thread_pool_size`` (3). If you raise ``sse.max_clients`` for more
concurrent streams, raise ``http_thread_pool_size`` to match - a safe target is
``sse.max_clients + cold_wait_cap`` plus headroom for regular requests.

**Executor threads.** The main executor only delivers the gateway node's own
callbacks (timers, graph events, log and fault subscriptions) and the fast
service-response callbacks for operation/action RPCs. The blocking wait for an
RPC runs on the cpp-httplib pool thread (a separate server thread), not on an
executor thread, and the fault transport uses its own private executor - so a
small main executor cannot starve or deadlock blocking RPC handlers. Increase
this only if the node's own callback load grows (for example very frequent
graph churn).

Example (more SSE clients needs a larger pool and matching ``sse.max_clients``):

.. code-block:: yaml

ros2_medkit_gateway:
ros__parameters:
server:
http_thread_pool_size: 16 # workers for heavy SSE + request load
executor_threads: 4
sse:
max_clients: 10 # raised together with the HTTP pool

Bulk Data Storage
-----------------

Expand Down Expand Up @@ -329,8 +388,8 @@ Configure limits for SSE-based streaming (fault events and cyclic subscriptions)
- Description
* - ``sse.max_clients``
- int
- ``10``
- Maximum number of concurrent SSE connections (fault stream, cyclic subscription streams, and trigger event streams combined).
- ``2``
- Maximum number of concurrent SSE connections (fault stream, cyclic subscription streams, and trigger event streams combined). Each connection pins one ``server.http_thread_pool_size`` worker for its lifetime, so this defaults at or below that pool; raise both together for more concurrent streams.
* - ``sse.max_subscriptions``
- int
- ``100``
Expand All @@ -347,7 +406,7 @@ Example:
ros2_medkit_gateway:
ros__parameters:
sse:
max_clients: 10
max_clients: 2
max_subscriptions: 100
max_duration_sec: 3600

Expand Down Expand Up @@ -643,7 +702,7 @@ Complete Example
categories: ["calibration", "firmware"]

sse:
max_clients: 10
max_clients: 2
max_subscriptions: 100
max_duration_sec: 3600

Expand Down
4 changes: 4 additions & 0 deletions src/ros2_medkit_gateway/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ if(BUILD_TESTING)
ament_add_gtest(test_tls_config test/test_tls_config.cpp)
target_link_libraries(test_tls_config gateway_ros2)

# Add HTTP server thread-pool bound tests (issue #440)
ament_add_gtest(test_http_server_thread_pool test/test_http_server_thread_pool.cpp)
target_link_libraries(test_http_server_thread_pool gateway_ros2)

# Add FaultManager tests
ament_add_gtest(test_fault_manager test/test_fault_manager.cpp)
target_link_libraries(test_fault_manager gateway_ros2)
Expand Down
33 changes: 30 additions & 3 deletions src/ros2_medkit_gateway/config/gateway_params.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,30 @@ ros2_medkit_gateway:
# Valid range: 1024-65535
port: 8080

# Number of worker threads in the HTTP request pool (cpp-httplib).
# Bounded to a small fixed size instead of scaling with the host core
# count. Clamped to >= 1.
#
# IMPORTANT: each active SSE stream (fault dashboard, cyclic
# subscriptions, trigger events) holds ONE pool thread for its entire
# lifetime, and the data cold-wait path can block up to
# data_provider.cold_wait_cap more. The default sse.max_clients (2) is kept
# at or below this pool so SSE streams cannot starve every worker. If you
# raise sse.max_clients for more concurrent streams, raise this to match
# (rule of thumb: sse.max_clients + data_provider.cold_wait_cap plus
# headroom for ordinary requests).
# Default: 3
http_thread_pool_size: 3

# Number of threads in the main rclcpp MultiThreadedExecutor. This
# executor only dispatches the gateway node's own callbacks (timers,
# graph events, log/fault subscriptions); HTTP handlers that issue ROS
# service calls use their own private executors, so a small pool here is
# safe. Bounded to a small fixed size instead of std::thread::
# hardware_concurrency(). Clamped to >= 1.
# Default: 2
executor_threads: 2

# TLS/HTTPS Configuration
# Enables encrypted communication using OpenSSL
tls:
Expand Down Expand Up @@ -136,9 +160,12 @@ ros2_medkit_gateway:
# SSE (Server-Sent Events) Configuration
sse:
# Maximum number of concurrent SSE connections
# Applies to fault streams and cyclic subscription streams combined
# Default: 10
max_clients: 10
# Applies to fault streams and cyclic subscription streams combined.
# Each connection pins one server.http_thread_pool_size worker for its
# lifetime, so this defaults at or below that pool (3) to leave a worker
# for ordinary requests. Raise both together for more concurrent streams.
# Default: 2
max_clients: 2

# Maximum number of active cyclic subscriptions across all entities
# Returns HTTP 503 when this limit is reached
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <httplib.h>

#include <cstddef>
#include <memory>
#include <string>

Expand All @@ -36,9 +37,13 @@ class HttpServerManager {
/**
* @brief Construct HTTP server manager
* @param tls_config TLS configuration (if enabled, creates SSLServer)
* @param thread_pool_size Number of worker threads for the cpp-httplib request
* pool. When > 0, the server's task queue is bounded to a fixed-size
* ThreadPool of this many threads (issue #440). When 0, the cpp-httplib
* default (max(8, hardware_concurrency - 1)) is left untouched.
* @throws std::runtime_error if TLS is requested but SSL server creation fails
*/
explicit HttpServerManager(const TlsConfig & tls_config);
explicit HttpServerManager(const TlsConfig & tls_config, std::size_t thread_pool_size = 0);

~HttpServerManager() = default;

Expand Down Expand Up @@ -87,8 +92,20 @@ class HttpServerManager {
*/
void configure_tls();

/**
* @brief Bound the server's request thread pool to thread_pool_size_ workers.
*
* No-op when thread_pool_size_ is 0 (keeps the cpp-httplib default). Must be
* called before listen(), as new_task_queue is consumed when the server
* starts accepting connections.
*/
void apply_thread_pool(httplib::Server & srv) const;

TlsConfig tls_config_;

// Fixed cpp-httplib worker-pool size (0 = leave the library default).
std::size_t thread_pool_size_;

// HTTP server (used when TLS is disabled)
std::unique_ptr<httplib::Server> server_;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2026 bburda
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <algorithm>
#include <cstddef>
#include <cstdint>

namespace ros2_medkit_gateway {

// Resolve a ROS-parameter thread-count into a usable pool size (issue #440).
//
// Both the rclcpp executor and the cpp-httplib request pool are sized from an
// int64 ROS parameter that an operator may mis-set. This clamps the value to a
// closed [min_threads, max_threads] range so a typo can neither drop the pool to
// zero (which would queue requests forever / mean "all cores") nor spawn a
// pathological thread count. The range is two-sided to match the established
// clamp pattern used by the subscription_executor.* / data_provider.* knobs.
//
// Pre-condition: 1 <= min_threads <= max_threads.
inline std::size_t clamp_thread_count(int64_t requested, int64_t min_threads, int64_t max_threads) {
return static_cast<std::size_t>(std::clamp<int64_t>(requested, min_threads, max_threads));
}

} // namespace ros2_medkit_gateway
18 changes: 17 additions & 1 deletion src/ros2_medkit_gateway/src/gateway_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,19 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki
// Declare parameters with defaults
declare_parameter("server.host", "127.0.0.1");
declare_parameter("server.port", 8080);

// Thread-pool bounds (issue #440). Both pools default to a small fixed size
// instead of scaling with the host core count, so the footprint stays the
// same on a 4-core SBC and a 64-core server. The main executor only delivers
// the node's own callbacks (timers, graph events, subscriptions, and fast RPC
// response callbacks); the blocking wait for an RPC happens on the cpp-httplib
// pool thread, not an executor thread, so a small executor cannot starve it.
// Each active SSE stream holds one HTTP pool thread for its lifetime, so
// sse.max_clients is kept at or below http_thread_pool_size - raise both
// together for more concurrent SSE. Both are clamped to a bounded range on
// read (see clamp_thread_count).
declare_parameter("server.executor_threads", 2);
declare_parameter("server.http_thread_pool_size", 3);
// Safety-backstop refresh interval. Primary discovery refresh is driven
// by rclcpp graph events; this only controls the periodic forced
// refresh used when a graph event would otherwise be missed.
Expand All @@ -158,7 +171,10 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki
declare_parameter("cors.max_age_seconds", 86400);

// SSE (Server-Sent Events) parameters
declare_parameter("sse.max_clients", 10); // Limit concurrent SSE connections to prevent resource exhaustion
// Concurrent SSE connections. Each one pins an HTTP pool worker for its
// lifetime, so this defaults at or below server.http_thread_pool_size (3) to
// leave a worker for ordinary requests; raise both together for more SSE.
declare_parameter("sse.max_clients", 2);
declare_parameter("sse.max_subscriptions", 100); // Maximum active cyclic subscriptions across all entities
declare_parameter("sse.max_duration_sec", 3600); // Maximum subscription duration in seconds (1 hour default)

Expand Down
20 changes: 19 additions & 1 deletion src/ros2_medkit_gateway/src/http/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

namespace ros2_medkit_gateway {

HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_(tls_config) {
HttpServerManager::HttpServerManager(const TlsConfig & tls_config, std::size_t thread_pool_size)
: tls_config_(tls_config), thread_pool_size_(thread_pool_size) {
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
if (tls_config_.enabled) {
// Create SSL server with certificate and key
Expand All @@ -33,12 +34,14 @@ HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_

// Configure additional TLS settings
configure_tls();
apply_thread_pool(*ssl_server_);

RCLCPP_INFO(rclcpp::get_logger("http_server"), "TLS/HTTPS enabled - cert: %s, min_version: %s",
tls_config_.cert_file.c_str(), tls_config_.min_version.c_str());
// Note: key_file path intentionally not logged for security reasons
} else {
server_ = std::make_unique<httplib::Server>();
apply_thread_pool(*server_);
RCLCPP_DEBUG(rclcpp::get_logger("http_server"), "TLS/HTTPS disabled - using plain HTTP");
}
#else
Expand All @@ -48,9 +51,24 @@ HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_
"Ensure CPPHTTPLIB_OPENSSL_SUPPORT is defined.");
}
server_ = std::make_unique<httplib::Server>();
apply_thread_pool(*server_);
#endif
}

void HttpServerManager::apply_thread_pool(httplib::Server & srv) const {
if (thread_pool_size_ == 0) {
return; // Keep the cpp-httplib default task queue.
}
// new_task_queue is invoked once when the server starts accepting connections.
// Returning a fixed-size ThreadPool bounds the request worker count regardless
// of host CPU count (issue #440). cpp-httplib owns and deletes the returned
// TaskQueue.
const std::size_t n = thread_pool_size_;
srv.new_task_queue = [n] {
return new httplib::ThreadPool(n);
};
}

httplib::Server * HttpServerManager::get_server() {
#ifdef CPPHTTPLIB_OPENSSL_SUPPORT
if (tls_config_.enabled && ssl_server_) {
Expand Down
15 changes: 13 additions & 2 deletions src/ros2_medkit_gateway/src/http/rest_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "ros2_medkit_gateway/core/auth/auth_middleware.hpp"
#include "ros2_medkit_gateway/core/http/error_codes.hpp"
#include "ros2_medkit_gateway/core/http/http_utils.hpp"
#include "ros2_medkit_gateway/core/thread_pool_config.hpp"
#include "ros2_medkit_gateway/gateway_node.hpp"

#include "../openapi/route_registry.hpp"
Expand Down Expand Up @@ -81,8 +82,18 @@ RESTServer::RESTServer(GatewayNode * node, const std::string & host, int port, c
, cors_config_(cors_config)
, auth_config_(auth_config)
, tls_config_(tls_config) {
// Create HTTP/HTTPS server manager
http_server_ = std::make_unique<HttpServerManager>(tls_config_);
// Create HTTP/HTTPS server manager with a bounded request thread pool
// (issue #440). clamp_thread_count keeps it in [1, 1024]: a pool of 0 would
// queue every request forever, and a typo'd huge value would spawn that many
// OS threads. Each active SSE stream holds one worker for its lifetime, so the
// default pool is kept at or above sse.max_clients; if you raise
// sse.max_clients, raise server.http_thread_pool_size to match.
const auto http_thread_pool_size =
clamp_thread_count(node_->get_parameter("server.http_thread_pool_size").as_int(), 1, 1024);
http_server_ = std::make_unique<HttpServerManager>(tls_config_, http_thread_pool_size);
RCLCPP_INFO(rclcpp::get_logger("rest_server"),
"HTTP request thread pool bounded to %zu workers (each active SSE stream holds one)",
http_thread_pool_size);

// Set maximum payload size for uploads (cpp-httplib default is 8MB)
auto * srv = http_server_->get_server();
Expand Down
24 changes: 18 additions & 6 deletions src/ros2_medkit_gateway/src/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

#include <algorithm>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <memory>

#include <rclcpp/rclcpp.hpp>

#include "ros2_medkit_gateway/core/thread_pool_config.hpp"
#include "ros2_medkit_gateway/data/ros2_topic_data_provider.hpp"
#include "ros2_medkit_gateway/gateway_node.hpp"
#include "ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp"
Expand Down Expand Up @@ -69,13 +71,23 @@ int main(int argc, char ** argv) {

auto node = std::make_shared<ros2_medkit_gateway::GatewayNode>();

// MultiThreadedExecutor for the gateway node - HTTP handlers run on several
// threads, so the main executor must dispatch callbacks in parallel to avoid
// starving slow handlers. The Ros2SubscriptionExecutor built below owns its
// own internal single-threaded executor (spun from its worker thread); the
// subscription node is intentionally not added here.
rclcpp::executors::MultiThreadedExecutor executor;
// MultiThreadedExecutor for the gateway node. Issue #440: the thread count
// is bounded by the server.executor_threads parameter (default 2) instead of
// rclcpp's default (host cores, minimum 2), so the footprint does not grow
// with the host core count. The executor only delivers the gateway node's
// own callbacks - timers, graph events, log/fault subscriptions, and the
// (fast) service-response callbacks for operation/action RPCs. The blocking
// wait for those RPCs happens on the cpp-httplib pool thread (a separate
// server_thread_), not on an executor thread, and the fault transport runs
// on its own private executor - so a small executor here cannot starve or
// deadlock blocking RPC handlers. The Ros2SubscriptionExecutor built below
// owns its own internal single-threaded executor (spun from its worker
// thread); the subscription node is intentionally not added here.
const auto executor_threads =
ros2_medkit_gateway::clamp_thread_count(node->get_parameter("server.executor_threads").as_int(), 1, 256);
rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), executor_threads);
executor.add_node(node);
RCLCPP_INFO(node->get_logger(), "Main executor bounded to %zu threads", executor_threads);

// Stand up the ROS 2 subscription executor + topic data provider.
// Issue #375: all subscription create/destroy calls are funneled through the
Expand Down
Loading
Loading