From ea2bdf6da2dd3c6dcda6aeaf1ff7ca8ac8041e14 Mon Sep 17 00:00:00 2001 From: manuelfehlhammer Date: Fri, 12 Jun 2026 14:30:31 +0200 Subject: [PATCH] message_passing: Add stress test Added stress test for SendWaitReply --- score/message_passing/test/BUILD | 64 +++++++++ score/message_passing/test/client.cpp | 127 ++++++++++++++++++ score/message_passing/test/common.h | 28 ++++ .../test/integration_test/BUILD | 31 +++++ .../test/integration_test/stress_test.py | 30 +++++ score/message_passing/test/server.cpp | 86 ++++++++++++ 6 files changed, 366 insertions(+) create mode 100644 score/message_passing/test/BUILD create mode 100644 score/message_passing/test/client.cpp create mode 100644 score/message_passing/test/common.h create mode 100644 score/message_passing/test/integration_test/BUILD create mode 100644 score/message_passing/test/integration_test/stress_test.py create mode 100644 score/message_passing/test/server.cpp diff --git a/score/message_passing/test/BUILD b/score/message_passing/test/BUILD new file mode 100644 index 000000000..37d427d03 --- /dev/null +++ b/score/message_passing/test/BUILD @@ -0,0 +1,64 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + +load("@rules_cc//cc:defs.bzl", "cc_binary") +load("@score_baselibs//score/language/safecpp:toolchain_features.bzl", "COMPILER_WARNING_FEATURES") +load("//score/mw/com/test:pkg_application.bzl", "pkg_application") + +cc_binary( + name = "client", + srcs = [ + "client.cpp", + "common.h", + ], + features = COMPILER_WARNING_FEATURES + [ + "aborts_upon_exception", + ], + visibility = ["//score/message_passing/test:__subpackages__"], + deps = [ + "@score_communication//score/message_passing", + ], +) + +cc_binary( + name = "server", + srcs = [ + "common.h", + "server.cpp", + ], + features = COMPILER_WARNING_FEATURES + [ + "aborts_upon_exception", + ], + visibility = ["//score/message_passing/test:__subpackages__"], + deps = [ + "@score_communication//score/message_passing", + ], +) + +pkg_application( + name = "client-pkg", + app_name = "client", + bin = [":client"], + visibility = [ + "//score/message_passing/test:__subpackages__", + ], +) + +pkg_application( + name = "server-pkg", + app_name = "server", + bin = [":server"], + visibility = [ + "//score/message_passing/test:__subpackages__", + ], +) diff --git a/score/message_passing/test/client.cpp b/score/message_passing/test/client.cpp new file mode 100644 index 000000000..f3ca6ad86 --- /dev/null +++ b/score/message_passing/test/client.cpp @@ -0,0 +1,127 @@ + +#include "score/message_passing/test/common.h" + +#include "score/message_passing/client_connection.h" +#include "score/message_passing/client_factory.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace score::message_passing::test; + +constexpr std::uint32_t kStateTryAttempts{32U}; + +constexpr std::chrono::milliseconds kStateRetryDelay{50}; +constexpr std::chrono::milliseconds kSendCycleDelay{20}; + +static std::atomic g_running{true}; + +static void SignalHandler(int /*signum*/) +{ + g_running.store(false, std::memory_order_relaxed); +} + +int main(int argc, char** argv) +{ + if (argc < 2) + { + std::cerr << "Usage: " << argv[0] << " " << std::endl; + return EXIT_FAILURE; + } + + const auto num_threads = static_cast(std::strtoul(argv[1], nullptr, 10)); + if (num_threads == 0U) + { + std::cerr << "Number of threads must be greater than 0" << std::endl; + return EXIT_FAILURE; + } + + std::signal(SIGINT, SignalHandler); + std::signal(SIGTERM, SignalHandler); + + const score::message_passing::ServiceProtocolConfig protocol_config{ + kServiceIdentifier, kMaxSendSize, kMaxReplySize, kMaxNotifySize}; + const score::message_passing::IClientFactory::ClientConfig client_config{0U, 20U, false, true, false}; + score::message_passing::ClientFactory client_factory; + auto client_connection = client_factory.Create(protocol_config, client_config); + + client_connection->Start(score::message_passing::IClientConnection::StateCallback{}, + score::message_passing::IClientConnection::NotifyCallback{}); + for (std::uint32_t try_attempt{0U}; try_attempt < kStateTryAttempts; ++try_attempt) + { + const auto state = client_connection->GetState(); + if (state == score::message_passing::IClientConnection::State::kReady) + { + break; + } + if (state != score::message_passing::IClientConnection::State::kStarting) + { + std::cerr << "Connection for " << kServiceIdentifier << " has failed to create, the reason is " + << static_cast(score::cpp::to_underlying(client_connection->GetStopReason())) + << std::endl; + return EXIT_FAILURE; + } + std::this_thread::sleep_for(kStateRetryDelay); + } + + std::cout << "Client connection is ready, sending messages with " << num_threads << " threads..." << std::endl; + + std::vector threads; + threads.reserve(num_threads); + + for (std::uint32_t thread_id = 0U; thread_id < num_threads; ++thread_id) + { + threads.emplace_back([&client_connection, thread_id]() { + while (g_running.load(std::memory_order_relaxed)) + { + std::array send_buffer{}; + std::array reply_buffer{}; + + // Encode the thread-unique ID into the send buffer + std::memcpy(send_buffer.data(), &thread_id, sizeof(thread_id)); + + auto result = client_connection->SendWaitReply(send_buffer, reply_buffer); + if (!result.has_value()) + { + std::cerr << "Thread " << thread_id << ": SendWaitReply failed with error code " + << result.error().GetOsDependentErrorCode() << std::endl; + std::exit(EXIT_FAILURE); + } + + const auto reply_span = result.value(); + if (reply_span.size() < sizeof(thread_id)) + { + std::cerr << "Thread " << thread_id << ": Reply too short (expected at least " + << sizeof(thread_id) << " bytes, got " << reply_span.size() << ")" << std::endl; + std::exit(EXIT_FAILURE); + } + + std::uint32_t reply_id{}; + std::memcpy(&reply_id, reply_span.data(), sizeof(reply_id)); + if (reply_id != thread_id) + { + std::cerr << "Thread " << thread_id << ": Reply mismatch! Expected " << thread_id + << " but got " << reply_id << std::endl; + std::exit(EXIT_FAILURE); + } + + std::this_thread::sleep_for(kSendCycleDelay); + } + }); + } + + for (auto& t : threads) + { + t.join(); + } + + client_connection->Stop(); + std::cout << "All threads stopped. Exiting." << std::endl; + return EXIT_SUCCESS; +} diff --git a/score/message_passing/test/common.h b/score/message_passing/test/common.h new file mode 100644 index 000000000..ea953a419 --- /dev/null +++ b/score/message_passing/test/common.h @@ -0,0 +1,28 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ +#ifndef SCORE_MESSAGE_PASSING_TEST_COMMON_H +#define SCORE_MESSAGE_PASSING_TEST_COMMON_H + +#include + +namespace score::message_passing::test +{ + +constexpr char kServiceIdentifier[]{"TestService"}; +constexpr std::uint32_t kMaxSendSize{32U}; +constexpr std::uint32_t kMaxReplySize{32U}; +constexpr std::uint32_t kMaxNotifySize{32U}; + +} // namespace score::message_passing::test + +#endif // SCORE_MESSAGE_PASSING_TEST_COMMON_H diff --git a/score/message_passing/test/integration_test/BUILD b/score/message_passing/test/integration_test/BUILD new file mode 100644 index 000000000..718298954 --- /dev/null +++ b/score/message_passing/test/integration_test/BUILD @@ -0,0 +1,31 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* +load("@rules_pkg//pkg:mappings.bzl", "pkg_filegroup") +load("//quality/integration_testing:integration_testing.bzl", "integration_test") + +pkg_filegroup( + name = "filesystem", + srcs = [ + "//score/message_passing/test:client-pkg", + "//score/message_passing/test:server-pkg", + ], +) + +integration_test( + name = "test_stress", + timeout = "moderate", + srcs = [ + "stress_test.py", + ], + filesystem = ":filesystem", +) diff --git a/score/message_passing/test/integration_test/stress_test.py b/score/message_passing/test/integration_test/stress_test.py new file mode 100644 index 000000000..07e7bed11 --- /dev/null +++ b/score/message_passing/test/integration_test/stress_test.py @@ -0,0 +1,30 @@ +# ******************************************************************************* +# Copyright (c) 2026 Contributors to the Eclipse Foundation +# +# See the NOTICE file(s) distributed with this work for additional +# information regarding copyright ownership. +# +# This program and the accompanying materials are made available under the +# terms of the Apache License Version 2.0 which is available at +# https://www.apache.org/licenses/LICENSE-2.0 +# +# SPDX-License-Identifier: Apache-2.0 +# ******************************************************************************* + + +def client(target, **kwargs): + args = ["10"] + return target.wrap_exec("bin/client", args, cwd="/opt/client", wait_on_exit=True, **kwargs) + + +def service(target, **kwargs): + args = [] + return target.wrap_exec("bin/server", args, cwd="/opt/server", **kwargs) + + +def test_stress(target): + """Test field initial value exchange between service and client.""" + with service(target): + with client(target): + pass + diff --git a/score/message_passing/test/server.cpp b/score/message_passing/test/server.cpp new file mode 100644 index 000000000..22060df3e --- /dev/null +++ b/score/message_passing/test/server.cpp @@ -0,0 +1,86 @@ + +#include "score/message_passing/test/common.h" + +#include "score/message_passing/i_server_connection.h" +#include "score/message_passing/server_factory.h" + +#include +#include +#include +#include +#include +#include + +using namespace score::message_passing::test; + +static std::atomic g_running{true}; + +static void SignalHandler(int /*signum*/) +{ + g_running.store(false, std::memory_order_relaxed); +} + +int main(int /*argc*/, char** /*argv*/) +{ + std::signal(SIGINT, SignalHandler); + std::signal(SIGTERM, SignalHandler); + + const score::message_passing::ServiceProtocolConfig protocol_config{ + kServiceIdentifier, kMaxSendSize, kMaxReplySize, kMaxNotifySize}; + const score::message_passing::IServerFactory::ServerConfig server_config{10U, 0U, 0U}; + + score::message_passing::ServerFactory server_factory; + auto server = server_factory.Create(protocol_config, server_config); + if (!server) + { + std::cerr << "Failed to create server for " << kServiceIdentifier << std::endl; + return EXIT_FAILURE; + } + + auto connect_callback = [](score::message_passing::IServerConnection& /*connection*/) -> void* { + return nullptr; + }; + + auto disconnect_callback = [](score::message_passing::IServerConnection& /*connection*/) { + }; + + auto sent_callback = [](score::message_passing::IServerConnection& /*connection*/, + score::cpp::span /*message*/) + -> score::cpp::expected_blank { + return {}; + }; + + auto sent_with_reply_callback = [](score::message_passing::IServerConnection& connection, + score::cpp::span message) + -> score::cpp::expected_blank { + const auto reply_result = connection.Reply(message); + if (!reply_result.has_value()) + { + std::cerr << "Failed to send reply: error code " + << reply_result.error().GetOsDependentErrorCode() << std::endl; + std::exit(EXIT_FAILURE); + } + return {}; + }; + + const auto listen_result = + server->StartListening(connect_callback, disconnect_callback, sent_callback, sent_with_reply_callback); + if (!listen_result.has_value()) + { + std::cerr << "Failed to start listening on " << kServiceIdentifier + << ": error code " << listen_result.error().GetOsDependentErrorCode() << std::endl; + return EXIT_FAILURE; + } + + std::cout << "Server is listening on " << kServiceIdentifier << std::endl; + + while (g_running.load(std::memory_order_relaxed)) + { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + server->StopListening(); + std::cout << "Server stopped." << std::endl; + return EXIT_SUCCESS; +} +