Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 64 additions & 0 deletions score/message_passing/test/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# *******************************************************************************
# Copyright (c) 2026 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************

load("@rules_cc//cc:defs.bzl", "cc_binary")
load("@score_baselibs//score/language/safecpp:toolchain_features.bzl", "COMPILER_WARNING_FEATURES")
load("//score/mw/com/test:pkg_application.bzl", "pkg_application")

cc_binary(
name = "client",
srcs = [
"client.cpp",
"common.h",
],
features = COMPILER_WARNING_FEATURES + [
"aborts_upon_exception",
],
visibility = ["//score/message_passing/test:__subpackages__"],
deps = [
"@score_communication//score/message_passing",
],
)

cc_binary(
name = "server",
srcs = [
"common.h",
"server.cpp",
],
features = COMPILER_WARNING_FEATURES + [
"aborts_upon_exception",
],
visibility = ["//score/message_passing/test:__subpackages__"],
deps = [
"@score_communication//score/message_passing",
],
)

pkg_application(
name = "client-pkg",
app_name = "client",
bin = [":client"],
visibility = [
"//score/message_passing/test:__subpackages__",
],
)

pkg_application(
name = "server-pkg",
app_name = "server",
bin = [":server"],
visibility = [
"//score/message_passing/test:__subpackages__",
],
)
127 changes: 127 additions & 0 deletions score/message_passing/test/client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@

#include "score/message_passing/test/common.h"

#include "score/message_passing/client_connection.h"
#include "score/message_passing/client_factory.h"

#include <atomic>
#include <csignal>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <iostream>
#include <thread>
#include <vector>

using namespace score::message_passing::test;

constexpr std::uint32_t kStateTryAttempts{32U};

constexpr std::chrono::milliseconds kStateRetryDelay{50};
constexpr std::chrono::milliseconds kSendCycleDelay{20};

static std::atomic<bool> g_running{true};

static void SignalHandler(int /*signum*/)
{
g_running.store(false, std::memory_order_relaxed);
}

int main(int argc, char** argv)
{
if (argc < 2)
{
std::cerr << "Usage: " << argv[0] << " <number_of_threads>" << std::endl;
return EXIT_FAILURE;
}

const auto num_threads = static_cast<std::uint32_t>(std::strtoul(argv[1], nullptr, 10));
if (num_threads == 0U)
{
std::cerr << "Number of threads must be greater than 0" << std::endl;
return EXIT_FAILURE;
}

std::signal(SIGINT, SignalHandler);
std::signal(SIGTERM, SignalHandler);

const score::message_passing::ServiceProtocolConfig protocol_config{
kServiceIdentifier, kMaxSendSize, kMaxReplySize, kMaxNotifySize};
const score::message_passing::IClientFactory::ClientConfig client_config{0U, 20U, false, true, false};
score::message_passing::ClientFactory client_factory;
auto client_connection = client_factory.Create(protocol_config, client_config);

client_connection->Start(score::message_passing::IClientConnection::StateCallback{},
score::message_passing::IClientConnection::NotifyCallback{});
for (std::uint32_t try_attempt{0U}; try_attempt < kStateTryAttempts; ++try_attempt)
{
const auto state = client_connection->GetState();
if (state == score::message_passing::IClientConnection::State::kReady)
{
break;
}
if (state != score::message_passing::IClientConnection::State::kStarting)
{
std::cerr << "Connection for " << kServiceIdentifier << " has failed to create, the reason is "
<< static_cast<std::uint32_t>(score::cpp::to_underlying(client_connection->GetStopReason()))
<< std::endl;
return EXIT_FAILURE;
}
std::this_thread::sleep_for(kStateRetryDelay);
}

std::cout << "Client connection is ready, sending messages with " << num_threads << " threads..." << std::endl;

std::vector<std::thread> threads;
threads.reserve(num_threads);

for (std::uint32_t thread_id = 0U; thread_id < num_threads; ++thread_id)
{
threads.emplace_back([&client_connection, thread_id]() {
while (g_running.load(std::memory_order_relaxed))
{
std::array<std::uint8_t, kMaxSendSize> send_buffer{};
std::array<std::uint8_t, kMaxReplySize> reply_buffer{};

// Encode the thread-unique ID into the send buffer
std::memcpy(send_buffer.data(), &thread_id, sizeof(thread_id));

auto result = client_connection->SendWaitReply(send_buffer, reply_buffer);
if (!result.has_value())
{
std::cerr << "Thread " << thread_id << ": SendWaitReply failed with error code "
<< result.error().GetOsDependentErrorCode() << std::endl;
std::exit(EXIT_FAILURE);
}

const auto reply_span = result.value();
if (reply_span.size() < sizeof(thread_id))
{
std::cerr << "Thread " << thread_id << ": Reply too short (expected at least "
<< sizeof(thread_id) << " bytes, got " << reply_span.size() << ")" << std::endl;
std::exit(EXIT_FAILURE);
}

std::uint32_t reply_id{};
std::memcpy(&reply_id, reply_span.data(), sizeof(reply_id));
if (reply_id != thread_id)
{
std::cerr << "Thread " << thread_id << ": Reply mismatch! Expected " << thread_id
<< " but got " << reply_id << std::endl;
std::exit(EXIT_FAILURE);
}

std::this_thread::sleep_for(kSendCycleDelay);
}
});
}

for (auto& t : threads)
{
t.join();
}

client_connection->Stop();
std::cout << "All threads stopped. Exiting." << std::endl;
return EXIT_SUCCESS;
}
28 changes: 28 additions & 0 deletions score/message_passing/test/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/********************************************************************************
* Copyright (c) 2026 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Apache License Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
********************************************************************************/
#ifndef SCORE_MESSAGE_PASSING_TEST_COMMON_H
#define SCORE_MESSAGE_PASSING_TEST_COMMON_H

#include <cstdint>

namespace score::message_passing::test
{

constexpr char kServiceIdentifier[]{"TestService"};
constexpr std::uint32_t kMaxSendSize{32U};
constexpr std::uint32_t kMaxReplySize{32U};
constexpr std::uint32_t kMaxNotifySize{32U};

} // namespace score::message_passing::test

#endif // SCORE_MESSAGE_PASSING_TEST_COMMON_H
31 changes: 31 additions & 0 deletions score/message_passing/test/integration_test/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# *******************************************************************************
# Copyright (c) 2026 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************
load("@rules_pkg//pkg:mappings.bzl", "pkg_filegroup")
load("//quality/integration_testing:integration_testing.bzl", "integration_test")

pkg_filegroup(
name = "filesystem",
srcs = [
"//score/message_passing/test:client-pkg",
"//score/message_passing/test:server-pkg",
],
)

integration_test(
name = "test_stress",
timeout = "moderate",
srcs = [
"stress_test.py",
],
filesystem = ":filesystem",
)
30 changes: 30 additions & 0 deletions score/message_passing/test/integration_test/stress_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# *******************************************************************************
# Copyright (c) 2026 Contributors to the Eclipse Foundation
#
# See the NOTICE file(s) distributed with this work for additional
# information regarding copyright ownership.
#
# This program and the accompanying materials are made available under the
# terms of the Apache License Version 2.0 which is available at
# https://www.apache.org/licenses/LICENSE-2.0
#
# SPDX-License-Identifier: Apache-2.0
# *******************************************************************************


def client(target, **kwargs):
args = ["10"]
return target.wrap_exec("bin/client", args, cwd="/opt/client", wait_on_exit=True, **kwargs)


def service(target, **kwargs):
args = []
return target.wrap_exec("bin/server", args, cwd="/opt/server", **kwargs)


def test_stress(target):
"""Test field initial value exchange between service and client."""
with service(target):
with client(target):
pass

86 changes: 86 additions & 0 deletions score/message_passing/test/server.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@

#include "score/message_passing/test/common.h"

#include "score/message_passing/i_server_connection.h"
#include "score/message_passing/server_factory.h"

#include <atomic>
#include <csignal>
#include <cstdint>
#include <cstdlib>
#include <iostream>
#include <thread>

using namespace score::message_passing::test;

static std::atomic<bool> g_running{true};

static void SignalHandler(int /*signum*/)
{
g_running.store(false, std::memory_order_relaxed);
}

int main(int /*argc*/, char** /*argv*/)
{
std::signal(SIGINT, SignalHandler);
std::signal(SIGTERM, SignalHandler);

const score::message_passing::ServiceProtocolConfig protocol_config{
kServiceIdentifier, kMaxSendSize, kMaxReplySize, kMaxNotifySize};
const score::message_passing::IServerFactory::ServerConfig server_config{10U, 0U, 0U};

score::message_passing::ServerFactory server_factory;
auto server = server_factory.Create(protocol_config, server_config);
if (!server)
{
std::cerr << "Failed to create server for " << kServiceIdentifier << std::endl;
return EXIT_FAILURE;
}

auto connect_callback = [](score::message_passing::IServerConnection& /*connection*/) -> void* {
return nullptr;
};

auto disconnect_callback = [](score::message_passing::IServerConnection& /*connection*/) {
};

auto sent_callback = [](score::message_passing::IServerConnection& /*connection*/,
score::cpp::span<const std::uint8_t> /*message*/)
-> score::cpp::expected_blank<score::os::Error> {
return {};
};

auto sent_with_reply_callback = [](score::message_passing::IServerConnection& connection,
score::cpp::span<const std::uint8_t> message)
-> score::cpp::expected_blank<score::os::Error> {
const auto reply_result = connection.Reply(message);
if (!reply_result.has_value())
{
std::cerr << "Failed to send reply: error code "
<< reply_result.error().GetOsDependentErrorCode() << std::endl;
std::exit(EXIT_FAILURE);
}
return {};
};

const auto listen_result =
server->StartListening(connect_callback, disconnect_callback, sent_callback, sent_with_reply_callback);
if (!listen_result.has_value())
{
std::cerr << "Failed to start listening on " << kServiceIdentifier
<< ": error code " << listen_result.error().GetOsDependentErrorCode() << std::endl;
return EXIT_FAILURE;
}

std::cout << "Server is listening on " << kServiceIdentifier << std::endl;

while (g_running.load(std::memory_order_relaxed))
{
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

server->StopListening();
std::cout << "Server stopped." << std::endl;
return EXIT_SUCCESS;
}

Loading