From 24369b01d66e7ee5d62c6f2e5340fbc312c353b1 Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 22:45:50 -0400 Subject: [PATCH 1/6] fix(ai): support async with on streaming responses (Fixes #393) The async AI wrappers returned a bare async generator for streaming responses. Async generators support `async for` but not `async with`, so libraries that enter the stream as a context manager (e.g. pydantic-ai's `async with response:`) raised: TypeError: 'async_generator' object does not support the asynchronous context manager protocol Add `AsyncStreamWrapper`, which wraps the PostHog tracking generator and adds the async context manager protocol. On `__aexit__` it closes the tracking generator (so the `finally` block that fires the PostHog usage event always runs, even on early exit) and then closes the underlying provider stream to release the HTTP connection, matching native SDK behaviour. Attribute access is proxied to the provider stream so metadata such as `.response` keeps working. Applied to OpenAI chat completions, OpenAI responses, and Anthropic messages streaming. Adds wrapper unit tests plus real-client regression tests covering `async with` and early-exit stream closure. Supersedes #622. Co-authored-by: devteamaegis Co-Authored-By: Claude Opus 4.8 (1M context) --- .../async-stream-context-manager.md | 5 + posthog/ai/anthropic/anthropic_async.py | 3 +- posthog/ai/openai/openai_async.py | 5 +- posthog/ai/stream.py | 84 +++++++++++++ posthog/test/ai/anthropic/test_anthropic.py | 91 ++++++++++++++ posthog/test/ai/openai/test_openai.py | 116 ++++++++++++++++++ posthog/test/ai/test_async_stream_wrapper.py | 99 +++++++++++++++ 7 files changed, 400 insertions(+), 3 deletions(-) create mode 100644 .sampo/changesets/async-stream-context-manager.md create mode 100644 posthog/ai/stream.py create mode 100644 posthog/test/ai/test_async_stream_wrapper.py diff --git a/.sampo/changesets/async-stream-context-manager.md b/.sampo/changesets/async-stream-context-manager.md new file mode 100644 index 00000000..05e76e58 --- /dev/null +++ b/.sampo/changesets/async-stream-context-manager.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: patch +--- + +Fix `TypeError: 'async_generator' object does not support the asynchronous context manager protocol` when using the async AI wrappers (`posthog.ai.openai.AsyncOpenAI`, `posthog.ai.anthropic.AsyncAnthropic`) with libraries such as pydantic-ai that consume streaming responses via `async with`. Streaming responses now support both `async for` and `async with`, and exiting the context closes the underlying provider stream. diff --git a/posthog/ai/anthropic/anthropic_async.py b/posthog/ai/anthropic/anthropic_async.py index 9b02e35c..8fd7a5ab 100644 --- a/posthog/ai/anthropic/anthropic_async.py +++ b/posthog/ai/anthropic/anthropic_async.py @@ -11,6 +11,7 @@ from typing import Any, Dict, List, Optional from posthog import setup +from posthog.ai.stream import AsyncStreamWrapper from posthog.ai.types import StreamingContentBlock, TokenUsage, ToolInProgress from posthog.ai.utils import ( call_llm_and_track_usage_async, @@ -225,7 +226,7 @@ async def generator(): stop_reason=stop_reason, ) - return generator() + return AsyncStreamWrapper(generator(), response) async def _capture_streaming_event( self, diff --git a/posthog/ai/openai/openai_async.py b/posthog/ai/openai/openai_async.py index 8e4644ff..d03fc25e 100644 --- a/posthog/ai/openai/openai_async.py +++ b/posthog/ai/openai/openai_async.py @@ -2,6 +2,7 @@ import uuid from typing import Any, Dict, List, Optional +from posthog.ai.stream import AsyncStreamWrapper from posthog.ai.types import TokenUsage try: @@ -221,7 +222,7 @@ async def async_generator(): stop_reason=stop_reason, ) - return async_generator() + return AsyncStreamWrapper(async_generator(), response) async def _capture_streaming_event( self, @@ -515,7 +516,7 @@ async def async_generator(): stop_reason=stop_reason, ) - return async_generator() + return AsyncStreamWrapper(async_generator(), response) async def _capture_streaming_event( self, diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py new file mode 100644 index 00000000..3b7d7dba --- /dev/null +++ b/posthog/ai/stream.py @@ -0,0 +1,84 @@ +"""Shared async streaming utilities for PostHog AI wrappers.""" + +from typing import Any, AsyncGenerator, Optional, TypeVar + +T = TypeVar("T") + + +class AsyncStreamWrapper: + """Wraps an async generator so it also implements the async context manager protocol. + + The OpenAI and Anthropic SDKs return stream objects that support both + ``async for`` iteration **and** ``async with`` (i.e. they are both async + iterators and async context managers). PostHog's streaming wrappers + previously returned a bare async generator, which only supports ``async + for``. Libraries such as pydantic-ai call ``async with response:`` before + iterating, causing:: + + TypeError: 'async_generator' object does not support the + asynchronous context manager protocol + + This class wraps the PostHog tracking generator and adds the missing + ``__aenter__`` / ``__aexit__`` methods. When available, it also keeps a + reference to the original provider stream so that: + + - On ``__aexit__`` the tracking generator is closed (so the ``finally`` + block that fires the PostHog usage event always runs, even on early + exit) **and** the underlying provider stream is closed (releasing the + HTTP connection, matching the native SDK behaviour). + - Attribute access not handled here is proxied to the provider stream, so + provider-specific metadata such as ``.response`` keeps working. + """ + + def __init__( + self, + generator: AsyncGenerator[T, None], + stream: Optional[Any] = None, + ) -> None: + self._generator = generator + self._stream = stream + + # ------------------------------------------------------------------ # + # Async iterator protocol # + # ------------------------------------------------------------------ # + + def __aiter__(self) -> "AsyncStreamWrapper": + return self + + async def __anext__(self) -> T: + return await self._generator.__anext__() + + # ------------------------------------------------------------------ # + # Async context manager protocol # + # ------------------------------------------------------------------ # + + async def __aenter__(self) -> "AsyncStreamWrapper": + return self + + async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: + # Close our tracking generator first so its `finally` block runs and + # the PostHog usage event is captured, even when the caller breaks out + # of the loop early. If it is already exhausted this is a no-op. + await self._generator.aclose() + + # Then close the underlying provider stream to release the HTTP + # connection, matching native SDK behaviour. Provider streams expose an + # async `close()`; bare async generators (e.g. in tests) expose + # `aclose()`. + if self._stream is not None: + close = getattr(self._stream, "aclose", None) or getattr( + self._stream, "close", None + ) + if close is not None: + await close() + + return False + + # ------------------------------------------------------------------ # + # Attribute proxy – forward any other attribute access to the # + # underlying provider stream (e.g. `.response`) when available. # + # ------------------------------------------------------------------ # + + def __getattr__(self, name: str) -> Any: + target = self._stream if self._stream is not None else self._generator + return getattr(target, name) diff --git a/posthog/test/ai/anthropic/test_anthropic.py b/posthog/test/ai/anthropic/test_anthropic.py index ee7aa640..9d7ef836 100644 --- a/posthog/test/ai/anthropic/test_anthropic.py +++ b/posthog/test/ai/anthropic/test_anthropic.py @@ -1421,3 +1421,94 @@ def test_integration_stop_reason(mock_client): assert props["$ai_stop_reason"] in ("end_turn", "max_tokens") assert props["$ai_provider"] == "anthropic" assert props["$ai_input_tokens"] > 0 + + +class _RecordingAnthropicStream: + """Mock Anthropic async stream that is iterable and records when closed.""" + + def __init__(self, events): + self._events = list(events) + self.closed = False + self.response = "provider-response" + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._events: + raise StopAsyncIteration + return self._events.pop(0) + + async def close(self): + self.closed = True + + +def _anthropic_stream_events(): + final = MockStreamEvent("message_delta") + final.usage = MockUsage( + input_tokens=10, + output_tokens=5, + cache_read_input_tokens=0, + cache_creation_input_tokens=0, + ) + return [ + MockStreamEvent("message_start"), + MockStreamEvent("content_block_delta", text="Hi"), + final, + ] + + +@pytest.mark.asyncio +async def test_async_messages_create_streaming_supports_async_with(mock_client): + """Regression test for #393: messages.create(stream=True) must support + `async with`.""" + + async def mock_async_create(**kwargs): + return _RecordingAnthropicStream(_anthropic_stream_events()) + + with patch( + "anthropic.resources.messages.AsyncMessages.create", + side_effect=mock_async_create, + ): + client = AsyncAnthropic(posthog_client=mock_client) + response = await client.messages.create( + model="claude-3-opus-20240229", + messages=[{"role": "user", "content": "Foo"}], + stream=True, + max_tokens=1, + ) + + async with response as stream: + events = [event async for event in stream] + + assert len(events) == 3 + assert mock_client.capture.call_count == 1 + + +@pytest.mark.asyncio +async def test_async_messages_streaming_early_exit_closes_provider_stream(mock_client): + """Breaking out early must close the underlying Anthropic stream and still + capture the event.""" + source = _RecordingAnthropicStream(_anthropic_stream_events()) + + async def mock_async_create(**kwargs): + return source + + with patch( + "anthropic.resources.messages.AsyncMessages.create", + side_effect=mock_async_create, + ): + client = AsyncAnthropic(posthog_client=mock_client) + response = await client.messages.create( + model="claude-3-opus-20240229", + messages=[{"role": "user", "content": "Foo"}], + stream=True, + max_tokens=1, + ) + + async with response as stream: + async for _ in stream: + break # early exit + + assert source.closed is True + assert mock_client.capture.call_count == 1 diff --git a/posthog/test/ai/openai/test_openai.py b/posthog/test/ai/openai/test_openai.py index 7049fb40..d0b83a23 100644 --- a/posthog/test/ai/openai/test_openai.py +++ b/posthog/test/ai/openai/test_openai.py @@ -2352,3 +2352,119 @@ def test_integration_stop_reason(mock_client): assert props["$ai_stop_reason"] in ("stop", "length") assert props["$ai_provider"] == "openai" assert props["$ai_input_tokens"] > 0 + + +class _RecordingAsyncStream: + """Mock OpenAI async stream that is iterable and records when closed. + + Mirrors the real ``openai.AsyncStream``: it supports ``async for`` and + exposes an async ``close()`` plus a ``response`` attribute. + """ + + def __init__(self, chunks): + self._chunks = list(chunks) + self.closed = False + self.response = "provider-response" + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._chunks: + raise StopAsyncIteration + return self._chunks.pop(0) + + async def close(self): + self.closed = True + + +@pytest.mark.asyncio +async def test_async_chat_streaming_supports_async_with( + mock_client, streaming_tool_call_chunks +): + """Regression test for #393: chat completions stream=True must support + `async with` (the protocol pydantic-ai relies on).""" + + async def mock_create(self, **kwargs): + return _RecordingAsyncStream(streaming_tool_call_chunks) + + with patch( + "openai.resources.chat.completions.AsyncCompletions.create", new=mock_create + ): + client = AsyncOpenAI(api_key="test-key", posthog_client=mock_client) + + response = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Hi"}], + stream=True, + posthog_distinct_id="test-id", + ) + + chunks = [] + async with response as stream: + async for chunk in stream: + chunks.append(chunk) + + assert chunks == streaming_tool_call_chunks + assert mock_client.capture.call_count == 1 + + +@pytest.mark.asyncio +async def test_async_responses_streaming_supports_async_with(mock_client): + """Regression test for #393: responses stream=True must support + `async with`.""" + from unittest.mock import MagicMock + + chunk = MagicMock() + chunk.type = "response.text.delta" + chunk.text = "hello" + + async def mock_create(self, **kwargs): + return _RecordingAsyncStream([chunk]) + + with patch("openai.resources.responses.AsyncResponses.create", new=mock_create): + client = AsyncOpenAI(api_key="test-key", posthog_client=mock_client) + + response = await client.responses.create( + model="gpt-4o-mini", + input=[{"role": "user", "content": "Hi"}], + stream=True, + posthog_distinct_id="test-id", + ) + + async with response as stream: + received = [c async for c in stream] + + assert received == [chunk] + assert mock_client.capture.call_count == 1 + + +@pytest.mark.asyncio +async def test_async_chat_streaming_early_exit_closes_provider_stream( + mock_client, streaming_tool_call_chunks +): + """Breaking out of the stream early must close the underlying provider + stream (release the HTTP connection) and still capture the event.""" + source = _RecordingAsyncStream(streaming_tool_call_chunks) + + async def mock_create(self, **kwargs): + return source + + with patch( + "openai.resources.chat.completions.AsyncCompletions.create", new=mock_create + ): + client = AsyncOpenAI(api_key="test-key", posthog_client=mock_client) + + response = await client.chat.completions.create( + model="gpt-4", + messages=[{"role": "user", "content": "Hi"}], + stream=True, + posthog_distinct_id="test-id", + ) + + async with response as stream: + async for _ in stream: + break # early exit + + assert source.closed is True + assert mock_client.capture.call_count == 1 diff --git a/posthog/test/ai/test_async_stream_wrapper.py b/posthog/test/ai/test_async_stream_wrapper.py new file mode 100644 index 00000000..1b7b6395 --- /dev/null +++ b/posthog/test/ai/test_async_stream_wrapper.py @@ -0,0 +1,99 @@ +"""Unit tests for AsyncStreamWrapper (no external SDKs required).""" + +import pytest + +from posthog.ai.stream import AsyncStreamWrapper + + +class RecordingStream: + """Minimal async-iterable provider stream that records when it is closed.""" + + def __init__(self, items): + self._items = list(items) + self.closed = False + self.response = "provider-response" # provider-specific metadata + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._items: + raise StopAsyncIteration + return self._items.pop(0) + + async def close(self): + self.closed = True + + +@pytest.mark.asyncio +async def test_async_for_iteration_still_works(): + async def gen(): + yield 1 + yield 2 + yield 3 + + wrapper = AsyncStreamWrapper(gen()) + assert [item async for item in wrapper] == [1, 2, 3] + + +@pytest.mark.asyncio +async def test_async_with_yields_self_and_iterates(): + async def gen(): + yield "a" + yield "b" + + wrapper = AsyncStreamWrapper(gen()) + async with wrapper as stream: + assert stream is wrapper + assert [item async for item in stream] == ["a", "b"] + + +@pytest.mark.asyncio +@pytest.mark.parametrize("consume_all", [False, True]) +async def test_finally_block_runs_on_exit(consume_all): + """The generator's finally block must run on context exit, whether the + caller exhausts the stream or breaks out of it early.""" + captured = [] + + async def gen(): + try: + yield 1 + yield 2 + yield 3 + finally: + captured.append("done") + + async with AsyncStreamWrapper(gen()) as stream: + async for item in stream: + if not consume_all and item == 1: + break + + assert captured == ["done"] + + +@pytest.mark.asyncio +async def test_exit_closes_underlying_provider_stream(): + source = RecordingStream([1, 2, 3]) + + async def gen(): + async for item in source: + yield item + + async with AsyncStreamWrapper(gen(), source) as stream: + async for _ in stream: + break # early exit + + assert source.closed is True + + +@pytest.mark.asyncio +async def test_getattr_proxies_to_provider_stream(): + source = RecordingStream([]) + + async def gen(): + if False: + yield # make this an async generator + + wrapper = AsyncStreamWrapper(gen(), source) + # `.response` lives on the provider stream, not the generator. + assert wrapper.response == "provider-response" From cec962f3e8f8d16305c757f7cfc3f6988107a50b Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 22:49:53 -0400 Subject: [PATCH 2/6] fix(ai): make AsyncStreamWrapper Generic[T] to satisfy mypy mypy flagged stream.py:49 because T was used across __init__ and __anext__ without the class being parameterized, so they resolved to unrelated typevars. Declare the class Generic[T] (it is genuinely generic over the chunk type) and carry [T] through the self-returning annotations. Co-Authored-By: Claude Opus 4.8 (1M context) --- posthog/ai/stream.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py index 3b7d7dba..177c962d 100644 --- a/posthog/ai/stream.py +++ b/posthog/ai/stream.py @@ -1,11 +1,11 @@ """Shared async streaming utilities for PostHog AI wrappers.""" -from typing import Any, AsyncGenerator, Optional, TypeVar +from typing import Any, AsyncGenerator, Generic, Optional, TypeVar T = TypeVar("T") -class AsyncStreamWrapper: +class AsyncStreamWrapper(Generic[T]): """Wraps an async generator so it also implements the async context manager protocol. The OpenAI and Anthropic SDKs return stream objects that support both @@ -42,7 +42,7 @@ def __init__( # Async iterator protocol # # ------------------------------------------------------------------ # - def __aiter__(self) -> "AsyncStreamWrapper": + def __aiter__(self) -> "AsyncStreamWrapper[T]": return self async def __anext__(self) -> T: @@ -52,7 +52,7 @@ async def __anext__(self) -> T: # Async context manager protocol # # ------------------------------------------------------------------ # - async def __aenter__(self) -> "AsyncStreamWrapper": + async def __aenter__(self) -> "AsyncStreamWrapper[T]": return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: From a0585a592fbc541287bd6b3ce1c8ddcd83272d95 Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 22:56:40 -0400 Subject: [PATCH 3/6] refactor(ai): address review on async stream wrapper - Wrap the generator close in try/finally so the underlying provider stream is always closed even if the PostHog capture in the generator's finally raises (avoids leaking the HTTP connection). - Extract the duplicated mock stream into a shared posthog/test/ai/utils.py::RecordingAsyncStream used by all three test modules. - Trim verbose comments/docstrings. Co-Authored-By: Claude Opus 4.8 (1M context) --- posthog/ai/stream.py | 70 ++++++-------------- posthog/test/ai/anthropic/test_anthropic.py | 27 ++------ posthog/test/ai/openai/test_openai.py | 33 ++------- posthog/test/ai/test_async_stream_wrapper.py | 28 ++------ posthog/test/ai/utils.py | 27 ++++++++ 5 files changed, 60 insertions(+), 125 deletions(-) create mode 100644 posthog/test/ai/utils.py diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py index 177c962d..2e5e6a54 100644 --- a/posthog/ai/stream.py +++ b/posthog/ai/stream.py @@ -6,28 +6,14 @@ class AsyncStreamWrapper(Generic[T]): - """Wraps an async generator so it also implements the async context manager protocol. - - The OpenAI and Anthropic SDKs return stream objects that support both - ``async for`` iteration **and** ``async with`` (i.e. they are both async - iterators and async context managers). PostHog's streaming wrappers - previously returned a bare async generator, which only supports ``async - for``. Libraries such as pydantic-ai call ``async with response:`` before - iterating, causing:: - - TypeError: 'async_generator' object does not support the - asynchronous context manager protocol - - This class wraps the PostHog tracking generator and adds the missing - ``__aenter__`` / ``__aexit__`` methods. When available, it also keeps a - reference to the original provider stream so that: - - - On ``__aexit__`` the tracking generator is closed (so the ``finally`` - block that fires the PostHog usage event always runs, even on early - exit) **and** the underlying provider stream is closed (releasing the - HTTP connection, matching the native SDK behaviour). - - Attribute access not handled here is proxied to the provider stream, so - provider-specific metadata such as ``.response`` keeps working. + """Adds the async context manager protocol to a PostHog streaming generator. + + The OpenAI and Anthropic SDK streams support both ``async for`` and + ``async with``. PostHog's wrappers returned a bare async generator, which + only supports ``async for``, so ``async with response:`` (used by + pydantic-ai) raised a TypeError. This wraps the tracking generator and, + when given the original provider stream, closes it and proxies attribute + access (e.g. ``.response``) to it. """ def __init__( @@ -38,47 +24,31 @@ def __init__( self._generator = generator self._stream = stream - # ------------------------------------------------------------------ # - # Async iterator protocol # - # ------------------------------------------------------------------ # - def __aiter__(self) -> "AsyncStreamWrapper[T]": return self async def __anext__(self) -> T: return await self._generator.__anext__() - # ------------------------------------------------------------------ # - # Async context manager protocol # - # ------------------------------------------------------------------ # - async def __aenter__(self) -> "AsyncStreamWrapper[T]": return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: - # Close our tracking generator first so its `finally` block runs and - # the PostHog usage event is captured, even when the caller breaks out - # of the loop early. If it is already exhausted this is a no-op. - await self._generator.aclose() - - # Then close the underlying provider stream to release the HTTP - # connection, matching native SDK behaviour. Provider streams expose an - # async `close()`; bare async generators (e.g. in tests) expose - # `aclose()`. - if self._stream is not None: - close = getattr(self._stream, "aclose", None) or getattr( - self._stream, "close", None - ) - if close is not None: - await close() + # Close the tracking generator first so its `finally` block captures the + # PostHog event, even on early exit. try/finally guarantees the provider + # stream is still closed if that capture raises. + try: + await self._generator.aclose() + finally: + if self._stream is not None: + close = getattr(self._stream, "aclose", None) or getattr( + self._stream, "close", None + ) + if close is not None: + await close() return False - # ------------------------------------------------------------------ # - # Attribute proxy – forward any other attribute access to the # - # underlying provider stream (e.g. `.response`) when available. # - # ------------------------------------------------------------------ # - def __getattr__(self, name: str) -> Any: target = self._stream if self._stream is not None else self._generator return getattr(target, name) diff --git a/posthog/test/ai/anthropic/test_anthropic.py b/posthog/test/ai/anthropic/test_anthropic.py index 9d7ef836..a2dd451b 100644 --- a/posthog/test/ai/anthropic/test_anthropic.py +++ b/posthog/test/ai/anthropic/test_anthropic.py @@ -10,6 +10,7 @@ from anthropic.types import Message, Usage from posthog.ai.anthropic import Anthropic, AsyncAnthropic + from posthog.test.ai.utils import RecordingAsyncStream ANTHROPIC_AVAILABLE = True except ImportError: @@ -1423,26 +1424,6 @@ def test_integration_stop_reason(mock_client): assert props["$ai_input_tokens"] > 0 -class _RecordingAnthropicStream: - """Mock Anthropic async stream that is iterable and records when closed.""" - - def __init__(self, events): - self._events = list(events) - self.closed = False - self.response = "provider-response" - - def __aiter__(self): - return self - - async def __anext__(self): - if not self._events: - raise StopAsyncIteration - return self._events.pop(0) - - async def close(self): - self.closed = True - - def _anthropic_stream_events(): final = MockStreamEvent("message_delta") final.usage = MockUsage( @@ -1464,7 +1445,7 @@ async def test_async_messages_create_streaming_supports_async_with(mock_client): `async with`.""" async def mock_async_create(**kwargs): - return _RecordingAnthropicStream(_anthropic_stream_events()) + return RecordingAsyncStream(_anthropic_stream_events()) with patch( "anthropic.resources.messages.AsyncMessages.create", @@ -1489,7 +1470,7 @@ async def mock_async_create(**kwargs): async def test_async_messages_streaming_early_exit_closes_provider_stream(mock_client): """Breaking out early must close the underlying Anthropic stream and still capture the event.""" - source = _RecordingAnthropicStream(_anthropic_stream_events()) + source = RecordingAsyncStream(_anthropic_stream_events()) async def mock_async_create(**kwargs): return source @@ -1508,7 +1489,7 @@ async def mock_async_create(**kwargs): async with response as stream: async for _ in stream: - break # early exit + break assert source.closed is True assert mock_client.capture.call_count == 1 diff --git a/posthog/test/ai/openai/test_openai.py b/posthog/test/ai/openai/test_openai.py index d0b83a23..107668c5 100644 --- a/posthog/test/ai/openai/test_openai.py +++ b/posthog/test/ai/openai/test_openai.py @@ -39,6 +39,7 @@ from posthog.ai.openai import OpenAI from posthog.ai.openai.openai_async import AsyncOpenAI from posthog.ai.openai.wrapper_utils import reset_fallback_warnings + from posthog.test.ai.utils import RecordingAsyncStream OPENAI_AVAILABLE = True except ImportError: @@ -2354,30 +2355,6 @@ def test_integration_stop_reason(mock_client): assert props["$ai_input_tokens"] > 0 -class _RecordingAsyncStream: - """Mock OpenAI async stream that is iterable and records when closed. - - Mirrors the real ``openai.AsyncStream``: it supports ``async for`` and - exposes an async ``close()`` plus a ``response`` attribute. - """ - - def __init__(self, chunks): - self._chunks = list(chunks) - self.closed = False - self.response = "provider-response" - - def __aiter__(self): - return self - - async def __anext__(self): - if not self._chunks: - raise StopAsyncIteration - return self._chunks.pop(0) - - async def close(self): - self.closed = True - - @pytest.mark.asyncio async def test_async_chat_streaming_supports_async_with( mock_client, streaming_tool_call_chunks @@ -2386,7 +2363,7 @@ async def test_async_chat_streaming_supports_async_with( `async with` (the protocol pydantic-ai relies on).""" async def mock_create(self, **kwargs): - return _RecordingAsyncStream(streaming_tool_call_chunks) + return RecordingAsyncStream(streaming_tool_call_chunks) with patch( "openai.resources.chat.completions.AsyncCompletions.create", new=mock_create @@ -2420,7 +2397,7 @@ async def test_async_responses_streaming_supports_async_with(mock_client): chunk.text = "hello" async def mock_create(self, **kwargs): - return _RecordingAsyncStream([chunk]) + return RecordingAsyncStream([chunk]) with patch("openai.resources.responses.AsyncResponses.create", new=mock_create): client = AsyncOpenAI(api_key="test-key", posthog_client=mock_client) @@ -2445,7 +2422,7 @@ async def test_async_chat_streaming_early_exit_closes_provider_stream( ): """Breaking out of the stream early must close the underlying provider stream (release the HTTP connection) and still capture the event.""" - source = _RecordingAsyncStream(streaming_tool_call_chunks) + source = RecordingAsyncStream(streaming_tool_call_chunks) async def mock_create(self, **kwargs): return source @@ -2464,7 +2441,7 @@ async def mock_create(self, **kwargs): async with response as stream: async for _ in stream: - break # early exit + break assert source.closed is True assert mock_client.capture.call_count == 1 diff --git a/posthog/test/ai/test_async_stream_wrapper.py b/posthog/test/ai/test_async_stream_wrapper.py index 1b7b6395..b57a69d2 100644 --- a/posthog/test/ai/test_async_stream_wrapper.py +++ b/posthog/test/ai/test_async_stream_wrapper.py @@ -3,26 +3,7 @@ import pytest from posthog.ai.stream import AsyncStreamWrapper - - -class RecordingStream: - """Minimal async-iterable provider stream that records when it is closed.""" - - def __init__(self, items): - self._items = list(items) - self.closed = False - self.response = "provider-response" # provider-specific metadata - - def __aiter__(self): - return self - - async def __anext__(self): - if not self._items: - raise StopAsyncIteration - return self._items.pop(0) - - async def close(self): - self.closed = True +from posthog.test.ai.utils import RecordingAsyncStream @pytest.mark.asyncio @@ -73,7 +54,7 @@ async def gen(): @pytest.mark.asyncio async def test_exit_closes_underlying_provider_stream(): - source = RecordingStream([1, 2, 3]) + source = RecordingAsyncStream([1, 2, 3]) async def gen(): async for item in source: @@ -81,19 +62,18 @@ async def gen(): async with AsyncStreamWrapper(gen(), source) as stream: async for _ in stream: - break # early exit + break assert source.closed is True @pytest.mark.asyncio async def test_getattr_proxies_to_provider_stream(): - source = RecordingStream([]) + source = RecordingAsyncStream([]) async def gen(): if False: yield # make this an async generator wrapper = AsyncStreamWrapper(gen(), source) - # `.response` lives on the provider stream, not the generator. assert wrapper.response == "provider-response" diff --git a/posthog/test/ai/utils.py b/posthog/test/ai/utils.py new file mode 100644 index 00000000..99fc3f02 --- /dev/null +++ b/posthog/test/ai/utils.py @@ -0,0 +1,27 @@ +"""Shared test helpers for the AI wrapper test suites.""" + + +class RecordingAsyncStream: + """Mock provider async stream that is iterable and records when closed. + + Mirrors the real ``openai.AsyncStream`` / ``anthropic.AsyncStream``: it + supports ``async for`` and exposes an async ``close()`` plus a ``response`` + attribute, so tests can assert both iteration and that the underlying + stream is closed on context exit. + """ + + def __init__(self, items): + self._items = list(items) + self.closed = False + self.response = "provider-response" + + def __aiter__(self): + return self + + async def __anext__(self): + if not self._items: + raise StopAsyncIteration + return self._items.pop(0) + + async def close(self): + self.closed = True From 1dd976ca0a50532a4644c3b31e17138b93aa6c77 Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 23:02:58 -0400 Subject: [PATCH 4/6] harden(ai): don't proxy private/dunder attrs in AsyncStreamWrapper.__getattr__ Guard __getattr__ against names starting with '_'. Prevents infinite recursion if the instance is built without __init__ (e.g. copy/pickle) and stops hasattr/copy probes from leaking to the provider stream. Only public metadata like .response is proxied. Co-Authored-By: Claude Opus 4.8 (1M context) --- posthog/ai/stream.py | 5 +++++ posthog/test/ai/test_async_stream_wrapper.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py index 2e5e6a54..121547ea 100644 --- a/posthog/ai/stream.py +++ b/posthog/ai/stream.py @@ -50,5 +50,10 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: return False def __getattr__(self, name: str) -> Any: + # Only proxy public attributes (e.g. `.response`). Private/dunder names + # are not forwarded — this avoids infinite recursion if `_stream` isn't + # set yet and stops `hasattr`/copy probes leaking to the provider stream. + if name.startswith("_"): + raise AttributeError(name) target = self._stream if self._stream is not None else self._generator return getattr(target, name) diff --git a/posthog/test/ai/test_async_stream_wrapper.py b/posthog/test/ai/test_async_stream_wrapper.py index b57a69d2..2443c769 100644 --- a/posthog/test/ai/test_async_stream_wrapper.py +++ b/posthog/test/ai/test_async_stream_wrapper.py @@ -77,3 +77,15 @@ async def gen(): wrapper = AsyncStreamWrapper(gen(), source) assert wrapper.response == "provider-response" + + +@pytest.mark.asyncio +async def test_getattr_does_not_proxy_private_names(): + source = RecordingAsyncStream([]) + + async def gen(): + if False: + yield + + wrapper = AsyncStreamWrapper(gen(), source) + assert not hasattr(wrapper, "_nonexistent_private") From 7c5d6774512f2db5b73df8e43e71a4988a65770f Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 23:17:50 -0400 Subject: [PATCH 5/6] fix(ai): restore aclose() on async streams + wrap async Gemini MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Address review findings on #645: - Forward async-generator protocol methods (aclose/asend/athrow) to the tracking generator instead of the provider stream. After pointing __getattr__ at the provider stream (which exposes close(), not aclose()), `await response.aclose()` regressed to AttributeError and dropped the $ai_generation event. Forwarding to the generator restores the pre-wrapper behaviour (aclose runs the finally, fires the event). - Wrap async Gemini streaming (gemini_async.py) in AsyncStreamWrapper — it had the identical `async with` gap. - Pass the provider stream by keyword (stream=response) at all call sites. - Tests: aclose-fires-event, try/finally closes provider stream when the capture raises, exception propagation through __aexit__, payload assertions on the async-with path, and a Gemini async-with regression. - Changeset: note the return-type change and that sync wrappers are unchanged. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../async-stream-context-manager.md | 4 +- posthog/ai/anthropic/anthropic_async.py | 2 +- posthog/ai/gemini/gemini_async.py | 3 +- posthog/ai/openai/openai_async.py | 4 +- posthog/ai/stream.py | 9 +++ posthog/test/ai/gemini/test_gemini_async.py | 40 +++++++++++++ posthog/test/ai/openai/test_openai.py | 5 ++ posthog/test/ai/test_async_stream_wrapper.py | 60 +++++++++++++++++++ 8 files changed, 122 insertions(+), 5 deletions(-) diff --git a/.sampo/changesets/async-stream-context-manager.md b/.sampo/changesets/async-stream-context-manager.md index 05e76e58..23147b29 100644 --- a/.sampo/changesets/async-stream-context-manager.md +++ b/.sampo/changesets/async-stream-context-manager.md @@ -2,4 +2,6 @@ pypi/posthog: patch --- -Fix `TypeError: 'async_generator' object does not support the asynchronous context manager protocol` when using the async AI wrappers (`posthog.ai.openai.AsyncOpenAI`, `posthog.ai.anthropic.AsyncAnthropic`) with libraries such as pydantic-ai that consume streaming responses via `async with`. Streaming responses now support both `async for` and `async with`, and exiting the context closes the underlying provider stream. +Fix `TypeError: 'async_generator' object does not support the asynchronous context manager protocol` when using the async AI wrappers (`posthog.ai.openai.AsyncOpenAI`, `posthog.ai.anthropic.AsyncAnthropic`, `posthog.ai.gemini.AsyncClient`) with libraries such as pydantic-ai that consume streaming responses via `async with`. Streaming responses now support both `async for` and `async with`, and exiting the context closes the underlying provider stream. + +Note: the async streaming return type changes from a bare `AsyncGenerator` to `AsyncStreamWrapper` (`async for`, `await response.aclose()`, and attribute access like `.response` are preserved, but `inspect.isasyncgen(response)` is now `False`). Sync streaming wrappers are unchanged and still return a bare generator. diff --git a/posthog/ai/anthropic/anthropic_async.py b/posthog/ai/anthropic/anthropic_async.py index 8fd7a5ab..df098955 100644 --- a/posthog/ai/anthropic/anthropic_async.py +++ b/posthog/ai/anthropic/anthropic_async.py @@ -226,7 +226,7 @@ async def generator(): stop_reason=stop_reason, ) - return AsyncStreamWrapper(generator(), response) + return AsyncStreamWrapper(generator(), stream=response) async def _capture_streaming_event( self, diff --git a/posthog/ai/gemini/gemini_async.py b/posthog/ai/gemini/gemini_async.py index cd2b962f..07ba3f02 100644 --- a/posthog/ai/gemini/gemini_async.py +++ b/posthog/ai/gemini/gemini_async.py @@ -3,6 +3,7 @@ import uuid from typing import Any, Dict, Optional +from posthog.ai.stream import AsyncStreamWrapper from posthog.ai.types import TokenUsage, StreamingEventData from posthog.ai.utils import merge_system_prompt @@ -354,7 +355,7 @@ async def async_generator(): stop_reason=stop_reason, ) - return async_generator() + return AsyncStreamWrapper(async_generator(), stream=response) def _capture_streaming_event( self, diff --git a/posthog/ai/openai/openai_async.py b/posthog/ai/openai/openai_async.py index d03fc25e..7e7b5838 100644 --- a/posthog/ai/openai/openai_async.py +++ b/posthog/ai/openai/openai_async.py @@ -222,7 +222,7 @@ async def async_generator(): stop_reason=stop_reason, ) - return AsyncStreamWrapper(async_generator(), response) + return AsyncStreamWrapper(async_generator(), stream=response) async def _capture_streaming_event( self, @@ -516,7 +516,7 @@ async def async_generator(): stop_reason=stop_reason, ) - return AsyncStreamWrapper(async_generator(), response) + return AsyncStreamWrapper(async_generator(), stream=response) async def _capture_streaming_event( self, diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py index 121547ea..7af4de48 100644 --- a/posthog/ai/stream.py +++ b/posthog/ai/stream.py @@ -49,11 +49,20 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: return False + # Async-generator protocol methods belong to the tracking generator, not + # the provider stream (provider AsyncStreams expose `close()`, not these). + # Forwarding `aclose()` to the generator preserves the pre-wrapper behaviour + # where `await response.aclose()` runs the generator's `finally` (firing the + # PostHog event) instead of raising AttributeError. + _GENERATOR_METHODS = ("aclose", "asend", "athrow") + def __getattr__(self, name: str) -> Any: # Only proxy public attributes (e.g. `.response`). Private/dunder names # are not forwarded — this avoids infinite recursion if `_stream` isn't # set yet and stops `hasattr`/copy probes leaking to the provider stream. if name.startswith("_"): raise AttributeError(name) + if name in self._GENERATOR_METHODS: + return getattr(self._generator, name) target = self._stream if self._stream is not None else self._generator return getattr(target, name) diff --git a/posthog/test/ai/gemini/test_gemini_async.py b/posthog/test/ai/gemini/test_gemini_async.py index 2823626e..53f85b26 100644 --- a/posthog/test/ai/gemini/test_gemini_async.py +++ b/posthog/test/ai/gemini/test_gemini_async.py @@ -1110,3 +1110,43 @@ async def test_async_embed_content_integration_batch(mock_client): assert response.embeddings is not None assert len(response.embeddings) == len(inputs) + + +async def test_async_client_streaming_supports_async_with( + mock_client, mock_google_genai_client +): + """Regression test for #393: generate_content_stream must support `async with`.""" + + async def mock_streaming_response(): + chunk = MagicMock() + chunk.text = "Hi" + usage = MagicMock() + usage.prompt_token_count = 5 + usage.candidates_token_count = 3 + usage.cached_content_token_count = 0 + usage.thoughts_token_count = 0 + chunk.usage_metadata = usage + yield chunk + + mock_google_genai_client.aio.models.generate_content_stream = AsyncMock( + return_value=mock_streaming_response() + ) + + client = AsyncClient(api_key="test-key", posthog_client=mock_client) + + response = await client.models.generate_content_stream( + model="gemini-2.0-flash", + contents=["Hi"], + posthog_distinct_id="test-id", + ) + + chunks = [] + async with response as stream: + async for chunk in stream: + chunks.append(chunk) + + assert len(chunks) == 1 + assert mock_client.capture.call_count == 1 + call_args = mock_client.capture.call_args[1] + assert call_args["event"] == "$ai_generation" + assert call_args["properties"]["$ai_provider"] == "gemini" diff --git a/posthog/test/ai/openai/test_openai.py b/posthog/test/ai/openai/test_openai.py index 107668c5..e75b429b 100644 --- a/posthog/test/ai/openai/test_openai.py +++ b/posthog/test/ai/openai/test_openai.py @@ -2384,6 +2384,11 @@ async def mock_create(self, **kwargs): assert chunks == streaming_tool_call_chunks assert mock_client.capture.call_count == 1 + call_args = mock_client.capture.call_args[1] + props = call_args["properties"] + assert call_args["event"] == "$ai_generation" + assert props["$ai_provider"] == "openai" + assert props["$ai_model"] == "gpt-4" @pytest.mark.asyncio diff --git a/posthog/test/ai/test_async_stream_wrapper.py b/posthog/test/ai/test_async_stream_wrapper.py index 2443c769..3df5c3b6 100644 --- a/posthog/test/ai/test_async_stream_wrapper.py +++ b/posthog/test/ai/test_async_stream_wrapper.py @@ -67,6 +67,45 @@ async def gen(): assert source.closed is True +@pytest.mark.asyncio +async def test_provider_stream_closed_even_if_generator_aclose_raises(): + """The try/finally in __aexit__ must still close the provider stream when + the generator's finally (the PostHog capture) raises.""" + source = RecordingAsyncStream([1, 2, 3]) + + async def gen(): + try: + async for item in source: + yield item + finally: + raise RuntimeError("capture blew up") + + with pytest.raises(RuntimeError, match="capture blew up"): + async with AsyncStreamWrapper(gen(), source) as stream: + async for _ in stream: + break + + assert source.closed is True + + +@pytest.mark.asyncio +async def test_exception_in_body_propagates(): + """__aexit__ returns False, so exceptions in the body must propagate (not be + swallowed), and the provider stream is still closed on the error path.""" + source = RecordingAsyncStream([1, 2, 3]) + + async def gen(): + async for item in source: + yield item + + with pytest.raises(ValueError, match="boom"): + async with AsyncStreamWrapper(gen(), source) as stream: + async for _ in stream: + raise ValueError("boom") + + assert source.closed is True + + @pytest.mark.asyncio async def test_getattr_proxies_to_provider_stream(): source = RecordingAsyncStream([]) @@ -79,6 +118,27 @@ async def gen(): assert wrapper.response == "provider-response" +@pytest.mark.asyncio +async def test_aclose_runs_generator_finally_and_captures(): + """`await response.aclose()` must close the tracking generator (firing its + finally) rather than proxying to the provider stream, which has no aclose.""" + source = RecordingAsyncStream([1, 2, 3]) + captured = [] + + async def gen(): + try: + async for item in source: + yield item + finally: + captured.append("done") + + wrapper = AsyncStreamWrapper(gen(), source) + await wrapper.__anext__() + await wrapper.aclose() + + assert captured == ["done"] + + @pytest.mark.asyncio async def test_getattr_does_not_proxy_private_names(): source = RecordingAsyncStream([]) From 4390085180b1470abcea5f86fe1b7cfb1a591299 Mon Sep 17 00:00:00 2001 From: Anna Garcia Date: Tue, 2 Jun 2026 23:23:00 -0400 Subject: [PATCH 6/6] chore(ai): simplify changeset, trim comments Simplify the changeset to a plain user-facing entry (drop the behavior note). Remove redundant test docstrings whose names already say it. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../changesets/async-stream-context-manager.md | 4 +--- posthog/ai/stream.py | 16 +++++----------- posthog/test/ai/test_async_stream_wrapper.py | 8 -------- 3 files changed, 6 insertions(+), 22 deletions(-) diff --git a/.sampo/changesets/async-stream-context-manager.md b/.sampo/changesets/async-stream-context-manager.md index 23147b29..e2cbf340 100644 --- a/.sampo/changesets/async-stream-context-manager.md +++ b/.sampo/changesets/async-stream-context-manager.md @@ -2,6 +2,4 @@ pypi/posthog: patch --- -Fix `TypeError: 'async_generator' object does not support the asynchronous context manager protocol` when using the async AI wrappers (`posthog.ai.openai.AsyncOpenAI`, `posthog.ai.anthropic.AsyncAnthropic`, `posthog.ai.gemini.AsyncClient`) with libraries such as pydantic-ai that consume streaming responses via `async with`. Streaming responses now support both `async for` and `async with`, and exiting the context closes the underlying provider stream. - -Note: the async streaming return type changes from a bare `AsyncGenerator` to `AsyncStreamWrapper` (`async for`, `await response.aclose()`, and attribute access like `.response` are preserved, but `inspect.isasyncgen(response)` is now `False`). Sync streaming wrappers are unchanged and still return a bare generator. +Fix async streaming responses from the AI wrappers (OpenAI, Anthropic, Gemini) so they support `async with` as well as `async for`. Previously, consuming a stream via `async with` (e.g. with pydantic-ai) raised `TypeError: 'async_generator' object does not support the asynchronous context manager protocol`. diff --git a/posthog/ai/stream.py b/posthog/ai/stream.py index 7af4de48..4ed8ca94 100644 --- a/posthog/ai/stream.py +++ b/posthog/ai/stream.py @@ -34,9 +34,8 @@ async def __aenter__(self) -> "AsyncStreamWrapper[T]": return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: - # Close the tracking generator first so its `finally` block captures the - # PostHog event, even on early exit. try/finally guarantees the provider - # stream is still closed if that capture raises. + # Close the generator first so its `finally` captures the event, even on + # early exit. try/finally still closes the provider stream if that raises. try: await self._generator.aclose() finally: @@ -49,17 +48,12 @@ async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: return False - # Async-generator protocol methods belong to the tracking generator, not - # the provider stream (provider AsyncStreams expose `close()`, not these). - # Forwarding `aclose()` to the generator preserves the pre-wrapper behaviour - # where `await response.aclose()` runs the generator's `finally` (firing the - # PostHog event) instead of raising AttributeError. + # aclose/asend/athrow belong to the generator; provider streams expose + # close(), not these. Forwarding aclose() keeps it firing the event. _GENERATOR_METHODS = ("aclose", "asend", "athrow") def __getattr__(self, name: str) -> Any: - # Only proxy public attributes (e.g. `.response`). Private/dunder names - # are not forwarded — this avoids infinite recursion if `_stream` isn't - # set yet and stops `hasattr`/copy probes leaking to the provider stream. + # Proxy only public attributes (e.g. `.response`) to the provider stream. if name.startswith("_"): raise AttributeError(name) if name in self._GENERATOR_METHODS: diff --git a/posthog/test/ai/test_async_stream_wrapper.py b/posthog/test/ai/test_async_stream_wrapper.py index 3df5c3b6..7cdba829 100644 --- a/posthog/test/ai/test_async_stream_wrapper.py +++ b/posthog/test/ai/test_async_stream_wrapper.py @@ -32,8 +32,6 @@ async def gen(): @pytest.mark.asyncio @pytest.mark.parametrize("consume_all", [False, True]) async def test_finally_block_runs_on_exit(consume_all): - """The generator's finally block must run on context exit, whether the - caller exhausts the stream or breaks out of it early.""" captured = [] async def gen(): @@ -69,8 +67,6 @@ async def gen(): @pytest.mark.asyncio async def test_provider_stream_closed_even_if_generator_aclose_raises(): - """The try/finally in __aexit__ must still close the provider stream when - the generator's finally (the PostHog capture) raises.""" source = RecordingAsyncStream([1, 2, 3]) async def gen(): @@ -90,8 +86,6 @@ async def gen(): @pytest.mark.asyncio async def test_exception_in_body_propagates(): - """__aexit__ returns False, so exceptions in the body must propagate (not be - swallowed), and the provider stream is still closed on the error path.""" source = RecordingAsyncStream([1, 2, 3]) async def gen(): @@ -120,8 +114,6 @@ async def gen(): @pytest.mark.asyncio async def test_aclose_runs_generator_finally_and_captures(): - """`await response.aclose()` must close the tracking generator (firing its - finally) rather than proxying to the provider stream, which has no aclose.""" source = RecordingAsyncStream([1, 2, 3]) captured = []