fix(ai): support async with on async streaming responses (Fixes #393)#645
fix(ai): support async with on async streaming responses (Fixes #393)#645turnipdabeets wants to merge 6 commits into
async with on async streaming responses (Fixes #393)#645Conversation
The async AI wrappers returned a bare async generator for streaming
responses. Async generators support `async for` but not `async with`,
so libraries that enter the stream as a context manager (e.g.
pydantic-ai's `async with response:`) raised:
TypeError: 'async_generator' object does not support the
asynchronous context manager protocol
Add `AsyncStreamWrapper`, which wraps the PostHog tracking generator and
adds the async context manager protocol. On `__aexit__` it closes the
tracking generator (so the `finally` block that fires the PostHog usage
event always runs, even on early exit) and then closes the underlying
provider stream to release the HTTP connection, matching native SDK
behaviour. Attribute access is proxied to the provider stream so
metadata such as `.response` keeps working.
Applied to OpenAI chat completions, OpenAI responses, and Anthropic
messages streaming. Adds wrapper unit tests plus real-client regression
tests covering `async with` and early-exit stream closure.
Supersedes #622.
Co-authored-by: devteamaegis <devteam.aegis@gmail.com>
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
posthog-python Compliance ReportDate: 2026-06-03 03:26:51 UTC ✅ All Tests Passed!45/45 tests passed Capture Tests✅ 29/29 tests passed View Details
Feature_Flags Tests✅ 16/16 tests passed View Details
|
mypy flagged stream.py:49 because T was used across __init__ and __anext__ without the class being parameterized, so they resolved to unrelated typevars. Declare the class Generic[T] (it is genuinely generic over the chunk type) and carry [T] through the self-returning annotations. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Prompt To Fix All With AIFix the following 2 code review issues. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 2
posthog/ai/stream.py:58-75
If `self._generator.aclose()` raises — for example because `_capture_streaming_event` inside the tracking generator's `finally` block throws a network error — the provider stream's `close()` is never reached. This leaves the underlying HTTP connection open until GC collects the stream, which can exhaust connection-pool slots under load. Wrapping in `try/finally` guarantees `_stream.close()` always runs regardless of what `aclose()` does.
```suggestion
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool:
# Close our tracking generator first so its `finally` block runs and
# the PostHog usage event is captured, even when the caller breaks out
# of the loop early. If it is already exhausted this is a no-op.
try:
await self._generator.aclose()
finally:
# Then close the underlying provider stream to release the HTTP
# connection, matching native SDK behaviour. Provider streams expose an
# async `close()`; bare async generators (e.g. in tests) expose
# `aclose()`.
if self._stream is not None:
close = getattr(self._stream, "aclose", None) or getattr(
self._stream, "close", None
)
if close is not None:
await close()
return False
```
### Issue 2 of 2
posthog/test/ai/test_async_stream_wrapper.py:8-25
`RecordingStream` here, `_RecordingAsyncStream` in `test_openai.py`, and `_RecordingAnthropicStream` in `test_anthropic.py` are structurally identical — each has `_items`, `closed`, `response = "provider-response"`, `__aiter__`/`__anext__`, and `async def close()`. Moving a single shared fixture (e.g. into a `conftest.py` or a small `posthog/test/ai/utils.py`) would remove the duplication and make any future changes a one-line update.
Reviews (1): Last reviewed commit: "fix(ai): make AsyncStreamWrapper Generic..." | Re-trigger Greptile |
- Wrap the generator close in try/finally so the underlying provider stream is always closed even if the PostHog capture in the generator's finally raises (avoids leaking the HTTP connection). - Extract the duplicated mock stream into a shared posthog/test/ai/utils.py::RecordingAsyncStream used by all three test modules. - Trim verbose comments/docstrings. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
|
Both fixes look correct:
One small note: Tip: You can customize Greptile's behavior for this repo with |
…getattr__ Guard __getattr__ against names starting with '_'. Prevents infinite recursion if the instance is built without __init__ (e.g. copy/pickle) and stops hasattr/copy probes from leaking to the provider stream. Only public metadata like .response is proxied. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Address review findings on #645: - Forward async-generator protocol methods (aclose/asend/athrow) to the tracking generator instead of the provider stream. After pointing __getattr__ at the provider stream (which exposes close(), not aclose()), `await response.aclose()` regressed to AttributeError and dropped the $ai_generation event. Forwarding to the generator restores the pre-wrapper behaviour (aclose runs the finally, fires the event). - Wrap async Gemini streaming (gemini_async.py) in AsyncStreamWrapper — it had the identical `async with` gap. - Pass the provider stream by keyword (stream=response) at all call sites. - Tests: aclose-fires-event, try/finally closes provider stream when the capture raises, exception propagation through __aexit__, payload assertions on the async-with path, and a Gemini async-with regression. - Changeset: note the return-type change and that sync wrappers are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Simplify the changeset to a plain user-facing entry (drop the behavior note). Remove redundant test docstrings whose names already say it. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Prompt To Fix All With AIFix the following 1 code review issue. Work through them one at a time, proposing concise fixes.
---
### Issue 1 of 1
posthog/ai/stream.py:55-62
**`aclose()` bypasses provider-stream cleanup**
When a caller does `await response.aclose()` directly (without `async with`), `__getattr__` forwards to `self._generator.aclose()`, which fires the PostHog capture event in the generator's `finally` block — but `self._stream.close()` is never called. The underlying HTTP connection is left open until GC. The `test_aclose_runs_generator_finally_and_captures` test verifies the event fires but intentionally does not assert `source.closed is True`, confirming this gap. Overriding `aclose()` as a proper method (mirroring `__aexit__`'s `try/finally`) would close the provider stream on any teardown path, not only `async with` exit.
Reviews (2): Last reviewed commit: "chore(ai): simplify changeset, trim comm..." | Re-trigger Greptile |
💡 Motivation and Context
Fixes #393. Supersedes #622 (rebased onto current
main, with @marandaneto's review addressed).The async AI wrappers returned a bare async generator for streaming responses, which supports
async forbut notasync with. Libraries like pydantic-ai consume streams viaasync with response:, so they broke with:This adds an
AsyncStreamWrapperthat supportsasync with(closing the underlying provider stream on exit) while preservingasync forandawait response.aclose(). Applied to the OpenAI, Anthropic, and Gemini async streaming wrappers.Out of scope: Anthropic
messages.stream()(stillasync def) and the syncwithgap — left for a follow-up.💚 How did you test it?
Added unit tests for the wrapper (async with, async for, early-exit capture, provider-stream close,
aclose, exception propagation) and real-client regression tests coveringasync withfor OpenAI chat/responses, Anthropicmessages.create, and Gemini streaming. Confirmed the tests reproduce the #393TypeErrorwithout the fix. Full AI test suite passes;ruffandmypyclean.📝 Checklist
If releasing new changes
sampo addto generate a changeset file