diff --git a/score/mw/com/test/generic_skeleton/BUILD.bazel b/score/mw/com/test/generic_skeleton/BUILD.bazel index cac1ad1ad..ac3e950f6 100644 --- a/score/mw/com/test/generic_skeleton/BUILD.bazel +++ b/score/mw/com/test/generic_skeleton/BUILD.bazel @@ -23,7 +23,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -35,7 +35,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -47,7 +47,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -59,7 +59,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -87,7 +87,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -99,7 +99,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -111,7 +111,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) @@ -123,7 +123,7 @@ cc_binary( deps = [ "//score/mw/com", "//score/mw/com/test/common_test_resources:sample_sender_receiver", - "//score/mw/com/test/common_test_resources:sctf_test_runner", + "//score/mw/com/test/common_test_resources:stop_token_sig_term_handler", ], ) diff --git a/score/mw/com/test/generic_skeleton/generic_generic_interaction_app.cpp b/score/mw/com/test/generic_skeleton/generic_generic_interaction_app.cpp index 028f3aa00..b3775f938 100644 --- a/score/mw/com/test/generic_skeleton/generic_generic_interaction_app.cpp +++ b/score/mw/com/test/generic_skeleton/generic_generic_interaction_app.cpp @@ -3,7 +3,9 @@ #include "score/mw/com/impl/instance_specifier.h" #include "score/mw/com/runtime.h" #include "score/mw/com/runtime_configuration.h" +#include "score/mw/com/test/common_test_resources/stop_token_sig_term_handler.h" #include "score/mw/log/logging.h" +#include #include #include @@ -45,7 +47,7 @@ constexpr std::string_view kEventName = "Event8Byte"; #error "Unsupported payload size configured." #endif -int run_provider() +int run_provider(score::cpp::stop_token stop_token) { const auto instance_specifier = score::mw::com::impl::InstanceSpecifier::Create(kInstanceSpecifier).value(); std::cout << "[PROVIDER] Instance specifier created." << std::endl; @@ -94,7 +96,8 @@ int run_provider() std::this_thread::sleep_for(std::chrono::seconds(5)); std::cout << "[PROVIDER] Finished initial 5s sleep." << std::endl; - for (int i = 0; i < kSamplesToProcess; ++i) + uint64_t i = 0; + while (!stop_token.stop_requested()) { auto sample_res = generic_event.Allocate(); if (!sample_res.has_value()) @@ -111,11 +114,10 @@ int run_provider() generic_event.Send(std::move(sample_res.value())); std::cout << "[PROVIDER] " << PAYLOAD_SIZE << "-byte Event Sent sample: " << i << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(10)); + i++; } - std::cout << "[PROVIDER] All samples sent." << std::endl; - std::this_thread::sleep_for(std::chrono::seconds(15)); - std::cout << "[PROVIDER] Finished post-send 15s sleep. Calling StopOfferService()..." << std::endl; + std::cout << "[PROVIDER] Received termination signal. Calling StopOfferService()..." << std::endl; skeleton.StopOfferService(); std::cout << "[PROVIDER] StopOfferService() completed." << std::endl; return 0; @@ -154,6 +156,7 @@ int run_consumer() uint64_t expected = 0; uint64_t received = 0; int data_mismatches = 0; + bool is_first_sample = true; while (received < kSamplesToProcess) { @@ -161,6 +164,13 @@ int run_consumer() generic_event.GetNewSamples( [&](auto sample) { auto* typed_sample = static_cast(sample.get()); + + if (is_first_sample) + { + expected = typed_sample->counter; + is_first_sample = false; + } + if (typed_sample->counter != expected) { std::cerr << "[CONSUMER] " << PAYLOAD_SIZE << "-byte Data mismatch! Expected: " << expected @@ -194,8 +204,12 @@ int main(int argc, const char* argv[]) if (std::string(argv[i]) == "--mode" && i + 1 < argc) mode = argv[++i]; score::mw::com::runtime::InitializeRuntime(score::mw::com::runtime::RuntimeConfiguration(argc, argv)); + + score::cpp::stop_source stop_source; + score::mw::com::SetupStopTokenSigTermHandler(stop_source); + if (mode == "provider") - return run_provider(); + return run_provider(stop_source.get_token()); if (mode == "consumer") return run_consumer(); return 1; diff --git a/score/mw/com/test/generic_skeleton/generic_typed_interaction_app.cpp b/score/mw/com/test/generic_skeleton/generic_typed_interaction_app.cpp index fee3d2a4a..cbad489a8 100644 --- a/score/mw/com/test/generic_skeleton/generic_typed_interaction_app.cpp +++ b/score/mw/com/test/generic_skeleton/generic_typed_interaction_app.cpp @@ -16,7 +16,9 @@ #include "score/mw/com/impl/traits.h" #include "score/mw/com/runtime.h" #include "score/mw/com/runtime_configuration.h" +#include "score/mw/com/test/common_test_resources/stop_token_sig_term_handler.h" #include "score/mw/log/logging.h" +#include #include #include @@ -57,7 +59,7 @@ constexpr std::string_view kEventName = "Event8Byte"; #error "Unsupported payload size configured." #endif -int run_provider() +int run_provider(score::cpp::stop_token stop_token) { const auto instance_specifier = score::mw::com::impl::InstanceSpecifier::Create(std::string{kInstanceSpecifier}).value(); @@ -94,7 +96,8 @@ int run_provider() << PAYLOAD_SIZE << "-byte - Waiting 5s for consumer to subscribe..."; std::this_thread::sleep_for(std::chrono::seconds(5)); - for (int i = 0; i < kSamplesToProcess; ++i) + uint64_t i = 0; + while (!stop_token.stop_requested()) { auto sample_res = generic_event.Allocate(); if (!sample_res.has_value()) @@ -108,9 +111,9 @@ int run_provider() score::mw::log::LogInfo("GenericSkeletonProvider") << PAYLOAD_SIZE << "-byte Event Sent sample: " << i; std::this_thread::sleep_for(std::chrono::milliseconds(10)); + i++; } - std::this_thread::sleep_for(std::chrono::seconds(15)); skeleton.StopOfferService(); return 0; } @@ -159,12 +162,19 @@ int run_consumer() uint64_t received = 0; uint64_t expected = 0; int data_mismatches = 0; + bool is_first_sample = true; proxy.event_.Subscribe(kSamplesToSubscribe); while (received < kSamplesToProcess) { proxy.event_.GetNewSamples( [&](score::mw::com::SamplePtr sample) { + if (is_first_sample) + { + expected = sample->counter; + is_first_sample = false; + } + if (sample->counter != expected) { score::mw::log::LogFatal("TypedProxyConsumer") @@ -208,9 +218,12 @@ int main(int argc, const char* argv[]) score::mw::com::runtime::InitializeRuntime(score::mw::com::runtime::RuntimeConfiguration(argc, argv)); + score::cpp::stop_source stop_source; + score::mw::com::SetupStopTokenSigTermHandler(stop_source); + if (mode == "provider") { - return run_provider(); + return run_provider(stop_source.get_token()); } else if (mode == "consumer") { diff --git a/score/mw/com/test/generic_skeleton/integration_test/test_generic_generic_interaction.py b/score/mw/com/test/generic_skeleton/integration_test/test_generic_generic_interaction.py index 4a76314c0..3d1ceb5ae 100644 --- a/score/mw/com/test/generic_skeleton/integration_test/test_generic_generic_interaction.py +++ b/score/mw/com/test/generic_skeleton/integration_test/test_generic_generic_interaction.py @@ -31,8 +31,7 @@ def test_generic_generic_interaction_64_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - # ADDED enforce_clean_shutdown=False and disabled LSAN here - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) # Give provider a moment to initialize logger.info(f"Starting consumer: {app_bin} in {app_root}") @@ -50,8 +49,7 @@ def test_generic_generic_interaction_32_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - # ADDED enforce_clean_shutdown=False and disabled LSAN here - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) # Give provider a moment to initialize logger.info(f"Starting consumer: {app_bin} in {app_root}") @@ -69,8 +67,7 @@ def test_generic_generic_interaction_16_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - # ADDED enforce_clean_shutdown=False and disabled LSAN here - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) # Give provider a moment to initialize logger.info(f"Starting consumer: {app_bin} in {app_root}") @@ -88,8 +85,7 @@ def test_generic_generic_interaction_8_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - # ADDED enforce_clean_shutdown=False and disabled LSAN here - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) # Give provider a moment to initialize logger.info(f"Starting consumer: {app_bin} in {app_root}") diff --git a/score/mw/com/test/generic_skeleton/integration_test/test_generic_typed_interaction.py b/score/mw/com/test/generic_skeleton/integration_test/test_generic_typed_interaction.py index 2f8c7b828..aaa5962f9 100644 --- a/score/mw/com/test/generic_skeleton/integration_test/test_generic_typed_interaction.py +++ b/score/mw/com/test/generic_skeleton/integration_test/test_generic_typed_interaction.py @@ -30,8 +30,7 @@ def test_generic_typed_interaction_64_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - # Added enforce_clean_shutdown=False and disabled LSAN so forceful shutdown doesn't fail the test - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): # Give the provider a moment to initialize and offer the service # to prevent a race condition where the consumer starts too quickly. time.sleep(2) @@ -51,7 +50,7 @@ def test_generic_typed_interaction_32_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) logger.info(f"Starting consumer: {app_bin} in {app_root}") @@ -68,7 +67,7 @@ def test_generic_typed_interaction_16_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) logger.info(f"Starting consumer: {app_bin} in {app_root}") @@ -85,7 +84,7 @@ def test_generic_typed_interaction_8_byte(target): config_path = "./etc/mw_com_config.json" logger.info(f"Starting provider: {app_bin} in {app_root}") - with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root, enforce_clean_shutdown=False, env={"ASAN_OPTIONS": "detect_leaks=0"}): + with run_interaction_app(target, app_bin, "provider", config_path, cwd=app_root): time.sleep(2) logger.info(f"Starting consumer: {app_bin} in {app_root}")