-
Notifications
You must be signed in to change notification settings - Fork 70
fix(ai): support async with on async streaming responses (Fixes #393)
#645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+454
−4
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
24369b0
fix(ai): support async with on streaming responses (Fixes #393)
turnipdabeets cec962f
fix(ai): make AsyncStreamWrapper Generic[T] to satisfy mypy
turnipdabeets a0585a5
refactor(ai): address review on async stream wrapper
turnipdabeets 1dd976c
harden(ai): don't proxy private/dunder attrs in AsyncStreamWrapper.__…
turnipdabeets 7c5d677
fix(ai): restore aclose() on async streams + wrap async Gemini
turnipdabeets 4390085
chore(ai): simplify changeset, trim comments
turnipdabeets File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| --- | ||
| pypi/posthog: patch | ||
| --- | ||
|
|
||
| Fix async streaming responses from the AI wrappers (OpenAI, Anthropic, Gemini) so they support `async with` as well as `async for`. Previously, consuming a stream via `async with` (e.g. with pydantic-ai) raised `TypeError: 'async_generator' object does not support the asynchronous context manager protocol`. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,62 @@ | ||
| """Shared async streaming utilities for PostHog AI wrappers.""" | ||
|
|
||
| from typing import Any, AsyncGenerator, Generic, Optional, TypeVar | ||
|
|
||
| T = TypeVar("T") | ||
|
|
||
|
|
||
| class AsyncStreamWrapper(Generic[T]): | ||
| """Adds the async context manager protocol to a PostHog streaming generator. | ||
|
|
||
| The OpenAI and Anthropic SDK streams support both ``async for`` and | ||
| ``async with``. PostHog's wrappers returned a bare async generator, which | ||
| only supports ``async for``, so ``async with response:`` (used by | ||
| pydantic-ai) raised a TypeError. This wraps the tracking generator and, | ||
| when given the original provider stream, closes it and proxies attribute | ||
| access (e.g. ``.response``) to it. | ||
| """ | ||
|
|
||
| def __init__( | ||
| self, | ||
| generator: AsyncGenerator[T, None], | ||
| stream: Optional[Any] = None, | ||
| ) -> None: | ||
| self._generator = generator | ||
| self._stream = stream | ||
|
|
||
| def __aiter__(self) -> "AsyncStreamWrapper[T]": | ||
| return self | ||
|
|
||
| async def __anext__(self) -> T: | ||
| return await self._generator.__anext__() | ||
|
|
||
| async def __aenter__(self) -> "AsyncStreamWrapper[T]": | ||
| return self | ||
|
|
||
| async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> bool: | ||
| # Close the generator first so its `finally` captures the event, even on | ||
| # early exit. try/finally still closes the provider stream if that raises. | ||
| try: | ||
| await self._generator.aclose() | ||
| finally: | ||
| if self._stream is not None: | ||
| close = getattr(self._stream, "aclose", None) or getattr( | ||
| self._stream, "close", None | ||
| ) | ||
| if close is not None: | ||
| await close() | ||
|
|
||
| return False | ||
|
|
||
| # aclose/asend/athrow belong to the generator; provider streams expose | ||
| # close(), not these. Forwarding aclose() keeps it firing the event. | ||
| _GENERATOR_METHODS = ("aclose", "asend", "athrow") | ||
|
|
||
| def __getattr__(self, name: str) -> Any: | ||
| # Proxy only public attributes (e.g. `.response`) to the provider stream. | ||
| if name.startswith("_"): | ||
| raise AttributeError(name) | ||
| if name in self._GENERATOR_METHODS: | ||
| return getattr(self._generator, name) | ||
| target = self._stream if self._stream is not None else self._generator | ||
| return getattr(target, name) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.