feat(modules): async modules#1920
Conversation
Greptile SummaryThis PR introduces a first-class async programming model for modules: an
Confidence Score: 2/5Not safe to merge: _logging_task_factory is broken on Python ≥ 3.10, making the entire async task system non-functional. A single P0 finding caps the score at 2. The asyncio.Task(loop=loop) call in _logging_task_factory raises TypeError on any Python ≥ 3.10, which is the actively supported range. This factory is set on every module's event loop via get_loop(), so every coroutine scheduled by spawn(), process_observable(), or _start_main() would fail to create a task. dimos/core/module.py — specifically _logging_task_factory at line 718 Important Files Changed
Sequence DiagramsequenceDiagram
participant C as Sync Caller
participant W as arpc Wrapper
participant EL as Module Event Loop
participant P as AsyncSpecProxy
participant TP as ThreadPoolExecutor
participant RL as Remote Module Loop
C->>W: handle_x(msg) from RPC dispatcher
W->>W: get_running_loop raises, running=None
W->>EL: run_coroutine_threadsafe(coro)
W->>W: future.result() blocks caller
EL->>EL: await coro
EL-->>C: result returned
EL->>P: await self._ref.remote_method(x)
P->>TP: run_in_executor(None, RpcCall)
TP->>RL: run_coroutine_threadsafe(coro)
RL-->>TP: result
TP-->>EL: Future resolved
C->>EL: start - run_coroutine_threadsafe gen anext
EL->>EL: setup code then yield
EL-->>C: setup done, start returns
C->>EL: stop - run_coroutine_threadsafe gen anext
EL->>EL: teardown then StopAsyncIteration
EL-->>C: teardown done, stop returns
Reviews (4): Last reviewed commit: "feat(modules): async modules" | Re-trigger Greptile |
| def _log_async_handler_error(self, fut: Any) -> None: | ||
| try: | ||
| fut.result() | ||
| except (asyncio.CancelledError, RuntimeError): | ||
| pass # loop stopped or task cancelled during shutdown | ||
| except BaseException as e: | ||
| # Include exception type+message in the event string so it is | ||
| # visible on consoles whose formatters strip exc_info/traceback. | ||
| logger.exception( | ||
| f"Unhandled error in async task on {type(self).__name__}._loop: " | ||
| f"{type(e).__name__}: {e}" | ||
| ) |
There was a problem hiding this comment.
RuntimeError catch swallows user exceptions from spawned tasks
The handler silently discards any RuntimeError raised inside user-written coroutines scheduled via spawn() or process_observable(). RuntimeError is extremely common in Python (e.g., StopIteration propagation, generator misuse, and countless library errors). The intent — catching "event loop is closed" — is better served by narrowing the RuntimeError arm:
import concurrent.futures
def _log_async_handler_error(self, fut: Any) -> None:
try:
fut.result()
except (asyncio.CancelledError, concurrent.futures.CancelledError):
pass # task cancelled during shutdown
except RuntimeError as e:
if "event loop" in str(e).lower() or "loop is closed" in str(e).lower():
pass # loop shut down before task completed
else:
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)
except BaseException as e:
logger.exception(
f"Unhandled error in async task on {type(self).__name__}._loop: "
f"{type(e).__name__}: {e}"
)As written, any RuntimeError in a spawned task is swallowed, defeating the whole purpose of spawn() over bare run_coroutine_threadsafe.
02b6e09 to
079c62a
Compare
|
Want your agent to iterate on Greptile's feedback? Try greploops. |
079c62a to
c87457c
Compare
c87457c to
c9a4d56
Compare
| Adds a done callback to log unhandled exceptions from any task created on | ||
| the loop. | ||
| """ | ||
| task = asyncio.Task(coro, loop=loop, **kwargs) |
There was a problem hiding this comment.
asyncio.Task(loop=...) removed in Python 3.10+
asyncio.Task.__init__ dropped the loop keyword argument in Python 3.10 (deprecated in 3.8). On Python ≥ 3.10, every coroutine scheduled on a module's loop will immediately raise TypeError: __init__() got an unexpected keyword argument 'loop', silently breaking the entire async task system. Inside a task factory the loop is already the running loop, so the fix is simply to drop the kwarg:
| task = asyncio.Task(coro, loop=loop, **kwargs) | |
| task = asyncio.Task(coro, **kwargs) |
| return fn | ||
|
|
||
|
|
||
| def arpc(fn: Callable[..., Coroutine[Any, Any, Any]]) -> Callable[..., Any]: |
There was a problem hiding this comment.
you can autodetect if function is async, so we can have a same @rpc decorator for both
| ) -> "DisposableBase": | ||
| """Subscribe `async_cb` (an async function) to `observable`, dispatching | ||
| each emitted value onto self._loop. Invocations are serialized through a | ||
| per-subscription dispatcher task with LATEST coalescing. The subscription |
There was a problem hiding this comment.
this means we basically drop msgs if consumer is slow? that's good just making sure
|
|
||
| ### Auto-bound input handlers | ||
|
|
||
| For every declared `x: In[T]`, if the module defines `async def handle_x(self, msg: T)`, the handler is automatically subscribed at `start()` and dispatched onto `self._loop`. Subscriptions are cleaned up at `stop()`. |
| # No lock needed. `_teleop_active` is only mutated on `self._loop`. | ||
| self._teleop_active = False | ||
|
|
||
| async def handle_clicked_point(self, msg: PointStamped) -> None: |
There was a problem hiding this comment.
this is confusing to me, should't this module have
clicked_point: In[PointStamped] defined?
Problem
Having so many threads in modules makes code hard to write.
Closes DIM-812
Solution
self._loopdaemon-thread asyncio loop, removing the need for locks on per-instance state.async def handle_xfor everyx: In[T]; each handler gets a dedicated dispatcher task with LATEST-only semantics (drops intermediate messages, guarantees the most recent value is eventually processed).@arpcdecorator: async RPC body interchangeable with@rpc(same discovery + RPC machinery).self.spawn(coro)replaces barerun_coroutine_threadsafeand routes unhandled exceptions to the module logger instead of swallowing them in an unread Future.self.process_observable(obs, async_handler)bridges rxpy observables ontoself._loop.async def main(self)async-generator-with-one-yieldcollapses pairedstart()/stop()resource setup/teardown into a single visually-adjacent block.Breaking Changes
None.
How to Test
Contributor License Agreement