diff --git a/docs/config/server.rst b/docs/config/server.rst index 427371a9..e1f9182d 100644 --- a/docs/config/server.rst +++ b/docs/config/server.rst @@ -264,6 +264,65 @@ Lower values shorten the worst-case recovery window if a graph event is missed but increase idle CPU. The default rarely fires on a stable graph because the graph-event poll handles node up/down events directly. +Thread Pools +------------ + +The gateway runs two thread pools. By default both are bounded to a small fixed +size instead of scaling with the host CPU count, so the gateway's thread +footprint is the same on a 4-core SBC and a 64-core server. Both values are +clamped to a minimum of 1. + +.. list-table:: + :header-rows: 1 + :widths: 30 10 12 48 + + * - Parameter + - Type + - Default + - Description + * - ``server.http_thread_pool_size`` + - int + - ``3`` + - Worker threads in the HTTP request pool (cpp-httplib). Replaces the + library default of ``max(8, cores - 1)``. Kept at or above + ``sse.max_clients`` so SSE streams cannot starve every worker (see note + below). Clamped to ``[1, 1024]``. + * - ``server.executor_threads`` + - int + - ``2`` + - Threads in the main rclcpp ``MultiThreadedExecutor``. Replaces rclcpp's + default (host cores, minimum 2). Clamped to ``[1, 256]``. + +**HTTP pool and SSE.** Each active SSE stream (fault dashboard, cyclic +subscriptions, trigger events - see `SSE (Server-Sent Events)`_) holds one HTTP +pool thread for its entire lifetime, and the data cold-wait path can block up to +``data_provider.cold_wait_cap`` additional threads. To stop SSE from starving +ordinary requests, ``sse.max_clients`` defaults (2) at or below +``http_thread_pool_size`` (3). If you raise ``sse.max_clients`` for more +concurrent streams, raise ``http_thread_pool_size`` to match - a safe target is +``sse.max_clients + cold_wait_cap`` plus headroom for regular requests. + +**Executor threads.** The main executor only delivers the gateway node's own +callbacks (timers, graph events, log and fault subscriptions) and the fast +service-response callbacks for operation/action RPCs. The blocking wait for an +RPC runs on the cpp-httplib pool thread (a separate server thread), not on an +executor thread, and the fault transport uses its own private executor - so a +small main executor cannot starve or deadlock blocking RPC handlers. Increase +this only if the node's own callback load grows (for example very frequent +graph churn). + +Example (more SSE clients needs a larger pool and matching ``sse.max_clients``): + +.. code-block:: yaml + + ros2_medkit_gateway: + ros__parameters: + server: + http_thread_pool_size: 16 # workers for heavy SSE + request load + executor_threads: 4 + sse: + max_clients: 10 # raised together with the HTTP pool + Bulk Data Storage ----------------- @@ -329,8 +388,8 @@ Configure limits for SSE-based streaming (fault events and cyclic subscriptions) - Description * - ``sse.max_clients`` - int - - ``10`` - - Maximum number of concurrent SSE connections (fault stream, cyclic subscription streams, and trigger event streams combined). + - ``2`` + - Maximum number of concurrent SSE connections (fault stream, cyclic subscription streams, and trigger event streams combined). Each connection pins one ``server.http_thread_pool_size`` worker for its lifetime, so this defaults at or below that pool; raise both together for more concurrent streams. * - ``sse.max_subscriptions`` - int - ``100`` @@ -347,7 +406,7 @@ Example: ros2_medkit_gateway: ros__parameters: sse: - max_clients: 10 + max_clients: 2 max_subscriptions: 100 max_duration_sec: 3600 @@ -643,7 +702,7 @@ Complete Example categories: ["calibration", "firmware"] sse: - max_clients: 10 + max_clients: 2 max_subscriptions: 100 max_duration_sec: 3600 diff --git a/src/ros2_medkit_gateway/CMakeLists.txt b/src/ros2_medkit_gateway/CMakeLists.txt index 470febc6..dabb3332 100644 --- a/src/ros2_medkit_gateway/CMakeLists.txt +++ b/src/ros2_medkit_gateway/CMakeLists.txt @@ -564,6 +564,10 @@ if(BUILD_TESTING) ament_add_gtest(test_tls_config test/test_tls_config.cpp) target_link_libraries(test_tls_config gateway_ros2) + # Add HTTP server thread-pool bound tests (issue #440) + ament_add_gtest(test_http_server_thread_pool test/test_http_server_thread_pool.cpp) + target_link_libraries(test_http_server_thread_pool gateway_ros2) + # Add FaultManager tests ament_add_gtest(test_fault_manager test/test_fault_manager.cpp) target_link_libraries(test_fault_manager gateway_ros2) diff --git a/src/ros2_medkit_gateway/config/gateway_params.yaml b/src/ros2_medkit_gateway/config/gateway_params.yaml index 515c6ddb..f3e872e2 100644 --- a/src/ros2_medkit_gateway/config/gateway_params.yaml +++ b/src/ros2_medkit_gateway/config/gateway_params.yaml @@ -20,6 +20,30 @@ ros2_medkit_gateway: # Valid range: 1024-65535 port: 8080 + # Number of worker threads in the HTTP request pool (cpp-httplib). + # Bounded to a small fixed size instead of scaling with the host core + # count. Clamped to >= 1. + # + # IMPORTANT: each active SSE stream (fault dashboard, cyclic + # subscriptions, trigger events) holds ONE pool thread for its entire + # lifetime, and the data cold-wait path can block up to + # data_provider.cold_wait_cap more. The default sse.max_clients (2) is kept + # at or below this pool so SSE streams cannot starve every worker. If you + # raise sse.max_clients for more concurrent streams, raise this to match + # (rule of thumb: sse.max_clients + data_provider.cold_wait_cap plus + # headroom for ordinary requests). + # Default: 3 + http_thread_pool_size: 3 + + # Number of threads in the main rclcpp MultiThreadedExecutor. This + # executor only dispatches the gateway node's own callbacks (timers, + # graph events, log/fault subscriptions); HTTP handlers that issue ROS + # service calls use their own private executors, so a small pool here is + # safe. Bounded to a small fixed size instead of std::thread:: + # hardware_concurrency(). Clamped to >= 1. + # Default: 2 + executor_threads: 2 + # TLS/HTTPS Configuration # Enables encrypted communication using OpenSSL tls: @@ -136,9 +160,12 @@ ros2_medkit_gateway: # SSE (Server-Sent Events) Configuration sse: # Maximum number of concurrent SSE connections - # Applies to fault streams and cyclic subscription streams combined - # Default: 10 - max_clients: 10 + # Applies to fault streams and cyclic subscription streams combined. + # Each connection pins one server.http_thread_pool_size worker for its + # lifetime, so this defaults at or below that pool (3) to leave a worker + # for ordinary requests. Raise both together for more concurrent streams. + # Default: 2 + max_clients: 2 # Maximum number of active cyclic subscriptions across all entities # Returns HTTP 503 when this limit is reached diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/http/http_server.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/http/http_server.hpp index 0dbbd61e..0d1d5da5 100644 --- a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/http/http_server.hpp +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/http/http_server.hpp @@ -16,6 +16,7 @@ #include +#include #include #include @@ -36,9 +37,13 @@ class HttpServerManager { /** * @brief Construct HTTP server manager * @param tls_config TLS configuration (if enabled, creates SSLServer) + * @param thread_pool_size Number of worker threads for the cpp-httplib request + * pool. When > 0, the server's task queue is bounded to a fixed-size + * ThreadPool of this many threads (issue #440). When 0, the cpp-httplib + * default (max(8, hardware_concurrency - 1)) is left untouched. * @throws std::runtime_error if TLS is requested but SSL server creation fails */ - explicit HttpServerManager(const TlsConfig & tls_config); + explicit HttpServerManager(const TlsConfig & tls_config, std::size_t thread_pool_size = 0); ~HttpServerManager() = default; @@ -87,8 +92,20 @@ class HttpServerManager { */ void configure_tls(); + /** + * @brief Bound the server's request thread pool to thread_pool_size_ workers. + * + * No-op when thread_pool_size_ is 0 (keeps the cpp-httplib default). Must be + * called before listen(), as new_task_queue is consumed when the server + * starts accepting connections. + */ + void apply_thread_pool(httplib::Server & srv) const; + TlsConfig tls_config_; + // Fixed cpp-httplib worker-pool size (0 = leave the library default). + std::size_t thread_pool_size_; + // HTTP server (used when TLS is disabled) std::unique_ptr server_; diff --git a/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/thread_pool_config.hpp b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/thread_pool_config.hpp new file mode 100644 index 00000000..f4ab67d9 --- /dev/null +++ b/src/ros2_medkit_gateway/include/ros2_medkit_gateway/core/thread_pool_config.hpp @@ -0,0 +1,37 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace ros2_medkit_gateway { + +// Resolve a ROS-parameter thread-count into a usable pool size (issue #440). +// +// Both the rclcpp executor and the cpp-httplib request pool are sized from an +// int64 ROS parameter that an operator may mis-set. This clamps the value to a +// closed [min_threads, max_threads] range so a typo can neither drop the pool to +// zero (which would queue requests forever / mean "all cores") nor spawn a +// pathological thread count. The range is two-sided to match the established +// clamp pattern used by the subscription_executor.* / data_provider.* knobs. +// +// Pre-condition: 1 <= min_threads <= max_threads. +inline std::size_t clamp_thread_count(int64_t requested, int64_t min_threads, int64_t max_threads) { + return static_cast(std::clamp(requested, min_threads, max_threads)); +} + +} // namespace ros2_medkit_gateway diff --git a/src/ros2_medkit_gateway/src/gateway_node.cpp b/src/ros2_medkit_gateway/src/gateway_node.cpp index 4e3a58d7..75114075 100644 --- a/src/ros2_medkit_gateway/src/gateway_node.cpp +++ b/src/ros2_medkit_gateway/src/gateway_node.cpp @@ -145,6 +145,19 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki // Declare parameters with defaults declare_parameter("server.host", "127.0.0.1"); declare_parameter("server.port", 8080); + + // Thread-pool bounds (issue #440). Both pools default to a small fixed size + // instead of scaling with the host core count, so the footprint stays the + // same on a 4-core SBC and a 64-core server. The main executor only delivers + // the node's own callbacks (timers, graph events, subscriptions, and fast RPC + // response callbacks); the blocking wait for an RPC happens on the cpp-httplib + // pool thread, not an executor thread, so a small executor cannot starve it. + // Each active SSE stream holds one HTTP pool thread for its lifetime, so + // sse.max_clients is kept at or below http_thread_pool_size - raise both + // together for more concurrent SSE. Both are clamped to a bounded range on + // read (see clamp_thread_count). + declare_parameter("server.executor_threads", 2); + declare_parameter("server.http_thread_pool_size", 3); // Safety-backstop refresh interval. Primary discovery refresh is driven // by rclcpp graph events; this only controls the periodic forced // refresh used when a graph event would otherwise be missed. @@ -158,7 +171,10 @@ GatewayNode::GatewayNode(const rclcpp::NodeOptions & options) : Node("ros2_medki declare_parameter("cors.max_age_seconds", 86400); // SSE (Server-Sent Events) parameters - declare_parameter("sse.max_clients", 10); // Limit concurrent SSE connections to prevent resource exhaustion + // Concurrent SSE connections. Each one pins an HTTP pool worker for its + // lifetime, so this defaults at or below server.http_thread_pool_size (3) to + // leave a worker for ordinary requests; raise both together for more SSE. + declare_parameter("sse.max_clients", 2); declare_parameter("sse.max_subscriptions", 100); // Maximum active cyclic subscriptions across all entities declare_parameter("sse.max_duration_sec", 3600); // Maximum subscription duration in seconds (1 hour default) diff --git a/src/ros2_medkit_gateway/src/http/http_server.cpp b/src/ros2_medkit_gateway/src/http/http_server.cpp index 716c2f8d..de49d32d 100644 --- a/src/ros2_medkit_gateway/src/http/http_server.cpp +++ b/src/ros2_medkit_gateway/src/http/http_server.cpp @@ -19,7 +19,8 @@ namespace ros2_medkit_gateway { -HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_(tls_config) { +HttpServerManager::HttpServerManager(const TlsConfig & tls_config, std::size_t thread_pool_size) + : tls_config_(tls_config), thread_pool_size_(thread_pool_size) { #ifdef CPPHTTPLIB_OPENSSL_SUPPORT if (tls_config_.enabled) { // Create SSL server with certificate and key @@ -33,12 +34,14 @@ HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_ // Configure additional TLS settings configure_tls(); + apply_thread_pool(*ssl_server_); RCLCPP_INFO(rclcpp::get_logger("http_server"), "TLS/HTTPS enabled - cert: %s, min_version: %s", tls_config_.cert_file.c_str(), tls_config_.min_version.c_str()); // Note: key_file path intentionally not logged for security reasons } else { server_ = std::make_unique(); + apply_thread_pool(*server_); RCLCPP_DEBUG(rclcpp::get_logger("http_server"), "TLS/HTTPS disabled - using plain HTTP"); } #else @@ -48,9 +51,24 @@ HttpServerManager::HttpServerManager(const TlsConfig & tls_config) : tls_config_ "Ensure CPPHTTPLIB_OPENSSL_SUPPORT is defined."); } server_ = std::make_unique(); + apply_thread_pool(*server_); #endif } +void HttpServerManager::apply_thread_pool(httplib::Server & srv) const { + if (thread_pool_size_ == 0) { + return; // Keep the cpp-httplib default task queue. + } + // new_task_queue is invoked once when the server starts accepting connections. + // Returning a fixed-size ThreadPool bounds the request worker count regardless + // of host CPU count (issue #440). cpp-httplib owns and deletes the returned + // TaskQueue. + const std::size_t n = thread_pool_size_; + srv.new_task_queue = [n] { + return new httplib::ThreadPool(n); + }; +} + httplib::Server * HttpServerManager::get_server() { #ifdef CPPHTTPLIB_OPENSSL_SUPPORT if (tls_config_.enabled && ssl_server_) { diff --git a/src/ros2_medkit_gateway/src/http/rest_server.cpp b/src/ros2_medkit_gateway/src/http/rest_server.cpp index 73777237..cd7d7ee4 100644 --- a/src/ros2_medkit_gateway/src/http/rest_server.cpp +++ b/src/ros2_medkit_gateway/src/http/rest_server.cpp @@ -22,6 +22,7 @@ #include "ros2_medkit_gateway/core/auth/auth_middleware.hpp" #include "ros2_medkit_gateway/core/http/error_codes.hpp" #include "ros2_medkit_gateway/core/http/http_utils.hpp" +#include "ros2_medkit_gateway/core/thread_pool_config.hpp" #include "ros2_medkit_gateway/gateway_node.hpp" #include "../openapi/route_registry.hpp" @@ -81,8 +82,18 @@ RESTServer::RESTServer(GatewayNode * node, const std::string & host, int port, c , cors_config_(cors_config) , auth_config_(auth_config) , tls_config_(tls_config) { - // Create HTTP/HTTPS server manager - http_server_ = std::make_unique(tls_config_); + // Create HTTP/HTTPS server manager with a bounded request thread pool + // (issue #440). clamp_thread_count keeps it in [1, 1024]: a pool of 0 would + // queue every request forever, and a typo'd huge value would spawn that many + // OS threads. Each active SSE stream holds one worker for its lifetime, so the + // default pool is kept at or above sse.max_clients; if you raise + // sse.max_clients, raise server.http_thread_pool_size to match. + const auto http_thread_pool_size = + clamp_thread_count(node_->get_parameter("server.http_thread_pool_size").as_int(), 1, 1024); + http_server_ = std::make_unique(tls_config_, http_thread_pool_size); + RCLCPP_INFO(rclcpp::get_logger("rest_server"), + "HTTP request thread pool bounded to %zu workers (each active SSE stream holds one)", + http_thread_pool_size); // Set maximum payload size for uploads (cpp-httplib default is 8MB) auto * srv = http_server_->get_server(); diff --git a/src/ros2_medkit_gateway/src/main.cpp b/src/ros2_medkit_gateway/src/main.cpp index b28f68b5..99825f00 100644 --- a/src/ros2_medkit_gateway/src/main.cpp +++ b/src/ros2_medkit_gateway/src/main.cpp @@ -14,11 +14,13 @@ #include #include +#include #include #include #include +#include "ros2_medkit_gateway/core/thread_pool_config.hpp" #include "ros2_medkit_gateway/data/ros2_topic_data_provider.hpp" #include "ros2_medkit_gateway/gateway_node.hpp" #include "ros2_medkit_gateway/ros2_common/ros2_subscription_executor.hpp" @@ -69,13 +71,23 @@ int main(int argc, char ** argv) { auto node = std::make_shared(); - // MultiThreadedExecutor for the gateway node - HTTP handlers run on several - // threads, so the main executor must dispatch callbacks in parallel to avoid - // starving slow handlers. The Ros2SubscriptionExecutor built below owns its - // own internal single-threaded executor (spun from its worker thread); the - // subscription node is intentionally not added here. - rclcpp::executors::MultiThreadedExecutor executor; + // MultiThreadedExecutor for the gateway node. Issue #440: the thread count + // is bounded by the server.executor_threads parameter (default 2) instead of + // rclcpp's default (host cores, minimum 2), so the footprint does not grow + // with the host core count. The executor only delivers the gateway node's + // own callbacks - timers, graph events, log/fault subscriptions, and the + // (fast) service-response callbacks for operation/action RPCs. The blocking + // wait for those RPCs happens on the cpp-httplib pool thread (a separate + // server_thread_), not on an executor thread, and the fault transport runs + // on its own private executor - so a small executor here cannot starve or + // deadlock blocking RPC handlers. The Ros2SubscriptionExecutor built below + // owns its own internal single-threaded executor (spun from its worker + // thread); the subscription node is intentionally not added here. + const auto executor_threads = + ros2_medkit_gateway::clamp_thread_count(node->get_parameter("server.executor_threads").as_int(), 1, 256); + rclcpp::executors::MultiThreadedExecutor executor(rclcpp::ExecutorOptions(), executor_threads); executor.add_node(node); + RCLCPP_INFO(node->get_logger(), "Main executor bounded to %zu threads", executor_threads); // Stand up the ROS 2 subscription executor + topic data provider. // Issue #375: all subscription create/destroy calls are funneled through the diff --git a/src/ros2_medkit_gateway/test/test_http_server_thread_pool.cpp b/src/ros2_medkit_gateway/test/test_http_server_thread_pool.cpp new file mode 100644 index 00000000..0efcacb7 --- /dev/null +++ b/src/ros2_medkit_gateway/test/test_http_server_thread_pool.cpp @@ -0,0 +1,152 @@ +// Copyright 2026 bburda +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Issue #440: the HTTP request pool must be bounded to a fixed worker count +// instead of scaling with the host CPU count. These tests start a real +// loopback server with a small pool and prove that no more than `pool_size` +// request handlers ever run concurrently, regardless of how many clients +// connect at once. + +#include + +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "ros2_medkit_gateway/core/config.hpp" +#include "ros2_medkit_gateway/core/http/http_server.hpp" +#include "ros2_medkit_gateway/core/thread_pool_config.hpp" + +using namespace ros2_medkit_gateway; +using namespace std::chrono_literals; + +namespace { + +// Observed peak concurrency of request handlers when `clients` clients hit a +// blocking endpoint at once, served by a pool bounded to `pool_size` workers. +// Returns the maximum number of handlers seen running simultaneously. +int observe_peak_concurrency(std::size_t pool_size, int clients) { + HttpServerManager manager(TlsConfig{}, pool_size); + httplib::Server * srv = manager.get_server(); + EXPECT_NE(srv, nullptr); + + std::atomic active{0}; + std::atomic peak{0}; + std::atomic started{0}; + std::mutex m; + std::condition_variable cv; + bool release = false; + + srv->Get("/block", [&](const httplib::Request &, httplib::Response & res) { + const int now = ++active; + started.fetch_add(1); + // Record the running peak. + int prev = peak.load(); + while (now > prev && !peak.compare_exchange_weak(prev, now)) { + // prev refreshed by compare_exchange_weak on failure. + } + { + std::unique_lock lk(m); + // Bounded wait so a wedged test fails fast instead of hanging forever. + cv.wait_for(lk, 10s, [&] { + return release; + }); + } + --active; + res.set_content("ok", "text/plain"); + }); + + const int port = srv->bind_to_any_port("127.0.0.1"); + EXPECT_GT(port, 0); + std::thread server_thread([srv] { + srv->listen_after_bind(); + }); + srv->wait_until_ready(); + + std::vector client_threads; + client_threads.reserve(static_cast(clients)); + for (int i = 0; i < clients; ++i) { + client_threads.emplace_back([port] { + httplib::Client cli("127.0.0.1", port); + cli.set_connection_timeout(10s); + cli.set_read_timeout(15s); + cli.Get("/block"); + }); + } + + // Wait until the pool is saturated: exactly pool_size handlers should be able + // to start; the rest queue. Generous timeout to avoid scheduler flakiness. + const auto deadline = std::chrono::steady_clock::now() + 5s; + while (started.load() < static_cast(pool_size) && std::chrono::steady_clock::now() < deadline) { + std::this_thread::sleep_for(5ms); + } + + // Give any (incorrectly unbounded) extra workers a moment to also start, so + // an over-large pool would be caught as peak > pool_size. + std::this_thread::sleep_for(200ms); + + const int observed_peak = peak.load(); + + // Release all handlers and let the queued requests drain. + { + std::lock_guard lk(m); + release = true; + } + cv.notify_all(); + + for (auto & t : client_threads) { + t.join(); + } + manager.stop(); + if (server_thread.joinable()) { + server_thread.join(); + } + return observed_peak; +} + +} // namespace + +// A pool of 1 worker serializes requests: peak concurrency is exactly 1 even +// when several clients connect simultaneously. +TEST(HttpServerThreadPoolTest, single_worker_serializes_requests) { + EXPECT_EQ(observe_peak_concurrency(/*pool_size=*/1, /*clients=*/3), 1); +} + +// A pool of 2 workers caps concurrency at 2: with 4 simultaneous clients, +// exactly 2 handlers run at once and the other 2 queue. +TEST(HttpServerThreadPoolTest, pool_caps_concurrency_at_size) { + EXPECT_EQ(observe_peak_concurrency(/*pool_size=*/2, /*clients=*/4), 2); +} + +// clamp_thread_count is the shared resolver for both the executor and HTTP pool +// sizes (issue #440). It must coerce mis-set ROS parameters into the bounded +// range: zero/negative values floor to min_threads, oversized values cap to +// max_threads, and in-range values pass through unchanged. +TEST(ThreadPoolConfigTest, clamp_thread_count_bounds_the_value) { + using ros2_medkit_gateway::clamp_thread_count; + // executor range [1, 256] + EXPECT_EQ(clamp_thread_count(0, 1, 256), 1u); // zero -> floor + EXPECT_EQ(clamp_thread_count(-5, 1, 256), 1u); // negative -> floor + EXPECT_EQ(clamp_thread_count(2, 1, 256), 2u); // in range -> unchanged + EXPECT_EQ(clamp_thread_count(100000, 1, 256), 256u); // oversized -> cap + // HTTP pool range [1, 1024] + EXPECT_EQ(clamp_thread_count(3, 1, 1024), 3u); // in range -> unchanged + EXPECT_EQ(clamp_thread_count(1u << 20, 1, 1024), 1024u); // oversized -> cap +} diff --git a/src/ros2_medkit_integration_tests/test/features/test_thread_pool_bounds.test.py b/src/ros2_medkit_integration_tests/test/features/test_thread_pool_bounds.test.py new file mode 100644 index 00000000..9366d2c8 --- /dev/null +++ b/src/ros2_medkit_integration_tests/test/features/test_thread_pool_bounds.test.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python3 +# Copyright 2026 bburda +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Integration tests for the bounded thread pools (issue #440). + +The gateway is launched with explicit, small overrides for both thread pools: + +- ``server.http_thread_pool_size`` bounds the cpp-httplib request workers. +- ``server.executor_threads`` bounds the main rclcpp MultiThreadedExecutor. + +These tests verify that the gateway is fully functional with the bounded pools +(discovery, data sampling, SSE), and - crucially - that regular requests are +still served while an SSE stream holds one HTTP pool thread for its lifetime. +That last check is the practical lower bound on ``http_thread_pool_size``: +it must leave headroom above the number of concurrent SSE streams. +""" + +import concurrent.futures +import threading +import unittest + +import launch_testing +import requests + +from ros2_medkit_test_utils.constants import ALLOWED_EXIT_CODES +from ros2_medkit_test_utils.gateway_test_case import GatewayTestCase +from ros2_medkit_test_utils.launch_helpers import create_test_launch + +# Bounded pool sizes under test. Both are set to values DISTINCT from the +# shipped defaults (http=3, executor=2) so that a regression which ignored the +# parameters would change observable behaviour. The HTTP pool is larger than the +# single SSE stream opened below so there is headroom for regular requests. +HTTP_THREAD_POOL_SIZE = 5 +EXECUTOR_THREADS = 3 + + +def generate_test_description(): + return create_test_launch( + demo_nodes=['temp_sensor'], + fault_manager=False, + gateway_params={ + 'server.http_thread_pool_size': HTTP_THREAD_POOL_SIZE, + 'server.executor_threads': EXECUTOR_THREADS, + }, + ) + + +class TestThreadPoolBounds(GatewayTestCase): + """Gateway behaves correctly with both thread pools bounded small.""" + + MIN_EXPECTED_APPS = 1 + REQUIRED_APPS = {'temp_sensor'} + + def test_gateway_functional_with_bounded_pools(self): + """Discovery and data sampling work with bounded executor and HTTP pools. + + Reaching this test already proves the gateway started, served /health, + and completed discovery (graph events processed by the bounded main + executor). Here we additionally sample live data, exercising the full + request path through the bounded HTTP pool. + """ + apps = self.get_json('/apps').get('items', []) + app_ids = {a.get('id') for a in apps} + self.assertIn('temp_sensor', app_ids) + + # A data sample exercises the HTTP handler -> data provider path. + data = self.poll_endpoint('/apps/temp_sensor/data', timeout=15.0) + self.assertIn('items', data) + + def test_requests_served_while_sse_stream_open(self): + """Regular requests succeed while an SSE stream holds a pool thread. + + Each active SSE stream consumes one HTTP pool worker for its entire + lifetime. With the pool bounded to HTTP_THREAD_POOL_SIZE, an open stream + must still leave workers free for ordinary traffic. We open one fault + stream, then fire several concurrent requests and require all of them + to succeed. + """ + stop_event = threading.Event() + connected = threading.Event() + stream_errors = [] + + def read_stream(): + try: + resp = requests.get( + f'{self.BASE_URL}/faults/stream', + stream=True, + timeout=30, + ) + self.assertEqual(resp.status_code, 200) + connected.set() + for _ in resp.iter_lines(decode_unicode=True): + if stop_event.is_set(): + break + resp.close() + except requests.exceptions.Timeout: + pass + except Exception as exc: # noqa: BLE001 - surfaced via assertion + stream_errors.append(str(exc)) + finally: + connected.set() + + stream_thread = threading.Thread(target=read_stream, daemon=True) + stream_thread.start() + try: + self.assertTrue( + connected.wait(timeout=5), + 'SSE stream failed to connect within 5s', + ) + self.assertEqual(stream_errors, [], f'SSE stream error: {stream_errors}') + + # Fire more concurrent requests than the free workers (pool minus the + # one SSE thread); they must all complete, not deadlock behind the + # open stream. + def hit(path): + r = requests.get(f'{self.BASE_URL}{path}', timeout=10) + return r.status_code + + paths = ['/health', '/apps', '/areas', '/functions', '/health', '/apps'] + with concurrent.futures.ThreadPoolExecutor(max_workers=len(paths)) as pool: + statuses = list(pool.map(hit, paths)) + + # The stream must still be open (holding a pool worker) at this point, + # otherwise "served while an SSE stream is open" was not actually + # tested - the worker would have been freed before the burst ran. + self.assertTrue( + stream_thread.is_alive(), + 'SSE stream closed before the request burst - it was not holding a pool worker', + ) + self.assertTrue( + all(s == 200 for s in statuses), + f'Some requests were not served while an SSE stream was open: {statuses}', + ) + finally: + stop_event.set() + stream_thread.join(timeout=3) + + +@launch_testing.post_shutdown_test() +class TestShutdown(unittest.TestCase): + + def test_exit_codes(self, proc_info): + """All processes exit cleanly (SIGTERM allowed after closing SSE).""" + for info in proc_info: + self.assertIn( + info.returncode, ALLOWED_EXIT_CODES, + f'{info.process_name} exited with code {info.returncode}', + )