feat(connectors): add AsyncDBConnectorProtocol; implement on PostgreSQL and SQLite#145
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
| python_classes = Test* | ||
| python_functions = test_* | ||
| addopts = -v | ||
| asyncio_default_fixture_loop_scope = function |
There was a problem hiding this comment.
`asyncio_default_fixture_loop_scope = function` tells pytest-asyncio to create a fresh event loop for each test function. Without it, pytest-asyncio ≥ 0.23 emits a `DeprecationWarning` and will error in a future release. The `function` scope is the strictest setting — it prevents any async state from leaking between tests.
| from .spiraldb_connector import SpiralDBConnector | ||
| from .sqlite_connector import SQLiteConnector | ||
| from .postgresql_connector import PostgreSQLConnector | ||
| from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol |
There was a problem hiding this comment.
This should not be exported as part of this package -- rather have the protocol sub package just export this protocol
There was a problem hiding this comment.
Removed. AsyncDBConnectorProtocol is no longer exported from orcapod.databases — it lives exclusively in orcapod.protocols. Also added DBConnectorProtocol to protocols/__init__ so both connector protocols are exported from a single place (commit 8a1fdae).
| Returns: | ||
| Dict mapping column name to Arrow DataType. | ||
| """ | ||
| if re.search(r"\bJOIN\b", query, re.IGNORECASE): |
There was a problem hiding this comment.
If there is much shared logic between sync and async version and there is NOT much async used actually, then extract and consolidate
There was a problem hiding this comment.
Extracted the shared parts (commit 8a1fdae):
- SQL strings —
_SQL_TABLE_NAMES,_SQL_PK_COLUMNS,_SQL_COLUMN_INFOare now module-level constants. Sync and async schema methods each reference the same string; no duplication. - Query-parsing logic — the FROM-clause parsing block that was copy-pasted verbatim between
_resolve_column_type_lookupand_async_resolve_column_type_lookupis now a standalone_parse_table_from_query(query) -> str | Nonehelper. Each lookup function is now 3 lines: call the helper, early-return{}onNone, then make the single connector call (sync vs async, using their respective connections).
Sync and async connections are kept separate — the async methods continue to use _async_conn (psycopg3 AsyncConnection) rather than delegating to asyncio.to_thread(sync_method).
There was a problem hiding this comment.
Pull request overview
This PR introduces an async connector interface for relational DB connectors and implements it for the existing PostgreSQL and SQLite connectors, enabling native use in asyncio-based pipelines without requiring callers to manage thread executors themselves.
Changes:
- Added a standalone
AsyncDBConnectorProtocoland exported it viaorcapod.protocolsandorcapod.databases. - Implemented async lifecycle, async schema introspection, and async batch iteration for
PostgreSQLConnector(native psycopg3 async) andSQLiteConnector(wrapping sync methods viaasyncio.to_thread()). - Added unit tests for async methods (SQLite + PostgreSQL) and PostgreSQL async integration tests; updated pytest asyncio loop-scope configuration.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/test_databases/test_sqlite_connector.py | Adds async-method unit tests against an in-memory SQLite database. |
| tests/test_databases/test_postgresql_connector.py | Adds unit tests for PostgreSQL async lifecycle, async schema introspection, and async batch iteration (mocked psycopg). |
| tests/test_databases/test_postgresql_connector_integration.py | Adds PostgreSQL async integration tests for lifecycle, schema introspection, and async batch iteration. |
| superpowers/specs/2026-05-23-async-db-connector-protocol-design.md | Adds an approved design spec describing the async protocol and connector behavior. |
| superpowers/plans/2026-05-23-async-db-connector-protocol.md | Adds an implementation plan documenting approach and testing steps. |
| src/orcapod/protocols/async_db_connector_protocol.py | Introduces AsyncDBConnectorProtocol definition. |
| src/orcapod/protocols/init.py | Re-exports AsyncDBConnectorProtocol from orcapod.protocols. |
| src/orcapod/databases/sqlite_connector.py | Implements async methods using asyncio.to_thread() wrappers over sync behavior. |
| src/orcapod/databases/postgresql_connector.py | Implements async lifecycle, async schema introspection, and async_iter_batches using psycopg3 async APIs. |
| src/orcapod/databases/init.py | Re-exports AsyncDBConnectorProtocol from orcapod.databases. |
| pytest.ini | Sets asyncio_default_fixture_loop_scope = function for pytest-asyncio behavior. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| async def async_close(self) -> None: | ||
| """Close async and sync connections. | ||
|
|
||
| Implementations must be idempotent — calling this multiple times must | ||
| not raise. | ||
| """ | ||
| if self._async_conn is not None: | ||
| await self._async_conn.close() | ||
| self._async_conn = None | ||
| self.close() |
There was a problem hiding this comment.
Fixed. Changed self.close() to await asyncio.to_thread(self.close) so the sync shutdown path (lock acquisition + psycopg3 network disconnect) runs in a worker thread instead of on the event loop. Also added import asyncio to the module imports. Fixed in commit 6734c26.
| self._require_async_open() # guard: raise if context manager not entered | ||
|
|
||
| with self._lock: | ||
| self._require_open() # guard: raise if sync connector is closed | ||
| dsn = self._dsn |
There was a problem hiding this comment.
Fixed. Removed the with self._lock: wrapper. _dsn is assigned once in __init__ and never reassigned, so it's safe to read from any thread without a lock. _require_open() is a simple if self._conn is None null-check — even without the lock the worst-case race is an already-closed connector slipping past the guard, which is a programmer error regardless. Both reads are now lock-free with a comment noting _dsn's immutability. Fixed in commit 6734c26.
Review response — commit 6734c26Fix 1:
|
| from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol | ||
|
|
||
| __all__ = [ | ||
| "AsyncDBConnectorProtocol", | ||
| "DataExecutionLoggerProtocol", | ||
| "ExecutionObserverProtocol", | ||
| ] |
There was a problem hiding this comment.
Fixed. DBConnectorProtocol is now imported and exported from orcapod.protocols.__init__ alongside AsyncDBConnectorProtocol (commit 8a1fdae).
| await cur.close() | ||
| try: | ||
| await read_conn.rollback() | ||
| except Exception: | ||
| pass | ||
| await read_conn.close() | ||
|
|
There was a problem hiding this comment.
Improved. Each cleanup step (cur.close, read_conn.rollback, read_conn.close) is now wrapped in its own try/except Exception block so one failure cannot skip the others (commit 8a1fdae). A comment notes the remaining CancelledError limitation: full asyncio.shield() protection is deferred as out-of-scope for this PR.
Review response — commit 8a1fdaeFix 1:
|
Replace NotImplementedError stubs for async_get_table_names, async_get_pk_columns, and async_get_column_info with real psycopg3 async implementations; add 7 unit tests.
…eanup assertions (PLT-1453)
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
async_close() closes both async and sync connections, so sync schema methods called after 'async with connector:' exits would raise. Move the three get_table_names/get_pk_columns/get_column_info sync calls to before the async context in the integration test suite. Also update the Protocol signature snippet in the spec and plan docs: async_iter_batches is declared as 'def' (not 'async def') in the Protocol body so that async-generator concrete implementations satisfy it statically. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…e first yield The SQLite async_iter_batches section said batches are "yielded lazily" but the implementation collects list(self.iter_batches(...)) in a worker thread before yielding any batch. Update wording to reflect that the full result set is materialised before the first yield and that batch_size does not enable true streaming. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…ter_batches async_close() was calling self.close() directly, which acquires a threading.RLock and performs blocking psycopg3 network I/O on the event loop thread. Fixed by using await asyncio.to_thread(self.close) so the sync shutdown runs in a worker thread. async_iter_batches() was wrapping the _dsn read and _require_open() guard in `with self._lock:`, which could stall the event loop if a sync thread held the lock. _dsn is immutable after __init__ and _require_open() is a simple null-check, so neither needs lock protection from async context. Removed the lock and annotated _dsn read accordingly. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…, fix exports - Extract _SQL_TABLE_NAMES, _SQL_PK_COLUMNS, _SQL_COLUMN_INFO module-level constants; sync and async schema methods now share a single copy of each SQL string (no duplication) - Extract _parse_table_from_query() helper from the two _resolve_column_type_lookup functions, which were identical except for the final DB call; each lookup function is now ~3 lines - Remove AsyncDBConnectorProtocol from orcapod.databases — protocols belong in orcapod.protocols (eywalker review) - Add DBConnectorProtocol to orcapod.protocols.__init__ alongside AsyncDBConnectorProtocol for a consistent export surface (Copilot review) - Wrap each async_iter_batches cleanup step (cur.close, rollback, read_conn.close) in its own try/except so one failure does not prevent the others from running Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
8a1fdae to
9d3ddbc
Compare
Summary
AsyncDBConnectorProtocol— a standalone@runtime_checkableProtocol with 7 async methods (__aenter__,__aexit__,async_close,async_get_table_names,async_get_pk_columns,async_get_column_info,async_iter_batches)PostgreSQLConnector: native async via psycopg3AsyncConnection/AsyncServerCursor; dedicated connection perasync_iter_batchescall for portal isolationSQLiteConnector:asyncio.to_thread()wrappers over existing sync methods (thread-safe:check_same_thread=False+threading.RLockalready in place)SpiralDBConnectordeferred to PLT-1456 (lacks thread-safety prerequisite)Test plan
uv run pytest tests/test_databases/test_postgresql_connector.py -v— PASSuv run pytest tests/test_databases/test_sqlite_connector.py -v— PASSuv run pytest tests/test_databases/test_postgresql_connector_integration.py -m postgres -v— PASS (CI)Closes PLT-1453