Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions lib/syskit/interface/commands.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def deployments
command :deployments,
"returns information about running deployments"

def system_definitions; end
command :system_definitions,
"transmits definition of orogen models and typelib types"

# Return incremental update about deployments
#
# @return [Protocol::Deployment]
Expand Down
2 changes: 2 additions & 0 deletions lib/syskit/telemetry/async.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# frozen_string_literal: true

require "syskit/telemetry/async/main_thread_restrictions"

require "syskit/telemetry/async/name_service"
require "syskit/telemetry/async/task_context"
require "syskit/telemetry/async/interface_object"
Expand Down
4 changes: 4 additions & 0 deletions lib/syskit/telemetry/async/interface_object.rb
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ def initialize(task_context, name, type)
@type = type
end

def to_s
"#{task_context}.#{name}<#{object_id},#{type.name}>"
end

def eql?(other)
other.task_context.eql?(task_context) &&
other.name == name &&
Expand Down
34 changes: 34 additions & 0 deletions lib/syskit/telemetry/async/main_thread_restrictions.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# frozen_string_literal: true

module Syskit
module Telemetry
module Async
# Module providing common functionality to validate that some methods are
# called within the main thread
Comment on lines +6 to +7

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a characterization for the "main thread". Something like

[...] The main thread is the thread upon the instance was created from

module MainThreadRestrictions
# Make a thread the main thread
#
# Usually called in an object's constructor
#
# @param [Thread] thread the thread that will become 'main', by default
# the current thread
def update_main_thread(thread = Thread.current)
@__main_thread = thread
end

class InvalidThread < RuntimeError; end

# Validate that the current thread is the main thread
#
# Called in non-thread-safe methods to make sure they aren't called
# outside of the main thread
def ensure_in_main_thread
return if Thread.current == @__main_thread

raise InvalidThread,
"calling #{caller(1, 1).first} outside of main thread"
end
end
end
end
end
55 changes: 38 additions & 17 deletions lib/syskit/telemetry/async/name_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ module Telemetry
module Async
# In-process name service
#
# It is exclusively filled using information that comes from the async
# It is exclusively filled based on information that comes from the async
# {Client}
class NameService < Orocos::NameServiceBase
include MainThreadRestrictions

# A new NameService instance
#
# @param [Hash<String,Orocos::TaskContext>] tasks The tasks which are
Expand All @@ -22,6 +24,8 @@ def initialize(
)
super()

update_main_thread

@iors = Concurrent::AtomicReference.new({})
@registered_tasks = Concurrent::Hash.new
@task_added_callbacks = Concurrent::Array.new
Expand Down Expand Up @@ -54,10 +58,14 @@ def include?(name)
# After this call, any task not in the tasks parameter will have been
# removed from the name server
#
# @param [#ior,#name] list of IOR and name of remote tasks to resolve
# @param [#ior,#name] list of IOR and name of remote tasks to resolve.
# This list is complete, that is it contains all the tasks that the
# name server should know about
# @return [Array<String>] list of task names that are either known, or
# that are being discovered
def async_update_tasks(tasks)
ensure_in_main_thread

iors = tasks.each_with_object({}) { |t, h| h[t.name] = t.ior }
@iors.set(iors)

Expand Down Expand Up @@ -102,16 +110,20 @@ class AsyncDiscoveryError < RuntimeError; end
AsyncDiscovery = Struct.new(
:task, :future, :ior, :async_task, keyword_init: true
) do
def update_from_result
fulfilled, (ior, async_task), reason = future.result
def update_from_result(port_read_manager:)
fulfilled, (ior, discovered), reason = future.result
unless fulfilled
raise AsyncDiscoveryError,
"unexpected error during asynchronous " \
"task discovery: #{reason}"
end

self.ior = ior
self.async_task = async_task
return unless discovered

self.async_task = TaskContext.from_async_discovery(
discovered, port_read_manager: port_read_manager
)
end

def wait
Expand All @@ -127,19 +139,29 @@ def resolved?
#
# Create a future that discovers a remote task
def async_discover_task(task)
ensure_in_main_thread

future = Concurrent::Promises.future_on(@discovery_executor) do
ior = @iors.get[task.name]

# ior will be nil if the task has been removed from the task
# set while the future was pending
discover_task(task.name, ior, task.orogen_model_name) if ior
end
@discovery[task.name] = AsyncDiscovery.new(task: task, future: future)
end

def wait_and_resolve_all_pending_discoveries
@discovery.each_value { _1.future.wait }
resolve_discovered_tasks
end

# @api private
#
# Process the tasks that have been (asynchronously) discovered
def resolve_discovered_tasks
ensure_in_main_thread

while (async_discovery = pop_discovered_task)
register(
async_discovery.async_task,
Expand All @@ -165,10 +187,12 @@ def wait_for_task_discovery
# @return [AsyncDiscovery,nil] a valid resolved task or nil if there are
# none so far
def pop_discovered_task
ensure_in_main_thread

loop do
return unless (async_discovery = pop_finished_discovery)
next unless finished_discovery_validate_ior(async_discovery)
next unless async_discovery.async_task
next unless async_discovery.async_task # error during resolution

return async_discovery
end
Expand All @@ -183,11 +207,15 @@ def pop_discovered_task
#
# @return [AsyncDiscovery]
def pop_finished_discovery
ensure_in_main_thread

async_discovery = @discovery.each_value.find(&:resolved?)
return unless async_discovery

@discovery.delete(async_discovery.task.name)
async_discovery.update_from_result
async_discovery.update_from_result(
port_read_manager: @port_read_manager
)
async_discovery
end

Expand Down Expand Up @@ -230,17 +258,10 @@ def finished_discovery_validate_ior(async_discovery)
# resolve the task, and the async taskcontext that represents it. The
# task is nil if the resolution failed
def discover_task(name, ior, orogen_model_name)
task = Orocos::TaskContext.new(
ior,
name: name,
model: orogen_model_from_name(orogen_model_name)
)

async_task = TaskContext.discover(
task, port_read_manager: @port_read_manager
)
orogen_model = orogen_model_from_name(orogen_model_name)
discovered = TaskContext.async_discovery(name, ior, orogen_model)

[ior, async_task]
[ior, discovered]
rescue StandardError => e
warn "Failed discovery of task #{name}: #{e.message}"
e.backtrace.each do |line|
Expand Down
4 changes: 4 additions & 0 deletions lib/syskit/telemetry/async/output_reader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ def initialize(port, policy, connect_on:, disconnect_on:)
end
end

def to_s
"#{port}.reader<#{object_id}>"
end

def raw_reader
@reader.get
end
Expand Down
66 changes: 47 additions & 19 deletions lib/syskit/telemetry/async/port_read_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ module Async
#
# Central class that manages data readers for ports
class PortReadManager
include MainThreadRestrictions

def initialize(
connection_executor: self.class.default_connection_executor,
disconnection_executor: self.class.default_disconnection_executor,
read_executor: self.class.default_read_executor
)
update_main_thread

@callbacks = {}
@pollers = Concurrent::AtomicReference.new({})

Expand All @@ -20,6 +24,18 @@ def initialize(
@read_executor = read_executor
end

# @api private
#
# Internal access method for the callback map
#
# ONLY use this method to access @callbacks, it validates that it is
# being accessed from the main thread
def callbacks
ensure_in_main_thread

@callbacks
end

CONNECTION_DEFAULT_THREADS = 20
DISCONNECTION_DEFAULT_THREADS = 20
READ_DEFAULT_THREADS = 5
Expand Down Expand Up @@ -87,7 +103,7 @@ def to_s(relative_to: PortReadManager.monotonic_time)
"poller %<name>s: connected=%<connected>s " \
"scheduled=%<scheduled>s " \
"next_time=%<next_time>.3f (in %<next_time_delta_ms>i ms)",
name: port.full_name,
name: port.to_s,
next_time: next_time || 0,
next_time_delta_ms: next_time_delta_ms || 0,
connected: connected? ? "yes" : "no",
Expand All @@ -98,7 +114,8 @@ def to_s(relative_to: PortReadManager.monotonic_time)
def schedule_read_if_needed(now, executor)
return if next_time && next_time > now

self.read_future = reader.raw_read_new(executor)
self.read_future =
reader.raw_read_with_result(executor, nil, false)
end

def prepare_next_read(now)
Expand Down Expand Up @@ -140,7 +157,7 @@ def register_callback(port, callback, period:, buffer_size:, init: false)
needs_last_received_value: true
)

(@callbacks[port] ||= []) << callback
(callbacks[port] ||= []) << callback
ensure_reader_uptodate(port)
propagate_last_received_value(port)
Roby.disposable do
Expand All @@ -160,7 +177,7 @@ def propagate_last_received_value(port)
# This is not meant to be called directly. Use the disposable
# returned by {#register_callback} instead.
def deregister_callback(port, callback)
return unless (callbacks = @callbacks[port])
return unless (callbacks = self.callbacks[port])

callbacks.delete(callback)
if callbacks.empty?
Expand Down Expand Up @@ -217,7 +234,7 @@ def polling?(port)
# Update a poller's period to match the callbacks currently listening
# to it
def update_poller_period(poller)
poller.period = @callbacks[poller.port].map(&:period).min
poller.period = callbacks[poller.port].map(&:period).min
end

# Return the Reader for the given port
Expand Down Expand Up @@ -246,17 +263,18 @@ def process_poller_state(poller, now)
return
end

if poller.propagate_last_received_value && poller.last_value &&
!poller.resolved_read?
dispatch_last_received_value(poller)
end

if !poller.scheduled_read?
poller.schedule_read_if_needed(now, @read_executor)
elsif poller.resolved_read?
dispatch_read_result(poller)
poller.prepare_next_read(now)
end

if poller.propagate_last_received_value && poller.last_value
dispatch_last_received_value(poller)
end

poller.propagate_last_received_value = false
end

# Time in seconds returned by CLOCK_MONOTONIC
Expand All @@ -271,29 +289,39 @@ def self.monotonic_time

# Send read data to registered callbacks
def dispatch_read_result(poller)
fulfilled, value, reason = poller.result
if fulfilled
@callbacks[poller.port].each { |c| c.dispatch(value) }
poller.last_value = value
poller.propagate_last_received_value = false
else
fulfilled, read_result, reason = poller.result
unless fulfilled
warn "failed to read #{poller.port}: #{reason}"
return
end

unless read_result # no data
poller.last_value = nil
return
end

flow, value = read_result
return unless flow == Orocos::NEW_DATA

callbacks[poller.port].each { |c| c.dispatch(value) }
poller.last_value = value
poller.propagate_last_received_value = false
end

# Send last received value to the callbacks that require it
def dispatch_last_received_value(poller)
@callbacks[poller.port].each do |c|
c.dispatch(poller.last_value)
if (value = poller.last_value)
callbacks[poller.port].each { _1.dispatch(value) }
end

poller.propagate_last_received_value = false
end

# Return the buffer size needed by all callbacks of a port, in aggregate
#
# @return [Integer]
def required_policy_for(port)
return unless (callbacks = @callbacks[port])
return unless (callbacks = self.callbacks[port])

buffer_size = callbacks.map { _1.buffer_size }.max
init = callbacks.map { _1.init }.inject(&:|)
Expand Down
Loading