diff --git a/lib/syskit/interface/commands.rb b/lib/syskit/interface/commands.rb index a2bf5336c..754825d1d 100644 --- a/lib/syskit/interface/commands.rb +++ b/lib/syskit/interface/commands.rb @@ -16,6 +16,10 @@ def deployments command :deployments, "returns information about running deployments" + def system_definitions; end + command :system_definitions, + "transmits definition of orogen models and typelib types" + # Return incremental update about deployments # # @return [Protocol::Deployment] diff --git a/lib/syskit/telemetry/async.rb b/lib/syskit/telemetry/async.rb index b7dacc731..6156cb18f 100644 --- a/lib/syskit/telemetry/async.rb +++ b/lib/syskit/telemetry/async.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "syskit/telemetry/async/main_thread_restrictions" + require "syskit/telemetry/async/name_service" require "syskit/telemetry/async/task_context" require "syskit/telemetry/async/interface_object" diff --git a/lib/syskit/telemetry/async/interface_object.rb b/lib/syskit/telemetry/async/interface_object.rb index 4424c0b26..d30e3c79b 100644 --- a/lib/syskit/telemetry/async/interface_object.rb +++ b/lib/syskit/telemetry/async/interface_object.rb @@ -41,6 +41,10 @@ def initialize(task_context, name, type) @type = type end + def to_s + "#{task_context}.#{name}<#{object_id},#{type.name}>" + end + def eql?(other) other.task_context.eql?(task_context) && other.name == name && diff --git a/lib/syskit/telemetry/async/main_thread_restrictions.rb b/lib/syskit/telemetry/async/main_thread_restrictions.rb new file mode 100644 index 000000000..2c1b362ef --- /dev/null +++ b/lib/syskit/telemetry/async/main_thread_restrictions.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module Syskit + module Telemetry + module Async + # Module providing common functionality to validate that some methods are + # called within the main thread + module MainThreadRestrictions + # Make a thread the main thread + # + # Usually called in an object's constructor + # + # @param [Thread] thread the thread that will become 'main', by default + # the current thread + def update_main_thread(thread = Thread.current) + @__main_thread = thread + end + + class InvalidThread < RuntimeError; end + + # Validate that the current thread is the main thread + # + # Called in non-thread-safe methods to make sure they aren't called + # outside of the main thread + def ensure_in_main_thread + return if Thread.current == @__main_thread + + raise InvalidThread, + "calling #{caller(1, 1).first} outside of main thread" + end + end + end + end +end diff --git a/lib/syskit/telemetry/async/name_service.rb b/lib/syskit/telemetry/async/name_service.rb index ba3f7f997..39d12a8d4 100644 --- a/lib/syskit/telemetry/async/name_service.rb +++ b/lib/syskit/telemetry/async/name_service.rb @@ -7,9 +7,11 @@ module Telemetry module Async # In-process name service # - # It is exclusively filled using information that comes from the async + # It is exclusively filled based on information that comes from the async # {Client} class NameService < Orocos::NameServiceBase + include MainThreadRestrictions + # A new NameService instance # # @param [Hash] tasks The tasks which are @@ -22,6 +24,8 @@ def initialize( ) super() + update_main_thread + @iors = Concurrent::AtomicReference.new({}) @registered_tasks = Concurrent::Hash.new @task_added_callbacks = Concurrent::Array.new @@ -54,10 +58,14 @@ def include?(name) # After this call, any task not in the tasks parameter will have been # removed from the name server # - # @param [#ior,#name] list of IOR and name of remote tasks to resolve + # @param [#ior,#name] list of IOR and name of remote tasks to resolve. + # This list is complete, that is it contains all the tasks that the + # name server should know about # @return [Array] list of task names that are either known, or # that are being discovered def async_update_tasks(tasks) + ensure_in_main_thread + iors = tasks.each_with_object({}) { |t, h| h[t.name] = t.ior } @iors.set(iors) @@ -102,8 +110,8 @@ class AsyncDiscoveryError < RuntimeError; end AsyncDiscovery = Struct.new( :task, :future, :ior, :async_task, keyword_init: true ) do - def update_from_result - fulfilled, (ior, async_task), reason = future.result + def update_from_result(port_read_manager:) + fulfilled, (ior, discovered), reason = future.result unless fulfilled raise AsyncDiscoveryError, "unexpected error during asynchronous " \ @@ -111,7 +119,11 @@ def update_from_result end self.ior = ior - self.async_task = async_task + return unless discovered + + self.async_task = TaskContext.from_async_discovery( + discovered, port_read_manager: port_read_manager + ) end def wait @@ -127,8 +139,11 @@ def resolved? # # Create a future that discovers a remote task def async_discover_task(task) + ensure_in_main_thread + future = Concurrent::Promises.future_on(@discovery_executor) do ior = @iors.get[task.name] + # ior will be nil if the task has been removed from the task # set while the future was pending discover_task(task.name, ior, task.orogen_model_name) if ior @@ -136,10 +151,17 @@ def async_discover_task(task) @discovery[task.name] = AsyncDiscovery.new(task: task, future: future) end + def wait_and_resolve_all_pending_discoveries + @discovery.each_value { _1.future.wait } + resolve_discovered_tasks + end + # @api private # # Process the tasks that have been (asynchronously) discovered def resolve_discovered_tasks + ensure_in_main_thread + while (async_discovery = pop_discovered_task) register( async_discovery.async_task, @@ -165,10 +187,12 @@ def wait_for_task_discovery # @return [AsyncDiscovery,nil] a valid resolved task or nil if there are # none so far def pop_discovered_task + ensure_in_main_thread + loop do return unless (async_discovery = pop_finished_discovery) next unless finished_discovery_validate_ior(async_discovery) - next unless async_discovery.async_task + next unless async_discovery.async_task # error during resolution return async_discovery end @@ -183,11 +207,15 @@ def pop_discovered_task # # @return [AsyncDiscovery] def pop_finished_discovery + ensure_in_main_thread + async_discovery = @discovery.each_value.find(&:resolved?) return unless async_discovery @discovery.delete(async_discovery.task.name) - async_discovery.update_from_result + async_discovery.update_from_result( + port_read_manager: @port_read_manager + ) async_discovery end @@ -230,17 +258,10 @@ def finished_discovery_validate_ior(async_discovery) # resolve the task, and the async taskcontext that represents it. The # task is nil if the resolution failed def discover_task(name, ior, orogen_model_name) - task = Orocos::TaskContext.new( - ior, - name: name, - model: orogen_model_from_name(orogen_model_name) - ) - - async_task = TaskContext.discover( - task, port_read_manager: @port_read_manager - ) + orogen_model = orogen_model_from_name(orogen_model_name) + discovered = TaskContext.async_discovery(name, ior, orogen_model) - [ior, async_task] + [ior, discovered] rescue StandardError => e warn "Failed discovery of task #{name}: #{e.message}" e.backtrace.each do |line| diff --git a/lib/syskit/telemetry/async/output_reader.rb b/lib/syskit/telemetry/async/output_reader.rb index 9114ea3c4..1cbe438f5 100644 --- a/lib/syskit/telemetry/async/output_reader.rb +++ b/lib/syskit/telemetry/async/output_reader.rb @@ -32,6 +32,10 @@ def initialize(port, policy, connect_on:, disconnect_on:) end end + def to_s + "#{port}.reader<#{object_id}>" + end + def raw_reader @reader.get end diff --git a/lib/syskit/telemetry/async/port_read_manager.rb b/lib/syskit/telemetry/async/port_read_manager.rb index c9ea8e85d..5f304e9fb 100644 --- a/lib/syskit/telemetry/async/port_read_manager.rb +++ b/lib/syskit/telemetry/async/port_read_manager.rb @@ -7,11 +7,15 @@ module Async # # Central class that manages data readers for ports class PortReadManager + include MainThreadRestrictions + def initialize( connection_executor: self.class.default_connection_executor, disconnection_executor: self.class.default_disconnection_executor, read_executor: self.class.default_read_executor ) + update_main_thread + @callbacks = {} @pollers = Concurrent::AtomicReference.new({}) @@ -20,6 +24,18 @@ def initialize( @read_executor = read_executor end + # @api private + # + # Internal access method for the callback map + # + # ONLY use this method to access @callbacks, it validates that it is + # being accessed from the main thread + def callbacks + ensure_in_main_thread + + @callbacks + end + CONNECTION_DEFAULT_THREADS = 20 DISCONNECTION_DEFAULT_THREADS = 20 READ_DEFAULT_THREADS = 5 @@ -87,7 +103,7 @@ def to_s(relative_to: PortReadManager.monotonic_time) "poller %s: connected=%s " \ "scheduled=%s " \ "next_time=%.3f (in %i ms)", - name: port.full_name, + name: port.to_s, next_time: next_time || 0, next_time_delta_ms: next_time_delta_ms || 0, connected: connected? ? "yes" : "no", @@ -98,7 +114,8 @@ def to_s(relative_to: PortReadManager.monotonic_time) def schedule_read_if_needed(now, executor) return if next_time && next_time > now - self.read_future = reader.raw_read_new(executor) + self.read_future = + reader.raw_read_with_result(executor, nil, false) end def prepare_next_read(now) @@ -140,7 +157,7 @@ def register_callback(port, callback, period:, buffer_size:, init: false) needs_last_received_value: true ) - (@callbacks[port] ||= []) << callback + (callbacks[port] ||= []) << callback ensure_reader_uptodate(port) propagate_last_received_value(port) Roby.disposable do @@ -160,7 +177,7 @@ def propagate_last_received_value(port) # This is not meant to be called directly. Use the disposable # returned by {#register_callback} instead. def deregister_callback(port, callback) - return unless (callbacks = @callbacks[port]) + return unless (callbacks = self.callbacks[port]) callbacks.delete(callback) if callbacks.empty? @@ -217,7 +234,7 @@ def polling?(port) # Update a poller's period to match the callbacks currently listening # to it def update_poller_period(poller) - poller.period = @callbacks[poller.port].map(&:period).min + poller.period = callbacks[poller.port].map(&:period).min end # Return the Reader for the given port @@ -246,17 +263,18 @@ def process_poller_state(poller, now) return end - if poller.propagate_last_received_value && poller.last_value && - !poller.resolved_read? - dispatch_last_received_value(poller) - end - if !poller.scheduled_read? poller.schedule_read_if_needed(now, @read_executor) elsif poller.resolved_read? dispatch_read_result(poller) poller.prepare_next_read(now) end + + if poller.propagate_last_received_value && poller.last_value + dispatch_last_received_value(poller) + end + + poller.propagate_last_received_value = false end # Time in seconds returned by CLOCK_MONOTONIC @@ -271,21 +289,31 @@ def self.monotonic_time # Send read data to registered callbacks def dispatch_read_result(poller) - fulfilled, value, reason = poller.result - if fulfilled - @callbacks[poller.port].each { |c| c.dispatch(value) } - poller.last_value = value - poller.propagate_last_received_value = false - else + fulfilled, read_result, reason = poller.result + unless fulfilled warn "failed to read #{poller.port}: #{reason}" + return + end + + unless read_result # no data + poller.last_value = nil + return end + + flow, value = read_result + return unless flow == Orocos::NEW_DATA + + callbacks[poller.port].each { |c| c.dispatch(value) } + poller.last_value = value + poller.propagate_last_received_value = false end # Send last received value to the callbacks that require it def dispatch_last_received_value(poller) - @callbacks[poller.port].each do |c| - c.dispatch(poller.last_value) + if (value = poller.last_value) + callbacks[poller.port].each { _1.dispatch(value) } end + poller.propagate_last_received_value = false end @@ -293,7 +321,7 @@ def dispatch_last_received_value(poller) # # @return [Integer] def required_policy_for(port) - return unless (callbacks = @callbacks[port]) + return unless (callbacks = self.callbacks[port]) buffer_size = callbacks.map { _1.buffer_size }.max init = callbacks.map { _1.init }.inject(&:|) diff --git a/lib/syskit/telemetry/async/task_context.rb b/lib/syskit/telemetry/async/task_context.rb index 4e2fe148c..c55fe76a3 100644 --- a/lib/syskit/telemetry/async/task_context.rb +++ b/lib/syskit/telemetry/async/task_context.rb @@ -64,54 +64,59 @@ def states_index_to_symbols @states_index_to_symbols end - # Discover information about a Orocos::TaskContext and create the - # corresponding {TaskContext} + Discovered = Struct.new( + :task, :orogen_model, :attributes, :properties, :ports, + keyword_init: true + ) + + # Discover information about a Orocos::TaskContext # # This is meant to be called in a separate thread - def self.discover(task, port_read_manager:) - async_task = TaskContext.new( - task.name, port_read_manager: port_read_manager - ) - - # Already do an initial discovery of all the task's interface objects - discover_attributes(async_task, task) - discover_properties(async_task, task) - discover_ports(async_task, task) - - # We can do this here ONLY BECAUSE we're populating an initial - # state. Further updates need to call the `discover_` methods in - # the main thread - async_task.reachable!(task) - async_task - end - - # @api private # - # Discover a remote task's attributes - def self.discover_attributes(async_task, task) - raw_attributes = task.attribute_names.map { task.attribute(_1) } - async_task.discover_attributes(raw_attributes) + # @return [Discovered] + def self.async_discovery(name, ior, orogen_model) + task = Orocos::TaskContext.new(ior, name: name, model: orogen_model) + + Discovered.new( + task: task, + orogen_model: orogen_model, + attributes: task.attribute_names.map { task.attribute(_1) }, + properties: task.property_names.map { task.property(_1) }, + ports: task.port_names.map { task.port(_1) } + ) end - # @api private - # - # Discover a remote task's properties - def self.discover_properties(async_task, task) - raw_properties = task.property_names.map { task.property(_1) } - async_task.discover_properties(raw_properties) + # Create a TaskContext object from the information returned by + # {.async_discovery} + def self.from_async_discovery(discovered, port_read_manager:) + async_task = new( + discovered.task.name, + model: discovered.orogen_model, + port_read_manager: port_read_manager + ) + async_task.discover_attributes(discovered.attributes) + async_task.discover_properties(discovered.properties) + async_task.discover_ports(discovered.ports) + async_task.reachable!(discovered.task) + async_task end - # @api private + # Synchronously discover remote task info and return the corresponding + # {TaskContext} object # - # Discover a remote task's ports - def self.discover_ports(async_task, task) - raw_ports = task.port_names.map { task.port(_1) } - async_task.discover_ports(raw_ports) + # This must be called within the main application thread, and must be + # wrapped in a Orocos.allow_blocking_calls call. This is meant as a helper + # for specific cases (essentially unit tests and scripts) + def self.discover(name, ior, orogen_model, port_read_manager:) + discovered = Orocos.allow_blocking_calls do + TaskContext.async_discovery(name, ior, orogen_model) + end + TaskContext.from_async_discovery( + discovered, port_read_manager: port_read_manager + ) end - def initialize( - name, port_read_manager:, model: self.class.dummy_orogen_model(name) - ) + def initialize(name, port_read_manager:, model:) super() @name = name @@ -130,6 +135,10 @@ def initialize( @current_state = nil end + def to_s + "TaskContext:#{name}<#{object_id}>" + end + @dummy_orogen_models = Concurrent::Hash.new def self.dummy_orogen_model(name) diff --git a/test/telemetry/async/test_name_service.rb b/test/telemetry/async/test_name_service.rb index 1f94d9ae0..605038c66 100644 --- a/test/telemetry/async/test_name_service.rb +++ b/test/telemetry/async/test_name_service.rb @@ -243,14 +243,9 @@ module Async describe "#ior" do it "returns the IOR of a registered task" do - _, task = make_deployed_task("test", "some") - async_task = Orocos.allow_blocking_calls do - TaskContext.discover( - task, port_read_manager: PortReadManager.new - ) - end - assert_equal "test", async_task.name - @ns.register(async_task) + deployed_task, task = make_deployed_task("test", "some") + @ns.async_update_tasks([deployed_task]) + @ns.wait_and_resolve_all_pending_discoveries assert_equal task.ior, @ns.ior("test") end diff --git a/test/telemetry/async/test_output_port_subfield.rb b/test/telemetry/async/test_output_port_subfield.rb index a321f5419..bf35ed503 100644 --- a/test/telemetry/async/test_output_port_subfield.rb +++ b/test/telemetry/async/test_output_port_subfield.rb @@ -113,11 +113,12 @@ def make_ruby_task(name) end def make_async_task(name) - t = make_ruby_task name - async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @port_read_manager) + task = make_ruby_task(name) + async_task = Orocos.allow_blocking_calls do + TaskContext.discover(task, task.ior, task.model, + port_read_manager: @port_read_manager) end - [t, async] + [task, async_task] end def assert_polling_eventually(period: 0.01, timeout: 2, &block) diff --git a/test/telemetry/async/test_output_reader.rb b/test/telemetry/async/test_output_reader.rb index 6050bb6a3..2a9636e4b 100644 --- a/test/telemetry/async/test_output_reader.rb +++ b/test/telemetry/async/test_output_reader.rb @@ -206,7 +206,8 @@ def make_ruby_task(name) def make_async_task(name) t = make_ruby_task name async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @port_read_manager) + TaskContext.discover(name, t.ior, t.model, + port_read_manager: @port_read_manager) end [t, async] end diff --git a/test/telemetry/async/test_port_read_manager.rb b/test/telemetry/async/test_port_read_manager.rb index 6645d38a6..1c9cca55d 100644 --- a/test/telemetry/async/test_port_read_manager.rb +++ b/test/telemetry/async/test_port_read_manager.rb @@ -152,12 +152,66 @@ module Async Orocos.allow_blocking_calls { @task.out.write 42 } execute_all(@read_executor) - assert_equal 42, @poller.read_future.value + assert_equal [1, 42], @poller.read_future.value @manager.poll assert_equal [42], @received_samples end + it "propagates a new received sample" do + execute_all(@connection_executor) + @manager.poll + + assert @poller.reader.connected? + assert @poller.read_future + + Orocos.allow_blocking_calls { @task.out.write 42 } + execute_all(@read_executor) + assert_equal [1, 42], @poller.read_future.value + @manager.poll + + assert_equal 42, @poller.last_value + assert_equal [42], @received_samples + end + + it "propagates a new value received after an existing one" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + assert_equal 42, @poller.last_value + + Orocos.allow_blocking_calls { @task.out.write 21 } + read_next_sample + assert_equal 21, @poller.last_value + assert_equal [42, 21], @received_samples + end + + it "does not propagate anything if a new read indicates " \ + "no new values" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + read_next_sample + assert_equal 42, @poller.last_value + assert_equal [42], @received_samples + end + + it "resets the last value to nil if the reader is disconnected" do + execute_all(@connection_executor) + @manager.poll + + Orocos.allow_blocking_calls { @task.out.write 42 } + read_next_sample + assert_equal 42, @poller.last_value + Orocos.allow_blocking_calls { @task.out.disconnect_all } + read_next_sample + assert_nil @poller.last_value + end + it "reschedules the second read based on the end of the first" do execute_all(@connection_executor) @manager.poll @@ -210,6 +264,17 @@ module Async assert @poller.read_future end + def read_next_sample + unless @poller.read_future + freeze_monotonic_time(@poller.next_time + 1e-6) + @manager.poll + end + + execute_all(@read_executor) + @poller.read_future.wait + @manager.poll + end + def freeze_monotonic_time(time = @manager.monotonic_time) @frozen_time = time flexmock(@manager) @@ -235,7 +300,8 @@ def make_ruby_task(name) def make_async_task(name) t = make_ruby_task name async = Orocos.allow_blocking_calls do - TaskContext.discover(t, port_read_manager: @manager) + TaskContext.discover(name, t.ior, t.model, + port_read_manager: @manager) end [t, async] end