From a6be5941e6431bbc59188d254be59c99d798e3d6 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 04:49:48 +0000 Subject: [PATCH 01/19] docs(specs): add async DB connector protocol design spec for PLT-1453 --- ...5-23-async-db-connector-protocol-design.md | 302 ++++++++++++++++++ 1 file changed, 302 insertions(+) create mode 100644 superpowers/specs/2026-05-23-async-db-connector-protocol-design.md diff --git a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md new file mode 100644 index 00000000..78b51cbe --- /dev/null +++ b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md @@ -0,0 +1,302 @@ +# Async DB Connector Protocol Design + +**Date:** 2026-05-23 +**Linear issue:** PLT-1453 +**Status:** Approved + +--- + +## Overview + +`DBConnectorProtocol` currently exposes only synchronous read and write methods. This spec +describes adding a parallel async interface — `AsyncDBConnectorProtocol` — and implementing +it across `PostgreSQLConnector`, `SQLiteConnector`, and `SpiralDBConnector`. + +The async interface enables connectors to be used natively in async pipelines (e.g., +`DynamicSourceProtocol` implementations) without forcing callers to wrap blocking I/O in +thread executors. + +--- + +## Goals + +- Define `AsyncDBConnectorProtocol` as a standalone protocol with 7 async methods. +- Implement all 7 methods in `PostgreSQLConnector` using psycopg3's native async API. +- Implement all 7 methods in `SQLiteConnector` and `SpiralDBConnector` using + `asyncio.to_thread()` wrappers (neither library exposes a native async API). +- Provide unit test coverage for all async paths on all three connectors. +- Provide async integration tests for `PostgreSQLConnector` (container fixture exists). + +--- + +## Out of scope + +- Async write methods (`create_table_if_not_exists`, `upsert_records`) — sync write path + is sufficient for current use cases. +- MySQL connector — a dedicated follow-on issue. +- Removing or deprecating existing sync methods — the sync interface is unchanged. +- Changes to callers of `DBConnectorProtocol` (`DBTableSource`, `ConnectorArrowDatabase`). +- Connection pooling (`psycopg_pool.AsyncConnectionPool`) — not required for the current + use cases and would add a dependency. + +--- + +## Protocol design + +### `AsyncDBConnectorProtocol` — standalone + +A new `@runtime_checkable` Protocol defined in +`src/orcapod/protocols/async_db_connector_protocol.py`. + +**It does not inherit from `DBConnectorProtocol`.** The two protocols are related but +independent structural contracts. A class satisfies both by implementing all their methods; +Python's structural typing handles this naturally. Callers that need both can check +`isinstance(obj, DBConnectorProtocol) and isinstance(obj, AsyncDBConnectorProtocol)`. + +#### Methods + +| Method | Return type | Notes | +|---|---|---| +| `__aenter__()` | `AsyncDBConnectorProtocol` | Opens async resources; returns `self` | +| `__aexit__(*args)` | `None` | Calls `async_close()` | +| `async_close()` | `None` | Full shutdown: closes async + sync connections | +| `async_get_table_names()` | `list[str]` | Async schema introspection | +| `async_get_pk_columns(table_name)` | `list[str]` | Async schema introspection | +| `async_get_column_info(table_name)` | `list[ColumnInfo]` | Async schema introspection | +| `async_iter_batches(query, params, batch_size)` | `AsyncIterator[pa.RecordBatch]` | Async read | + +`async_iter_batches` signature mirrors `iter_batches` exactly: + +```python +async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, +) -> AsyncIterator[pa.RecordBatch]: ... +``` + +#### Lifecycle contract + +The intended usage pattern is the `async with` context manager: + +```python +async with PostgreSQLConnector(dsn) as connector: + async for batch in connector.async_iter_batches('SELECT * FROM "t"'): + process(batch) +``` + +`__aenter__` opens async resources. `__aexit__` calls `async_close()`, which performs a +full shutdown of both async and sync connections. After `__aexit__`, the connector must not +be used further. + +#### Export surface + +`AsyncDBConnectorProtocol` is exported from: +- `orcapod.protocols` (alongside `DBConnectorProtocol`) +- `orcapod.databases` (alongside the connector classes) + +--- + +## `PostgreSQLConnector` implementation + +### New private state + +```python +self._async_conn: Any = None # psycopg.AsyncConnection, set by __aenter__ +``` + +`_async_conn` is set exactly once in `__aenter__` before any async method is callable, so +there is no initialization race. Concurrent async operations on `_async_conn` are serialized +by psycopg3's internal `asyncio.Lock` — no application-level lock is required. + +### `__aenter__` + +Opens a psycopg3 `AsyncConnection` and stores it in `_async_conn`: + +```python +async def __aenter__(self) -> PostgreSQLConnector: + self._async_conn = await psycopg.AsyncConnection.connect( + self._dsn, autocommit=False + ) + return self +``` + +### `_require_async_open()` + +Private guard used by schema introspection methods: + +```python +def _require_async_open(self) -> Any: + if self._async_conn is None: + raise RuntimeError( + "PostgreSQLConnector: enter the async context manager before " + "calling async methods" + ) + return self._async_conn +``` + +### `async_close()` + +Closes `_async_conn` (if open) then calls the sync `close()` for a full shutdown: + +```python +async def async_close(self) -> None: + if self._async_conn is not None: + await self._async_conn.close() + self._async_conn = None + self.close() +``` + +### `__aexit__` + +```python +async def __aexit__(self, *args: Any) -> None: + await self.async_close() +``` + +### Schema introspection (`async_get_table_names`, `async_get_pk_columns`, `async_get_column_info`) + +Use `_async_conn` (obtained via `_require_async_open()`). Structurally identical to their +sync counterparts — same SQL, same result mapping — but using `async with conn.cursor()` +and `await cur.execute()` / `await cur.fetchall()`. + +No application-level lock is needed: psycopg3's `AsyncConnection` carries an internal +`asyncio.Lock` that serializes concurrent async operations on the same connection, so +concurrent calls to these methods (e.g. via `asyncio.gather`) are handled safely by the +driver itself. + +### `async_iter_batches` + +Opens a **dedicated** `AsyncConnection` per call, using psycopg3's `AsyncServerCursor` +for server-side streaming. This mirrors the sync `iter_batches` pattern exactly: +a dedicated connection prevents other operations on `_async_conn` from invalidating the +open cursor portal. + +``` +read_conn = await psycopg.AsyncConnection.connect(self._dsn, autocommit=False) +cursor_name = f"orcapod_{next(self._cursor_seq)}" +cur = read_conn.cursor(name=cursor_name) # AsyncServerCursor +try: + await cur.execute(query, params) + ... fetch batches with await cur.fetchmany(batch_size) ... + yield batch +finally: + await cur.close() + await read_conn.close() +``` + +A module-level coroutine `_async_resolve_column_type_lookup(query, connector)` mirrors the +sync `_resolve_column_type_lookup` but calls `await connector.async_get_column_info(table)` +for Arrow type mapping. + +--- + +## `SQLiteConnector` implementation + +stdlib `sqlite3` is synchronous. All async methods are thin `asyncio.to_thread()` wrappers +over the existing sync methods. + +### `__aenter__` / `__aexit__` + +```python +async def __aenter__(self) -> SQLiteConnector: + return self # sync connection already open in __init__ + +async def __aexit__(self, *args: Any) -> None: + await self.async_close() +``` + +### `async_close` + +```python +async def async_close(self) -> None: + await asyncio.to_thread(self.close) +``` + +### Schema introspection + +```python +async def async_get_table_names(self) -> list[str]: + return await asyncio.to_thread(self.get_table_names) + +async def async_get_pk_columns(self, table_name: str) -> list[str]: + return await asyncio.to_thread(self.get_pk_columns, table_name) + +async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + return await asyncio.to_thread(self.get_column_info, table_name) +``` + +### `async_iter_batches` + +The entire sync iteration runs in the thread pool (blocking I/O stays off the event loop), +then batches are yielded lazily from the async generator: + +```python +async def async_iter_batches(self, query, params=None, batch_size=1000): + batches = await asyncio.to_thread( + lambda: list(self.iter_batches(query, params, batch_size)) + ) + for batch in batches: + yield batch +``` + +--- + +## `SpiralDBConnector` implementation + +pyspiral (v0.11.7) is entirely synchronous — confirmed by walking all submodules; the only +async constructs are internal gRPC pagination utilities in `spiral.grpc_`, not part of the +public API. All async methods follow the same `asyncio.to_thread()` pattern as +`SQLiteConnector`. + +`__aenter__`, `__aexit__`, `async_close`, `async_get_table_names`, `async_get_pk_columns`, +`async_get_column_info`, and `async_iter_batches` are implemented identically to +`SQLiteConnector` in structure, wrapping their sync counterparts. + +--- + +## Testing + +### Unit tests + +Added to each connector's existing test file as a new `TestAsyncMethods` class. + +| Connector | Approach | +|---|---| +| `PostgreSQLConnector` | Mock `psycopg.AsyncConnection` and `AsyncServerCursor`; verify `__aenter__` sets `_async_conn`, `_require_async_open` guards, `async_iter_batches` opens a dedicated connection per call, `async_close` tears down async then sync | +| `SQLiteConnector` | Real in-memory `:memory:` DB (no mocks needed); verify each async method returns the same results as its sync counterpart | +| `SpiralDBConnector` | Mock `spiral` module (existing pattern); verify `asyncio.to_thread` is called for each method | + +All async test methods use `@pytest.mark.asyncio`. + +### Integration tests + +Added to `test_postgresql_connector_integration.py`, marked `@pytest.mark.postgres`: + +- `TestAsyncLifecycle` — `__aenter__`/`__aexit__` open and close correctly; `async_close` + is idempotent; `_require_async_open` raises outside context manager +- `TestAsyncSchemaIntrospection` — `async_get_table_names`, `async_get_pk_columns`, + `async_get_column_info` return correct results against a live PostgreSQL instance +- `TestAsyncIterBatches` — correct rows, correct Arrow types, batch size respected, + empty result, early generator abandonment closes server-side cursor + +No SQLite integration tests (unit tests cover the full path with a real `:memory:` DB). +No SpiralDB integration tests (no container fixture). + +--- + +## File changes summary + +| File | Change | +|---|---| +| `src/orcapod/protocols/async_db_connector_protocol.py` | **New** — `AsyncDBConnectorProtocol` | +| `src/orcapod/protocols/__init__.py` | Export `AsyncDBConnectorProtocol` | +| `src/orcapod/databases/__init__.py` | Export `AsyncDBConnectorProtocol` | +| `src/orcapod/databases/postgresql_connector.py` | Add `_async_conn`, `__aenter__`, `__aexit__`, `async_close`, `_require_async_open`, async schema methods, `async_iter_batches`, `_async_resolve_column_type_lookup` | +| `src/orcapod/databases/sqlite_connector.py` | Add `__aenter__`, `__aexit__`, `async_close`, async schema methods, `async_iter_batches` | +| `src/orcapod/databases/spiraldb_connector.py` | Add `__aenter__`, `__aexit__`, `async_close`, async schema methods, `async_iter_batches` | +| `tests/test_databases/test_postgresql_connector.py` | Add `TestAsyncMethods` | +| `tests/test_databases/test_sqlite_connector.py` | Add `TestAsyncMethods` | +| `tests/test_databases/test_spiraldb_connector.py` | Add `TestAsyncMethods` | +| `tests/test_databases/test_postgresql_connector_integration.py` | Add `TestAsyncLifecycle`, `TestAsyncSchemaIntrospection`, `TestAsyncIterBatches` | From e48eef7c9c29f4a926427a21dd5b544b2614366e Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Sat, 23 May 2026 05:02:45 +0000 Subject: [PATCH 02/19] docs(specs): defer SpiralDB async to PLT-1456; narrow PLT-1453 scope --- ...5-23-async-db-connector-protocol-design.md | 29 ++++++------------- 1 file changed, 9 insertions(+), 20 deletions(-) diff --git a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md index 78b51cbe..85af6142 100644 --- a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md +++ b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md @@ -22,9 +22,9 @@ thread executors. - Define `AsyncDBConnectorProtocol` as a standalone protocol with 7 async methods. - Implement all 7 methods in `PostgreSQLConnector` using psycopg3's native async API. -- Implement all 7 methods in `SQLiteConnector` and `SpiralDBConnector` using - `asyncio.to_thread()` wrappers (neither library exposes a native async API). -- Provide unit test coverage for all async paths on all three connectors. +- Implement all 7 methods in `SQLiteConnector` using `asyncio.to_thread()` wrappers + (stdlib `sqlite3` is synchronous). +- Provide unit test coverage for all async paths on both connectors. - Provide async integration tests for `PostgreSQLConnector` (container fixture exists). --- @@ -38,6 +38,10 @@ thread executors. - Changes to callers of `DBConnectorProtocol` (`DBTableSource`, `ConnectorArrowDatabase`). - Connection pooling (`psycopg_pool.AsyncConnectionPool`) — not required for the current use cases and would add a dependency. +- `SpiralDBConnector` async implementation — deferred to PLT-1456. `SpiralDBConnector` + has no internal thread synchronization, making `asyncio.to_thread()` wrappers unsafe + under concurrent async use. PLT-1456 will first add a `threading.Lock` to + `SpiralDBConnector` and then implement the async interface. --- @@ -243,19 +247,6 @@ async def async_iter_batches(self, query, params=None, batch_size=1000): --- -## `SpiralDBConnector` implementation - -pyspiral (v0.11.7) is entirely synchronous — confirmed by walking all submodules; the only -async constructs are internal gRPC pagination utilities in `spiral.grpc_`, not part of the -public API. All async methods follow the same `asyncio.to_thread()` pattern as -`SQLiteConnector`. - -`__aenter__`, `__aexit__`, `async_close`, `async_get_table_names`, `async_get_pk_columns`, -`async_get_column_info`, and `async_iter_batches` are implemented identically to -`SQLiteConnector` in structure, wrapping their sync counterparts. - ---- - ## Testing ### Unit tests @@ -266,7 +257,6 @@ Added to each connector's existing test file as a new `TestAsyncMethods` class. |---|---| | `PostgreSQLConnector` | Mock `psycopg.AsyncConnection` and `AsyncServerCursor`; verify `__aenter__` sets `_async_conn`, `_require_async_open` guards, `async_iter_batches` opens a dedicated connection per call, `async_close` tears down async then sync | | `SQLiteConnector` | Real in-memory `:memory:` DB (no mocks needed); verify each async method returns the same results as its sync counterpart | -| `SpiralDBConnector` | Mock `spiral` module (existing pattern); verify `asyncio.to_thread` is called for each method | All async test methods use `@pytest.mark.asyncio`. @@ -282,7 +272,7 @@ Added to `test_postgresql_connector_integration.py`, marked `@pytest.mark.postgr empty result, early generator abandonment closes server-side cursor No SQLite integration tests (unit tests cover the full path with a real `:memory:` DB). -No SpiralDB integration tests (no container fixture). +`SpiralDBConnector` async implementation and tests are deferred to PLT-1456. --- @@ -295,8 +285,7 @@ No SpiralDB integration tests (no container fixture). | `src/orcapod/databases/__init__.py` | Export `AsyncDBConnectorProtocol` | | `src/orcapod/databases/postgresql_connector.py` | Add `_async_conn`, `__aenter__`, `__aexit__`, `async_close`, `_require_async_open`, async schema methods, `async_iter_batches`, `_async_resolve_column_type_lookup` | | `src/orcapod/databases/sqlite_connector.py` | Add `__aenter__`, `__aexit__`, `async_close`, async schema methods, `async_iter_batches` | -| `src/orcapod/databases/spiraldb_connector.py` | Add `__aenter__`, `__aexit__`, `async_close`, async schema methods, `async_iter_batches` | +| `src/orcapod/databases/spiraldb_connector.py` | No changes — deferred to PLT-1456 | | `tests/test_databases/test_postgresql_connector.py` | Add `TestAsyncMethods` | | `tests/test_databases/test_sqlite_connector.py` | Add `TestAsyncMethods` | -| `tests/test_databases/test_spiraldb_connector.py` | Add `TestAsyncMethods` | | `tests/test_databases/test_postgresql_connector_integration.py` | Add `TestAsyncLifecycle`, `TestAsyncSchemaIntrospection`, `TestAsyncIterBatches` | From 05cfd72b4aa6718790e93e33c68262ec54af9eb4 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:23:43 +0000 Subject: [PATCH 03/19] docs(plans): add async DB connector implementation plan (PLT-1453) --- .../2026-05-23-async-db-connector-protocol.md | 1350 +++++++++++++++++ 1 file changed, 1350 insertions(+) create mode 100644 superpowers/plans/2026-05-23-async-db-connector-protocol.md diff --git a/superpowers/plans/2026-05-23-async-db-connector-protocol.md b/superpowers/plans/2026-05-23-async-db-connector-protocol.md new file mode 100644 index 00000000..bbaddfe5 --- /dev/null +++ b/superpowers/plans/2026-05-23-async-db-connector-protocol.md @@ -0,0 +1,1350 @@ +# Async DB Connector Protocol Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use sensei:subagent-driven-development (recommended) or sensei:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add `AsyncDBConnectorProtocol` and implement its 7 async methods on `PostgreSQLConnector` and `SQLiteConnector`. + +**Architecture:** A standalone `@runtime_checkable` Protocol defines the async interface independently from `DBConnectorProtocol`. `PostgreSQLConnector` uses psycopg3's native async API (`AsyncConnection`, `AsyncServerCursor`); `SQLiteConnector` wraps its existing sync methods in `asyncio.to_thread()`. The async lifecycle is managed via `__aenter__`/`__aexit__`. + +**Tech Stack:** Python 3.12, psycopg3 (`psycopg[binary]>=3.0`), stdlib `asyncio`, `pyarrow>=20`, `pytest-asyncio>=1.3.0` + +--- + +## Prerequisites + +- [ ] **Check out the feature branch** + +```bash +git checkout -b eywalker/plt-1453-add-async-methods-to-dbconnectorprotocol-implement-across +``` + +--- + +## File map + +| File | Action | Purpose | +|---|---|---| +| `src/orcapod/protocols/async_db_connector_protocol.py` | Create | `AsyncDBConnectorProtocol` definition | +| `src/orcapod/protocols/__init__.py` | Modify | Export `AsyncDBConnectorProtocol` | +| `src/orcapod/databases/__init__.py` | Modify | Export `AsyncDBConnectorProtocol` | +| `src/orcapod/databases/postgresql_connector.py` | Modify | Async lifecycle, schema, iter_batches | +| `src/orcapod/databases/sqlite_connector.py` | Modify | asyncio.to_thread() wrappers | +| `tests/test_databases/test_postgresql_connector.py` | Modify | `TestAsyncMethods` unit tests | +| `tests/test_databases/test_sqlite_connector.py` | Modify | `TestAsyncMethods` unit tests | +| `tests/test_databases/test_postgresql_connector_integration.py` | Modify | Async integration tests | + +--- + +## Task 1: Define `AsyncDBConnectorProtocol` and wire up exports + +**Files:** +- Create: `src/orcapod/protocols/async_db_connector_protocol.py` +- Modify: `src/orcapod/protocols/__init__.py` +- Modify: `src/orcapod/databases/__init__.py` + +- [ ] **Step 1: Create the protocol file** + +```python +# src/orcapod/protocols/async_db_connector_protocol.py +"""AsyncDBConnectorProtocol — async counterpart to DBConnectorProtocol. + +Standalone protocol (does not inherit from DBConnectorProtocol). A class +satisfies both protocols by implementing all their methods. + +Intended use:: + + async with PostgreSQLConnector(dsn) as connector: + tables = await connector.async_get_table_names() + async for batch in connector.async_iter_batches('SELECT * FROM "t"'): + process(batch) +""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any, Protocol, TYPE_CHECKING, runtime_checkable + +from orcapod.types import ColumnInfo + +if TYPE_CHECKING: + import pyarrow as pa + +__all__ = ["AsyncDBConnectorProtocol"] + + +@runtime_checkable +class AsyncDBConnectorProtocol(Protocol): + """Async interface for an external relational database backend. + + Standalone protocol — does not inherit from ``DBConnectorProtocol``. A + connector class satisfies both protocols by implementing all their methods. + + Lifecycle: use ``async with connector:`` to open async resources. + ``__aexit__`` calls ``async_close()``, which performs a full shutdown of + both async and sync connections. + + Example:: + + async with PostgreSQLConnector(dsn) as connector: + tables = await connector.async_get_table_names() + async for batch in connector.async_iter_batches('SELECT * FROM "t"'): + process(batch) + """ + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + async def __aenter__(self) -> AsyncDBConnectorProtocol: + """Open async resources and return self.""" + ... + + async def __aexit__(self, *args: Any) -> None: + """Close all resources by calling ``async_close()``.""" + ... + + async def async_close(self) -> None: + """Release all async and sync database resources. Idempotent.""" + ... + + # ── Schema introspection ────────────────────────────────────────────────── + + async def async_get_table_names(self) -> list[str]: + """Return all available table names in this database.""" + ... + + async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names for a table, in key-sequence order. + + Returns an empty list if the table has no primary key. + """ + ... + + async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata for a table, with types mapped to Arrow.""" + ... + + # ── Read ────────────────────────────────────────────────────────────────── + + async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, + ) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Args: + query: SQL query string. Table names should be double-quoted + (``SELECT * FROM "my_table"``). + params: Optional query parameters (connector-specific format). + batch_size: Maximum rows per yielded batch. + """ + ... +``` + +- [ ] **Step 2: Export from `orcapod.protocols`** + +Add one import to `src/orcapod/protocols/__init__.py`: + +```python +from orcapod.protocols.observability_protocols import ( + ExecutionObserverProtocol, + DataExecutionLoggerProtocol, +) +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol +``` + +- [ ] **Step 3: Export from `orcapod.databases`** + +In `src/orcapod/databases/__init__.py`, add the import and update `__all__`: + +```python +from .connector_arrow_database import ConnectorArrowDatabase +from .delta_lake_databases import DeltaTableDatabase +from .in_memory_databases import InMemoryArrowDatabase +from .noop_database import NoOpArrowDatabase +from .spiraldb_connector import SpiralDBConnector +from .sqlite_connector import SQLiteConnector +from .postgresql_connector import PostgreSQLConnector +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol + +__all__ = [ + "AsyncDBConnectorProtocol", + "ConnectorArrowDatabase", + "DeltaTableDatabase", + "InMemoryArrowDatabase", + "NoOpArrowDatabase", + "SpiralDBConnector", + "SQLiteConnector", + "PostgreSQLConnector", +] + +# Relational DB connector implementations satisfy DBConnectorProtocol +# (orcapod.protocols.db_connector_protocol) and can be passed to either +# ConnectorArrowDatabase (read+write ArrowDatabaseProtocol) or +# DBTableSource (read-only Source): +# +# SQLiteConnector -- PLT-1076 (stdlib sqlite3, zero extra deps) ✓ +# PostgreSQLConnector -- PLT-1075 (psycopg3) ✓ +# SpiralDBConnector -- PLT-1074 +# +# Connectors that also implement AsyncDBConnectorProtocol: +# +# PostgreSQLConnector -- native psycopg3 async (PLT-1453) ✓ +# SQLiteConnector -- asyncio.to_thread() wrappers (PLT-1453) ✓ +# SpiralDBConnector -- deferred to PLT-1456 +# +# ArrowDatabaseProtocol backends (existing, not connector-based): +# +# DeltaTableDatabase -- Delta Lake (deltalake package) +# InMemoryArrowDatabase -- pure in-memory, for tests +# NoOpArrowDatabase -- no-op, for dry-runs / benchmarks +``` + +- [ ] **Step 4: Verify imports work** + +```bash +uv run python -c " +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol +from orcapod.protocols import AsyncDBConnectorProtocol as P2 +from orcapod.databases import AsyncDBConnectorProtocol as P3 +print('OK', AsyncDBConnectorProtocol, P2, P3) +" +``` + +Expected: prints `OK` with the class three times. No import errors. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/protocols/async_db_connector_protocol.py \ + src/orcapod/protocols/__init__.py \ + src/orcapod/databases/__init__.py +git commit -m "feat(protocols): add AsyncDBConnectorProtocol (PLT-1453)" +``` + +--- + +## Task 2: `PostgreSQLConnector` — async lifecycle + +**Files:** +- Modify: `src/orcapod/databases/postgresql_connector.py` +- Modify: `tests/test_databases/test_postgresql_connector.py` + +- [ ] **Step 1: Write the failing tests** + +Add this class to the end of `tests/test_databases/test_postgresql_connector.py`. +Also add `AsyncMock` to the existing import line at the top: +```python +from unittest.mock import AsyncMock, MagicMock, patch +``` + +And add to the imports: +```python +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol +``` + +Then add the test class: + +```python +class TestAsyncLifecycle: + """Unit tests for PostgreSQLConnector async lifecycle methods.""" + + def _make_connector(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + return PostgreSQLConnector("postgresql://localhost/test") + + def test_isinstance_async_protocol(self) -> None: + connector = self._make_connector() + assert isinstance(connector, AsyncDBConnectorProtocol) + connector._conn = None + + @pytest.mark.asyncio + async def test_aenter_sets_async_conn(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_async_conn): + result = await connector.__aenter__() + assert result is connector + assert connector._async_conn is mock_async_conn + connector._conn = None + + @pytest.mark.asyncio + async def test_aexit_closes_async_and_sync(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + connector._async_conn = mock_async_conn + await connector.__aexit__(None, None, None) + mock_async_conn.close.assert_called_once() + assert connector._async_conn is None + assert connector._conn is None # sync also closed + + @pytest.mark.asyncio + async def test_async_close_idempotent(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + connector._async_conn = mock_async_conn + await connector.async_close() + await connector.async_close() # must not raise + mock_async_conn.close.assert_called_once() + + def test_require_async_open_raises_before_aenter(self) -> None: + connector = self._make_connector() + with pytest.raises(RuntimeError, match="async context manager"): + connector._require_async_open() + connector._conn = None + + def test_require_async_open_returns_conn_after_aenter(self) -> None: + connector = self._make_connector() + mock_async_conn = MagicMock() + connector._async_conn = mock_async_conn + result = connector._require_async_open() + assert result is mock_async_conn + connector._conn = None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncLifecycle -v +``` + +Expected: `ERRORS` or `FAILED` — `AsyncDBConnectorProtocol` import may pass but +`_require_async_open`, `_async_conn`, `__aenter__`, `__aexit__`, `async_close` don't +exist yet on `PostgreSQLConnector`. + +- [ ] **Step 3: Implement async lifecycle in `PostgreSQLConnector`** + +In `src/orcapod/databases/postgresql_connector.py`: + +**a)** Add `_async_conn` to `__init__`. The existing `__init__` ends at line 233: + +```python +def __init__(self, dsn: str) -> None: + self._dsn = dsn + self._conn: Any = psycopg.connect(dsn, autocommit=False) + self._lock = threading.RLock() # RLock required: iter_batches → _resolve_column_type_lookup → get_column_info re-enters the lock + self._cursor_seq = itertools.count() + self._async_conn: Any = None # psycopg.AsyncConnection; set by __aenter__ +``` + +**b)** Add `_require_async_open` after the existing `_require_open` method: + +```python +def _require_async_open(self) -> Any: + """Return the open async connection or raise RuntimeError if not entered. + + Raises: + RuntimeError: If ``__aenter__`` has not been called. + """ + if self._async_conn is None: + raise RuntimeError( + "PostgreSQLConnector: enter the async context manager before " + "calling async methods" + ) + return self._async_conn +``` + +**c)** Add async lifecycle methods in a new `# ── Async lifecycle ──` section, +placed after the existing `# ── Lifecycle ──` section (after `__exit__`): + +```python +# ── Async lifecycle ─────────────────────────────────────────────────────── + +async def __aenter__(self) -> PostgreSQLConnector: + """Open an async psycopg3 connection and return self. + + The async connection is used by ``async_get_table_names``, + ``async_get_pk_columns``, and ``async_get_column_info``. + ``async_iter_batches`` opens its own dedicated connection per call. + """ + self._async_conn = await psycopg.AsyncConnection.connect( + self._dsn, autocommit=False + ) + return self + +async def __aexit__(self, *args: Any) -> None: + await self.async_close() + +async def async_close(self) -> None: + """Close async and sync connections. Idempotent.""" + if self._async_conn is not None: + await self._async_conn.close() + self._async_conn = None + self.close() +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncLifecycle -v +``` + +Expected: all 6 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/postgresql_connector.py \ + tests/test_databases/test_postgresql_connector.py +git commit -m "feat(postgresql): add async lifecycle methods (PLT-1453)" +``` + +--- + +## Task 3: `PostgreSQLConnector` — async schema introspection + +**Files:** +- Modify: `src/orcapod/databases/postgresql_connector.py` +- Modify: `tests/test_databases/test_postgresql_connector.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_postgresql_connector.py`: + +```python +class TestAsyncSchemaIntrospectionUnit: + """Unit tests for async schema introspection — mocked cursor.""" + + def _make_connector_with_async_conn(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + connector._async_conn = AsyncMock() + return connector + + def _mock_cursor_returning(self, connector: PostgreSQLConnector, + rows: list) -> AsyncMock: + """Wire up connector._async_conn.cursor() to return rows from fetchall.""" + mock_cursor = AsyncMock() + mock_cursor.fetchall.return_value = rows + mock_cm = MagicMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_cursor) + mock_cm.__aexit__ = AsyncMock(return_value=False) + connector._async_conn.cursor.return_value = mock_cm + return mock_cursor + + @pytest.mark.asyncio + async def test_async_get_table_names(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [("alpha",), ("beta",)]) + result = await connector.async_get_table_names() + assert result == ["alpha", "beta"] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_single(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [("id",)]) + result = await connector.async_get_pk_columns("my_table") + assert result == ["id"] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_empty(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, []) + result = await connector.async_get_pk_columns("my_table") + assert result == [] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_raises_on_bad_name(self) -> None: + connector = self._make_connector_with_async_conn() + with pytest.raises(ValueError, match="double-quote"): + await connector.async_get_pk_columns('bad"name') + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_column_info(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [ + ("id", "text", "text", "NO"), + ("val", "double precision", "float8", "YES"), + ]) + result = await connector.async_get_column_info("my_table") + assert len(result) == 2 + assert result[0].name == "id" + assert result[0].arrow_type == pa.large_string() + assert result[0].nullable is False + assert result[1].name == "val" + assert result[1].arrow_type == pa.float64() + assert result[1].nullable is True + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_column_info_raises_on_bad_name(self) -> None: + connector = self._make_connector_with_async_conn() + with pytest.raises(ValueError, match="double-quote"): + await connector.async_get_column_info('bad"name') + connector._conn = None + + @pytest.mark.asyncio + async def test_schema_methods_raise_without_aenter(self) -> None: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # _async_conn is None — all methods should raise + with pytest.raises(RuntimeError, match="async context manager"): + await connector.async_get_table_names() + connector._conn = None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncSchemaIntrospectionUnit -v +``` + +Expected: `FAILED` — methods `async_get_table_names`, `async_get_pk_columns`, +`async_get_column_info` don't exist yet. + +- [ ] **Step 3: Implement async schema introspection in `postgresql_connector.py`** + +Add a new `# ── Async schema introspection ──` section immediately after the +`# ── Schema introspection ──` section: + +```python +# ── Async schema introspection ──────────────────────────────────────────── + +async def async_get_table_names(self) -> list[str]: + """Return all user table names in this database (sorted, excludes views).""" + conn = self._require_async_open() + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = current_schema() + AND table_type = 'BASE TABLE' + ORDER BY table_name + """ + ) + return [row[0] for row in await cur.fetchall()] + +async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names in key-sequence order.""" + conn = self._require_async_open() + self._validate_table_name(table_name) + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT kcu.column_name + FROM information_schema.key_column_usage kcu + JOIN information_schema.table_constraints tc + ON kcu.constraint_name = tc.constraint_name + AND kcu.table_schema = tc.table_schema + AND kcu.table_name = tc.table_name + WHERE tc.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = current_schema() + AND kcu.table_name = %s + ORDER BY kcu.ordinal_position + """, + (table_name,), + ) + return [row[0] for row in await cur.fetchall()] + +async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata with Arrow-mapped types.""" + conn = self._require_async_open() + self._validate_table_name(table_name) + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT column_name, data_type, udt_name, is_nullable + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = %s + ORDER BY ordinal_position + """, + (table_name,), + ) + return [ + ColumnInfo( + name=row[0], + arrow_type=_pg_type_to_arrow(row[1], row[2]), + nullable=(row[3].upper() == "YES"), + ) + for row in await cur.fetchall() + ] +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncSchemaIntrospectionUnit -v +``` + +Expected: all 7 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/orcapod/databases/postgresql_connector.py \ + tests/test_databases/test_postgresql_connector.py +git commit -m "feat(postgresql): add async schema introspection methods (PLT-1453)" +``` + +--- + +## Task 4: `PostgreSQLConnector` — `async_iter_batches` + +**Files:** +- Modify: `src/orcapod/databases/postgresql_connector.py` +- Modify: `tests/test_databases/test_postgresql_connector.py` + +- [ ] **Step 1: Write failing tests** + +Add this class to `tests/test_databases/test_postgresql_connector.py`: + +```python +class TestAsyncIterBatchesUnit: + """Unit tests for async_iter_batches — mocked psycopg async connection.""" + + def _make_connector(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # Simulate entered context + connector._async_conn = AsyncMock() + return connector + + def _make_mock_read_conn(self, rows_per_call: list[list]) -> tuple[AsyncMock, AsyncMock]: + """Return (mock_read_conn, mock_cursor). + + rows_per_call: list of row lists returned by successive fetchmany calls. + The last entry must be an empty list to signal end-of-results. + """ + mock_cursor = AsyncMock() + col = MagicMock() + col.name = "id" + mock_cursor.description = [col] + mock_cursor.fetchmany = AsyncMock(side_effect=rows_per_call) + + mock_read_conn = AsyncMock() + mock_read_conn.cursor.return_value = mock_cursor + + return mock_read_conn, mock_cursor + + @pytest.mark.asyncio + async def test_yields_correct_row_count(self) -> None: + connector = self._make_connector() + mock_read_conn, _ = self._make_mock_read_conn( + [[(1,), (2,), (3,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"' + )] + assert sum(b.num_rows for b in batches) == 3 + connector._conn = None + + @pytest.mark.asyncio + async def test_multiple_batches(self) -> None: + connector = self._make_connector() + mock_read_conn, _ = self._make_mock_read_conn( + [[(1,), (2,)], [(3,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"', batch_size=2 + )] + assert len(batches) == 2 + assert batches[0].num_rows == 2 + assert batches[1].num_rows == 1 + connector._conn = None + + @pytest.mark.asyncio + async def test_empty_result(self) -> None: + connector = self._make_connector() + mock_cursor = AsyncMock() + mock_cursor.description = None + mock_read_conn = AsyncMock() + mock_read_conn.cursor.return_value = mock_cursor + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"' + )] + assert batches == [] + connector._conn = None + + @pytest.mark.asyncio + async def test_cursor_closed_on_completion(self) -> None: + connector = self._make_connector() + mock_read_conn, mock_cursor = self._make_mock_read_conn( + [[(1,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + _ = [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + mock_cursor.close.assert_called_once() + mock_read_conn.close.assert_called_once() + connector._conn = None + + @pytest.mark.asyncio + async def test_cursor_closed_on_early_abandonment(self) -> None: + connector = self._make_connector() + mock_read_conn, mock_cursor = self._make_mock_read_conn( + [[(1,)], [(2,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + gen = connector.async_iter_batches('SELECT * FROM "t"', batch_size=1) + await gen.__anext__() # consume one batch + await gen.aclose() # abandon mid-stream + mock_cursor.close.assert_called_once() + mock_read_conn.close.assert_called_once() + connector._conn = None + + @pytest.mark.asyncio + async def test_raises_without_aenter(self) -> None: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # _async_conn is None + with pytest.raises(RuntimeError, match="async context manager"): + async for _ in connector.async_iter_batches('SELECT * FROM "t"'): + pass + connector._conn = None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncIterBatchesUnit -v +``` + +Expected: `FAILED` — `async_iter_batches` not defined yet. + +- [ ] **Step 3: Implement `_async_resolve_column_type_lookup` and `async_iter_batches`** + +**a)** Add the module-level coroutine after `_resolve_column_type_lookup` in +`src/orcapod/databases/postgresql_connector.py`: + +```python +async def _async_resolve_column_type_lookup( + query: str, + connector: "PostgreSQLConnector", +) -> dict[str, pa.DataType]: + """Async version of ``_resolve_column_type_lookup``. + + Parses the FROM clause of query to find the source table, then returns a + column-name → Arrow-type dict by calling ``async_get_column_info``. + + Returns an empty dict if no single unambiguous table can be identified, + causing ``async_iter_batches`` to fall back to ``pa.large_string()`` for + all columns. + + Args: + query: SQL query string. + connector: The connector to call ``async_get_column_info`` on. + + Returns: + Dict mapping column name to Arrow DataType. + """ + if re.search(r"\bJOIN\b", query, re.IGNORECASE): + return {} + + from_pattern = re.compile( + r'\bFROM\b\s+(?:"([^"]+)"|(\w+))', + re.IGNORECASE, + ) + from_matches = list(from_pattern.finditer(query)) + if len(from_matches) != 1: + return {} + + match = from_matches[0] + table_name = match.group(1) or match.group(2) + + from_tail = query[match.end():] + clause_boundary = re.search( + r"\b(WHERE|GROUP\s+BY|ORDER\s+BY|LIMIT|OFFSET|HAVING|UNION|EXCEPT|INTERSECT)\b", + from_tail, + re.IGNORECASE, + ) + if clause_boundary: + from_tail = from_tail[: clause_boundary.start()] + + if "," in from_tail: + return {} + + return {ci.name: ci.arrow_type for ci in await connector.async_get_column_info(table_name)} +``` + +**b)** Add `async_iter_batches` inside `PostgreSQLConnector` in the new +`# ── Async read ──` section, placed after the async schema introspection section: + +```python +# ── Async read ──────────────────────────────────────────────────────────── + +async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, +) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Opens a dedicated ``AsyncConnection`` per call so that the server-side + cursor portal is isolated from operations on the shared ``_async_conn``. + + Args: + query: SQL query string. Table names should be double-quoted. + params: Optional query parameters. + batch_size: Maximum rows per yielded batch. + + Raises: + RuntimeError: If the async context manager has not been entered. + """ + import pyarrow as _pa + from collections.abc import AsyncIterator as _AsyncIterator # noqa: F401 + + self._require_async_open() # guard: raise if context manager not entered + + # Briefly acquire the sync lock only to read self._dsn safely. + with self._lock: + self._require_open() # guard: raise if sync connector is closed + dsn = self._dsn + + read_conn = await psycopg.AsyncConnection.connect(dsn, autocommit=False) + cursor_name = f"orcapod_{next(self._cursor_seq)}" + cur = read_conn.cursor(name=cursor_name) + + try: + await cur.execute(query, params) + if cur.description is None: + return + col_names = [d.name for d in cur.description] + type_lookup = await _async_resolve_column_type_lookup(query, self) + arrow_types = [type_lookup.get(n, _pa.large_string()) for n in col_names] + schema = _pa.schema( + [_pa.field(n, t) for n, t in zip(col_names, arrow_types)] + ) + rows = await cur.fetchmany(batch_size) + + while rows: + arrays = [ + _pa.array([r[i] for r in rows], type=t) + for i, t in enumerate(arrow_types) + ] + yield _pa.RecordBatch.from_arrays(arrays, schema=schema) + rows = await cur.fetchmany(batch_size) + finally: + await cur.close() + try: + await read_conn.rollback() + except Exception: + pass + await read_conn.close() +``` + +Also add `AsyncIterator` to the `collections.abc` import at the top of the file: + +```python +from collections.abc import Iterator, AsyncIterator +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py::TestAsyncIterBatchesUnit -v +``` + +Expected: all 6 tests PASS. + +- [ ] **Step 5: Run all PostgreSQL unit tests** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector.py -v +``` + +Expected: all tests PASS (existing + new). + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/databases/postgresql_connector.py \ + tests/test_databases/test_postgresql_connector.py +git commit -m "feat(postgresql): add async_iter_batches (PLT-1453)" +``` + +--- + +## Task 5: `SQLiteConnector` — all async methods + +**Files:** +- Modify: `src/orcapod/databases/sqlite_connector.py` +- Modify: `tests/test_databases/test_sqlite_connector.py` + +- [ ] **Step 1: Write failing tests** + +Add the following class to the end of `tests/test_databases/test_sqlite_connector.py`. +Also add this import at the top of the file: + +```python +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol +``` + +Then the test class: + +```python +class TestAsyncMethods: + """Async method tests using a real in-memory SQLite DB — no mocks needed.""" + + def _setup(self) -> SQLiteConnector: + """Return a connector with a simple table pre-populated.""" + connector = SQLiteConnector(":memory:") + connector._conn.execute( + 'CREATE TABLE "t" (id INTEGER PRIMARY KEY, val TEXT NOT NULL)' + ) + connector._conn.execute( + 'INSERT INTO "t" VALUES (1, "a"), (2, "b"), (3, "c")' + ) + return connector + + def test_isinstance_async_protocol(self) -> None: + connector = SQLiteConnector(":memory:") + assert isinstance(connector, AsyncDBConnectorProtocol) + + @pytest.mark.asyncio + async def test_aenter_returns_self(self) -> None: + connector = SQLiteConnector(":memory:") + result = await connector.__aenter__() + assert result is connector + await connector.async_close() + + @pytest.mark.asyncio + async def test_aexit_closes_connection(self) -> None: + connector = SQLiteConnector(":memory:") + await connector.__aenter__() + await connector.__aexit__(None, None, None) + with pytest.raises(RuntimeError, match="closed"): + connector._require_open() + + @pytest.mark.asyncio + async def test_async_close_idempotent(self) -> None: + connector = SQLiteConnector(":memory:") + await connector.async_close() + await connector.async_close() # must not raise + + @pytest.mark.asyncio + async def test_async_context_manager(self) -> None: + async with SQLiteConnector(":memory:") as connector: + assert connector._conn is not None + with pytest.raises(RuntimeError, match="closed"): + connector._require_open() + + @pytest.mark.asyncio + async def test_async_get_table_names_matches_sync(self) -> None: + connector = self._setup() + assert await connector.async_get_table_names() == connector.get_table_names() + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_get_pk_columns_matches_sync(self) -> None: + connector = self._setup() + assert ( + await connector.async_get_pk_columns("t") == connector.get_pk_columns("t") + ) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_get_column_info_matches_sync(self) -> None: + connector = self._setup() + assert ( + await connector.async_get_column_info("t") + == connector.get_column_info("t") + ) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_returns_correct_rows(self) -> None: + connector = self._setup() + batches = [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + total_rows = sum(b.num_rows for b in batches) + assert total_rows == 3 + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_matches_sync_data(self) -> None: + import pyarrow as pa + connector = self._setup() + + sync_table = pa.Table.from_batches( + list(connector.iter_batches('SELECT * FROM "t"')) + ) + async_table = pa.Table.from_batches( + [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + ) + assert async_table.equals(sync_table) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_empty_result(self) -> None: + connector = self._setup() + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "t" WHERE 1=0' + ) + ] + assert batches == [] + await connector.async_close() +``` + +- [ ] **Step 2: Run tests to verify they fail** + +```bash +uv run pytest tests/test_databases/test_sqlite_connector.py::TestAsyncMethods -v +``` + +Expected: `FAILED` — `async_get_table_names`, `async_iter_batches`, etc. don't exist yet; +`__aenter__`/`__aexit__` don't exist either. + +- [ ] **Step 3: Implement all async methods in `sqlite_connector.py`** + +Add `import asyncio` to the imports at the top of `src/orcapod/databases/sqlite_connector.py` +(alongside the existing `import threading`): + +```python +import asyncio +import logging +import os +import sqlite3 +import threading +``` + +Also add `AsyncIterator` to the `collections.abc` import: + +```python +from collections.abc import AsyncIterator, Iterator +``` + +Then add a new `# ── Async lifecycle ──` section after the existing `# ── Lifecycle ──` +section (after `__exit__`), and a `# ── Async read ──` section after `# ── Write ──`: + +```python +# ── Async lifecycle ─────────────────────────────────────────────────────── + +async def __aenter__(self) -> SQLiteConnector: + """Return self; the sync connection is already open from ``__init__``.""" + return self + +async def __aexit__(self, *args: Any) -> None: + await self.async_close() + +async def async_close(self) -> None: + """Close the database connection. Idempotent.""" + await asyncio.to_thread(self.close) + +# ── Async schema introspection ──────────────────────────────────────────── + +async def async_get_table_names(self) -> list[str]: + """Return all user table names asynchronously.""" + return await asyncio.to_thread(self.get_table_names) + +async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names asynchronously.""" + return await asyncio.to_thread(self.get_pk_columns, table_name) + +async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata asynchronously.""" + return await asyncio.to_thread(self.get_column_info, table_name) + +# ── Async read ──────────────────────────────────────────────────────────── + +async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, +) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Runs the full synchronous iteration in a thread-pool worker (so blocking + I/O does not stall the event loop), then yields each collected batch. + + Args: + query: SQL query string. Table names should be double-quoted. + params: Optional query parameters. + batch_size: Maximum rows per yielded batch. + """ + batches = await asyncio.to_thread( + lambda: list(self.iter_batches(query, params, batch_size)) + ) + for batch in batches: + yield batch +``` + +- [ ] **Step 4: Run tests to verify they pass** + +```bash +uv run pytest tests/test_databases/test_sqlite_connector.py::TestAsyncMethods -v +``` + +Expected: all 11 tests PASS. + +- [ ] **Step 5: Run all SQLite unit tests** + +```bash +uv run pytest tests/test_databases/test_sqlite_connector.py -v +``` + +Expected: all tests PASS (existing + new). + +- [ ] **Step 6: Commit** + +```bash +git add src/orcapod/databases/sqlite_connector.py \ + tests/test_databases/test_sqlite_connector.py +git commit -m "feat(sqlite): add async methods via asyncio.to_thread() (PLT-1453)" +``` + +--- + +## Task 6: PostgreSQL async integration tests + +**Files:** +- Modify: `tests/test_databases/test_postgresql_connector_integration.py` + +These tests require a live PostgreSQL instance. They are marked `@pytest.mark.postgres` +and skipped in normal test runs. Run with: +```bash +uv run pytest tests/test_databases/test_postgresql_connector_integration.py -m postgres -v +``` + +- [ ] **Step 1: Add async imports to the integration test file** + +At the top of `tests/test_databases/test_postgresql_connector_integration.py`, add: + +```python +import asyncio # already stdlib, no new dep +``` + +(The file already imports `pytest`, `psycopg`, `pa`, `PostgreSQLConnector`, `ColumnInfo`.) + +- [ ] **Step 2: Add `TestAsyncLifecycle`** + +Append to the end of the file: + +```python +# --------------------------------------------------------------------------- +# Async lifecycle +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncLifecycle: + @pytest.mark.asyncio + async def test_aenter_opens_async_conn(self, connector: PostgreSQLConnector) -> None: + assert connector._async_conn is None + async with connector: + assert connector._async_conn is not None + + @pytest.mark.asyncio + async def test_async_close_idempotent(self, connector: PostgreSQLConnector) -> None: + async with connector: + pass + await connector.async_close() # second call must not raise + + def test_require_async_open_raises_outside_context( + self, connector: PostgreSQLConnector + ) -> None: + with pytest.raises(RuntimeError, match="async context manager"): + connector._require_async_open() +``` + +- [ ] **Step 3: Add `TestAsyncSchemaIntrospection`** + +```python +# --------------------------------------------------------------------------- +# Async schema introspection +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncSchemaIntrospection: + @pytest.mark.asyncio + async def test_async_get_table_names_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t1" (id INTEGER PRIMARY KEY)') + cur.execute('CREATE TABLE "t2" (id INTEGER PRIMARY KEY)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_table_names() + sync_result = connector.get_table_names() + assert async_result == sync_result + + @pytest.mark.asyncio + async def test_async_get_pk_columns_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t" (id TEXT PRIMARY KEY, val REAL)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_pk_columns("t") + sync_result = connector.get_pk_columns("t") + assert async_result == sync_result + + @pytest.mark.asyncio + async def test_async_get_column_info_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t" (id TEXT NOT NULL, val REAL)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_column_info("t") + sync_result = connector.get_column_info("t") + assert async_result == sync_result +``` + +- [ ] **Step 4: Add `TestAsyncIterBatches`** + +```python +# --------------------------------------------------------------------------- +# Async iter_batches +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncIterBatches: + def _setup_data(self, connector: PostgreSQLConnector) -> None: + cols = [ + ColumnInfo("id", pa.large_string(), nullable=False), + ColumnInfo("score", pa.float64(), nullable=True), + ] + connector.create_table_if_not_exists("data", cols, "id") + records = pa.table({ + "id": pa.array(["a", "b", "c"], type=pa.large_string()), + "score": pa.array([1.5, 2.5, 3.5], type=pa.float64()), + }) + connector.upsert_records("data", records, "id") + + @pytest.mark.asyncio + async def test_returns_all_rows(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + assert sum(b.num_rows for b in batches) == 3 + + @pytest.mark.asyncio + async def test_correct_arrow_types(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + table = pa.Table.from_batches(batches) + assert table.schema.field("id").type == pa.large_string() + assert table.schema.field("score").type == pa.float64() + + @pytest.mark.asyncio + async def test_batch_size_respected(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "data"', batch_size=2 + ) + ] + assert len(batches) == 2 + assert batches[0].num_rows == 2 + assert batches[1].num_rows == 1 + + @pytest.mark.asyncio + async def test_empty_result(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "data" WHERE 1=0' + ) + ] + assert batches == [] + + @pytest.mark.asyncio + async def test_early_abandonment_closes_cursor( + self, connector: PostgreSQLConnector + ) -> None: + self._setup_data(connector) + async with connector: + gen = connector.async_iter_batches('SELECT * FROM "data"', batch_size=1) + await gen.__anext__() # consume one batch + await gen.aclose() # abandon — must not leak server-side cursor + # Subsequent queries must still work + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + assert sum(b.num_rows for b in batches) == 3 +``` + +- [ ] **Step 5: Run integration tests (requires live PostgreSQL)** + +```bash +uv run pytest tests/test_databases/test_postgresql_connector_integration.py -m postgres -v +``` + +Expected: all async test classes PASS against the live database. +If no live PostgreSQL is available locally, these run automatically in CI +(the GitHub Actions workflow starts a PostgreSQL service container). + +- [ ] **Step 6: Commit** + +```bash +git add tests/test_databases/test_postgresql_connector_integration.py +git commit -m "test(postgresql): add async integration tests (PLT-1453)" +``` + +--- + +## Task 7: Final verification and PR + +- [ ] **Step 1: Run all connector unit tests** + +```bash +uv run pytest tests/test_databases/ -v --ignore=tests/test_databases/test_postgresql_connector_integration.py --ignore=tests/test_databases/test_sqlite_connector_integration.py --ignore=tests/test_databases/test_spiraldb_connector_integration.py +``` + +Expected: all tests PASS. + +- [ ] **Step 2: Run the full test suite** + +```bash +uv run pytest tests/ -v --ignore=tests/test_databases/test_postgresql_connector_integration.py --ignore=tests/test_databases/test_sqlite_connector_integration.py --ignore=tests/test_databases/test_spiraldb_connector_integration.py +``` + +Expected: all tests PASS. No regressions. + +- [ ] **Step 3: Open PR against `main`** + +```bash +gh pr create \ + --title "feat(connectors): add AsyncDBConnectorProtocol; implement on PostgreSQL and SQLite" \ + --base main \ + --body "$(cat <<'EOF' +## Summary + +- Adds `AsyncDBConnectorProtocol` — a standalone `@runtime_checkable` Protocol with 7 async methods (`__aenter__`, `__aexit__`, `async_close`, `async_get_table_names`, `async_get_pk_columns`, `async_get_column_info`, `async_iter_batches`) +- `PostgreSQLConnector`: native async via psycopg3 `AsyncConnection` / `AsyncServerCursor`; dedicated connection per `async_iter_batches` call for portal isolation +- `SQLiteConnector`: `asyncio.to_thread()` wrappers over existing sync methods (thread-safe: `check_same_thread=False` + `threading.RLock` already in place) +- `SpiralDBConnector` deferred to PLT-1456 (lacks thread safety prerequisite) +- Unit tests for both connectors; async integration tests for PostgreSQL + +## Test plan + +- [ ] `uv run pytest tests/test_databases/test_postgresql_connector.py -v` — PASS +- [ ] `uv run pytest tests/test_databases/test_sqlite_connector.py -v` — PASS +- [ ] `uv run pytest tests/test_databases/test_postgresql_connector_integration.py -m postgres -v` — PASS (CI) + +Closes PLT-1453 +EOF +)" +``` From f105777be7144b50782ba5dec2c0262699fe5545 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:24:13 +0000 Subject: [PATCH 04/19] feat(protocols): add AsyncDBConnectorProtocol (PLT-1453) --- src/orcapod/databases/__init__.py | 8 ++ src/orcapod/protocols/__init__.py | 2 + .../protocols/async_db_connector_protocol.py | 92 +++++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 src/orcapod/protocols/async_db_connector_protocol.py diff --git a/src/orcapod/databases/__init__.py b/src/orcapod/databases/__init__.py index 8a393dd5..adc4428c 100644 --- a/src/orcapod/databases/__init__.py +++ b/src/orcapod/databases/__init__.py @@ -5,8 +5,10 @@ from .spiraldb_connector import SpiralDBConnector from .sqlite_connector import SQLiteConnector from .postgresql_connector import PostgreSQLConnector +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol __all__ = [ + "AsyncDBConnectorProtocol", "ConnectorArrowDatabase", "DeltaTableDatabase", "InMemoryArrowDatabase", @@ -25,6 +27,12 @@ # PostgreSQLConnector -- PLT-1075 (psycopg3) ✓ # SpiralDBConnector -- PLT-1074 # +# Connectors that also implement AsyncDBConnectorProtocol: +# +# PostgreSQLConnector -- native psycopg3 async (PLT-1453) ✓ +# SQLiteConnector -- asyncio.to_thread() wrappers (PLT-1453) ✓ +# SpiralDBConnector -- deferred to PLT-1456 +# # ArrowDatabaseProtocol backends (existing, not connector-based): # # DeltaTableDatabase -- Delta Lake (deltalake package) diff --git a/src/orcapod/protocols/__init__.py b/src/orcapod/protocols/__init__.py index 35398232..22e35c06 100644 --- a/src/orcapod/protocols/__init__.py +++ b/src/orcapod/protocols/__init__.py @@ -3,8 +3,10 @@ DataExecutionLoggerProtocol, ) from orcapod.protocols.pipeline_protocols import PipelineProtocol +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol __all__ = [ + "AsyncDBConnectorProtocol", "DataExecutionLoggerProtocol", "ExecutionObserverProtocol", "PipelineProtocol", diff --git a/src/orcapod/protocols/async_db_connector_protocol.py b/src/orcapod/protocols/async_db_connector_protocol.py new file mode 100644 index 00000000..e8bc3f71 --- /dev/null +++ b/src/orcapod/protocols/async_db_connector_protocol.py @@ -0,0 +1,92 @@ +"""AsyncDBConnectorProtocol — async counterpart to DBConnectorProtocol. + +Standalone protocol (does not inherit from DBConnectorProtocol). A class +satisfies both protocols by implementing all their methods. + +Intended use:: + + async with PostgreSQLConnector(dsn) as connector: + tables = await connector.async_get_table_names() + async for batch in connector.async_iter_batches('SELECT * FROM "t"'): + process(batch) +""" +from __future__ import annotations + +from collections.abc import AsyncIterator +from typing import Any, Protocol, TYPE_CHECKING, runtime_checkable + +from orcapod.types import ColumnInfo + +if TYPE_CHECKING: + import pyarrow as pa + +__all__ = ["AsyncDBConnectorProtocol"] + + +@runtime_checkable +class AsyncDBConnectorProtocol(Protocol): + """Async interface for an external relational database backend. + + Standalone protocol — does not inherit from ``DBConnectorProtocol``. A + connector class satisfies both protocols by implementing all their methods. + + Lifecycle: use ``async with connector:`` to open async resources. + ``__aexit__`` calls ``async_close()``, which performs a full shutdown of + both async and sync connections. + + Example:: + + async with PostgreSQLConnector(dsn) as connector: + tables = await connector.async_get_table_names() + async for batch in connector.async_iter_batches('SELECT * FROM "t"'): + process(batch) + """ + + # ── Lifecycle ───────────────────────────────────────────────────────────── + + async def __aenter__(self) -> AsyncDBConnectorProtocol: + """Open async resources and return self.""" + ... + + async def __aexit__(self, *args: Any) -> None: + """Close all resources by calling ``async_close()``.""" + ... + + async def async_close(self) -> None: + """Release all async and sync database resources. Idempotent.""" + ... + + # ── Schema introspection ────────────────────────────────────────────────── + + async def async_get_table_names(self) -> list[str]: + """Return all available table names in this database.""" + ... + + async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names for a table, in key-sequence order. + + Returns an empty list if the table has no primary key. + """ + ... + + async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata for a table, with types mapped to Arrow.""" + ... + + # ── Read ────────────────────────────────────────────────────────────────── + + async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, + ) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Args: + query: SQL query string. Table names should be double-quoted + (``SELECT * FROM "my_table"``). + params: Optional query parameters (connector-specific format). + batch_size: Maximum rows per yielded batch. + """ + ... From ac3707388e873d7e8b105067217acbdf286ed580 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:28:49 +0000 Subject: [PATCH 05/19] refactor(protocols): fix AsyncDBConnectorProtocol quality issues (PLT-1453) --- .../protocols/async_db_connector_protocol.py | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/src/orcapod/protocols/async_db_connector_protocol.py b/src/orcapod/protocols/async_db_connector_protocol.py index e8bc3f71..75603b35 100644 --- a/src/orcapod/protocols/async_db_connector_protocol.py +++ b/src/orcapod/protocols/async_db_connector_protocol.py @@ -1,14 +1,6 @@ """AsyncDBConnectorProtocol — async counterpart to DBConnectorProtocol. -Standalone protocol (does not inherit from DBConnectorProtocol). A class -satisfies both protocols by implementing all their methods. - -Intended use:: - - async with PostgreSQLConnector(dsn) as connector: - tables = await connector.async_get_table_names() - async for batch in connector.async_iter_batches('SELECT * FROM "t"'): - process(batch) +Standalone protocol; a class satisfies both by implementing all their methods. """ from __future__ import annotations @@ -49,11 +41,15 @@ async def __aenter__(self) -> AsyncDBConnectorProtocol: ... async def __aexit__(self, *args: Any) -> None: - """Close all resources by calling ``async_close()``.""" + """Close all resources. Implementations must delegate to ``async_close()``.""" ... async def async_close(self) -> None: - """Release all async and sync database resources. Idempotent.""" + """Release all async and sync database resources. + + Implementations must be idempotent — calling this multiple times must + not raise. + """ ... # ── Schema introspection ────────────────────────────────────────────────── @@ -75,7 +71,7 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: # ── Read ────────────────────────────────────────────────────────────────── - async def async_iter_batches( + def async_iter_batches( self, query: str, params: Any = None, @@ -85,7 +81,8 @@ async def async_iter_batches( Args: query: SQL query string. Table names should be double-quoted - (``SELECT * FROM "my_table"``). + (``SELECT * FROM "my_table"``); all connectors must support + ANSI-standard double-quoted identifiers. params: Optional query parameters (connector-specific format). batch_size: Maximum rows per yielded batch. """ From b0105a956e95e0219f511d974ba0f4d148689534 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:33:18 +0000 Subject: [PATCH 06/19] feat(postgresql): add async lifecycle methods (PLT-1453) --- src/orcapod/databases/postgresql_connector.py | 86 ++++++++++++++++++- .../test_postgresql_connector.py | 61 ++++++++++++- 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index 4f98810e..87f720c5 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -16,7 +16,7 @@ import logging import re import threading -from collections.abc import Iterator +from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING, Any from orcapod.types import ColumnInfo @@ -231,6 +231,7 @@ def __init__(self, dsn: str) -> None: self._conn: Any = psycopg.connect(dsn, autocommit=False) self._lock = threading.RLock() # RLock required: iter_batches → _resolve_column_type_lookup → get_column_info re-enters the lock self._cursor_seq = itertools.count() + self._async_conn: Any = None # psycopg.AsyncConnection; set by __aenter__ # ── Internal helpers ────────────────────────────────────────────────────── @@ -240,6 +241,19 @@ def _require_open(self) -> Any: raise RuntimeError("PostgreSQLConnector is closed") return self._conn + def _require_async_open(self) -> Any: + """Return the open async connection or raise RuntimeError if not entered. + + Raises: + RuntimeError: If ``__aenter__`` has not been called. + """ + if self._async_conn is None: + raise RuntimeError( + "PostgreSQLConnector: enter the async context manager before " + "calling async methods" + ) + return self._async_conn + @staticmethod def _validate_table_name(table_name: str) -> None: """Raise ValueError if table_name contains a double-quote character.""" @@ -468,6 +482,76 @@ def __enter__(self) -> PostgreSQLConnector: def __exit__(self, *args: Any) -> None: self.close() + # ── Async lifecycle ─────────────────────────────────────────────────────── + + async def __aenter__(self) -> PostgreSQLConnector: + """Open an async psycopg3 connection and return self. + + The async connection is used by ``async_get_table_names``, + ``async_get_pk_columns``, and ``async_get_column_info``. + ``async_iter_batches`` opens its own dedicated connection per call. + """ + self._async_conn = await psycopg.AsyncConnection.connect( + self._dsn, autocommit=False + ) + return self + + async def __aexit__(self, *args: Any) -> None: + await self.async_close() + + async def async_close(self) -> None: + """Close async and sync connections. + + Implementations must be idempotent — calling this multiple times must + not raise. + """ + if self._async_conn is not None: + await self._async_conn.close() + self._async_conn = None + self.close() + + # ── Async schema introspection ──────────────────────────────────────────── + + async def async_get_table_names(self) -> list[str]: + """Return all user table names in this database (sorted, excludes views). + + Requires the async context manager to have been entered. + """ + raise NotImplementedError("async_get_table_names not yet implemented") + + async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names for a table, in key-sequence order. + + Requires the async context manager to have been entered. + Returns an empty list if the table has no primary key. + """ + raise NotImplementedError("async_get_pk_columns not yet implemented") + + async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata for a table, with types mapped to Arrow. + + Requires the async context manager to have been entered. + """ + raise NotImplementedError("async_get_column_info not yet implemented") + + # ── Async read ──────────────────────────────────────────────────────────── + + def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, + ) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Args: + query: SQL query string. Table names should be double-quoted + (``SELECT * FROM "my_table"``). + params: Optional query parameters. + batch_size: Maximum rows per yielded batch. + """ + raise NotImplementedError("async_iter_batches not yet implemented") + # ── Serialization ───────────────────────────────────────────────────────── def to_config(self) -> dict[str, Any]: diff --git a/tests/test_databases/test_postgresql_connector.py b/tests/test_databases/test_postgresql_connector.py index 95afeeaa..dec466d4 100644 --- a/tests/test_databases/test_postgresql_connector.py +++ b/tests/test_databases/test_postgresql_connector.py @@ -2,7 +2,7 @@ """Unit tests for PostgreSQLConnector — no live database required.""" from __future__ import annotations -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pyarrow as pa import pytest @@ -12,6 +12,7 @@ _arrow_type_to_pg_sql, _pg_type_to_arrow, ) +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol from orcapod.protocols.db_connector_protocol import DBConnectorProtocol @@ -259,3 +260,61 @@ def test_get_pk_columns_no_pk(self) -> None: result = connector.get_pk_columns("my_table") assert result == [] connector._conn = None + + +class TestAsyncLifecycle: + """Unit tests for PostgreSQLConnector async lifecycle methods.""" + + def _make_connector(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + return PostgreSQLConnector("postgresql://localhost/test") + + def test_isinstance_async_protocol(self) -> None: + connector = self._make_connector() + assert isinstance(connector, AsyncDBConnectorProtocol) + connector._conn = None + + @pytest.mark.asyncio + async def test_aenter_sets_async_conn(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_async_conn): + result = await connector.__aenter__() + assert result is connector + assert connector._async_conn is mock_async_conn + connector._conn = None + + @pytest.mark.asyncio + async def test_aexit_closes_async_and_sync(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + connector._async_conn = mock_async_conn + await connector.__aexit__(None, None, None) + mock_async_conn.close.assert_called_once() + assert connector._async_conn is None + assert connector._conn is None # sync also closed + + @pytest.mark.asyncio + async def test_async_close_idempotent(self) -> None: + connector = self._make_connector() + mock_async_conn = AsyncMock() + connector._async_conn = mock_async_conn + await connector.async_close() + await connector.async_close() # must not raise + mock_async_conn.close.assert_called_once() + + def test_require_async_open_raises_before_aenter(self) -> None: + connector = self._make_connector() + with pytest.raises(RuntimeError, match="async context manager"): + connector._require_async_open() + connector._conn = None + + def test_require_async_open_returns_conn_after_aenter(self) -> None: + connector = self._make_connector() + mock_async_conn = MagicMock() + connector._async_conn = mock_async_conn + result = connector._require_async_open() + assert result is mock_async_conn + connector._conn = None From e08cf2a557f3af1e6135641c5756ef98f2c86bae Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:39:02 +0000 Subject: [PATCH 07/19] refactor(postgresql): document async lifecycle; fix pytest asyncio config (PLT-1453) --- pytest.ini | 1 + src/orcapod/databases/postgresql_connector.py | 9 +++++++++ 2 files changed, 10 insertions(+) diff --git a/pytest.ini b/pytest.ini index 88d892c4..fe342395 100644 --- a/pytest.ini +++ b/pytest.ini @@ -4,6 +4,7 @@ python_files = test_*.py python_classes = Test* python_functions = test_* addopts = -v +asyncio_default_fixture_loop_scope = function pythonpath = src markers = postgres: mark test as requiring a live PostgreSQL instance diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index 87f720c5..bf680fdb 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -220,6 +220,11 @@ class PostgreSQLConnector: Uses named server-side cursors in iter_batches so PostgreSQL streams results row-by-row rather than buffering the full result set. + **Async context manager note:** ``async with connector:`` opens an async + psycopg3 connection. Exiting the async context manager calls + ``async_close()``, which closes *both* the async and the sync connections. + After ``async with``, the connector must not be used further. + Args: dsn: libpq connection string. URI form: ``"postgresql://user:pass@host:5432/dbname"`` @@ -536,6 +541,10 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: # ── Async read ──────────────────────────────────────────────────────────── + # Note: declared as plain `def` (not `async def`) because the concrete + # implementation will be an async generator function (`async def … yield`). + # A Protocol method declared as `def` allows both async generators and + # coroutines returning AsyncIterator to satisfy the contract structurally. def async_iter_batches( self, query: str, From eacbe2e576db09b181bb33aaeaca3d9c0c3b3015 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:41:33 +0000 Subject: [PATCH 08/19] feat(postgresql): add async schema introspection methods (PLT-1453) Replace NotImplementedError stubs for async_get_table_names, async_get_pk_columns, and async_get_column_info with real psycopg3 async implementations; add 7 unit tests. --- src/orcapod/databases/postgresql_connector.py | 71 +++++++++++---- .../test_postgresql_connector.py | 90 +++++++++++++++++++ 2 files changed, 145 insertions(+), 16 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index bf680fdb..ce10f168 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -518,26 +518,65 @@ async def async_close(self) -> None: # ── Async schema introspection ──────────────────────────────────────────── async def async_get_table_names(self) -> list[str]: - """Return all user table names in this database (sorted, excludes views). - - Requires the async context manager to have been entered. - """ - raise NotImplementedError("async_get_table_names not yet implemented") + """Return all user table names in this database (sorted, excludes views).""" + conn = self._require_async_open() + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = current_schema() + AND table_type = 'BASE TABLE' + ORDER BY table_name + """ + ) + return [row[0] for row in await cur.fetchall()] async def async_get_pk_columns(self, table_name: str) -> list[str]: - """Return primary-key column names for a table, in key-sequence order. - - Requires the async context manager to have been entered. - Returns an empty list if the table has no primary key. - """ - raise NotImplementedError("async_get_pk_columns not yet implemented") + """Return primary-key column names in key-sequence order.""" + conn = self._require_async_open() + self._validate_table_name(table_name) + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT kcu.column_name + FROM information_schema.key_column_usage kcu + JOIN information_schema.table_constraints tc + ON kcu.constraint_name = tc.constraint_name + AND kcu.table_schema = tc.table_schema + AND kcu.table_name = tc.table_name + WHERE tc.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = current_schema() + AND kcu.table_name = %s + ORDER BY kcu.ordinal_position + """, + (table_name,), + ) + return [row[0] for row in await cur.fetchall()] async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: - """Return column metadata for a table, with types mapped to Arrow. - - Requires the async context manager to have been entered. - """ - raise NotImplementedError("async_get_column_info not yet implemented") + """Return column metadata with Arrow-mapped types.""" + conn = self._require_async_open() + self._validate_table_name(table_name) + async with conn.cursor() as cur: + await cur.execute( + """ + SELECT column_name, data_type, udt_name, is_nullable + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = %s + ORDER BY ordinal_position + """, + (table_name,), + ) + return [ + ColumnInfo( + name=row[0], + arrow_type=_pg_type_to_arrow(row[1], row[2]), + nullable=(row[3].upper() == "YES"), + ) + for row in await cur.fetchall() + ] # ── Async read ──────────────────────────────────────────────────────────── diff --git a/tests/test_databases/test_postgresql_connector.py b/tests/test_databases/test_postgresql_connector.py index dec466d4..5a5a63e4 100644 --- a/tests/test_databases/test_postgresql_connector.py +++ b/tests/test_databases/test_postgresql_connector.py @@ -318,3 +318,93 @@ def test_require_async_open_returns_conn_after_aenter(self) -> None: result = connector._require_async_open() assert result is mock_async_conn connector._conn = None + + +class TestAsyncSchemaIntrospectionUnit: + """Unit tests for async schema introspection — mocked cursor.""" + + def _make_connector_with_async_conn(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + connector._async_conn = AsyncMock() + return connector + + def _mock_cursor_returning(self, connector: PostgreSQLConnector, + rows: list) -> AsyncMock: + """Wire up connector._async_conn.cursor() to return rows from fetchall.""" + mock_cursor = AsyncMock() + mock_cursor.fetchall.return_value = rows + mock_cm = MagicMock() + mock_cm.__aenter__ = AsyncMock(return_value=mock_cursor) + mock_cm.__aexit__ = AsyncMock(return_value=False) + # cursor() must be a sync call returning an async context manager; + # override the AsyncMock attribute with a plain MagicMock so that + # conn.cursor() returns mock_cm directly (not a coroutine). + connector._async_conn.cursor = MagicMock(return_value=mock_cm) + return mock_cursor + + @pytest.mark.asyncio + async def test_async_get_table_names(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [("alpha",), ("beta",)]) + result = await connector.async_get_table_names() + assert result == ["alpha", "beta"] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_single(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [("id",)]) + result = await connector.async_get_pk_columns("my_table") + assert result == ["id"] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_empty(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, []) + result = await connector.async_get_pk_columns("my_table") + assert result == [] + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_pk_columns_raises_on_bad_name(self) -> None: + connector = self._make_connector_with_async_conn() + with pytest.raises(ValueError, match="double-quote"): + await connector.async_get_pk_columns('bad"name') + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_column_info(self) -> None: + connector = self._make_connector_with_async_conn() + self._mock_cursor_returning(connector, [ + ("id", "text", "text", "NO"), + ("val", "double precision", "float8", "YES"), + ]) + result = await connector.async_get_column_info("my_table") + assert len(result) == 2 + assert result[0].name == "id" + assert result[0].arrow_type == pa.large_string() + assert result[0].nullable is False + assert result[1].name == "val" + assert result[1].arrow_type == pa.float64() + assert result[1].nullable is True + connector._conn = None + + @pytest.mark.asyncio + async def test_async_get_column_info_raises_on_bad_name(self) -> None: + connector = self._make_connector_with_async_conn() + with pytest.raises(ValueError, match="double-quote"): + await connector.async_get_column_info('bad"name') + connector._conn = None + + @pytest.mark.asyncio + async def test_schema_methods_raise_without_aenter(self) -> None: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # _async_conn is None — all methods should raise + with pytest.raises(RuntimeError, match="async context manager"): + await connector.async_get_table_names() + connector._conn = None From 37060cc52d4b756d0b4e2866f7ec7c1731d64236 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:44:11 +0000 Subject: [PATCH 09/19] refactor(postgresql): improve async schema test coverage and docstring (PLT-1453) --- src/orcapod/databases/postgresql_connector.py | 6 +++++- tests/test_databases/test_postgresql_connector.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index ce10f168..6bb348d6 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -533,7 +533,11 @@ async def async_get_table_names(self) -> list[str]: return [row[0] for row in await cur.fetchall()] async def async_get_pk_columns(self, table_name: str) -> list[str]: - """Return primary-key column names in key-sequence order.""" + """Return primary-key column names in key-sequence order. + + Returns: + List of PK column names; empty list if the table has no primary key. + """ conn = self._require_async_open() self._validate_table_name(table_name) async with conn.cursor() as cur: diff --git a/tests/test_databases/test_postgresql_connector.py b/tests/test_databases/test_postgresql_connector.py index 5a5a63e4..8aa4d6d5 100644 --- a/tests/test_databases/test_postgresql_connector.py +++ b/tests/test_databases/test_postgresql_connector.py @@ -404,7 +404,11 @@ async def test_schema_methods_raise_without_aenter(self) -> None: with patch("psycopg.connect") as mock_connect: mock_connect.return_value = MagicMock() connector = PostgreSQLConnector("postgresql://localhost/test") - # _async_conn is None — all methods should raise + # _async_conn is None — all three methods should raise with pytest.raises(RuntimeError, match="async context manager"): await connector.async_get_table_names() + with pytest.raises(RuntimeError, match="async context manager"): + await connector.async_get_pk_columns("t") + with pytest.raises(RuntimeError, match="async context manager"): + await connector.async_get_column_info("t") connector._conn = None From 0546ae2eb83071456ef7d87cce5f0939f10bb1e3 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 00:50:16 +0000 Subject: [PATCH 10/19] feat(postgresql): add async_iter_batches (PLT-1453) --- src/orcapod/databases/postgresql_connector.py | 108 ++++++++++++++- .../test_postgresql_connector.py | 128 ++++++++++++++++++ 2 files changed, 229 insertions(+), 7 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index 6bb348d6..ee37ef19 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -206,6 +206,55 @@ def _resolve_column_type_lookup( return {ci.name: ci.arrow_type for ci in connector.get_column_info(table_name)} +async def _async_resolve_column_type_lookup( + query: str, + connector: "PostgreSQLConnector", +) -> dict[str, pa.DataType]: + """Async version of ``_resolve_column_type_lookup``. + + Parses the FROM clause of query to find the source table, then returns a + column-name → Arrow-type dict by calling ``async_get_column_info``. + + Returns an empty dict if no single unambiguous table can be identified, + causing ``async_iter_batches`` to fall back to ``pa.large_string()`` for + all columns. + + Args: + query: SQL query string. + connector: The connector to call ``async_get_column_info`` on. + + Returns: + Dict mapping column name to Arrow DataType. + """ + if re.search(r"\bJOIN\b", query, re.IGNORECASE): + return {} + + from_pattern = re.compile( + r'\bFROM\b\s+(?:"([^"]+)"|(\w+))', + re.IGNORECASE, + ) + from_matches = list(from_pattern.finditer(query)) + if len(from_matches) != 1: + return {} + + match = from_matches[0] + table_name = match.group(1) or match.group(2) + + from_tail = query[match.end():] + clause_boundary = re.search( + r"\b(WHERE|GROUP\s+BY|ORDER\s+BY|LIMIT|OFFSET|HAVING|UNION|EXCEPT|INTERSECT)\b", + from_tail, + re.IGNORECASE, + ) + if clause_boundary: + from_tail = from_tail[: clause_boundary.start()] + + if "," in from_tail: + return {} + + return {ci.name: ci.arrow_type for ci in await connector.async_get_column_info(table_name)} + + # --------------------------------------------------------------------------- # PostgreSQLConnector # --------------------------------------------------------------------------- @@ -584,11 +633,10 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: # ── Async read ──────────────────────────────────────────────────────────── - # Note: declared as plain `def` (not `async def`) because the concrete - # implementation will be an async generator function (`async def … yield`). - # A Protocol method declared as `def` allows both async generators and - # coroutines returning AsyncIterator to satisfy the contract structurally. - def async_iter_batches( + # Note: declared as plain `def` (not `async def`) but the generator body uses + # `yield`, making it an async generator. The Protocol declares this as `def` + # returning `AsyncIterator` — see async_db_connector_protocol.py for rationale. + async def async_iter_batches( self, query: str, params: Any = None, @@ -596,13 +644,59 @@ def async_iter_batches( ) -> AsyncIterator[pa.RecordBatch]: """Execute a query and yield results as Arrow RecordBatches. + Opens a dedicated ``AsyncConnection`` per call so that the server-side + cursor portal is isolated from operations on the shared ``_async_conn``. + The dedicated connection is closed in the finally block regardless of + how the generator is consumed or abandoned. + Args: query: SQL query string. Table names should be double-quoted - (``SELECT * FROM "my_table"``). + (``SELECT * FROM "my_table"``); all connectors must support + ANSI-standard double-quoted identifiers. params: Optional query parameters. batch_size: Maximum rows per yielded batch. + + Raises: + RuntimeError: If the async context manager has not been entered. """ - raise NotImplementedError("async_iter_batches not yet implemented") + import pyarrow as _pa + + self._require_async_open() # guard: raise if context manager not entered + + with self._lock: + self._require_open() # guard: raise if sync connector is closed + dsn = self._dsn + + read_conn = await psycopg.AsyncConnection.connect(dsn, autocommit=False) + cursor_name = f"orcapod_{next(self._cursor_seq)}" + cur = read_conn.cursor(name=cursor_name) + + try: + await cur.execute(query, params) + if cur.description is None: + return + col_names = [d.name for d in cur.description] + type_lookup = await _async_resolve_column_type_lookup(query, self) + arrow_types = [type_lookup.get(n, _pa.large_string()) for n in col_names] + schema = _pa.schema( + [_pa.field(n, t) for n, t in zip(col_names, arrow_types)] + ) + rows = await cur.fetchmany(batch_size) + + while rows: + arrays = [ + _pa.array([r[i] for r in rows], type=t) + for i, t in enumerate(arrow_types) + ] + yield _pa.RecordBatch.from_arrays(arrays, schema=schema) + rows = await cur.fetchmany(batch_size) + finally: + await cur.close() + try: + await read_conn.rollback() + except Exception: + pass + await read_conn.close() # ── Serialization ───────────────────────────────────────────────────────── diff --git a/tests/test_databases/test_postgresql_connector.py b/tests/test_databases/test_postgresql_connector.py index 8aa4d6d5..be7d553d 100644 --- a/tests/test_databases/test_postgresql_connector.py +++ b/tests/test_databases/test_postgresql_connector.py @@ -412,3 +412,131 @@ async def test_schema_methods_raise_without_aenter(self) -> None: with pytest.raises(RuntimeError, match="async context manager"): await connector.async_get_column_info("t") connector._conn = None + + +class TestAsyncIterBatchesUnit: + """Unit tests for async_iter_batches — mocked psycopg async connection.""" + + def _make_connector(self) -> PostgreSQLConnector: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # Simulate entered context with an async connection whose cursor() + # returns an async context manager. async_get_column_info uses this + # to discover column types — return "id INTEGER NOT NULL" so that + # int test data is accepted by pyarrow without type errors. + inner_cursor = AsyncMock() + inner_cursor.fetchall = AsyncMock( + return_value=[("id", "integer", "int4", "NO")] + ) + mock_cursor_cm = MagicMock() + mock_cursor_cm.__aenter__ = AsyncMock(return_value=inner_cursor) + mock_cursor_cm.__aexit__ = AsyncMock(return_value=False) + mock_async_conn = AsyncMock() + mock_async_conn.cursor = MagicMock(return_value=mock_cursor_cm) + connector._async_conn = mock_async_conn + return connector + + def _make_mock_read_conn(self, rows_per_call: list[list]) -> tuple[AsyncMock, AsyncMock]: + """Return (mock_read_conn, mock_cursor). + + rows_per_call: list of row lists returned by successive fetchmany calls. + The last entry must be an empty list to signal end-of-results. + """ + mock_cursor = AsyncMock() + col = MagicMock() + col.name = "id" + mock_cursor.description = [col] + mock_cursor.fetchmany = AsyncMock(side_effect=rows_per_call) + + mock_read_conn = AsyncMock() + # cursor() is a sync call in psycopg AsyncConnection; use MagicMock so + # that read_conn.cursor(name=...) returns mock_cursor directly. + mock_read_conn.cursor = MagicMock(return_value=mock_cursor) + + return mock_read_conn, mock_cursor + + @pytest.mark.asyncio + async def test_yields_correct_row_count(self) -> None: + connector = self._make_connector() + mock_read_conn, _ = self._make_mock_read_conn( + [[(1,), (2,), (3,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"' + )] + assert sum(b.num_rows for b in batches) == 3 + connector._conn = None + + @pytest.mark.asyncio + async def test_multiple_batches(self) -> None: + connector = self._make_connector() + mock_read_conn, _ = self._make_mock_read_conn( + [[(1,), (2,)], [(3,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"', batch_size=2 + )] + assert len(batches) == 2 + assert batches[0].num_rows == 2 + assert batches[1].num_rows == 1 + connector._conn = None + + @pytest.mark.asyncio + async def test_empty_result(self) -> None: + connector = self._make_connector() + mock_cursor = AsyncMock() + mock_cursor.description = None + mock_read_conn = AsyncMock() + # cursor() is sync in psycopg; use MagicMock so the call is not awaited + mock_read_conn.cursor = MagicMock(return_value=mock_cursor) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + batches = [b async for b in connector.async_iter_batches( + 'SELECT * FROM "t"' + )] + assert batches == [] + connector._conn = None + + @pytest.mark.asyncio + async def test_cursor_closed_on_completion(self) -> None: + connector = self._make_connector() + mock_read_conn, mock_cursor = self._make_mock_read_conn( + [[(1,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + _ = [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + mock_cursor.close.assert_called_once() + mock_read_conn.close.assert_called_once() + connector._conn = None + + @pytest.mark.asyncio + async def test_cursor_closed_on_early_abandonment(self) -> None: + connector = self._make_connector() + mock_read_conn, mock_cursor = self._make_mock_read_conn( + [[(1,)], [(2,)], []] + ) + with patch("psycopg.AsyncConnection.connect", new_callable=AsyncMock, + return_value=mock_read_conn): + gen = connector.async_iter_batches('SELECT * FROM "t"', batch_size=1) + await gen.__anext__() # consume one batch + await gen.aclose() # abandon mid-stream + mock_cursor.close.assert_called_once() + mock_read_conn.close.assert_called_once() + connector._conn = None + + @pytest.mark.asyncio + async def test_raises_without_aenter(self) -> None: + with patch("psycopg.connect") as mock_connect: + mock_connect.return_value = MagicMock() + connector = PostgreSQLConnector("postgresql://localhost/test") + # _async_conn is None + with pytest.raises(RuntimeError, match="async context manager"): + async for _ in connector.async_iter_batches('SELECT * FROM "t"'): + pass + connector._conn = None From 937cea84f0aa540fd549daa1076072a1766647fe Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 01:03:42 +0000 Subject: [PATCH 11/19] refactor(postgresql): fix async_iter_batches comment; improve test cleanup assertions (PLT-1453) --- src/orcapod/databases/postgresql_connector.py | 6 +++--- tests/test_databases/test_postgresql_connector.py | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index ee37ef19..dbc18c22 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -633,9 +633,9 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: # ── Async read ──────────────────────────────────────────────────────────── - # Note: declared as plain `def` (not `async def`) but the generator body uses - # `yield`, making it an async generator. The Protocol declares this as `def` - # returning `AsyncIterator` — see async_db_connector_protocol.py for rationale. + # The Protocol declares this as plain `def` returning `AsyncIterator` (see + # async_db_connector_protocol.py). Here, `async def` + `yield` makes it an + # async generator, which correctly satisfies that declared return type. async def async_iter_batches( self, query: str, diff --git a/tests/test_databases/test_postgresql_connector.py b/tests/test_databases/test_postgresql_connector.py index be7d553d..45d7b7ea 100644 --- a/tests/test_databases/test_postgresql_connector.py +++ b/tests/test_databases/test_postgresql_connector.py @@ -500,6 +500,8 @@ async def test_empty_result(self) -> None: 'SELECT * FROM "t"' )] assert batches == [] + mock_cursor.close.assert_called_once() + mock_read_conn.close.assert_called_once() connector._conn = None @pytest.mark.asyncio @@ -512,6 +514,7 @@ async def test_cursor_closed_on_completion(self) -> None: return_value=mock_read_conn): _ = [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] mock_cursor.close.assert_called_once() + mock_read_conn.rollback.assert_called_once() mock_read_conn.close.assert_called_once() connector._conn = None @@ -527,6 +530,7 @@ async def test_cursor_closed_on_early_abandonment(self) -> None: await gen.__anext__() # consume one batch await gen.aclose() # abandon mid-stream mock_cursor.close.assert_called_once() + mock_read_conn.rollback.assert_called_once() mock_read_conn.close.assert_called_once() connector._conn = None From b08fd3d2f9dbcf3e6106de958b812420dec72774 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 01:05:55 +0000 Subject: [PATCH 12/19] feat(sqlite): add async methods via asyncio.to_thread() (PLT-1453) Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/sqlite_connector.py | 64 ++++++++++- tests/test_databases/test_sqlite_connector.py | 104 ++++++++++++++++++ 2 files changed, 167 insertions(+), 1 deletion(-) diff --git a/src/orcapod/databases/sqlite_connector.py b/src/orcapod/databases/sqlite_connector.py index fe579951..ea28d57e 100644 --- a/src/orcapod/databases/sqlite_connector.py +++ b/src/orcapod/databases/sqlite_connector.py @@ -13,11 +13,12 @@ """ from __future__ import annotations +import asyncio import logging import os import sqlite3 import threading -from collections.abc import Iterator +from collections.abc import AsyncIterator, Iterator from typing import TYPE_CHECKING, Any from orcapod.types import ColumnInfo @@ -372,6 +373,67 @@ def __enter__(self) -> SQLiteConnector: def __exit__(self, *args: Any) -> None: self.close() + # ── Async lifecycle ─────────────────────────────────────────────────────── + + async def __aenter__(self) -> SQLiteConnector: + """Return self; the sync connection is already open from ``__init__``.""" + return self + + async def __aexit__(self, *args: Any) -> None: + await self.async_close() + + async def async_close(self) -> None: + """Close the database connection. + + Implementations must be idempotent — calling this multiple times must + not raise. + """ + await asyncio.to_thread(self.close) + + # ── Async schema introspection ──────────────────────────────────────────── + + async def async_get_table_names(self) -> list[str]: + """Return all user table names asynchronously.""" + return await asyncio.to_thread(self.get_table_names) + + async def async_get_pk_columns(self, table_name: str) -> list[str]: + """Return primary-key column names asynchronously. + + Returns: + List of PK column names; empty list if the table has no primary key. + """ + return await asyncio.to_thread(self.get_pk_columns, table_name) + + async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: + """Return column metadata asynchronously.""" + return await asyncio.to_thread(self.get_column_info, table_name) + + # ── Async read ──────────────────────────────────────────────────────────── + + async def async_iter_batches( + self, + query: str, + params: Any = None, + batch_size: int = 1000, + ) -> AsyncIterator[pa.RecordBatch]: + """Execute a query and yield results as Arrow RecordBatches. + + Runs the full synchronous iteration in a thread-pool worker (so blocking + I/O does not stall the event loop), then yields each collected batch. + + Args: + query: SQL query string. Table names should be double-quoted + (``SELECT * FROM "my_table"``); all connectors must support + ANSI-standard double-quoted identifiers. + params: Optional query parameters. + batch_size: Maximum rows per yielded batch. + """ + batches = await asyncio.to_thread( + lambda: list(self.iter_batches(query, params, batch_size)) + ) + for batch in batches: + yield batch + # ── Serialization ───────────────────────────────────────────────────────── def to_config(self) -> dict[str, Any]: diff --git a/tests/test_databases/test_sqlite_connector.py b/tests/test_databases/test_sqlite_connector.py index d28eb899..6b505d99 100644 --- a/tests/test_databases/test_sqlite_connector.py +++ b/tests/test_databases/test_sqlite_connector.py @@ -13,6 +13,7 @@ _coerce_column, _sqlite_type_to_arrow, ) +from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol from orcapod.protocols.db_connector_protocol import DBConnectorProtocol from orcapod.types import ColumnInfo @@ -426,3 +427,106 @@ def test_declared_rowid_column_keeps_declared_type(self, connector): assert "rowid" in batch.schema.names # Should remain large_string (declared type), not int64 assert batch.schema.field("rowid").type == pa.large_string() + + +class TestAsyncMethods: + """Async method tests using a real in-memory SQLite DB — no mocks needed.""" + + def _setup(self) -> SQLiteConnector: + """Return a connector with a simple table pre-populated.""" + connector = SQLiteConnector(":memory:") + connector._conn.execute( + 'CREATE TABLE "t" (id INTEGER PRIMARY KEY, val TEXT NOT NULL)' + ) + connector._conn.execute( + "INSERT INTO \"t\" VALUES (1, 'a'), (2, 'b'), (3, 'c')" + ) + return connector + + def test_isinstance_async_protocol(self) -> None: + connector = SQLiteConnector(":memory:") + assert isinstance(connector, AsyncDBConnectorProtocol) + + @pytest.mark.asyncio + async def test_aenter_returns_self(self) -> None: + connector = SQLiteConnector(":memory:") + result = await connector.__aenter__() + assert result is connector + await connector.async_close() + + @pytest.mark.asyncio + async def test_aexit_closes_connection(self) -> None: + connector = SQLiteConnector(":memory:") + await connector.__aenter__() + await connector.__aexit__(None, None, None) + with pytest.raises(RuntimeError, match="closed"): + connector._require_open() + + @pytest.mark.asyncio + async def test_async_close_idempotent(self) -> None: + connector = SQLiteConnector(":memory:") + await connector.async_close() + await connector.async_close() # must not raise + + @pytest.mark.asyncio + async def test_async_context_manager(self) -> None: + async with SQLiteConnector(":memory:") as connector: + assert connector._conn is not None + with pytest.raises(RuntimeError, match="closed"): + connector._require_open() + + @pytest.mark.asyncio + async def test_async_get_table_names_matches_sync(self) -> None: + connector = self._setup() + assert await connector.async_get_table_names() == connector.get_table_names() + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_get_pk_columns_matches_sync(self) -> None: + connector = self._setup() + assert ( + await connector.async_get_pk_columns("t") == connector.get_pk_columns("t") + ) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_get_column_info_matches_sync(self) -> None: + connector = self._setup() + assert ( + await connector.async_get_column_info("t") + == connector.get_column_info("t") + ) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_returns_correct_rows(self) -> None: + connector = self._setup() + batches = [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + total_rows = sum(b.num_rows for b in batches) + assert total_rows == 3 + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_matches_sync_data(self) -> None: + import pyarrow as pa + connector = self._setup() + + sync_table = pa.Table.from_batches( + list(connector.iter_batches('SELECT * FROM "t"')) + ) + async_table = pa.Table.from_batches( + [b async for b in connector.async_iter_batches('SELECT * FROM "t"')] + ) + assert async_table.equals(sync_table) + await connector.async_close() + + @pytest.mark.asyncio + async def test_async_iter_batches_empty_result(self) -> None: + connector = self._setup() + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "t" WHERE 1=0' + ) + ] + assert batches == [] + await connector.async_close() From 68ce5dbf22bc1494107d3c3e6f7b1ac32de964b1 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 01:09:27 +0000 Subject: [PATCH 13/19] refactor(sqlite): enrich async method docstrings (PLT-1453) --- src/orcapod/databases/sqlite_connector.py | 28 +++++++++++++++---- tests/test_databases/test_sqlite_connector.py | 1 - 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/src/orcapod/databases/sqlite_connector.py b/src/orcapod/databases/sqlite_connector.py index ea28d57e..5f6b04c9 100644 --- a/src/orcapod/databases/sqlite_connector.py +++ b/src/orcapod/databases/sqlite_connector.py @@ -393,19 +393,34 @@ async def async_close(self) -> None: # ── Async schema introspection ──────────────────────────────────────────── async def async_get_table_names(self) -> list[str]: - """Return all user table names asynchronously.""" + """Return all user table names in this database (excludes views and SQLite internals). + + Returns: + Sorted list of table name strings. + """ return await asyncio.to_thread(self.get_table_names) async def async_get_pk_columns(self, table_name: str) -> list[str]: - """Return primary-key column names asynchronously. + """Return primary-key column names in key-sequence order. + + Args: + table_name: Name of the table to introspect. Returns: - List of PK column names; empty list if the table has no primary key. + List of PK column names; empty list if the table has no primary key + or the table doesn't exist. """ return await asyncio.to_thread(self.get_pk_columns, table_name) async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: - """Return column metadata asynchronously.""" + """Return column metadata with Arrow-mapped types. + + Args: + table_name: Name of the table to introspect. + + Returns: + List of ColumnInfo objects; empty list if table doesn't exist. + """ return await asyncio.to_thread(self.get_column_info, table_name) # ── Async read ──────────────────────────────────────────────────────────── @@ -419,7 +434,10 @@ async def async_iter_batches( """Execute a query and yield results as Arrow RecordBatches. Runs the full synchronous iteration in a thread-pool worker (so blocking - I/O does not stall the event loop), then yields each collected batch. + I/O does not stall the event loop), collects all batches into a list, + then yields each batch from the event loop. The entire result set is + materialised before the first yield — ``batch_size`` controls individual + batch shape but not peak memory usage. Args: query: SQL query string. Table names should be double-quoted diff --git a/tests/test_databases/test_sqlite_connector.py b/tests/test_databases/test_sqlite_connector.py index 6b505d99..cdc48b0e 100644 --- a/tests/test_databases/test_sqlite_connector.py +++ b/tests/test_databases/test_sqlite_connector.py @@ -508,7 +508,6 @@ async def test_async_iter_batches_returns_correct_rows(self) -> None: @pytest.mark.asyncio async def test_async_iter_batches_matches_sync_data(self) -> None: - import pyarrow as pa connector = self._setup() sync_table = pa.Table.from_batches( From c6db94a5ce91b953a9c8273a955db54a4f046208 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 01:12:35 +0000 Subject: [PATCH 14/19] test(postgresql): add async integration tests (PLT-1453) Co-Authored-By: Claude Sonnet 4.6 --- .../test_postgresql_connector_integration.py | 151 ++++++++++++++++++ 1 file changed, 151 insertions(+) diff --git a/tests/test_databases/test_postgresql_connector_integration.py b/tests/test_databases/test_postgresql_connector_integration.py index 42301eb3..235e009f 100644 --- a/tests/test_databases/test_postgresql_connector_integration.py +++ b/tests/test_databases/test_postgresql_connector_integration.py @@ -20,6 +20,7 @@ """ from __future__ import annotations +import asyncio import os import uuid from collections.abc import Iterator @@ -440,3 +441,153 @@ def test_schema_mismatch_raises(self, db: ConnectorArrowDatabase) -> None: db.add_record(("fn", "mismatch"), record_id="r2", record=r2) with pytest.raises(ValueError, match="Schema mismatch"): db.flush() + + +# --------------------------------------------------------------------------- +# Async lifecycle +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncLifecycle: + @pytest.mark.asyncio + async def test_aenter_opens_async_conn(self, connector: PostgreSQLConnector) -> None: + assert connector._async_conn is None + async with connector: + assert connector._async_conn is not None + + @pytest.mark.asyncio + async def test_async_close_idempotent(self, connector: PostgreSQLConnector) -> None: + async with connector: + pass + await connector.async_close() # second call must not raise + + def test_require_async_open_raises_outside_context( + self, connector: PostgreSQLConnector + ) -> None: + with pytest.raises(RuntimeError, match="async context manager"): + connector._require_async_open() + + +# --------------------------------------------------------------------------- +# Async schema introspection +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncSchemaIntrospection: + @pytest.mark.asyncio + async def test_async_get_table_names_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t1" (id INTEGER PRIMARY KEY)') + cur.execute('CREATE TABLE "t2" (id INTEGER PRIMARY KEY)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_table_names() + sync_result = connector.get_table_names() + assert async_result == sync_result + + @pytest.mark.asyncio + async def test_async_get_pk_columns_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t" (id TEXT PRIMARY KEY, val REAL)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_pk_columns("t") + sync_result = connector.get_pk_columns("t") + assert async_result == sync_result + + @pytest.mark.asyncio + async def test_async_get_column_info_matches_sync( + self, connector: PostgreSQLConnector + ) -> None: + with connector._conn.cursor() as cur: + cur.execute('CREATE TABLE "t" (id TEXT NOT NULL, val REAL)') + connector._conn.commit() + async with connector: + async_result = await connector.async_get_column_info("t") + sync_result = connector.get_column_info("t") + assert async_result == sync_result + + +# --------------------------------------------------------------------------- +# Async iter_batches +# --------------------------------------------------------------------------- + + +@pytest.mark.postgres +class TestAsyncIterBatches: + def _setup_data(self, connector: PostgreSQLConnector) -> None: + cols = [ + ColumnInfo("id", pa.large_string(), nullable=False), + ColumnInfo("score", pa.float64(), nullable=True), + ] + connector.create_table_if_not_exists("data", cols, "id") + records = pa.table({ + "id": pa.array(["a", "b", "c"], type=pa.large_string()), + "score": pa.array([1.5, 2.5, 3.5], type=pa.float64()), + }) + connector.upsert_records("data", records, "id") + + @pytest.mark.asyncio + async def test_returns_all_rows(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + assert sum(b.num_rows for b in batches) == 3 + + @pytest.mark.asyncio + async def test_correct_arrow_types(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + table = pa.Table.from_batches(batches) + assert table.schema.field("id").type == pa.large_string() + assert table.schema.field("score").type == pa.float64() + + @pytest.mark.asyncio + async def test_batch_size_respected(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "data"', batch_size=2 + ) + ] + assert len(batches) == 2 + assert batches[0].num_rows == 2 + assert batches[1].num_rows == 1 + + @pytest.mark.asyncio + async def test_empty_result(self, connector: PostgreSQLConnector) -> None: + self._setup_data(connector) + async with connector: + batches = [ + b async for b in connector.async_iter_batches( + 'SELECT * FROM "data" WHERE 1=0' + ) + ] + assert batches == [] + + @pytest.mark.asyncio + async def test_early_abandonment_closes_cursor( + self, connector: PostgreSQLConnector + ) -> None: + self._setup_data(connector) + async with connector: + gen = connector.async_iter_batches('SELECT * FROM "data"', batch_size=1) + await gen.__anext__() # consume one batch + await gen.aclose() # abandon — must not leak server-side cursor + # Subsequent queries must still work + batches = [ + b async for b in connector.async_iter_batches('SELECT * FROM "data"') + ] + assert sum(b.num_rows for b in batches) == 3 From 6ae4d9724ffb9474e2dfed4fe244f1e5719996b2 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 01:18:25 +0000 Subject: [PATCH 15/19] refactor(test): remove unused asyncio import in pg integration tests (PLT-1453) --- tests/test_databases/test_postgresql_connector_integration.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_databases/test_postgresql_connector_integration.py b/tests/test_databases/test_postgresql_connector_integration.py index 235e009f..aa5a5cf5 100644 --- a/tests/test_databases/test_postgresql_connector_integration.py +++ b/tests/test_databases/test_postgresql_connector_integration.py @@ -20,7 +20,6 @@ """ from __future__ import annotations -import asyncio import os import uuid from collections.abc import Iterator From 182a3c87e236207d17ac80e84e67d6702bb40a5b Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 02:11:11 +0000 Subject: [PATCH 16/19] fix(test): compute sync results before entering async context manager async_close() closes both async and sync connections, so sync schema methods called after 'async with connector:' exits would raise. Move the three get_table_names/get_pk_columns/get_column_info sync calls to before the async context in the integration test suite. Also update the Protocol signature snippet in the spec and plan docs: async_iter_batches is declared as 'def' (not 'async def') in the Protocol body so that async-generator concrete implementations satisfy it statically. Co-Authored-By: Claude Sonnet 4.6 --- superpowers/plans/2026-05-23-async-db-connector-protocol.md | 2 +- .../specs/2026-05-23-async-db-connector-protocol-design.md | 2 +- .../test_databases/test_postgresql_connector_integration.py | 6 +++--- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/superpowers/plans/2026-05-23-async-db-connector-protocol.md b/superpowers/plans/2026-05-23-async-db-connector-protocol.md index bbaddfe5..9c6d4a94 100644 --- a/superpowers/plans/2026-05-23-async-db-connector-protocol.md +++ b/superpowers/plans/2026-05-23-async-db-connector-protocol.md @@ -123,7 +123,7 @@ class AsyncDBConnectorProtocol(Protocol): # ── Read ────────────────────────────────────────────────────────────────── - async def async_iter_batches( + def async_iter_batches( self, query: str, params: Any = None, diff --git a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md index 85af6142..25513b13 100644 --- a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md +++ b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md @@ -72,7 +72,7 @@ Python's structural typing handles this naturally. Callers that need both can ch `async_iter_batches` signature mirrors `iter_batches` exactly: ```python -async def async_iter_batches( +def async_iter_batches( self, query: str, params: Any = None, diff --git a/tests/test_databases/test_postgresql_connector_integration.py b/tests/test_databases/test_postgresql_connector_integration.py index aa5a5cf5..9b09b651 100644 --- a/tests/test_databases/test_postgresql_connector_integration.py +++ b/tests/test_databases/test_postgresql_connector_integration.py @@ -483,9 +483,9 @@ async def test_async_get_table_names_matches_sync( cur.execute('CREATE TABLE "t1" (id INTEGER PRIMARY KEY)') cur.execute('CREATE TABLE "t2" (id INTEGER PRIMARY KEY)') connector._conn.commit() + sync_result = connector.get_table_names() async with connector: async_result = await connector.async_get_table_names() - sync_result = connector.get_table_names() assert async_result == sync_result @pytest.mark.asyncio @@ -495,9 +495,9 @@ async def test_async_get_pk_columns_matches_sync( with connector._conn.cursor() as cur: cur.execute('CREATE TABLE "t" (id TEXT PRIMARY KEY, val REAL)') connector._conn.commit() + sync_result = connector.get_pk_columns("t") async with connector: async_result = await connector.async_get_pk_columns("t") - sync_result = connector.get_pk_columns("t") assert async_result == sync_result @pytest.mark.asyncio @@ -507,9 +507,9 @@ async def test_async_get_column_info_matches_sync( with connector._conn.cursor() as cur: cur.execute('CREATE TABLE "t" (id TEXT NOT NULL, val REAL)') connector._conn.commit() + sync_result = connector.get_column_info("t") async with connector: async_result = await connector.async_get_column_info("t") - sync_result = connector.get_column_info("t") assert async_result == sync_result From 3e8445a5b863955850376f271e435ccab48c41e4 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 02:27:56 +0000 Subject: [PATCH 17/19] docs(spec): clarify async_iter_batches materialises full result before first yield The SQLite async_iter_batches section said batches are "yielded lazily" but the implementation collects list(self.iter_batches(...)) in a worker thread before yielding any batch. Update wording to reflect that the full result set is materialised before the first yield and that batch_size does not enable true streaming. Co-Authored-By: Claude Sonnet 4.6 --- .../specs/2026-05-23-async-db-connector-protocol-design.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md index 25513b13..b3859fe6 100644 --- a/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md +++ b/superpowers/specs/2026-05-23-async-db-connector-protocol-design.md @@ -234,7 +234,9 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: ### `async_iter_batches` The entire sync iteration runs in the thread pool (blocking I/O stays off the event loop), -then batches are yielded lazily from the async generator: +collecting all batches into a list before returning to the event loop. The full result set +is materialised before the first yield — ``batch_size`` controls individual batch sizes +but does not enable true streaming: ```python async def async_iter_batches(self, query, params=None, batch_size=1000): From c0daf87163313156ddd04a78283541dde4b5820d Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Tue, 26 May 2026 02:42:34 +0000 Subject: [PATCH 18/19] fix(postgresql): avoid blocking event loop in async_close and async_iter_batches async_close() was calling self.close() directly, which acquires a threading.RLock and performs blocking psycopg3 network I/O on the event loop thread. Fixed by using await asyncio.to_thread(self.close) so the sync shutdown runs in a worker thread. async_iter_batches() was wrapping the _dsn read and _require_open() guard in `with self._lock:`, which could stall the event loop if a sync thread held the lock. _dsn is immutable after __init__ and _require_open() is a simple null-check, so neither needs lock protection from async context. Removed the lock and annotated _dsn read accordingly. Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/postgresql_connector.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index dbc18c22..b02297ce 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -12,6 +12,7 @@ """ from __future__ import annotations +import asyncio import itertools import logging import re @@ -562,7 +563,7 @@ async def async_close(self) -> None: if self._async_conn is not None: await self._async_conn.close() self._async_conn = None - self.close() + await asyncio.to_thread(self.close) # ── Async schema introspection ──────────────────────────────────────────── @@ -662,10 +663,8 @@ async def async_iter_batches( import pyarrow as _pa self._require_async_open() # guard: raise if context manager not entered - - with self._lock: - self._require_open() # guard: raise if sync connector is closed - dsn = self._dsn + self._require_open() # guard: raise if sync connector is closed + dsn = self._dsn # immutable after __init__; no lock needed read_conn = await psycopg.AsyncConnection.connect(dsn, autocommit=False) cursor_name = f"orcapod_{next(self._cursor_seq)}" From 9d3ddbc470fd671c37c16f6ef6f0ec1fa4ef1485 Mon Sep 17 00:00:00 2001 From: "agent-kurodo[bot]" <268466204+agent-kurodo[bot]@users.noreply.github.com> Date: Wed, 27 May 2026 03:55:48 +0000 Subject: [PATCH 19/19] =?UTF-8?q?refactor(connectors):=20address=20PR=20re?= =?UTF-8?q?view=20=E2=80=94=20consolidate=20SQL/query=20logic,=20fix=20exp?= =?UTF-8?q?orts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Extract _SQL_TABLE_NAMES, _SQL_PK_COLUMNS, _SQL_COLUMN_INFO module-level constants; sync and async schema methods now share a single copy of each SQL string (no duplication) - Extract _parse_table_from_query() helper from the two _resolve_column_type_lookup functions, which were identical except for the final DB call; each lookup function is now ~3 lines - Remove AsyncDBConnectorProtocol from orcapod.databases — protocols belong in orcapod.protocols (eywalker review) - Add DBConnectorProtocol to orcapod.protocols.__init__ alongside AsyncDBConnectorProtocol for a consistent export surface (Copilot review) - Wrap each async_iter_batches cleanup step (cur.close, rollback, read_conn.close) in its own try/except so one failure does not prevent the others from running Co-Authored-By: Claude Sonnet 4.6 --- src/orcapod/databases/__init__.py | 2 - src/orcapod/databases/postgresql_connector.py | 203 ++++++++---------- src/orcapod/protocols/__init__.py | 2 + 3 files changed, 88 insertions(+), 119 deletions(-) diff --git a/src/orcapod/databases/__init__.py b/src/orcapod/databases/__init__.py index adc4428c..15f95515 100644 --- a/src/orcapod/databases/__init__.py +++ b/src/orcapod/databases/__init__.py @@ -5,10 +5,8 @@ from .spiraldb_connector import SpiralDBConnector from .sqlite_connector import SQLiteConnector from .postgresql_connector import PostgreSQLConnector -from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol __all__ = [ - "AsyncDBConnectorProtocol", "ConnectorArrowDatabase", "DeltaTableDatabase", "InMemoryArrowDatabase", diff --git a/src/orcapod/databases/postgresql_connector.py b/src/orcapod/databases/postgresql_connector.py index b02297ce..8974409a 100644 --- a/src/orcapod/databases/postgresql_connector.py +++ b/src/orcapod/databases/postgresql_connector.py @@ -33,6 +33,40 @@ logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# SQL query strings (shared between sync and async methods) +# --------------------------------------------------------------------------- + +_SQL_TABLE_NAMES = """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = current_schema() + AND table_type = 'BASE TABLE' + ORDER BY table_name +""" + +_SQL_PK_COLUMNS = """ + SELECT kcu.column_name + FROM information_schema.key_column_usage kcu + JOIN information_schema.table_constraints tc + ON kcu.constraint_name = tc.constraint_name + AND kcu.table_schema = tc.table_schema + AND kcu.table_name = tc.table_name + WHERE tc.constraint_type = 'PRIMARY KEY' + AND kcu.table_schema = current_schema() + AND kcu.table_name = %s + ORDER BY kcu.ordinal_position +""" + +_SQL_COLUMN_INFO = """ + SELECT column_name, data_type, udt_name, is_nullable + FROM information_schema.columns + WHERE table_schema = current_schema() + AND table_name = %s + ORDER BY ordinal_position +""" + + # --------------------------------------------------------------------------- # Module-level helpers (pure functions, no I/O) # These are kept module-level so AsyncPostgreSQLConnector (future) can reuse @@ -151,31 +185,22 @@ def _arrow_type_to_pg_sql(arrow_type: pa.DataType) -> str: return "TEXT" -def _resolve_column_type_lookup( - query: str, - connector: "PostgreSQLConnector", -) -> dict[str, pa.DataType]: - """Parse the FROM clause of query to find the source table, then return - a column-name → Arrow-type dict from get_column_info. +def _parse_table_from_query(query: str) -> str | None: + """Extract the single unambiguous source table name from a SELECT query. - Returns an empty dict if no single unambiguous table can be identified, - causing iter_batches to fall back to pa.large_string() for all columns. + Returns ``None`` when the query references more than one table (JOINs, + comma-separated tables, subqueries, CTEs) so callers fall back to treating + all columns as ``pa.large_string()``. Args: query: SQL query string. - connector: The connector to call get_column_info on. Returns: - Dict mapping column name to Arrow DataType. + The table name string, or ``None`` if no single table can be identified. """ - # Be conservative: only resolve types when we can unambiguously identify - # a single source table. For multi-table queries (JOINs, comma-separated - # tables, multiple FROM clauses, subqueries, etc.) return {} so callers - # fall back to treating all columns as pa.large_string(). - # Fast path: any JOIN keyword means multi-table. if re.search(r"\bJOIN\b", query, re.IGNORECASE): - return {} + return None # Find all FROM occurrences. We only proceed when there is # exactly one (multiple FROMs indicate subqueries or CTEs). @@ -185,7 +210,7 @@ def _resolve_column_type_lookup( ) from_matches = list(from_pattern.finditer(query)) if len(from_matches) != 1: - return {} + return None match = from_matches[0] table_name = match.group(1) or match.group(2) @@ -202,57 +227,54 @@ def _resolve_column_type_lookup( from_tail = from_tail[: clause_boundary.start()] if "," in from_tail: - return {} + return None - return {ci.name: ci.arrow_type for ci in connector.get_column_info(table_name)} + return table_name -async def _async_resolve_column_type_lookup( +def _resolve_column_type_lookup( query: str, connector: "PostgreSQLConnector", ) -> dict[str, pa.DataType]: - """Async version of ``_resolve_column_type_lookup``. - - Parses the FROM clause of query to find the source table, then returns a - column-name → Arrow-type dict by calling ``async_get_column_info``. + """Return a column-name → Arrow-type dict for the source table of ``query``. Returns an empty dict if no single unambiguous table can be identified, - causing ``async_iter_batches`` to fall back to ``pa.large_string()`` for - all columns. + causing ``iter_batches`` to fall back to ``pa.large_string()`` for all + columns. Args: query: SQL query string. - connector: The connector to call ``async_get_column_info`` on. + connector: The connector to call ``get_column_info`` on. Returns: Dict mapping column name to Arrow DataType. """ - if re.search(r"\bJOIN\b", query, re.IGNORECASE): + table_name = _parse_table_from_query(query) + if table_name is None: return {} + return {ci.name: ci.arrow_type for ci in connector.get_column_info(table_name)} - from_pattern = re.compile( - r'\bFROM\b\s+(?:"([^"]+)"|(\w+))', - re.IGNORECASE, - ) - from_matches = list(from_pattern.finditer(query)) - if len(from_matches) != 1: - return {} - match = from_matches[0] - table_name = match.group(1) or match.group(2) +async def _async_resolve_column_type_lookup( + query: str, + connector: "PostgreSQLConnector", +) -> dict[str, pa.DataType]: + """Async counterpart to ``_resolve_column_type_lookup``. - from_tail = query[match.end():] - clause_boundary = re.search( - r"\b(WHERE|GROUP\s+BY|ORDER\s+BY|LIMIT|OFFSET|HAVING|UNION|EXCEPT|INTERSECT)\b", - from_tail, - re.IGNORECASE, - ) - if clause_boundary: - from_tail = from_tail[: clause_boundary.start()] + Returns an empty dict if no single unambiguous table can be identified, + causing ``async_iter_batches`` to fall back to ``pa.large_string()`` for + all columns. - if "," in from_tail: - return {} + Args: + query: SQL query string. + connector: The connector to call ``async_get_column_info`` on. + Returns: + Dict mapping column name to Arrow DataType. + """ + table_name = _parse_table_from_query(query) + if table_name is None: + return {} return {ci.name: ci.arrow_type for ci in await connector.async_get_column_info(table_name)} @@ -324,15 +346,7 @@ def get_table_names(self) -> list[str]: with self._lock: conn = self._require_open() with conn.cursor() as cur: - cur.execute( - """ - SELECT table_name - FROM information_schema.tables - WHERE table_schema = current_schema() - AND table_type = 'BASE TABLE' - ORDER BY table_name - """ - ) + cur.execute(_SQL_TABLE_NAMES) return [row[0] for row in cur.fetchall()] def get_pk_columns(self, table_name: str) -> list[str]: @@ -341,21 +355,7 @@ def get_pk_columns(self, table_name: str) -> list[str]: conn = self._require_open() self._validate_table_name(table_name) with conn.cursor() as cur: - cur.execute( - """ - SELECT kcu.column_name - FROM information_schema.key_column_usage kcu - JOIN information_schema.table_constraints tc - ON kcu.constraint_name = tc.constraint_name - AND kcu.table_schema = tc.table_schema - AND kcu.table_name = tc.table_name - WHERE tc.constraint_type = 'PRIMARY KEY' - AND kcu.table_schema = current_schema() - AND kcu.table_name = %s - ORDER BY kcu.ordinal_position - """, - (table_name,), - ) + cur.execute(_SQL_PK_COLUMNS, (table_name,)) return [row[0] for row in cur.fetchall()] def get_column_info(self, table_name: str) -> list[ColumnInfo]: @@ -364,16 +364,7 @@ def get_column_info(self, table_name: str) -> list[ColumnInfo]: conn = self._require_open() self._validate_table_name(table_name) with conn.cursor() as cur: - cur.execute( - """ - SELECT column_name, data_type, udt_name, is_nullable - FROM information_schema.columns - WHERE table_schema = current_schema() - AND table_name = %s - ORDER BY ordinal_position - """, - (table_name,), - ) + cur.execute(_SQL_COLUMN_INFO, (table_name,)) return [ ColumnInfo( name=row[0], @@ -571,15 +562,7 @@ async def async_get_table_names(self) -> list[str]: """Return all user table names in this database (sorted, excludes views).""" conn = self._require_async_open() async with conn.cursor() as cur: - await cur.execute( - """ - SELECT table_name - FROM information_schema.tables - WHERE table_schema = current_schema() - AND table_type = 'BASE TABLE' - ORDER BY table_name - """ - ) + await cur.execute(_SQL_TABLE_NAMES) return [row[0] for row in await cur.fetchall()] async def async_get_pk_columns(self, table_name: str) -> list[str]: @@ -591,21 +574,7 @@ async def async_get_pk_columns(self, table_name: str) -> list[str]: conn = self._require_async_open() self._validate_table_name(table_name) async with conn.cursor() as cur: - await cur.execute( - """ - SELECT kcu.column_name - FROM information_schema.key_column_usage kcu - JOIN information_schema.table_constraints tc - ON kcu.constraint_name = tc.constraint_name - AND kcu.table_schema = tc.table_schema - AND kcu.table_name = tc.table_name - WHERE tc.constraint_type = 'PRIMARY KEY' - AND kcu.table_schema = current_schema() - AND kcu.table_name = %s - ORDER BY kcu.ordinal_position - """, - (table_name,), - ) + await cur.execute(_SQL_PK_COLUMNS, (table_name,)) return [row[0] for row in await cur.fetchall()] async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: @@ -613,16 +582,7 @@ async def async_get_column_info(self, table_name: str) -> list[ColumnInfo]: conn = self._require_async_open() self._validate_table_name(table_name) async with conn.cursor() as cur: - await cur.execute( - """ - SELECT column_name, data_type, udt_name, is_nullable - FROM information_schema.columns - WHERE table_schema = current_schema() - AND table_name = %s - ORDER BY ordinal_position - """, - (table_name,), - ) + await cur.execute(_SQL_COLUMN_INFO, (table_name,)) return [ ColumnInfo( name=row[0], @@ -690,12 +650,21 @@ async def async_iter_batches( yield _pa.RecordBatch.from_arrays(arrays, schema=schema) rows = await cur.fetchmany(batch_size) finally: - await cur.close() + # Each step runs independently so one failure doesn't block the + # others. Note: CancelledError (BaseException subclass) can still + # interrupt an individual await; full shield() protection is deferred. + try: + await cur.close() + except Exception: + pass try: await read_conn.rollback() except Exception: pass - await read_conn.close() + try: + await read_conn.close() + except Exception: + pass # ── Serialization ───────────────────────────────────────────────────────── diff --git a/src/orcapod/protocols/__init__.py b/src/orcapod/protocols/__init__.py index 22e35c06..16380c9f 100644 --- a/src/orcapod/protocols/__init__.py +++ b/src/orcapod/protocols/__init__.py @@ -4,10 +4,12 @@ ) from orcapod.protocols.pipeline_protocols import PipelineProtocol from orcapod.protocols.async_db_connector_protocol import AsyncDBConnectorProtocol +from orcapod.protocols.db_connector_protocol import DBConnectorProtocol __all__ = [ "AsyncDBConnectorProtocol", "DataExecutionLoggerProtocol", + "DBConnectorProtocol", "ExecutionObserverProtocol", "PipelineProtocol", ]