Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions score/mw/com/test/generic_skeleton/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -35,7 +35,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -47,7 +47,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -59,7 +59,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand Down Expand Up @@ -87,7 +87,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -99,7 +99,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -111,7 +111,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand All @@ -123,7 +123,7 @@ cc_binary(
deps = [
"//score/mw/com",
"//score/mw/com/test/common_test_resources:sample_sender_receiver",
"//score/mw/com/test/common_test_resources:sctf_test_runner",
"//score/mw/com/test/common_test_resources:stop_token_sig_term_handler",
],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
#include "score/mw/com/impl/instance_specifier.h"
#include "score/mw/com/runtime.h"
#include "score/mw/com/runtime_configuration.h"
#include "score/mw/com/test/common_test_resources/stop_token_sig_term_handler.h"
#include "score/mw/log/logging.h"
#include <score/stop_token.hpp>

#include <chrono>
#include <cstdlib>
Expand Down Expand Up @@ -45,7 +47,7 @@ constexpr std::string_view kEventName = "Event8Byte";
#error "Unsupported payload size configured."
#endif

int run_provider()
int run_provider(score::cpp::stop_token stop_token)
{
const auto instance_specifier = score::mw::com::impl::InstanceSpecifier::Create(kInstanceSpecifier).value();
std::cout << "[PROVIDER] Instance specifier created." << std::endl;
Expand Down Expand Up @@ -94,7 +96,8 @@ int run_provider()
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "[PROVIDER] Finished initial 5s sleep." << std::endl;

for (int i = 0; i < kSamplesToProcess; ++i)
uint64_t i = 0;
while (!stop_token.stop_requested())
{
auto sample_res = generic_event.Allocate();
if (!sample_res.has_value())
Expand All @@ -111,11 +114,10 @@ int run_provider()
generic_event.Send(std::move(sample_res.value()));
std::cout << "[PROVIDER] " << PAYLOAD_SIZE << "-byte Event Sent sample: " << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
i++;
}

std::cout << "[PROVIDER] All samples sent." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(15));
std::cout << "[PROVIDER] Finished post-send 15s sleep. Calling StopOfferService()..." << std::endl;
std::cout << "[PROVIDER] Received termination signal. Calling StopOfferService()..." << std::endl;
skeleton.StopOfferService();
std::cout << "[PROVIDER] StopOfferService() completed." << std::endl;
return 0;
Expand Down Expand Up @@ -154,13 +156,21 @@ int run_consumer()
uint64_t expected = 0;
uint64_t received = 0;
int data_mismatches = 0;
bool is_first_sample = true;

while (received < kSamplesToProcess)
{
// The receiver callback operates on type-erased memory (SamplePtr<const void>)
generic_event.GetNewSamples(
[&](auto sample) {
auto* typed_sample = static_cast<const MyEventData*>(sample.get());

if (is_first_sample)
{
expected = typed_sample->counter;
is_first_sample = false;
}

if (typed_sample->counter != expected)
{
std::cerr << "[CONSUMER] " << PAYLOAD_SIZE << "-byte Data mismatch! Expected: " << expected
Expand Down Expand Up @@ -194,8 +204,12 @@ int main(int argc, const char* argv[])
if (std::string(argv[i]) == "--mode" && i + 1 < argc)
mode = argv[++i];
score::mw::com::runtime::InitializeRuntime(score::mw::com::runtime::RuntimeConfiguration(argc, argv));

score::cpp::stop_source stop_source;
score::mw::com::SetupStopTokenSigTermHandler(stop_source);

if (mode == "provider")
return run_provider();
return run_provider(stop_source.get_token());
if (mode == "consumer")
return run_consumer();
return 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
#include "score/mw/com/impl/traits.h"
#include "score/mw/com/runtime.h"
#include "score/mw/com/runtime_configuration.h"
#include "score/mw/com/test/common_test_resources/stop_token_sig_term_handler.h"
#include "score/mw/log/logging.h"
#include <score/stop_token.hpp>

#include <chrono>
#include <cstdlib>
Expand Down Expand Up @@ -57,7 +59,7 @@ constexpr std::string_view kEventName = "Event8Byte";
#error "Unsupported payload size configured."
#endif

int run_provider()
int run_provider(score::cpp::stop_token stop_token)
{
const auto instance_specifier =
score::mw::com::impl::InstanceSpecifier::Create(std::string{kInstanceSpecifier}).value();
Expand Down Expand Up @@ -94,7 +96,8 @@ int run_provider()
<< PAYLOAD_SIZE << "-byte - Waiting 5s for consumer to subscribe...";
std::this_thread::sleep_for(std::chrono::seconds(5));

for (int i = 0; i < kSamplesToProcess; ++i)
uint64_t i = 0;
while (!stop_token.stop_requested())
{
auto sample_res = generic_event.Allocate();
if (!sample_res.has_value())
Expand All @@ -108,9 +111,9 @@ int run_provider()

score::mw::log::LogInfo("GenericSkeletonProvider") << PAYLOAD_SIZE << "-byte Event Sent sample: " << i;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
i++;
}

std::this_thread::sleep_for(std::chrono::seconds(15));
skeleton.StopOfferService();
return 0;
}
Expand Down Expand Up @@ -159,12 +162,19 @@ int run_consumer()
uint64_t received = 0;
uint64_t expected = 0;
int data_mismatches = 0;
bool is_first_sample = true;
proxy.event_.Subscribe(kSamplesToSubscribe);

while (received < kSamplesToProcess)
{
proxy.event_.GetNewSamples(
[&](score::mw::com::SamplePtr<MyEventData> sample) {
if (is_first_sample)
{
expected = sample->counter;
is_first_sample = false;
}

if (sample->counter != expected)
{
score::mw::log::LogFatal("TypedProxyConsumer")
Expand Down Expand Up @@ -208,9 +218,12 @@ int main(int argc, const char* argv[])

score::mw::com::runtime::InitializeRuntime(score::mw::com::runtime::RuntimeConfiguration(argc, argv));

score::cpp::stop_source stop_source;
score::mw::com::SetupStopTokenSigTermHandler(stop_source);

if (mode == "provider")
{
return run_provider();
return run_provider(stop_source.get_token());
}
else if (mode == "consumer")
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def test_generic_generic_interaction_64_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
# ADDED enforce_clean_shutdown=False and disabled LSAN here
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2) # Give provider a moment to initialize

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand All @@ -50,8 +49,7 @@ def test_generic_generic_interaction_32_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
# ADDED enforce_clean_shutdown=False and disabled LSAN here
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2) # Give provider a moment to initialize

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand All @@ -69,8 +67,7 @@ def test_generic_generic_interaction_16_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
# ADDED enforce_clean_shutdown=False and disabled LSAN here
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2) # Give provider a moment to initialize

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand All @@ -88,8 +85,7 @@ def test_generic_generic_interaction_8_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
# ADDED enforce_clean_shutdown=False and disabled LSAN here
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2) # Give provider a moment to initialize

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ def test_generic_typed_interaction_64_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
# Added enforce_clean_shutdown=False and disabled LSAN so forceful shutdown doesn't fail the test
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
# Give the provider a moment to initialize and offer the service
# to prevent a race condition where the consumer starts too quickly.
time.sleep(2)
Expand All @@ -51,7 +50,7 @@ def test_generic_typed_interaction_32_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2)

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand All @@ -68,7 +67,7 @@ def test_generic_typed_interaction_16_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2)

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand All @@ -85,7 +84,7 @@ def test_generic_typed_interaction_8_byte(target):
config_path = "./etc/mw_com_config.json"

logger.info(f"Starting provider: {app_bin} in {app_root}")
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}):
with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root):
time.sleep(2)

logger.info(f"Starting consumer: {app_bin} in {app_root}")
Expand Down
Loading