From cfcff43ce9e24a65667f0073d3431c4c77338200 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Tue, 16 Jun 2026 16:05:45 +0530 Subject: [PATCH 1/2] Rust::com Rust Example app Updated version * Sync APIs added on this --- score/mw/com/example/com-api-example/BUILD | 24 ++-- score/mw/com/example/com-api-example/main.rs | 103 ++++++++++++++++++ .../example/com-api-example/src/consumer.rs | 64 +++++++++++ .../mw/com/example/com-api-example/src/lib.rs | 21 ++++ .../example/com-api-example/src/producer.rs | 40 +++++++ .../com-api-example/src/vehicle_monitor.rs | 33 ++++++ 6 files changed, 278 insertions(+), 7 deletions(-) create mode 100644 score/mw/com/example/com-api-example/main.rs create mode 100644 score/mw/com/example/com-api-example/src/consumer.rs create mode 100644 score/mw/com/example/com-api-example/src/lib.rs create mode 100644 score/mw/com/example/com-api-example/src/producer.rs create mode 100644 score/mw/com/example/com-api-example/src/vehicle_monitor.rs diff --git a/score/mw/com/example/com-api-example/BUILD b/score/mw/com/example/com-api-example/BUILD index 91bfb1091..a8d24116c 100644 --- a/score/mw/com/example/com-api-example/BUILD +++ b/score/mw/com/example/com-api-example/BUILD @@ -11,18 +11,33 @@ # SPDX-License-Identifier: Apache-2.0 # ******************************************************************************* -load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_test") +load("@rules_rust//rust:defs.bzl", "rust_binary", "rust_library", "rust_test") + +rust_library( + name = "com-api-example-lib", + srcs = glob(["src/**/*.rs"]), + crate_name = "com_api_example", + features = ["link_std_cpp_lib"], + visibility = ["//visibility:public"], + deps = [ + "//score/mw/com/example/com-api-example/com-api-gen", + "//score/mw/com/impl/rust/com-api/com-api", + ], +) rust_binary( name = "com-api-example", - srcs = ["basic-consumer-producer.rs"], + srcs = ["main.rs"], data = ["etc/mw_com_config.json"], features = ["link_std_cpp_lib"], visibility = ["//visibility:public"], deps = [ + ":com-api-example-lib", "//score/mw/com/example/com-api-example/com-api-gen", "//score/mw/com/impl/rust/com-api/com-api", "@score_communication_crate_index//:clap", + "@score_communication_crate_index//:futures", + "@score_communication_crate_index//:tokio", ], ) @@ -31,9 +46,4 @@ rust_test( crate = ":com-api-example", data = ["etc/mw_com_config.json"], tags = ["manual"], - deps = [ - ":com-api-example", - "@score_communication_crate_index//:futures", - "@score_communication_crate_index//:tokio", - ], ) diff --git a/score/mw/com/example/com-api-example/main.rs b/score/mw/com/example/com-api-example/main.rs new file mode 100644 index 000000000..0bd19868d --- /dev/null +++ b/score/mw/com/example/com-api-example/main.rs @@ -0,0 +1,103 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +// This demo app writing and reading tire pressure data using producer and consumer respectively. +// It is demonstrating the composition of consumer and producer in one struct, +// but they can be used separately as well. +// The example is using Lola runtime, but it can be used with any runtime by changing the runtime initialization part. +// Note: The example is using unwrap and panic in some places for simplicity, +// but it is recommended to handle errors properly in production code. + +use clap::Parser; +use std::path::PathBuf; + +use com_api::{ + Builder, InstanceSpecifier, LolaRuntimeBuilderImpl, OfferedProducer, Runtime, RuntimeBuilder, + Subscription, +}; + +use com_api_example::{VehicleMonitorConsumer, VehicleMonitorProducer}; + +use com_api_gen::Tire; + +#[derive(Parser)] +struct Arguments { + #[arg( + short, + long, + default_value = "./score/mw/com/example/com-api-example/etc/mw_com_config.json" + )] + service_instance_manifest: PathBuf, +} + +// Run the example with the specified runtime +fn run_with_runtime(name: &str, runtime: &R) { + println!("\n=== Running with {name} runtime ==="); + + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let producer_monitor = VehicleMonitorProducer::new(runtime, service_id.clone()); + let consumer_monitor = VehicleMonitorConsumer::new(runtime, service_id); + + let tire_pressure = 5.0; + println!("Setting tire pressure to {tire_pressure}"); + for i in 0..5 { + match producer_monitor.write_tire_data(Tire { + pressure: tire_pressure + i as f32, + }) { + Ok(_) => (), + Err(e) => { + eprintln!("Failed to write tire data: {:?}", e); + continue; + } + } + let tire_data = match consumer_monitor.read_tire_data() { + Ok(data) => data, + Err(e) => { + eprintln!("Failed to read tire data: {:?}", e); + continue; + } + }; + println!("{tire_data}"); + } + //unoffer returns producer back, so if needed it can be used further + match producer_monitor.producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + //UnSubscribe the event + let _ = consumer_monitor.tire_subscriber.unsubscribe(); + println!("=== {name} runtime completed ===\n"); +} + +// Initialize Lola runtime builder with configuration +fn init_lola_runtime_builder(config_path: &std::path::Path) -> LolaRuntimeBuilderImpl { + let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); + if config_path.exists() { + lola_runtime_builder.load_config(config_path); + } else { + eprintln!( + "Provided config path does not exist: {}", + config_path.display() + ); + } + lola_runtime_builder +} + +fn main() { + let args = Arguments::parse(); + let lola_runtime_builder = init_lola_runtime_builder(&args.service_instance_manifest); + let lola_runtime = lola_runtime_builder.build().unwrap(); + run_with_runtime("Lola", &lola_runtime); +} diff --git a/score/mw/com/example/com-api-example/src/consumer.rs b/score/mw/com/example/com-api-example/src/consumer.rs new file mode 100644 index 000000000..3ea8d6060 --- /dev/null +++ b/score/mw/com/example/com-api-example/src/consumer.rs @@ -0,0 +1,64 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use com_api::{ + Builder, FindServiceSpecifier, InstanceSpecifier, Result, Runtime, SampleContainer, + ServiceDiscovery, Subscriber, Subscription, +}; +use com_api_gen::VehicleInterface; + +use crate::vehicle_monitor::VehicleMonitorConsumer; + +impl VehicleMonitorConsumer { + /// Create a new VehicleMonitorConsumer + pub fn new(runtime: &R, service_id: InstanceSpecifier) -> Self { + let consumer_discovery = + runtime.find_service::(FindServiceSpecifier::Specific(service_id)); + let available_service_instances = consumer_discovery + .get_available_instances() + .expect("Failed to get available service instances"); + + // Select service instance at specific handle_index + let handle_index = 0; // or any index you need from vector of instances + let consumer_builder = available_service_instances + .into_iter() + .nth(handle_index) + .expect("Failed to get consumer builder at specified handle index"); + + let consumer = consumer_builder + .build() + .expect("Failed to build consumer instance"); + + let tire_subscriber = consumer.left_tire.subscribe(3).unwrap(); + let exhaust_subscriber = consumer.exhaust.subscribe(3).unwrap(); + + Self { + tire_subscriber, + _exhaust_subscriber: exhaust_subscriber, + } + } + + /// Monitor tire data from the consumer + pub fn read_tire_data(&self) -> Result { + let mut sample_buf = SampleContainer::new(3); + + match self.tire_subscriber.try_receive(&mut sample_buf, 1) { + Ok(0) => Ok("No tire data received".to_string()), + Ok(x) => { + let sample = sample_buf.pop_front().unwrap(); + Ok(format!("{} samples received: sample[0] = {:?}", x, *sample)) + } + Err(e) => Err(e), + } + } +} diff --git a/score/mw/com/example/com-api-example/src/lib.rs b/score/mw/com/example/com-api-example/src/lib.rs new file mode 100644 index 000000000..e752c129b --- /dev/null +++ b/score/mw/com/example/com-api-example/src/lib.rs @@ -0,0 +1,21 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +pub mod consumer; +pub mod producer; +pub mod vehicle_monitor; + +// Re-export commonly used items +pub use vehicle_monitor::{ + VehicleConsumer, VehicleMonitorConsumer, VehicleMonitorProducer, VehicleOfferedProducer, +}; diff --git a/score/mw/com/example/com-api-example/src/producer.rs b/score/mw/com/example/com-api-example/src/producer.rs new file mode 100644 index 000000000..091930a66 --- /dev/null +++ b/score/mw/com/example/com-api-example/src/producer.rs @@ -0,0 +1,40 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use com_api::{ + Builder, InstanceSpecifier, Producer, Publisher, Result, Runtime, SampleMaybeUninit, SampleMut, +}; +use com_api_gen::{Tire, VehicleInterface}; + +use crate::vehicle_monitor::VehicleMonitorProducer; + +impl VehicleMonitorProducer { + /// Create a new VehicleMonitorProducer + pub fn new(runtime: &R, service_id: InstanceSpecifier) -> Self { + let producer_builder = runtime.producer_builder::(service_id); + let producer = producer_builder + .build() + .expect("Failed to build producer instance"); + let producer = producer.offer().expect("Failed to offer producer instance"); + Self { producer } + } + + /// Write tire data using the producer + pub fn write_tire_data(&self, tire: Tire) -> Result<()> { + let uninit_sample = self.producer.left_tire.allocate()?; + let sample = uninit_sample.write(tire); + sample.send()?; + println!("Tire data sent"); + Ok(()) + } +} diff --git a/score/mw/com/example/com-api-example/src/vehicle_monitor.rs b/score/mw/com/example/com-api-example/src/vehicle_monitor.rs new file mode 100644 index 000000000..d110eb508 --- /dev/null +++ b/score/mw/com/example/com-api-example/src/vehicle_monitor.rs @@ -0,0 +1,33 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +use com_api::{Interface, Producer, Runtime, Subscriber}; +use com_api_gen::{Exhaust, Tire, VehicleInterface}; + +// Type aliases for generated consumer and offered producer types for the Vehicle interface +// VehicleConsumer is the consumer type generated for the Vehicle interface, parameterized by the runtime R +pub type VehicleConsumer = ::Consumer; +// VehicleOfferedProducer is the offered producer type generated for the Vehicle interface, parameterized by the runtime R +pub type VehicleOfferedProducer = + <::Producer as Producer>::OfferedProducer; + +// Example struct demonstrating composition with VehicleConsumer +pub struct VehicleMonitorProducer { + pub producer: VehicleOfferedProducer, +} + +pub struct VehicleMonitorConsumer { + pub tire_subscriber: <::Subscriber as Subscriber>::Subscription, + pub _exhaust_subscriber: + <::Subscriber as Subscriber>::Subscription, +} From e9fb42d8292afc278b69012645a343b90ca58b71 Mon Sep 17 00:00:00 2001 From: bharatgoswami Date: Tue, 16 Jun 2026 22:01:41 +0530 Subject: [PATCH 2/2] Rust::com Added Asyncronous APIs And Test for example * Updated example binary with proper structure * Kept only async test cases using tokio --- score/mw/com/example/com-api-example/BUILD | 14 +- score/mw/com/example/com-api-example/USAGE.md | 129 ++++ .../basic-consumer-producer.rs | 554 ------------------ score/mw/com/example/com-api-example/main.rs | 162 +++-- .../example/com-api-example/src/consumer.rs | 161 ++++- .../mw/com/example/com-api-example/src/lib.rs | 18 +- .../example/com-api-example/src/producer.rs | 37 +- .../com-api-example/src/vehicle_monitor.rs | 7 +- .../tests_using_tokio_runtime.rs | 299 ++++++++++ 9 files changed, 737 insertions(+), 644 deletions(-) create mode 100644 score/mw/com/example/com-api-example/USAGE.md delete mode 100644 score/mw/com/example/com-api-example/basic-consumer-producer.rs create mode 100644 score/mw/com/example/com-api-example/tests_using_tokio_runtime.rs diff --git a/score/mw/com/example/com-api-example/BUILD b/score/mw/com/example/com-api-example/BUILD index a8d24116c..a6cfb8dec 100644 --- a/score/mw/com/example/com-api-example/BUILD +++ b/score/mw/com/example/com-api-example/BUILD @@ -22,6 +22,8 @@ rust_library( deps = [ "//score/mw/com/example/com-api-example/com-api-gen", "//score/mw/com/impl/rust/com-api/com-api", + "@score_communication_crate_index//:clap", + "@score_communication_crate_index//:futures", ], ) @@ -37,13 +39,19 @@ rust_binary( "//score/mw/com/impl/rust/com-api/com-api", "@score_communication_crate_index//:clap", "@score_communication_crate_index//:futures", - "@score_communication_crate_index//:tokio", ], ) rust_test( - name = "com-api-example-test", - crate = ":com-api-example", + name = "com-api-example-tokio-integration-test", + srcs = ["tests_using_tokio_runtime.rs"], data = ["etc/mw_com_config.json"], + features = ["link_std_cpp_lib"], tags = ["manual"], + deps = [ + "//score/mw/com/example/com-api-example/com-api-gen", + "//score/mw/com/impl/rust/com-api/com-api", + "@score_communication_crate_index//:futures", + "@score_communication_crate_index//:tokio", + ], ) diff --git a/score/mw/com/example/com-api-example/USAGE.md b/score/mw/com/example/com-api-example/USAGE.md new file mode 100644 index 000000000..7137bc038 --- /dev/null +++ b/score/mw/com/example/com-api-example/USAGE.md @@ -0,0 +1,129 @@ + + +# COM API Example - Vehicle Monitor Application + +## Overview + +This example application demonstrates the COM (Communication Middleware) API capabilities using a Vehicle Monitor scenario. It showcases producer-consumer communication patterns with the Lola runtime, featuring both synchronous and asynchronous operations. + +## Use Case + +The application simulates a **Vehicle Monitor System** that tracks two types of events: +- **Tire Pressure Data**: Monitoring tire pressure readings +- **Exhaust Data**: Monitoring exhaust system information + +The producer publishes vehicle data, while the consumer subscribes to and processes these events. + +## Library Components + +The `com-api-example-lib` library is organized into the following modules: + +1. **Producer Module** (`producer.rs`) + - Creates and offers vehicle data services + +2. **Consumer Module** (`consumer.rs`) + - Subscribes to vehicle events and handles them via both synchronous and asynchronous APIs + +3. **Shared Enum** (`lib.rs`) + - Defines `ExampleType`, a shared enum used for CLI parsing, consumer instantiation, and receive-mode dispatch + +4. **Vehicle Monitor Module** (`vehicle_monitor.rs`) + - Type definitions for `VehicleMonitorProducer` and `VehicleMonitorConsumer` + + +## Main Application + +The main application (`main.rs`) demonstrates: + +### Multi-threaded Architecture +- **Producer Thread**: Sends vehicle data at regular intervals +- **Consumer Thread**: Receives and processes data independently + +### Async Operations +- Uses `futures::executor::block_on()` for async operations +- Supports multiple execution modes via command-line arguments + +## Command-Line Options + +### Main Application + +```bash +bazel run //score/mw/com/example/com-api-example:com-api-example -- [OPTIONS] + +Options: + -r, --run-mode Which roles to start [default: both] + [possible values: both, producer, consumer] + + -e, --example-type Execution mode [default: sync] + [possible values: sync, async-service-discovery, + async-receive, async-receive-with-timeout, streaming] + + -s, --service-instance-manifest + Path to service configuration JSON + [default: ./score/mw/com/example/com-api-example/etc/mw_com_config.json] +``` + +### Examples + +```bash +# Run with default sync mode (both producer and consumer) +bazel run //score/mw/com/example/com-api-example:com-api-example + +# Run only the producer +bazel run //score/mw/com/example/com-api-example:com-api-example -- -r producer + +# Run only the consumer +bazel run //score/mw/com/example/com-api-example:com-api-example -- -r consumer + +# Test async receive with timeout/cancellation +bazel run //score/mw/com/example/com-api-example:com-api-example -- -e async-receive-with-timeout + +# Run only consumer with async receive with timeout feature +bazel run //score/mw/com/example/com-api-example:com-api-example -- -r consumer -e async-receive-with-timeout + +# Test streaming with custom config +bazel run //score/mw/com/example/com-api-example:com-api-example -- \ + -e streaming \ + -s ./path/to/custom/config.json +``` + +## Integration Tests + +### Tokio crate based Integration Tests + +A separate test target (`tests_using_tokio_runtime.rs`) provides comprehensive integration tests using the **Tokio async runtime**. These tests are independent of the main application and use the `com-api-gen` generated types together with the `com_api` runtime, service discovery, and subscription APIs directly. + +**Why Separate?** +- Leverages Tokio's multi-threaded runtime to test concurrent scenarios and verify that subscription and other async APIs can be safely spawned on independent tasks or threads. +- Validates async COM API behavior under a real async runtime + +### Running Tests + +```bash +# Run all integration tests +bazel test //score/mw/com/example/com-api-example:com-api-example-tokio-integration-test + +# Run specific test +bazel test //score/mw/com/example/com-api-example:com-api-example-tokio-integration-test \ + --test_filter=receive_and_send_using_multi_thread + +# Run with verbose output for debugging +bazel test //score/mw/com/example/com-api-example:com-api-example-tokio-integration-test \ + --test_output=all --test_arg=--nocapture +``` + +## Further Reading +- [COM API Documentation](../../impl/rust/com-api/README.md) + +--- diff --git a/score/mw/com/example/com-api-example/basic-consumer-producer.rs b/score/mw/com/example/com-api-example/basic-consumer-producer.rs deleted file mode 100644 index 90079e61f..000000000 --- a/score/mw/com/example/com-api-example/basic-consumer-producer.rs +++ /dev/null @@ -1,554 +0,0 @@ -/******************************************************************************** - * Copyright (c) 2025 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Apache License Version 2.0 which is available at - * https://www.apache.org/licenses/LICENSE-2.0 - * - * SPDX-License-Identifier: Apache-2.0 - ********************************************************************************/ - -// This demo app writing and reading tire pressure data using producer and consumer respectively. -// It is demonstrating the composition of consumer and producer in one struct, -// but they can be used separately as well. -// The example is using Lola runtime, but it can be used with any runtime by changing the runtime initialization part. -// Note: The example is using unwrap and panic in some places for simplicity, -// but it is recommended to handle errors properly in production code. - -use clap::Parser; -use std::path::PathBuf; - -use com_api::{ - Builder, FindServiceSpecifier, InstanceSpecifier, Interface, LolaRuntimeBuilderImpl, - OfferedProducer, Producer, Publisher, Result, Runtime, RuntimeBuilder, SampleContainer, - SampleMaybeUninit, SampleMut, ServiceDiscovery, Subscriber, Subscription, -}; - -use com_api_gen::{Exhaust, Tire, VehicleInterface}; - -#[derive(Parser)] -struct Arguments { - #[arg( - short, - long, - default_value = "./score/mw/com/example/com-api-example/etc/mw_com_config.json" - )] - service_instance_manifest: PathBuf, -} - -// Type aliases for generated consumer and offered producer types for the Vehicle interface -// VehicleConsumer is the consumer type generated for the Vehicle interface, parameterized by the runtime R -type VehicleConsumer = ::Consumer; -// VehicleOfferedProducer is the offered producer type generated for the Vehicle interface, parameterized by the runtime R -type VehicleOfferedProducer = - <::Producer as Producer>::OfferedProducer; - -// Example struct demonstrating composition with VehicleConsumer -pub struct VehicleMonitor { - producer: VehicleOfferedProducer, - tire_subscriber: <::Subscriber as Subscriber>::Subscription, - _exhaust_subscriber: - <::Subscriber as Subscriber>::Subscription, -} - -impl VehicleMonitor { - /// Create a new `VehicleMonitor` with a consumer - /// - /// # Returns - /// - /// A `Result` containing the monitor on success or an error if subscription fails. - /// - /// # Errors - /// - /// Returns an error if subscribing to the tire data fails. - /// - /// # Panics - /// - /// Panics if unwrapping the subscription fails. - pub fn new(consumer: VehicleConsumer, producer: VehicleOfferedProducer) -> Result { - let tire_subscriber = consumer.left_tire.subscribe(3).unwrap(); - let exhaust_subscriber = consumer.exhaust.subscribe(3).unwrap(); - Ok(Self { - producer, - tire_subscriber, - _exhaust_subscriber: exhaust_subscriber, - }) - } - - /// Monitor tire data from the consumer - /// - /// # Returns - /// - /// A `Result` containing tire data as a `String` on success or an error if reading fails. - /// - /// # Errors - /// - /// Returns an error if no data is received or if there is a failure in receiving data - /// - /// # Panics - /// - /// Panics if unwrapping the sample from the buffer fails. - pub fn read_tire_data(&self) -> Result { - let mut sample_buf = SampleContainer::new(3); - - match self.tire_subscriber.try_receive(&mut sample_buf, 1) { - Ok(0) => Ok("No tire data received".to_string()), - Ok(x) => { - let sample = sample_buf.pop_front().unwrap(); - Ok(format!("{} samples received: sample[0] = {:?}", x, *sample)) - } - Err(e) => Err(e), - } - } - - /// Write tire data using the producer - /// # Returns - /// - /// A `Result` indicating success or failure of the write operation. - /// - /// # Errors - /// - /// Returns an error if allocation or sending of the sample fails. - pub fn write_tire_data(&self, tire: Tire) -> Result<()> { - // Allocate API of lola is not calling at the moment - let uninit_sample = self.producer.left_tire.allocate()?; - let sample = uninit_sample.write(tire); - sample.send()?; - println!("Tire data sent"); - Ok(()) - } -} - -// Create a consumer for the specified service identifier -fn create_consumer(runtime: &R, service_id: InstanceSpecifier) -> VehicleConsumer { - let consumer_discovery = - runtime.find_service::(FindServiceSpecifier::Specific(service_id)); - let available_service_instances = consumer_discovery - .get_available_instances() - .expect("Failed to get available service instances"); - - // Select service instance at specific handle_index - let handle_index = 0; // or any index you need from vector of instances - let consumer_builder = available_service_instances - .into_iter() - .nth(handle_index) - .expect("Failed to get consumer builder at specified handle index"); - - consumer_builder - .build() - .expect("Failed to build consumer instance") -} - -#[allow(dead_code)] -//it is used in async test, but to avoid unused code warning in main example, it is marked as allow(dead_code) -async fn create_consumer_async( - runtime: &R, - service_id: InstanceSpecifier, -) -> VehicleConsumer { - let consumer_discovery = - runtime.find_service::(FindServiceSpecifier::Specific(service_id)); - let available_service_instances = consumer_discovery - .get_available_instances_async() - .await - .expect("Failed to get available service instances asynchronously"); - - // Select service instance at specific handle_index - let handle_index = 0; // or any index you need from vector of instances - let consumer_builder = available_service_instances - .into_iter() - .nth(handle_index) - .expect("Failed to get consumer builder at specified handle index"); - - consumer_builder - .build() - .expect("Failed to build consumer instance") -} - -// Create a producer for the specified service identifier -fn create_producer( - runtime: &R, - service_id: InstanceSpecifier, -) -> VehicleOfferedProducer { - let producer_builder = runtime.producer_builder::(service_id); - let producer = producer_builder - .build() - .expect("Failed to build producer instance"); - producer.offer().expect("Failed to offer producer instance") -} - -// Run the example with the specified runtime -fn run_with_runtime(name: &str, runtime: &R) { - println!("\n=== Running with {name} runtime ==="); - - let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") - .expect("Failed to create InstanceSpecifier"); - let producer = create_producer(runtime, service_id.clone()); - let consumer = create_consumer(runtime, service_id); - let monitor = VehicleMonitor::new(consumer, producer).unwrap(); - let tire_pressure = 5.0; - println!("Setting tire pressure to {tire_pressure}"); - for i in 0..5 { - match monitor.write_tire_data(Tire { - pressure: tire_pressure + i as f32, - }) { - Ok(_) => (), - Err(e) => { - eprintln!("Failed to write tire data: {:?}", e); - continue; - } - } - let tire_data = match monitor.read_tire_data() { - Ok(data) => data, - Err(e) => { - eprintln!("Failed to read tire data: {:?}", e); - continue; - } - }; - println!("{tire_data}"); - } - //unoffer returns producer back, so if needed it can be used further - match monitor.producer.unoffer() { - Ok(_) => println!("Successfully unoffered the service"), - Err(e) => eprintln!("Failed to unoffer: {:?}", e), - } - - //UnSubscribe the event - let _ = monitor.tire_subscriber.unsubscribe(); - println!("=== {name} runtime completed ===\n"); -} - -// Initialize Lola runtime builder with configuration -fn init_lola_runtime_builder(config_path: &std::path::Path) -> LolaRuntimeBuilderImpl { - let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); - if config_path.exists() { - lola_runtime_builder.load_config(config_path); - } else { - eprintln!( - "Provided config path does not exist: {}", - config_path.display() - ); - } - lola_runtime_builder -} - -fn main() { - let args = Arguments::parse(); - let lola_runtime_builder = init_lola_runtime_builder(&args.service_instance_manifest); - let lola_runtime = lola_runtime_builder.build().unwrap(); - run_with_runtime("Lola", &lola_runtime); -} - -#[cfg(test)] -mod test { - use super::*; - use futures::stream::StreamExt; - use std::sync::OnceLock; - use std::thread; - use std::time::Duration; - const TEST_CONFIG_PATH: &str = "./score/mw/com/example/com-api-example/etc/mw_com_config.json"; - - static LOLA_RUNTIME: OnceLock = OnceLock::new(); - // it will create a singleton instance of LolaRuntime for testing, - // and it will be shared across different test cases, - // so that the runtime initialization is done only once for all tests, - // and re-initialization of backend is avoided, - // as it prints warning if backend is initialized more than once. - fn get_test_runtime() -> &'static com_api::LolaRuntimeImpl { - LOLA_RUNTIME.get_or_init(|| { - let lola_runtime_builder = - init_lola_runtime_builder(std::path::Path::new(TEST_CONFIG_PATH)); - lola_runtime_builder.build().unwrap() - }) - } - - #[test] - fn integration_test() { - println!("Starting integration test with Lola runtime"); - run_with_runtime("Lola", get_test_runtime()); - } - - // Test case: Async sender and receiver on separate threads - // Demonstrates true concurrent async operations on different threads - // Each thread gets its own runtime instance - // It use futures-based blocking executor to run async code in each thread, - #[test] - fn test_async_sender_receiver_threads() { - println!("=== Starting async sender and receiver on separate threads ==="); - - let service_id = InstanceSpecifier::new("/Vehicle/Service2/Instance") - .expect("Failed to create InstanceSpecifier"); - - // Sender thread - let service_id_sender = service_id.clone(); - let sender_handle = std::thread::spawn(move || { - let lola_runtime = get_test_runtime(); - - let producer = create_producer(lola_runtime, service_id_sender); - - println!("[SENDER] Thread started: {:?}", thread::current().id()); - - futures::executor::block_on(async { - for i in 0..5 { - let tire = Tire { - pressure: 1.0 + (i as f32 * 0.5), - }; - println!("[SENDER] Sending sample {}: {:.2} psi", i, tire.pressure); - - let uninit_sample = producer.left_tire.allocate().unwrap(); - let sample = uninit_sample.write(tire); - sample.send().unwrap(); - - // Simulate async work delay - std::thread::sleep(Duration::from_millis(500)); - } - println!("[SENDER] All samples sent"); - }); - }); - - let service_id_receiver = service_id.clone(); - // Receiver thread - let receiver_handle = std::thread::spawn(move || { - // Ensure sender starts first - std::thread::sleep(Duration::from_millis(500)); - // Each thread creates its own runtime instance - let lola_runtime = get_test_runtime(); - - let consumer = create_consumer(lola_runtime, service_id_receiver); - let subscribed = consumer.left_tire.subscribe(5).unwrap(); - - println!("[RECEIVER] Thread started: {:?}", thread::current().id()); - - futures::executor::block_on(async { - let mut sample_buf = SampleContainer::new(5); - let mut total_received = 0; - const MAX_ATTEMPTS: usize = 5; - - for attempt in 0..MAX_ATTEMPTS { - println!("[RECEIVER] Attempt {}", attempt); - - // Destructure tuple - container is always returned even on error - let (returned_buf, result) = subscribed.receive(sample_buf, 1, 3).await; - sample_buf = { - let count = returned_buf.sample_count(); - if let Err(e) = result { - println!("[RECEIVER] Error on attempt {}: {:?}", attempt, e); - } else if count > 0 { - total_received += count; - println!( - "[RECEIVER] Received {} samples (total: {})", - count, total_received - ); - } - // Drain printed samples - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buf - }; - } - - println!("[RECEIVER] Total received: {}", total_received); - }); - }); - - // Wait for both threads - sender_handle.join().expect("Sender thread panicked"); - receiver_handle.join().expect("Receiver thread panicked"); - println!("=== Async sender and receiver threads test completed ==="); - } - - //sender will send data in each 1 second - async fn async_data_sender_fn( - offered_producer: VehicleOfferedProducer, - ) -> VehicleOfferedProducer { - for i in 0..6 { - let uninit_sample = match offered_producer.left_tire.allocate() { - Ok(sample) => sample, - Err(e) => { - eprintln!("[SENDER] Failed to allocate sample: {:?}", e); - continue; - } - }; - let sample = uninit_sample.write(Tire { - pressure: 1.0 + i as f32, - }); - match sample.send() { - Ok(_) => (), - Err(e) => eprintln!("[SENDER] Failed to send sample: {:?}", e), - } - println!("[SENDER] Sent sample with pressure: {}", 1.0 + i as f32); - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - } - offered_producer - } - - //receiver function which use async receive to get data, it waits for new data and process it once it arrives, - //it will receive data 10 times and print the received samples - async fn async_data_processor_fn( - subscribed: impl Subscription, - is_timeout: bool, - ) { - println!("[RECEIVER] Async data processor started"); - let mut buffer = SampleContainer::new(5); - for _ in 0..3 { - let (returned_buf, result) = if is_timeout { - let timeout = tokio::time::sleep(Duration::from_millis(1000)); - subscribed.cancellable_receive(buffer, 2, 3, timeout).await - } else { - subscribed.receive(buffer, 2, 3).await - }; - match result { - Ok(count) => println!("[RECEIVER] Received {} samples", count), - Err(e) => eprintln!("[RECEIVER] Failed to receive samples: {:?}", e), - } - let mut buf = returned_buf; - while let Some(sample) = buf.pop_front() { - println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); - } - buffer = buf; - } - } - - async fn stream_processor_fn(mut subscribed: impl Subscription) { - let mut stream = subscribed.to_stream(); - let mut cnt = 5usize; - println!("[RECEIVER] Stream processor started"); - while cnt > 0 { - // Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription - match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await { - Ok(Some(Ok(sample))) => { - println!( - "[RECEIVER] Stream received sample: {:.2} psi", - sample.pressure - ) - } - Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e), - Ok(None) => break, - Err(_) => { - eprintln!("[RECEIVER] Timeout while waiting for stream sample"); - break; - } - } - cnt -= 1; - } - } - - // Test case: Async subscription and sending on multi-threaded runtime - // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads - #[tokio::test(flavor = "multi_thread")] - async fn receive_and_send_using_multi_thread() { - println!("Starting async subscription test with Lola runtime"); - let service_id = InstanceSpecifier::new("/Vehicle/Service3/Instance") - .expect("Failed to create InstanceSpecifier"); - let service_id_clone = service_id.clone(); - //consumer create - let consumer_runtime = get_test_runtime(); - //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result - let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); - //simulate some delay before producer offer service, so that consumer is waiting for discovery - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - //Producer create - let producer_runtime = get_test_runtime(); - let producer = create_producer(producer_runtime, service_id_clone); - // Spawn async data sender - let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); - // Await consumer creation and subscribe to events - let consumer = consumer.await.expect("Failed to create consumer"); - // Subscribe to one event - let subscribed = consumer.left_tire.subscribe(5).unwrap(); - - let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, false)); - processor_join_handle - .await - .expect("Error returned from task"); - let producer = sender_join_handle.await.expect("Error returned from task"); - - match producer.unoffer() { - Ok(_) => println!("Successfully unoffered the service"), - Err(e) => eprintln!("Failed to unoffer: {:?}", e), - } - - println!("=== Async subscription test with Lola runtime completed ===\n"); - } - - #[tokio::test(flavor = "multi_thread")] - #[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"] - async fn receive_with_timeout_and_send_using_multi_thread() { - println!("Starting async subscription test with Lola runtime"); - //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. - let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") - .expect("Failed to create InstanceSpecifier"); - let service_id_clone = service_id.clone(); - //consumer create - let consumer_runtime = get_test_runtime(); - //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result - let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); - //simulate some delay before producer offer service, so that consumer is waiting for discovery - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - //Producer create - let producer_runtime = get_test_runtime(); - let producer = create_producer(producer_runtime, service_id_clone); - // Spawn async data sender - let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); - // Await consumer creation and subscribe to events - let consumer = consumer.await.expect("Failed to create consumer"); - // Subscribe to one event - let subscribed = consumer.left_tire.subscribe(5).unwrap(); - - let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, true)); - processor_join_handle - .await - .expect("Error returned from task"); - - let producer = sender_join_handle.await.expect("Error returned from task"); - - match producer.unoffer() { - Ok(_) => println!("Successfully unoffered the service"), - Err(e) => eprintln!("Failed to unoffer: {:?}", e), - } - - println!("=== Async subscription test with Lola runtime completed ===\n"); - } - - // Test case: Async subscription and sending on multi-threaded runtime - // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads - #[tokio::test(flavor = "multi_thread")] - async fn stream_and_send_using_multi_thread() { - println!("Starting async subscription test with Lola runtime"); - let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") - .expect("Failed to create InstanceSpecifier"); - let service_id_clone = service_id.clone(); - //consumer create - let consumer_runtime = get_test_runtime(); - //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result - let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); - //simulate some delay before producer offer service, so that consumer is waiting for discovery - tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; - //Producer create - let producer_runtime = get_test_runtime(); - let producer = create_producer(producer_runtime, service_id_clone); - // Spawn async data sender - let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); - // Await consumer creation and subscribe to events - let consumer = consumer.await.expect("Failed to create consumer"); - // Subscribe to one event - let subscribed = consumer.left_tire.subscribe(5).unwrap(); - let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed)); - stream_processor_join_handle - .await - .expect("Error returned from stream processor task"); - - let producer = sender_join_handle.await.expect("Error returned from task"); - - match producer.unoffer() { - Ok(_) => println!("Successfully unoffered the service"), - Err(e) => eprintln!("Failed to unoffer: {:?}", e), - } - - println!("=== Async subscription test with Lola runtime completed ===\n"); - } -} diff --git a/score/mw/com/example/com-api-example/main.rs b/score/mw/com/example/com-api-example/main.rs index 0bd19868d..39995d87e 100644 --- a/score/mw/com/example/com-api-example/main.rs +++ b/score/mw/com/example/com-api-example/main.rs @@ -15,23 +15,38 @@ // It is demonstrating the composition of consumer and producer in one struct, // but they can be used separately as well. // The example is using Lola runtime, but it can be used with any runtime by changing the runtime initialization part. -// Note: The example is using unwrap and panic in some places for simplicity, +// Note: The example is may using unwrap and panic in some places for simplicity, // but it is recommended to handle errors properly in production code. use clap::Parser; use std::path::PathBuf; +use std::sync::Arc; +use std::thread; -use com_api::{ - Builder, InstanceSpecifier, LolaRuntimeBuilderImpl, OfferedProducer, Runtime, RuntimeBuilder, - Subscription, -}; +use com_api::{Builder, InstanceSpecifier, LolaRuntimeBuilderImpl, Runtime, RuntimeBuilder}; -use com_api_example::{VehicleMonitorConsumer, VehicleMonitorProducer}; +use com_api_example::{ExampleType, VehicleMonitorConsumer, VehicleMonitorProducer}; -use com_api_gen::Tire; +/// Controls which roles are started in this process. +#[derive(clap::ValueEnum, Clone, Debug, Default)] +enum RunMode { + /// Start both the producer and consumer (default). + #[default] + Both, + /// Start only the producer. + Producer, + /// Start only the consumer. + Consumer, +} #[derive(Parser)] struct Arguments { + #[arg(short, long, value_enum, default_value = "both")] + run_mode: RunMode, + + #[arg(short, long, value_enum, default_value = "sync")] + example_type: ExampleType, + #[arg( short, long, @@ -40,64 +55,103 @@ struct Arguments { service_instance_manifest: PathBuf, } -// Run the example with the specified runtime -fn run_with_runtime(name: &str, runtime: &R) { - println!("\n=== Running with {name} runtime ==="); +/// Number of samples to publish / receive in each example run. +const SAMPLE_COUNT: usize = 5; +/// Starting tire pressure value for the producer publish loop. +const INITIAL_TIRE_PRESSURE: f32 = 5.0; +/// Milliseconds the sync consumer waits before starting service discovery. +const SYNC_CONSUMER_STARTUP_DELAY_MS: u64 = 500; +/// Milliseconds the producer waits in async examples to let the consumer connect first. +const ASYNC_PRODUCER_STARTUP_DELAY_MS: u64 = 2000; +// Run the consumer with the specified runtime and example type +fn run_consumer(name: &str, example_type: &ExampleType, runtime: &R) { + println!("\n=== Running with {name} runtime ==="); let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") .expect("Failed to create InstanceSpecifier"); - let producer_monitor = VehicleMonitorProducer::new(runtime, service_id.clone()); - let consumer_monitor = VehicleMonitorConsumer::new(runtime, service_id); - - let tire_pressure = 5.0; - println!("Setting tire pressure to {tire_pressure}"); - for i in 0..5 { - match producer_monitor.write_tire_data(Tire { - pressure: tire_pressure + i as f32, - }) { - Ok(_) => (), - Err(e) => { - eprintln!("Failed to write tire data: {:?}", e); - continue; - } - } - let tire_data = match consumer_monitor.read_tire_data() { - Ok(data) => data, - Err(e) => { - eprintln!("Failed to read tire data: {:?}", e); - continue; - } - }; - println!("{tire_data}"); + if matches!(example_type, ExampleType::Sync) { + std::thread::sleep(std::time::Duration::from_millis( + SYNC_CONSUMER_STARTUP_DELAY_MS, + )); } - //unoffer returns producer back, so if needed it can be used further - match producer_monitor.producer.unoffer() { - Ok(_) => println!("Successfully unoffered the service"), - Err(e) => eprintln!("Failed to unoffer: {:?}", e), - } - - //UnSubscribe the event - let _ = consumer_monitor.tire_subscriber.unsubscribe(); + let mut consumer_monitor = futures::executor::block_on(VehicleMonitorConsumer::new( + runtime, + service_id, + example_type, + )) + .expect("Failed to create consumer"); + futures::executor::block_on(consumer_monitor.run_receive(example_type, SAMPLE_COUNT)); + consumer_monitor.unsubscribe(); println!("=== {name} runtime completed ===\n"); } -// Initialize Lola runtime builder with configuration -fn init_lola_runtime_builder(config_path: &std::path::Path) -> LolaRuntimeBuilderImpl { - let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); - if config_path.exists() { - lola_runtime_builder.load_config(config_path); - } else { - eprintln!( - "Provided config path does not exist: {}", - config_path.display() - ); +// Run the producer with the specified runtime and example type +fn run_producer(name: &str, example_type: &ExampleType, runtime: &R) { + println!("\n=== Running with {name} runtime ==="); + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + // In async examples, we will not wait before offering the service to simulate async discovery. + if !matches!(example_type, ExampleType::Sync) { + std::thread::sleep(std::time::Duration::from_millis( + ASYNC_PRODUCER_STARTUP_DELAY_MS, + )); } + let producer_monitor = + VehicleMonitorProducer::new(runtime, service_id).expect("Failed to create producer"); + println!("Producer created and offered successfully"); + producer_monitor.run_publish_loop(INITIAL_TIRE_PRESSURE, SAMPLE_COUNT); + producer_monitor.unoffer(); +} + +// Create and configure a Lola runtime builder from the given config path. +fn create_lola_runtime_builder(config_path: &std::path::Path) -> LolaRuntimeBuilderImpl { + assert!( + config_path.exists(), + "Config file not found: {}. Provide a valid path via --service-instance-manifest.", + config_path.display() + ); + let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); + lola_runtime_builder.load_config(config_path); lola_runtime_builder } fn main() { let args = Arguments::parse(); - let lola_runtime_builder = init_lola_runtime_builder(&args.service_instance_manifest); - let lola_runtime = lola_runtime_builder.build().unwrap(); - run_with_runtime("Lola", &lola_runtime); + let lola_runtime_builder = create_lola_runtime_builder(&args.service_instance_manifest); + let lola_runtime = Arc::new( + lola_runtime_builder + .build() + .expect("Failed to build Lola runtime"), + ); + + let example_type = args.example_type.clone(); + let runtime_producer = Arc::clone(&lola_runtime); + let runtime_consumer = Arc::clone(&lola_runtime); + + // Spawn producer thread + let producer_handle = match args.run_mode { + RunMode::Both | RunMode::Producer => { + let producer_example_type = example_type.clone(); + Some(thread::spawn(move || { + run_producer("Lola", &producer_example_type, runtime_producer.as_ref()); + })) + } + RunMode::Consumer => None, + }; + + // Spawn consumer thread + let consumer_handle = match args.run_mode { + RunMode::Both | RunMode::Consumer => Some(thread::spawn(move || { + run_consumer("Lola", &example_type, runtime_consumer.as_ref()); + })), + RunMode::Producer => None, + }; + + // Wait for both threads to complete + if let Some(h) = producer_handle { + h.join().expect("Producer thread panicked"); + } + if let Some(h) = consumer_handle { + h.join().expect("Consumer thread panicked"); + } } diff --git a/score/mw/com/example/com-api-example/src/consumer.rs b/score/mw/com/example/com-api-example/src/consumer.rs index 3ea8d6060..2b92126d5 100644 --- a/score/mw/com/example/com-api-example/src/consumer.rs +++ b/score/mw/com/example/com-api-example/src/consumer.rs @@ -12,25 +12,31 @@ ********************************************************************************/ use com_api::{ - Builder, FindServiceSpecifier, InstanceSpecifier, Result, Runtime, SampleContainer, + ConsumerBuilder, FindServiceSpecifier, InstanceSpecifier, Result, Runtime, SampleContainer, ServiceDiscovery, Subscriber, Subscription, }; use com_api_gen::VehicleInterface; +use futures::channel::oneshot; +use futures::{FutureExt, StreamExt}; +use std::thread; +use std::time::Duration; use crate::vehicle_monitor::VehicleMonitorConsumer; +use crate::ExampleType; -impl VehicleMonitorConsumer { - /// Create a new VehicleMonitorConsumer - pub fn new(runtime: &R, service_id: InstanceSpecifier) -> Self { - let consumer_discovery = - runtime.find_service::(FindServiceSpecifier::Specific(service_id)); - let available_service_instances = consumer_discovery - .get_available_instances() - .expect("Failed to get available service instances"); +/// Timeout used when performing a cancellable async receive. +const RECEIVE_TIMEOUT_MS: u64 = 500; +/// Polling interval between iterations in synchronous receive loops. +const SYNC_RECEIVE_POLL_INTERVAL_MS: u64 = 1000; - // Select service instance at specific handle_index +impl VehicleMonitorConsumer { + /// Shared constructor logic: picks the first available service instance and subscribes. + fn from_service_instances(instances: impl IntoIterator) -> Result + where + B: ConsumerBuilder, + { let handle_index = 0; // or any index you need from vector of instances - let consumer_builder = available_service_instances + let consumer_builder = instances .into_iter() .nth(handle_index) .expect("Failed to get consumer builder at specified handle index"); @@ -39,26 +45,135 @@ impl VehicleMonitorConsumer { .build() .expect("Failed to build consumer instance"); - let tire_subscriber = consumer.left_tire.subscribe(3).unwrap(); - let exhaust_subscriber = consumer.exhaust.subscribe(3).unwrap(); + let tire_subscriber = consumer.left_tire.subscribe(3)?; + let exhaust_subscriber = consumer.exhaust.subscribe(3)?; - Self { + Ok(Self { tire_subscriber, _exhaust_subscriber: exhaust_subscriber, + }) + } + + /// Create a new VehicleMonitorConsumer. + /// + /// Uses synchronous service discovery for `ExampleType::Sync` and + /// asynchronous service discovery for all other modes. + pub async fn new( + runtime: &R, + service_id: InstanceSpecifier, + mode: &ExampleType, + ) -> Result { + let consumer_discovery = + runtime.find_service::(FindServiceSpecifier::Specific(service_id)); + let instances = match mode { + ExampleType::Sync => consumer_discovery + .get_available_instances() + .expect("Failed to get available service instances"), + _ => { + let instances = consumer_discovery + .get_available_instances_async() + .await + .expect("Failed to get available service instances asynchronously"); + println!("Available service instances received asynchronously:"); + instances + } + }; + Self::from_service_instances(instances) + } + + /// Read tire data sample using the strategy defined by `mode`. + /// + /// Takes `&self` to support all receive modes including streaming, + /// which requires exclusive access to the subscriber. + pub async fn read_tire_data(&self, mode: &ExampleType) -> Result { + match mode { + ExampleType::Sync | ExampleType::AsyncServiceDiscovery => { + let mut sample_buf = SampleContainer::new(3); + let result = self.tire_subscriber.try_receive(&mut sample_buf, 1); + match result { + Ok(0) => Ok("No tire data received".to_string()), + Ok(x) => { + let sample = sample_buf.pop_front().unwrap(); + Ok(format!("{x} samples received: sample[0] = {:?}", *sample)) + } + Err(e) => Err(e), + } + } + ExampleType::AsyncReceive => { + println!("Performing asynchronous receive without timeout"); + let sample_buf = SampleContainer::new(3); + let (mut sample_buf, result) = self.tire_subscriber.receive(sample_buf, 1, 3).await; + match result { + Ok(0) => Ok("No tire data received".to_string()), + Ok(x) => { + let sample = sample_buf.pop_front().unwrap(); + Ok(format!("{x} samples received: sample[0] = {:?}", *sample)) + } + Err(e) => Err(e), + } + } + ExampleType::AsyncReceiveWithTimeout => { + println!("Performing asynchronous receive with timeout"); + let sample_buf = SampleContainer::new(3); + let (tx, rx) = oneshot::channel(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(RECEIVE_TIMEOUT_MS)); + let _ = tx.send(()); // Send signal after timeout + }); + let timeout_future = rx.map(|_| ()); + let (mut sample_buf, result) = self + .tire_subscriber + .cancellable_receive(sample_buf, 1, 3, timeout_future) + .await; + match result { + Ok(0) => Ok("No tire data received".to_string()), + Ok(x) => { + let sample = sample_buf.pop_front().unwrap(); + Ok(format!("{x} samples received: sample[0] = {:?}", *sample)) + } + Err(e) => Err(e), + } + } + _ => unreachable!( + "Streaming mode should use run_receive with stream processing, not read_tire_data" + ), } } - /// Monitor tire data from the consumer - pub fn read_tire_data(&self) -> Result { - let mut sample_buf = SampleContainer::new(3); + /// Receive `count` tire data samples, printing each result. + /// + /// Sync modes add a polling delay between iterations; async/stream modes + /// block until data arrives. + pub async fn run_receive(&mut self, mode: &ExampleType, count: usize) { + if matches!(mode, ExampleType::Streaming) { + let mut stream = self.tire_subscriber.to_stream(); + for _ in 0..count { + match stream.next().await { + Some(Ok(sample)) => println!("Received tire data: {:?}", *sample), + Some(Err(e)) => eprintln!("Failed to receive tire data: {:?}", e), + None => { + println!("Stream ended"); + break; + } + } + } + return; + } - match self.tire_subscriber.try_receive(&mut sample_buf, 1) { - Ok(0) => Ok("No tire data received".to_string()), - Ok(x) => { - let sample = sample_buf.pop_front().unwrap(); - Ok(format!("{} samples received: sample[0] = {:?}", x, *sample)) + for _ in 0..count { + match self.read_tire_data(mode).await { + Ok(data) => println!("{data}"), + Err(e) => eprintln!("Failed to receive tire data: {:?}", e), + } + if matches!(mode, ExampleType::Sync | ExampleType::AsyncServiceDiscovery) { + thread::sleep(Duration::from_millis(SYNC_RECEIVE_POLL_INTERVAL_MS)); } - Err(e) => Err(e), } } + + /// Unsubscribe from all events and release the consumer. + pub fn unsubscribe(self) { + let _ = self.tire_subscriber.unsubscribe(); + let _ = self._exhaust_subscriber.unsubscribe(); + } } diff --git a/score/mw/com/example/com-api-example/src/lib.rs b/score/mw/com/example/com-api-example/src/lib.rs index e752c129b..601ffa9c2 100644 --- a/score/mw/com/example/com-api-example/src/lib.rs +++ b/score/mw/com/example/com-api-example/src/lib.rs @@ -14,8 +14,22 @@ pub mod consumer; pub mod producer; pub mod vehicle_monitor; - -// Re-export commonly used items pub use vehicle_monitor::{ VehicleConsumer, VehicleMonitorConsumer, VehicleMonitorProducer, VehicleOfferedProducer, }; + +/// All execution modes supported by the example application. +/// Used by clap for CLI argument parsing and by the consumer/producer to select behaviour. +#[derive(clap::ValueEnum, Clone, Debug)] +pub enum ExampleType { + /// Synchronous service discovery and synchronous receive. + Sync, + /// Asynchronous service discovery, then synchronous receive. + AsyncServiceDiscovery, + /// Asynchronous service discovery and asynchronous receive (no timeout). + AsyncReceive, + /// Asynchronous service discovery and asynchronous receive with a cancellation timeout. + AsyncReceiveWithTimeout, + /// Asynchronous service discovery and stream-based receive. + Streaming, +} diff --git a/score/mw/com/example/com-api-example/src/producer.rs b/score/mw/com/example/com-api-example/src/producer.rs index 091930a66..e03ed6106 100644 --- a/score/mw/com/example/com-api-example/src/producer.rs +++ b/score/mw/com/example/com-api-example/src/producer.rs @@ -12,29 +12,56 @@ ********************************************************************************/ use com_api::{ - Builder, InstanceSpecifier, Producer, Publisher, Result, Runtime, SampleMaybeUninit, SampleMut, + Builder, InstanceSpecifier, OfferedProducer, Producer, Publisher, Result, Runtime, + SampleMaybeUninit, SampleMut, }; use com_api_gen::{Tire, VehicleInterface}; +use std::thread; +use std::time::Duration; use crate::vehicle_monitor::VehicleMonitorProducer; +// Delay between sample data sending in the producer loop. +const PPRODUCER_SEND_INTERVAL_MS: u64 = 1000; + impl VehicleMonitorProducer { /// Create a new VehicleMonitorProducer - pub fn new(runtime: &R, service_id: InstanceSpecifier) -> Self { + pub fn new(runtime: &R, service_id: InstanceSpecifier) -> Result { let producer_builder = runtime.producer_builder::(service_id); let producer = producer_builder .build() .expect("Failed to build producer instance"); let producer = producer.offer().expect("Failed to offer producer instance"); - Self { producer } + Ok(Self { producer }) } - /// Write tire data using the producer - pub fn write_tire_data(&self, tire: Tire) -> Result<()> { + /// Publish tire data using the producer + pub fn publish_tire_data(&self, tire: Tire) -> Result<()> { let uninit_sample = self.producer.left_tire.allocate()?; let sample = uninit_sample.write(tire); sample.send()?; println!("Tire data sent"); Ok(()) } + + /// Publish tire data in a loop, incrementing pressure each iteration + pub fn run_publish_loop(&self, initial_pressure: f32, count: usize) { + for i in 0..count { + match self.publish_tire_data(Tire { + pressure: initial_pressure + i as f32, + }) { + Ok(_) => (), + Err(e) => eprintln!("Failed to publish tire data: {:?}", e), + } + thread::sleep(Duration::from_millis(PPRODUCER_SEND_INTERVAL_MS)); + } + } + + /// Stop offering the service and release the producer. + pub fn unoffer(self) { + match self.producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + } } diff --git a/score/mw/com/example/com-api-example/src/vehicle_monitor.rs b/score/mw/com/example/com-api-example/src/vehicle_monitor.rs index d110eb508..f6f1e2cfe 100644 --- a/score/mw/com/example/com-api-example/src/vehicle_monitor.rs +++ b/score/mw/com/example/com-api-example/src/vehicle_monitor.rs @@ -23,11 +23,12 @@ pub type VehicleOfferedProducer = // Example struct demonstrating composition with VehicleConsumer pub struct VehicleMonitorProducer { - pub producer: VehicleOfferedProducer, + pub(crate) producer: VehicleOfferedProducer, } pub struct VehicleMonitorConsumer { - pub tire_subscriber: <::Subscriber as Subscriber>::Subscription, - pub _exhaust_subscriber: + pub(crate) tire_subscriber: + <::Subscriber as Subscriber>::Subscription, + pub(crate) _exhaust_subscriber: <::Subscriber as Subscriber>::Subscription, } diff --git a/score/mw/com/example/com-api-example/tests_using_tokio_runtime.rs b/score/mw/com/example/com-api-example/tests_using_tokio_runtime.rs new file mode 100644 index 000000000..2cffb3744 --- /dev/null +++ b/score/mw/com/example/com-api-example/tests_using_tokio_runtime.rs @@ -0,0 +1,299 @@ +/******************************************************************************** + * Copyright (c) 2026 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * This program and the accompanying materials are made available under the + * terms of the Apache License Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + ********************************************************************************/ + +//! Integration tests for the Vehicle Monitor API example using Tokio runtime. +//! +//! This file contains comprehensive integration tests demonstrating: +//! - Multi-threaded async producer/consumer patterns +//! - Tokio runtime integration for async operations +//! - Asynchronous receive with and without timeout +//! - Stream-based data processing +//! - Concurrent service discovery patterns +//! - Spawn the async operation using tokio::spawn to run them concurrently on separate task/threads. +//! +//! To run these tests: +//! ```bash +//! bazel test //score/mw/com/example/com-api-example:com-api-example-tokio-integration-test +//! ``` +//! +//! Note: These tests are tagged as 'manual' and use a shared runtime instance +//! to avoid re-initialization warnings. + +#[cfg(test)] +mod test { + use com_api::{ + Builder, FindServiceSpecifier, InstanceSpecifier, LolaRuntimeBuilderImpl, OfferedProducer, + Producer, Publisher, Runtime, RuntimeBuilder, SampleContainer, SampleMaybeUninit, + SampleMut, ServiceDiscovery, Subscriber, Subscription, + }; + use com_api_gen::{Tire, VehicleInterface, VehicleOfferedProducer}; + + use futures::stream::StreamExt; + use std::sync::OnceLock; + use std::time::Duration; + const TEST_CONFIG_PATH: &str = "./score/mw/com/example/com-api-example/etc/mw_com_config.json"; + + static LOLA_RUNTIME: OnceLock = OnceLock::new(); + // it will create a singleton instance of LolaRuntime for testing, + // and it will be shared across different test cases, + // so that the runtime initialization is done only once for all tests, + // and re-initialization of backend is avoided, + // as it prints warning if backend is initialized more than once. + fn get_test_runtime() -> &'static com_api::LolaRuntimeImpl { + LOLA_RUNTIME.get_or_init(|| { + let lola_runtime_builder = + init_lola_runtime_builder(std::path::Path::new(TEST_CONFIG_PATH)); + lola_runtime_builder.build().unwrap() + }) + } + + // Initialize Lola runtime builder with configuration + fn init_lola_runtime_builder(config_path: &std::path::Path) -> LolaRuntimeBuilderImpl { + let mut lola_runtime_builder = LolaRuntimeBuilderImpl::new(); + if config_path.exists() { + lola_runtime_builder.load_config(config_path); + } else { + eprintln!( + "Provided config path does not exist: {}", + config_path.display() + ); + } + lola_runtime_builder + } + + // Create a producer for testing + fn create_producer( + runtime: &R, + service_id: InstanceSpecifier, + ) -> com_api_gen::VehicleOfferedProducer { + let producer_builder = runtime.producer_builder::(service_id); + let producer = producer_builder + .build() + .expect("Failed to build producer instance"); + producer.offer().expect("Failed to offer producer instance") + } + + // Create a consumer asynchronously for testing + async fn create_consumer_async( + runtime: &R, + service_id: InstanceSpecifier, + ) -> com_api_gen::VehicleConsumer { + let consumer_discovery = + runtime.find_service::(FindServiceSpecifier::Specific(service_id)); + let available_service_instances = consumer_discovery + .get_available_instances_async() + .await + .expect("Failed to get available service instances asynchronously"); + + let handle_index = 0; + let consumer_builder = available_service_instances + .into_iter() + .nth(handle_index) + .expect("Failed to get consumer builder at specified handle index"); + + consumer_builder + .build() + .expect("Failed to build consumer instance") + } + + //sender will send data in each 1 second + async fn async_data_sender_fn( + offered_producer: VehicleOfferedProducer, + ) -> VehicleOfferedProducer { + for i in 0..6 { + let uninit_sample = match offered_producer.left_tire.allocate() { + Ok(sample) => sample, + Err(e) => { + eprintln!("[SENDER] Failed to allocate sample: {:?}", e); + continue; + } + }; + let sample = uninit_sample.write(Tire { + pressure: 1.0 + i as f32, + }); + match sample.send() { + Ok(_) => (), + Err(e) => eprintln!("[SENDER] Failed to send sample: {:?}", e), + } + println!("[SENDER] Sent sample with pressure: {}", 1.0 + i as f32); + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + } + offered_producer + } + + //receiver function which use async receive to get data, it waits for new data and process it once it arrives, + //it will receive data 10 times and print the received samples + async fn async_data_processor_fn( + subscribed: impl Subscription, + is_timeout: bool, + ) { + println!("[RECEIVER] Async data processor started"); + let mut buffer = SampleContainer::new(5); + for _ in 0..3 { + let (returned_buf, result) = if is_timeout { + let timeout = tokio::time::sleep(Duration::from_millis(1000)); + subscribed.cancellable_receive(buffer, 2, 3, timeout).await + } else { + subscribed.receive(buffer, 2, 3).await + }; + match result { + Ok(count) => println!("[RECEIVER] Received {} samples", count), + Err(e) => eprintln!("[RECEIVER] Failed to receive samples: {:?}", e), + } + let mut buf = returned_buf; + while let Some(sample) = buf.pop_front() { + println!("[RECEIVER] Sample: {:.2} psi", sample.pressure); + } + buffer = buf; + } + } + + async fn stream_processor_fn(mut subscribed: impl Subscription) { + let mut stream = subscribed.to_stream(); + let mut cnt = 5usize; + println!("[RECEIVER] Stream processor started"); + while cnt > 0 { + // Use timeout to avoid waiting indefinitely in case of issues with the producer or subscription + match tokio::time::timeout(tokio::time::Duration::from_secs(3), stream.next()).await { + Ok(Some(Ok(sample))) => { + println!( + "[RECEIVER] Stream received sample: {:.2} psi", + sample.pressure + ) + } + Ok(Some(Err(e))) => eprintln!("[RECEIVER] Stream error: {:?}", e), + Ok(None) => break, + Err(_) => { + eprintln!("[RECEIVER] Timeout while waiting for stream sample"); + break; + } + } + cnt -= 1; + } + } + + // Test case: Async subscription and sending on multi-threaded runtime + // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads + #[tokio::test(flavor = "multi_thread")] + async fn receive_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + let service_id = InstanceSpecifier::new("/Vehicle/Service3/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, false)); + processor_join_handle + .await + .expect("Error returned from task"); + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + #[tokio::test(flavor = "multi_thread")] + #[ignore = "This test demonstrates async receive with timeout, it can run individually if wanted to test timeout behavior"] + async fn receive_with_timeout_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + //Intentionally using service instance of test1, if you face issue add new service instance in config file and use it here. + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + + let processor_join_handle = tokio::spawn(async_data_processor_fn(subscribed, true)); + processor_join_handle + .await + .expect("Error returned from task"); + + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } + + // Test case: Async subscription and sending on multi-threaded runtime + // Use the tokio multi-threaded runtime to run async sender and receiver concurrently on separate threads + #[tokio::test(flavor = "multi_thread")] + async fn stream_and_send_using_multi_thread() { + println!("Starting async subscription test with Lola runtime"); + let service_id = InstanceSpecifier::new("/Vehicle/Service1/Instance") + .expect("Failed to create InstanceSpecifier"); + let service_id_clone = service_id.clone(); + //consumer create + let consumer_runtime = get_test_runtime(); + //starting service discovery in async way, so that it can be discovered when producer offer service after some delay, and consumer is waiting for discovery result + let consumer = tokio::spawn(create_consumer_async(consumer_runtime, service_id)); + //simulate some delay before producer offer service, so that consumer is waiting for discovery + tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await; + //Producer create + let producer_runtime = get_test_runtime(); + let producer = create_producer(producer_runtime, service_id_clone); + // Spawn async data sender + let sender_join_handle = tokio::spawn(async_data_sender_fn(producer)); + // Await consumer creation and subscribe to events + let consumer = consumer.await.expect("Failed to create consumer"); + // Subscribe to one event + let subscribed = consumer.left_tire.subscribe(5).unwrap(); + let stream_processor_join_handle = tokio::spawn(stream_processor_fn(subscribed)); + stream_processor_join_handle + .await + .expect("Error returned from stream processor task"); + + let producer = sender_join_handle.await.expect("Error returned from task"); + + match producer.unoffer() { + Ok(_) => println!("Successfully unoffered the service"), + Err(e) => eprintln!("Failed to unoffer: {:?}", e), + } + + println!("=== Async subscription test with Lola runtime completed ===\n"); + } +}