From a076b4a879e29cbb4848bce2c6c7c77029802450 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 29 May 2026 17:55:34 -0300 Subject: [PATCH 1/6] chore: improve #to_s in various telemetry/async objects --- lib/syskit/telemetry/async/interface_object.rb | 4 ++++ lib/syskit/telemetry/async/output_reader.rb | 4 ++++ lib/syskit/telemetry/async/port_read_manager.rb | 2 +- lib/syskit/telemetry/async/task_context.rb | 4 ++++ 4 files changed, 13 insertions(+), 1 deletion(-) diff --git a/lib/syskit/telemetry/async/interface_object.rb b/lib/syskit/telemetry/async/interface_object.rb index 4424c0b26..d30e3c79b 100644 --- a/lib/syskit/telemetry/async/interface_object.rb +++ b/lib/syskit/telemetry/async/interface_object.rb @@ -41,6 +41,10 @@ def initialize(task_context, name, type) @type = type end + def to_s + "#{task_context}.#{name}<#{object_id},#{type.name}>" + end + def eql?(other) other.task_context.eql?(task_context) && other.name == name && diff --git a/lib/syskit/telemetry/async/output_reader.rb b/lib/syskit/telemetry/async/output_reader.rb index 9114ea3c4..1cbe438f5 100644 --- a/lib/syskit/telemetry/async/output_reader.rb +++ b/lib/syskit/telemetry/async/output_reader.rb @@ -32,6 +32,10 @@ def initialize(port, policy, connect_on:, disconnect_on:) end end + def to_s + "#{port}.reader<#{object_id}>" + end + def raw_reader @reader.get end diff --git a/lib/syskit/telemetry/async/port_read_manager.rb b/lib/syskit/telemetry/async/port_read_manager.rb index c9ea8e85d..9426f8389 100644 --- a/lib/syskit/telemetry/async/port_read_manager.rb +++ b/lib/syskit/telemetry/async/port_read_manager.rb @@ -87,7 +87,7 @@ def to_s(relative_to: PortReadManager.monotonic_time) "poller %s: connected=%s " \ "scheduled=%s " \ "next_time=%.3f (in %i ms)", - name: port.full_name, + name: port.to_s, next_time: next_time || 0, next_time_delta_ms: next_time_delta_ms || 0, connected: connected? ? "yes" : "no", diff --git a/lib/syskit/telemetry/async/task_context.rb b/lib/syskit/telemetry/async/task_context.rb index 4e2fe148c..be3f1b6f9 100644 --- a/lib/syskit/telemetry/async/task_context.rb +++ b/lib/syskit/telemetry/async/task_context.rb @@ -130,6 +130,10 @@ def initialize( @current_state = nil end + def to_s + "TaskContext:#{name}<#{object_id}>" + end + @dummy_orogen_models = Concurrent::Hash.new def self.dummy_orogen_model(name) From 8e7123cbae9f89b90371da21739e0a5979d6f39e Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 29 May 2026 17:56:50 -0300 Subject: [PATCH 2/6] fix: add thread-safety sanity checks --- lib/syskit/telemetry/async.rb | 2 ++ .../async/main_thread_restrictions.rb | 34 +++++++++++++++++++ lib/syskit/telemetry/async/name_service.rb | 20 +++++++++-- .../telemetry/async/port_read_manager.rb | 24 ++++++++++--- 4 files changed, 74 insertions(+), 6 deletions(-) create mode 100644 lib/syskit/telemetry/async/main_thread_restrictions.rb diff --git a/lib/syskit/telemetry/async.rb b/lib/syskit/telemetry/async.rb index b7dacc731..6156cb18f 100644 --- a/lib/syskit/telemetry/async.rb +++ b/lib/syskit/telemetry/async.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "syskit/telemetry/async/main_thread_restrictions" + require "syskit/telemetry/async/name_service" require "syskit/telemetry/async/task_context" require "syskit/telemetry/async/interface_object" diff --git a/lib/syskit/telemetry/async/main_thread_restrictions.rb b/lib/syskit/telemetry/async/main_thread_restrictions.rb new file mode 100644 index 000000000..2c1b362ef --- /dev/null +++ b/lib/syskit/telemetry/async/main_thread_restrictions.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module Syskit + module Telemetry + module Async + # Module providing common functionality to validate that some methods are + # called within the main thread + module MainThreadRestrictions + # Make a thread the main thread + # + # Usually called in an object's constructor + # + # @param [Thread] thread the thread that will become 'main', by default + # the current thread + def update_main_thread(thread = Thread.current) + @__main_thread = thread + end + + class InvalidThread < RuntimeError; end + + # Validate that the current thread is the main thread + # + # Called in non-thread-safe methods to make sure they aren't called + # outside of the main thread + def ensure_in_main_thread + return if Thread.current == @__main_thread + + raise InvalidThread, + "calling #{caller(1, 1).first} outside of main thread" + end + end + end + end +end diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index ba3f7f997..ed6fc7389 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -7,9 +7,11 @@ module Telemetry module Async # In-process name service # - # It is exclusively filled using information that comes from the async + # It is exclusively filled based on information that comes from the async # {Client} class NameService < Orocos::NameServiceBase + include MainThreadRestrictions + # A new NameService instance # # @param [Hash] tasks The tasks which are @@ -22,6 +24,8 @@ def initialize( ) super() + update_main_thread + @iors = Concurrent::AtomicReference.new({}) @registered_tasks = Concurrent::Hash.new @task_added_callbacks = Concurrent::Array.new @@ -54,10 +58,14 @@ def include?(name) # After this call, any task not in the tasks parameter will have been # removed from the name server # - # @param [#ior,#name] list of IOR and name of remote tasks to resolve + # @param [#ior,#name] list of IOR and name of remote tasks to resolve. + # This list is complete, that is it contains all the tasks that the + # name server should know about # @return [Array] list of task names that are either known, or # that are being discovered def async_update_tasks(tasks) + ensure_in_main_thread + iors = tasks.each_with_object({}) { |t, h| h[t.name] = t.ior } @iors.set(iors) @@ -127,6 +135,8 @@ def resolved? # # Create a future that discovers a remote task def async_discover_task(task) + ensure_in_main_thread + future = Concurrent::Promises.future_on(@discovery_executor) do ior = @iors.get[task.name] # ior will be nil if the task has been removed from the task @@ -140,6 +150,8 @@ def async_discover_task(task) # # Process the tasks that have been (asynchronously) discovered def resolve_discovered_tasks + ensure_in_main_thread + while (async_discovery = pop_discovered_task) register( async_discovery.async_task, @@ -165,6 +177,8 @@ def wait_for_task_discovery # @return [AsyncDiscovery,nil] a valid resolved task or nil if there are # none so far def pop_discovered_task + ensure_in_main_thread + loop do return unless (async_discovery = pop_finished_discovery) next unless finished_discovery_validate_ior(async_discovery) @@ -183,6 +197,8 @@ def pop_discovered_task # # @return [AsyncDiscovery] def pop_finished_discovery + ensure_in_main_thread + async_discovery = @discovery.each_value.find(&:resolved?) return unless async_discovery diff --git a/lib/syskit/telemetry/async/port_read_manager.rb b/lib/syskit/telemetry/async/port_read_manager.rb index 9426f8389..d9c4796a2 100644 --- a/lib/syskit/telemetry/async/port_read_manager.rb +++ b/lib/syskit/telemetry/async/port_read_manager.rb @@ -7,11 +7,15 @@ module Async # # Central class that manages data readers for ports class PortReadManager + include MainThreadRestrictions + def initialize( connection_executor: self.class.default_connection_executor, disconnection_executor: self.class.default_disconnection_executor, read_executor: self.class.default_read_executor ) + update_main_thread + @callbacks = {} @pollers = Concurrent::AtomicReference.new({}) @@ -20,6 +24,18 @@ def initialize( @read_executor = read_executor end + # @api private + # + # Internal access method for the callback map + # + # ONLY use this method to access @callbacks, it validates that it is + # being accessed from the main thread + def callbacks + ensure_in_main_thread + + @callbacks + end + CONNECTION_DEFAULT_THREADS = 20 DISCONNECTION_DEFAULT_THREADS = 20 READ_DEFAULT_THREADS = 5 @@ -140,7 +156,7 @@ def register_callback(port, callback, period:, buffer_size:, init: false) needs_last_received_value: true ) - (@callbacks[port] ||= []) << callback + (callbacks[port] ||= []) << callback ensure_reader_uptodate(port) propagate_last_received_value(port) Roby.disposable do @@ -160,7 +176,7 @@ def propagate_last_received_value(port) # This is not meant to be called directly. Use the disposable # returned by {#register_callback} instead. def deregister_callback(port, callback) - return unless (callbacks = @callbacks[port]) + return unless (callbacks = self.callbacks[port]) callbacks.delete(callback) if callbacks.empty? @@ -217,7 +233,7 @@ def polling?(port) # Update a poller's period to match the callbacks currently listening # to it def update_poller_period(poller) - poller.period = @callbacks[poller.port].map(&:period).min + poller.period = callbacks[poller.port].map(&:period).min end # Return the Reader for the given port @@ -293,7 +309,7 @@ def dispatch_last_received_value(poller) # # @return [Integer] def required_policy_for(port) - return unless (callbacks = @callbacks[port]) + return unless (callbacks = self.callbacks[port]) buffer_size = callbacks.map { _1.buffer_size }.max init = callbacks.map { _1.init }.inject(&:|) From ca5cca7ac891ea6c3a4b97f7b5de72d51b2c8f44 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 29 May 2026 17:57:59 -0300 Subject: [PATCH 3/6] fix: thread safety during task discovery The current discovery code would do remote calls (good !) but also register the async objects (bad !) in separate threads. Move the second part to the main thread --- lib/syskit/telemetry/async/name_service.rb | 30 ++++----- lib/syskit/telemetry/async/task_context.rb | 71 +++++++++------------- test/telemetry/async/test_task_context.rb | 7 ++- 3 files changed, 49 insertions(+), 59 deletions(-) diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index ed6fc7389..2b57754d8 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -110,8 +110,8 @@ class AsyncDiscoveryError < RuntimeError; end AsyncDiscovery = Struct.new( :task, :future, :ior, :async_task, keyword_init: true ) do - def update_from_result - fulfilled, (ior, async_task), reason = future.result + def update_from_result(port_read_manager:) + fulfilled, (ior, discovered), reason = future.result unless fulfilled raise AsyncDiscoveryError, "unexpected error during asynchronous " \ @@ -119,7 +119,11 @@ def update_from_result end self.ior = ior - self.async_task = async_task + return unless discovered + + self.async_task = TaskContext.from_async_discovery( + discovered, port_read_manager: port_read_manager + ) end def wait @@ -139,6 +143,7 @@ def async_discover_task(task) future = Concurrent::Promises.future_on(@discovery_executor) do ior = @iors.get[task.name] + # ior will be nil if the task has been removed from the task # set while the future was pending discover_task(task.name, ior, task.orogen_model_name) if ior @@ -182,7 +187,7 @@ def pop_discovered_task loop do return unless (async_discovery = pop_finished_discovery) next unless finished_discovery_validate_ior(async_discovery) - next unless async_discovery.async_task + next unless async_discovery.async_task # error during resolution return async_discovery end @@ -203,7 +208,9 @@ def pop_finished_discovery return unless async_discovery @discovery.delete(async_discovery.task.name) - async_discovery.update_from_result + async_discovery.update_from_result( + port_read_manager: @port_read_manager + ) async_discovery end @@ -246,17 +253,10 @@ def finished_discovery_validate_ior(async_discovery) # resolve the task, and the async taskcontext that represents it. The # task is nil if the resolution failed def discover_task(name, ior, orogen_model_name) - task = Orocos::TaskContext.new( - ior, - name: name, - model: orogen_model_from_name(orogen_model_name) - ) - - async_task = TaskContext.discover( - task, port_read_manager: @port_read_manager - ) + orogen_model = orogen_model_from_name(orogen_model_name) + discovered = TaskContext.async_discovery(name, ior, orogen_model) - [ior, async_task] + [ior, discovered] rescue StandardError => e warn "Failed discovery of task #{name}: #{e.message}" e.backtrace.each do |line| diff --git a/lib/syskit/telemetry/async/task_context.rb b/lib/syskit/telemetry/async/task_context.rb index be3f1b6f9..e092c5ef6 100644 --- a/lib/syskit/telemetry/async/task_context.rb +++ b/lib/syskit/telemetry/async/task_context.rb @@ -64,54 +64,43 @@ def states_index_to_symbols @states_index_to_symbols end - # Discover information about a Orocos::TaskContext and create the - # corresponding {TaskContext} - # - # This is meant to be called in a separate thread - def self.discover(task, port_read_manager:) - async_task = TaskContext.new( - task.name, port_read_manager: port_read_manager - ) - - # Already do an initial discovery of all the task's interface objects - discover_attributes(async_task, task) - discover_properties(async_task, task) - discover_ports(async_task, task) - - # We can do this here ONLY BECAUSE we're populating an initial - # state. Further updates need to call the `discover_` methods in - # the main thread - async_task.reachable!(task) - async_task - end + Discovered = Struct.new( + :task, :orogen_model, :attributes, :properties, :ports, + keyword_init: true + ) - # @api private + # Discover information about a Orocos::TaskContext # - # Discover a remote task's attributes - def self.discover_attributes(async_task, task) - raw_attributes = task.attribute_names.map { task.attribute(_1) } - async_task.discover_attributes(raw_attributes) - end - - # @api private + # This is meant to be called in a separate thread # - # Discover a remote task's properties - def self.discover_properties(async_task, task) - raw_properties = task.property_names.map { task.property(_1) } - async_task.discover_properties(raw_properties) + # @return [Discovered] + def self.async_discovery(name, ior, orogen_model) + task = Orocos::TaskContext.new(ior, name: name, model: orogen_model) + + Discovered.new( + task: task, + orogen_model: orogen_model, + attributes: task.attribute_names.map { task.attribute(_1) }, + properties: task.property_names.map { task.property(_1) }, + ports: task.port_names.map { task.port(_1) } + ) end - # @api private - # - # Discover a remote task's ports - def self.discover_ports(async_task, task) - raw_ports = task.port_names.map { task.port(_1) } - async_task.discover_ports(raw_ports) + # Create a TaskContext object from + def self.from_async_discovery(discovered, port_read_manager:) + async_task = new( + discovered.task.name, + model: discovered.orogen_model, + port_read_manager: port_read_manager + ) + async_task.discover_attributes(discovered.attributes) + async_task.discover_properties(discovered.properties) + async_task.discover_ports(discovered.ports) + async_task.reachable!(discovered.task) + async_task end - def initialize( - name, port_read_manager:, model: self.class.dummy_orogen_model(name) - ) + def initialize(name, port_read_manager:, model:) super() @name = name diff --git a/test/telemetry/async/test_task_context.rb b/test/telemetry/async/test_task_context.rb index 1585b81bb..ed31c6b1a 100644 --- a/test/telemetry/async/test_task_context.rb +++ b/test/telemetry/async/test_task_context.rb @@ -330,9 +330,10 @@ def make_async_task(name) end def discover_task(task) - Orocos.allow_blocking_calls do - TaskContext.discover(task, port_read_manager: @port_read_manager) - end + TaskContext.discover( + task.name, task.ior, task.model, + port_read_manager: @port_read_manager + ) end def assert_polling_eventually(period: 0.01, timeout: 2, &block) From 4385533f541bc7178f920210b5c4c9d2dea70e02 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 29 May 2026 18:00:04 -0300 Subject: [PATCH 4/6] fix: handling of NoData in telemetry/async The code would only call read_new and update the last known value with the return value unconditionally (that is, with nil if there was no updates). Use read_with_result to have proper flow information and propagate that information. --- lib/syskit/telemetry/async/name_service.rb | 5 ++ .../telemetry/async/port_read_manager.rb | 40 +++++++---- lib/syskit/telemetry/async/task_context.rb | 18 ++++- test/telemetry/async/test_name_service.rb | 11 +-- .../async/test_output_port_subfield.rb | 9 +-- test/telemetry/async/test_output_reader.rb | 3 +- .../telemetry/async/test_port_read_manager.rb | 70 ++++++++++++++++++- 7 files changed, 126 insertions(+), 30 deletions(-) diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index 2b57754d8..39d12a8d4 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -151,6 +151,11 @@ def async_discover_task(task) @discovery[task.name] = AsyncDiscovery.new(task: task, future: future) end + def wait_and_resolve_all_pending_discoveries + @discovery.each_value { _1.future.wait } + resolve_discovered_tasks + end + # @api private # # Process the tasks that have been (asynchronously) discovered diff --git a/lib/syskit/telemetry/async/port_read_manager.rb b/lib/syskit/telemetry/async/port_read_manager.rb index d9c4796a2..5f304e9fb 100644 --- a/lib/syskit/telemetry/async/port_read_manager.rb +++ b/lib/syskit/telemetry/async/port_read_manager.rb @@ -114,7 +114,8 @@ def to_s(relative_to: PortReadManager.monotonic_time) def schedule_read_if_needed(now, executor) return if next_time && next_time > now - self.read_future = reader.raw_read_new(executor) + self.read_future = + reader.raw_read_with_result(executor, nil, false) end def prepare_next_read(now) @@ -262,17 +263,18 @@ def process_poller_state(poller, now) return end - if poller.propagate_last_received_value && poller.last_value && - !poller.resolved_read? - dispatch_last_received_value(poller) - end - if !poller.scheduled_read? poller.schedule_read_if_needed(now, @read_executor) elsif poller.resolved_read? dispatch_read_result(poller) poller.prepare_next_read(now) end + + if poller.propagate_last_received_value && poller.last_value + dispatch_last_received_value(poller) + end + + poller.propagate_last_received_value = false end # Time in seconds returned by CLOCK_MONOTONIC @@ -287,21 +289,31 @@ def self.monotonic_time # Send read data to registered callbacks def dispatch_read_result(poller) - fulfilled, value, reason = poller.result - if fulfilled - @callbacks[poller.port].each { |c| c.dispatch(value) } - poller.last_value = value - poller.propagate_last_received_value = false - else + fulfilled, read_result, reason = poller.result + unless fulfilled warn "failed to read #{poller.port}: #{reason}" + return + end + + unless read_result # no data + poller.last_value = nil + return end + + flow, value = read_result + return unless flow == Orocos::NEW_DATA + + callbacks[poller.port].each { |c| c.dispatch(value) } + poller.last_value = value + poller.propagate_last_received_value = false end # Send last received value to the callbacks that require it def dispatch_last_received_value(poller) - @callbacks[poller.port].each do |c| - c.dispatch(poller.last_value) + if (value = poller.last_value) + callbacks[poller.port].each { _1.dispatch(value) } end + poller.propagate_last_received_value = false end diff --git a/lib/syskit/telemetry/async/task_context.rb b/lib/syskit/telemetry/async/task_context.rb index e092c5ef6..c55fe76a3 100644 --- a/lib/syskit/telemetry/async/task_context.rb +++ b/lib/syskit/telemetry/async/task_context.rb @@ -86,7 +86,8 @@ def self.async_discovery(name, ior, orogen_model) ) end - # Create a TaskContext object from + # Create a TaskContext object from the information returned by + # {.async_discovery} def self.from_async_discovery(discovered, port_read_manager:) async_task = new( discovered.task.name, @@ -100,6 +101,21 @@ def self.from_async_discovery(discovered, port_read_manager:) async_task end + # Synchronously discover remote task info and return the corresponding + # {TaskContext} object + # + # This must be called within the main application thread, and must be + # wrapped in a Orocos.allow_blocking_calls call. This is meant as a helper + # for specific cases (essentially unit tests and scripts) + def self.discover(name, ior, orogen_model, port_read_manager:) + discovered = Orocos.allow_blocking_calls do + TaskContext.async_discovery(name, ior, orogen_model) + end + TaskContext.from_async_discovery( + discovered, port_read_manager: port_read_manager + ) + end + def initialize(name, port_read_manager:, model:) super() diff --git a/test/telemetry/async/test_name_service.rb b/test/telemetry/async/test_name_service.rb index 1f94d9ae0..605038c66 100644 --- a/test/telemetry/async/test_name_service.rb +++ b/test/telemetry/async/test_name_service.rb @@ -243,14 +243,9 @@ module Async describe "#ior" do it "returns the IOR of a registered task" do - _, task = make_deployed_task("test", "some") - async_task = Orocos.allow_blocking_calls do - TaskContext.discover( - task, port_read_manager: PortReadManager.new - ) - end - assert_equal "test", async_task.name - @ns.register(async_task) + deployed_task, task = make_deployed_task("test", "some") + @ns.async_update_tasks([deployed_task]) + @ns.wait_and_resolve_all_pending_discoveries assert_equal task.ior, @ns.ior("test") end diff --git a/test/telemetry/async/test_output_port_subfield.rb b/test/telemetry/async/test_output_port_subfield.rb index a321f5419..bf35ed503 100644 --- a/test/telemetry/async/test_output_port_subfield.rb +++ b/test/telemetry/async/test_output_port_subfield.rb @@ -113,11 +113,12 @@ def make_ruby_task(name) end def make_async_task(name) - t = make_ruby_task name - async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @port_read_manager) + task = make_ruby_task(name) + async_task = Orocos.allow_blocking_calls do + TaskContext.discover(task, task.ior, task.model, + port_read_manager: @port_read_manager) end - [t, async] + [task, async_task] end def assert_polling_eventually(period: 0.01, timeout: 2, &block) diff --git a/test/telemetry/async/test_output_reader.rb b/test/telemetry/async/test_output_reader.rb index 6050bb6a3..2a9636e4b 100644 --- a/test/telemetry/async/test_output_reader.rb +++ b/test/telemetry/async/test_output_reader.rb @@ -206,7 +206,8 @@ def make_ruby_task(name) def make_async_task(name) t = make_ruby_task name async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @port_read_manager) + TaskContext.discover(name, t.ior, t.model, + port_read_manager: @port_read_manager) end [t, async] end diff --git a/test/telemetry/async/test_port_read_manager.rb b/test/telemetry/async/test_port_read_manager.rb index 6645d38a6..1c9cca55d 100644 --- a/test/telemetry/async/test_port_read_manager.rb +++ b/test/telemetry/async/test_port_read_manager.rb @@ -152,12 +152,66 @@ module Async Orocos.allow_blocking_calls { @task.out.write 42 } execute_all(@read_executor) - assert_equal 42, @poller.read_future.value + assert_equal [1, 42], @poller.read_future.value @manager.poll assert_equal [42], @received_samples end + it "propagates a new received sample" do + execute_all(@connection_executor) + @manager.poll + + assert @poller.reader.connected? + assert @poller.read_future + + Orocos.allow_blocking_calls { @task.out.write 42 } + execute_all(@read_executor) + assert_equal [1, 42], @poller.read_future.value + @manager.poll + + assert_equal 42, @poller.last_value + assert_equal [42], @received_samples + end + + it "propagates a new value received after an existing one" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + assert_equal 42, @poller.last_value + + Orocos.allow_blocking_calls { @task.out.write 21 } + read_next_sample + assert_equal 21, @poller.last_value + assert_equal [42, 21], @received_samples + end + + it "does not propagate anything if a new read indicates " \ + "no new values" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + read_next_sample + assert_equal 42, @poller.last_value + assert_equal [42], @received_samples + end + + it "resets the last value to nil if the reader is disconnected" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + assert_equal 42, @poller.last_value + Orocos.allow_blocking_calls { @task.out.disconnect_all } + read_next_sample + assert_nil @poller.last_value + end + it "reschedules the second read based on the end of the first" do execute_all(@connection_executor) @manager.poll @@ -210,6 +264,17 @@ module Async assert @poller.read_future end + def read_next_sample + unless @poller.read_future + freeze_monotonic_time(@poller.next_time + 1e-6) + @manager.poll + end + + execute_all(@read_executor) + @poller.read_future.wait + @manager.poll + end + def freeze_monotonic_time(time = @manager.monotonic_time) @frozen_time = time flexmock(@manager) @@ -235,7 +300,8 @@ def make_ruby_task(name) def make_async_task(name) t = make_ruby_task name async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @manager) + TaskContext.discover(name, t.ior, t.model, + port_read_manager: @manager) end [t, async] end From 670416cb81981646c7afb365b5a66c2e1cff7559 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Fri, 5 Jun 2026 17:50:56 -0300 Subject: [PATCH 5/6] chore: improve documentation of MainThreadRestrictions --- .../telemetry/async/main_thread_restrictions.rb | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/lib/syskit/telemetry/async/main_thread_restrictions.rb b/lib/syskit/telemetry/async/main_thread_restrictions.rb index 2c1b362ef..415fd8c3d 100644 --- a/lib/syskit/telemetry/async/main_thread_restrictions.rb +++ b/lib/syskit/telemetry/async/main_thread_restrictions.rb @@ -3,15 +3,24 @@ module Syskit module Telemetry module Async - # Module providing common functionality to validate that some methods are - # called within the main thread + # Module providing common functionality to validate the thread some methods + # are called from + # + # Set the "main" thread, that is the thread we want these methods to be called + # from, by calling {#update_main_thread} and then call + # {#ensure_in_main_thread} to enforce the restriction. + # + # This is usually meant to be used in an event-loop kind of application that + # spawns futures/promises, to make sure thread-unsafe methods are not called + # in a separate thread. module MainThreadRestrictions # Make a thread the main thread # # Usually called in an object's constructor # # @param [Thread] thread the thread that will become 'main', by default - # the current thread + # the current thread. When running on a UI, it will usually be called + # from within the thread of the UI event loop def update_main_thread(thread = Thread.current) @__main_thread = thread end From aae4ceaa4d62e17a97a443b63517b001465c1d78 Mon Sep 17 00:00:00 2001 From: Sylvain Joyeux Date: Tue, 9 Jun 2026 10:01:09 -0300 Subject: [PATCH 6/6] chore: improve naming --- lib/syskit/telemetry/async/name_service.rb | 4 ++-- lib/syskit/telemetry/async/task_context.rb | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index 39d12a8d4..16611ec75 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -121,7 +121,7 @@ def update_from_result(port_read_manager:) self.ior = ior return unless discovered - self.async_task = TaskContext.from_async_discovery( + self.async_task = TaskContext.from_discovered_interface( discovered, port_read_manager: port_read_manager ) end @@ -259,7 +259,7 @@ def finished_discovery_validate_ior(async_discovery) # task is nil if the resolution failed def discover_task(name, ior, orogen_model_name) orogen_model = orogen_model_from_name(orogen_model_name) - discovered = TaskContext.async_discovery(name, ior, orogen_model) + discovered = TaskContext.discover_interface(name, ior, orogen_model) [ior, discovered] rescue StandardError => e diff --git a/lib/syskit/telemetry/async/task_context.rb b/lib/syskit/telemetry/async/task_context.rb index c55fe76a3..6d27fd0a7 100644 --- a/lib/syskit/telemetry/async/task_context.rb +++ b/lib/syskit/telemetry/async/task_context.rb @@ -64,7 +64,7 @@ def states_index_to_symbols @states_index_to_symbols end - Discovered = Struct.new( + DiscoveredInterface = Struct.new( :task, :orogen_model, :attributes, :properties, :ports, keyword_init: true ) @@ -73,11 +73,11 @@ def states_index_to_symbols # # This is meant to be called in a separate thread # - # @return [Discovered] - def self.async_discovery(name, ior, orogen_model) + # @return [DiscoveredInterface] + def self.discover_interface(name, ior, orogen_model) task = Orocos::TaskContext.new(ior, name: name, model: orogen_model) - Discovered.new( + DiscoveredInterface.new( task: task, orogen_model: orogen_model, attributes: task.attribute_names.map { task.attribute(_1) }, @@ -87,8 +87,8 @@ def self.async_discovery(name, ior, orogen_model) end # Create a TaskContext object from the information returned by - # {.async_discovery} - def self.from_async_discovery(discovered, port_read_manager:) + # {.discover} + def self.from_discovered_interface(discovered, port_read_manager:) async_task = new( discovered.task.name, model: discovered.orogen_model, @@ -109,9 +109,9 @@ def self.from_async_discovery(discovered, port_read_manager:) # for specific cases (essentially unit tests and scripts) def self.discover(name, ior, orogen_model, port_read_manager:) discovered = Orocos.allow_blocking_calls do - TaskContext.async_discovery(name, ior, orogen_model) + TaskContext.discover_interface(name, ior, orogen_model) end - TaskContext.from_async_discovery( + TaskContext.from_discovered_interface( discovered, port_read_manager: port_read_manager ) end