Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

All notable changes to this project will be documented in this file.

## [1.14.0] - 9999-99-99
## [1.14.0] - 2026-04-27
### Changed
- Updated all compiled protos for compatibility with Injective core v1.19.0 and Indexer v1.19.0

## [1.13.0] - 2026-02-13
### Changed
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ clean-all:
$(call clean_repos)

clone-injective-indexer:
git clone https://github.com/InjectiveLabs/injective-indexer.git -b v1.18.59 --depth 1 --single-branch
git clone https://github.com/InjectiveLabs/injective-indexer.git -b v1.19.0 --depth 1 --single-branch
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail
git ls-remote --heads --tags https://github.com/InjectiveLabs/injective-indexer.git v1.19.0

Repository: InjectiveLabs/sdk-python

Length of output: 186


Repository at the specified URL is not accessible—the build will fail immediately.

The clone target fails because https://github.com/InjectiveLabs/injective-indexer.git returns "Repository not found" (exit code 128). Verify the correct repository URL and ensure it is accessible before merging.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@Makefile` at line 29, The Makefile's hardcoded git clone command referencing
"https://github.com/InjectiveLabs/injective-indexer.git -b v1.19.0" is failing
because that repository URL is not accessible; update the clone target to point
to the correct, accessible repository URL or make the repository URL and branch
configurable (e.g., via a MAKEVAR like
INJECTIVE_INDEXER_REPO/INJECTIVE_INDEXER_REF) so builds don't hard-fail—locate
the git clone line in the Makefile and replace the broken URL/branch with the
verified repository URL or a variable that can be overridden in CI, and add a
short comment or fallback to prevent silent failures.


clone-all: clone-injective-indexer

Expand Down
2 changes: 1 addition & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ inputs:
# tag: v1.0.1-inj
# subdir: proto
- git_repo: https://github.com/InjectiveLabs/injective-core
tag: v1.19.0-beta
tag: v1.19.0
subdir: proto
# - git_repo: https://github.com/InjectiveLabs/injective-core
# branch: c-655/add_chainlink_data_streams_oracle
Expand Down
13 changes: 7 additions & 6 deletions examples/exchange_client/oracle_rpc/1_StreamPrices.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ async def main() -> None:
# select network: local, testnet, mainnet
network = Network.testnet()
client = IndexerClient(network)
market = (await client.all_derivative_markets())[
"0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"
]
market_response = await client.fetch_derivative_market(
market_id="0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"
)
market = market_response["market"]

base_symbol = market.oracle_base
quote_symbol = market.oracle_quote
oracle_type = market.oracle_type.lower()
base_symbol = market["oracleBase"]
quote_symbol = market["oracleQuote"]
oracle_type = market["oracleType"].lower()

task = asyncio.get_event_loop().create_task(
client.listen_oracle_prices_updates(
Expand Down
41 changes: 41 additions & 0 deletions examples/exchange_client/oracle_rpc/5_StreamOracleList.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
from typing import Any, Dict

from grpc import RpcError

from pyinjective.core.network import Network
from pyinjective.indexer_client import IndexerClient


async def oracle_list_event_processor(event: Dict[str, Any]):
print(event)


def stream_error_processor(exception: RpcError):
print(f"There was an error listening to oracle list updates ({exception})")


def stream_closed_processor():
print("The oracle list updates stream has been closed")


async def main() -> None:
network = Network.testnet()
client = IndexerClient(network)

task = asyncio.get_event_loop().create_task(
client.listen_oracle_list_updates(
callback=oracle_list_event_processor,
on_end_callback=stream_closed_processor,
on_status_callback=stream_error_processor,
oracle_type="provider",
symbols=["TIA"],
)
)

await asyncio.sleep(delay=60)
task.cancel()


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
41 changes: 41 additions & 0 deletions examples/exchange_client/oracle_rpc/6_StreamPricesByMarkets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import asyncio
from typing import Any, Dict

from grpc import RpcError

from pyinjective.core.network import Network
from pyinjective.indexer_client import IndexerClient


async def price_event_processor(event: Dict[str, Any]):
print(event)


def stream_error_processor(exception: RpcError):
print(f"There was an error listening to oracle prices by markets updates ({exception})")


def stream_closed_processor():
print("The oracle prices by markets updates stream has been closed")


async def main() -> None:
network = Network.testnet()
client = IndexerClient(network)
market_id = "0x17ef48032cb24375ba7c2e39f384e56433bcab20cbee9a7357e4cba2eb00abe6"

task = asyncio.get_event_loop().create_task(
client.listen_oracle_prices_by_markets_updates(
market_ids=[market_id],
callback=price_event_processor,
on_end_callback=stream_closed_processor,
on_status_callback=stream_error_processor,
)
)

await asyncio.sleep(delay=60)
task.cancel()


if __name__ == "__main__":
asyncio.get_event_loop().run_until_complete(main())
Comment on lines +22 to +41
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Fix the example to call a real client method.

Line 25 calls client.all_derivative_markets(), but pyinjective/indexer_client.py only exposes fetch_derivative_markets(). As written, this script will raise AttributeError before the stream starts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@examples/exchange_client/oracle_rpc/6_StreamPricesByMarkets.py` around lines
22 - 43, The example calls client.all_derivative_markets() which doesn't exist;
replace that call with the actual method fetch_derivative_markets() and adjust
any handling if the return shape differs (e.g., ensure you select the same
market by the hex key and use market.id when passing market_ids to
client.listen_oracle_prices_by_markets_updates); update the reference to the
market variable only if fetch_derivative_markets() returns a different
mapping/sequence so the index/key lookup remains correct.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

class IndexerGrpcOracleStream:
def __init__(self, channel: Channel, cookie_assistant: CookieAssistant):
self._stub = self._stub = exchange_oracle_grpc.InjectiveOracleRPCStub(channel)
self._stub = exchange_oracle_grpc.InjectiveOracleRPCStub(channel)
self._assistant = GrpcApiStreamAssistant(cookie_assistant=cookie_assistant)

async def stream_oracle_prices(
Expand All @@ -38,6 +38,27 @@ async def stream_oracle_prices(
on_status_callback=on_status_callback,
)

async def stream_oracle_list(
self,
callback: Callable,
on_end_callback: Optional[Callable] = None,
on_status_callback: Optional[Callable] = None,
oracle_type: Optional[str] = None,
symbols: Optional[List[str]] = None,
):
request = exchange_oracle_pb.StreamOracleListRequest(
oracle_type=oracle_type,
symbols=symbols or [],
)

await self._assistant.listen_stream(
call=self._stub.StreamOracleList,
request=request,
callback=callback,
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
)

async def stream_oracle_prices_by_markets(
self,
market_ids: List[str],
Expand Down
30 changes: 30 additions & 0 deletions pyinjective/indexer_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,6 +663,36 @@ async def listen_oracle_prices_updates(
oracle_type=oracle_type,
)

async def listen_oracle_list_updates(
self,
callback: Callable,
on_end_callback: Optional[Callable] = None,
on_status_callback: Optional[Callable] = None,
oracle_type: Optional[str] = None,
symbols: Optional[List[str]] = None,
):
await self.oracle_stream_api.stream_oracle_list(
callback=callback,
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
oracle_type=oracle_type,
symbols=symbols,
)

async def listen_oracle_prices_by_markets_updates(
self,
market_ids: List[str],
callback: Callable,
on_end_callback: Optional[Callable] = None,
on_status_callback: Optional[Callable] = None,
):
await self.oracle_stream_api.stream_oracle_prices_by_markets(
market_ids=market_ids,
callback=callback,
on_end_callback=on_end_callback,
on_status_callback=on_status_callback,
)

# endregion

# region portfolio
Expand Down
18 changes: 11 additions & 7 deletions pyinjective/proto/exchange/injective_oracle_rpc_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions pyinjective/proto/exchange/injective_oracle_rpc_pb2_grpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ def __init__(self, channel):
request_serializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesRequest.SerializeToString,
response_deserializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesResponse.FromString,
_registered_method=True)
self.StreamOracleList = channel.unary_stream(
'/injective_oracle_rpc.InjectiveOracleRPC/StreamOracleList',
request_serializer=exchange_dot_injective__oracle__rpc__pb2.StreamOracleListRequest.SerializeToString,
response_deserializer=exchange_dot_injective__oracle__rpc__pb2.StreamOracleListResponse.FromString,
_registered_method=True)
self.StreamPricesByMarkets = channel.unary_stream(
'/injective_oracle_rpc.InjectiveOracleRPC/StreamPricesByMarkets',
request_serializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesByMarketsRequest.SerializeToString,
Expand Down Expand Up @@ -75,6 +80,14 @@ def StreamPrices(self, request, context):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StreamOracleList(self, request, context):
"""StreamOracleList streams oracle data updates filtered by oracle type and
optionally by symbols.
"""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')

def StreamPricesByMarkets(self, request, context):
"""StreamPrices streams new price changes markets
"""
Expand Down Expand Up @@ -105,6 +118,11 @@ def add_InjectiveOracleRPCServicer_to_server(servicer, server):
request_deserializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesRequest.FromString,
response_serializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesResponse.SerializeToString,
),
'StreamOracleList': grpc.unary_stream_rpc_method_handler(
servicer.StreamOracleList,
request_deserializer=exchange_dot_injective__oracle__rpc__pb2.StreamOracleListRequest.FromString,
response_serializer=exchange_dot_injective__oracle__rpc__pb2.StreamOracleListResponse.SerializeToString,
),
'StreamPricesByMarkets': grpc.unary_stream_rpc_method_handler(
servicer.StreamPricesByMarkets,
request_deserializer=exchange_dot_injective__oracle__rpc__pb2.StreamPricesByMarketsRequest.FromString,
Expand Down Expand Up @@ -230,6 +248,33 @@ def StreamPrices(request,
metadata,
_registered_method=True)

@staticmethod
def StreamOracleList(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_stream(
request,
target,
'/injective_oracle_rpc.InjectiveOracleRPC/StreamOracleList',
exchange_dot_injective__oracle__rpc__pb2.StreamOracleListRequest.SerializeToString,
exchange_dot_injective__oracle__rpc__pb2.StreamOracleListResponse.FromString,
options,
channel_credentials,
insecure,
call_credentials,
compression,
wait_for_ready,
timeout,
metadata,
_registered_method=True)

@staticmethod
def StreamPricesByMarkets(request,
target,
Expand Down
Loading
Loading