Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 26 additions & 2 deletions src/copy_engine/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ async def start_monitoring(self):
await self.ws.connect()

# Subscribe to user updates
await self.ws.subscribe_user(self.target_address, self._handle_update)
await self.ws.subscribe_user_events(self.target_address, self._handle_user_event)

# Subscribe to order updates
await self.ws.subscribe_order_updates(self.target_address, self._handle_order_update)

# Start listening
await self.ws.listen()
Expand All @@ -68,7 +71,7 @@ async def stop_monitoring(self):
logger.info("Stopping wallet monitoring")
await self.ws.stop()

async def _handle_update(self, update: WebSocketUpdate):
async def _handle_user_event(self, update: WebSocketUpdate):
"""Handle WebSocket updates from target wallet"""
logger.info(f"🔔 WebSocket Update Received: {update.channel}")

Expand Down Expand Up @@ -213,3 +216,24 @@ async def _handle_orders(self, orders: List[dict]):

# Update state
await self.get_current_state()

async def _handle_order_update(self, update: WebSocketUpdate):
"""Handle order updates from WebSocket"""
logger.info(f"🔔 Order Update Received: {update.channel}")

try:
if "data" not in update.data:
logger.warning(f"⚠️ Update has no 'data' field: {update.data}")
return

data = update.data["data"]
logger.info(f"📦 Order update data keys: {list(data.keys())}")

if "orders" in data:
logger.success(f"📋 ORDERS UPDATE: {len(data['orders'])} orders")
await self._handle_orders(data["orders"])

except Exception as e:
logger.error(f"Error handling order update: {e}")
import traceback
logger.error(traceback.format_exc())
74 changes: 73 additions & 1 deletion src/hyperliquid/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ def __init__(self, ws_url: str = "wss://api.hyperliquid.xyz/ws"):
self.reconnect_delay = 5
self.subscriptions: Dict[str, Any] = {}
self.callbacks: Dict[str, Callable] = {}
self.heartbeat_task: Optional[asyncio.Task] = None
self.heartbeat_interval = 55 # Send ping every 55 seconds (server closes at 60)

async def connect(self):
"""Establish WebSocket connection"""
Expand All @@ -27,6 +29,11 @@ async def connect(self):
self.is_running = True
logger.info("WebSocket connected successfully")

# Start heartbeat task
if self.heartbeat_task is None or self.heartbeat_task.done():
self.heartbeat_task = asyncio.create_task(self._heartbeat())
logger.info("Heartbeat task started")

# Resubscribe to channels after reconnection
for channel, sub_data in self.subscriptions.items():
await self._send_subscription(sub_data)
Expand All @@ -38,6 +45,15 @@ async def connect(self):
async def disconnect(self):
"""Close WebSocket connection"""
self.is_running = False

# Stop heartbeat task
if self.heartbeat_task and not self.heartbeat_task.done():
self.heartbeat_task.cancel()
try:
await self.heartbeat_task
except asyncio.CancelledError:
logger.info("Heartbeat task cancelled")

if self.ws:
await self.ws.close()
logger.info("WebSocket disconnected")
Expand All @@ -51,7 +67,30 @@ async def _send_subscription(self, data: dict):
except Exception as e:
logger.error(f"Failed to send subscription: {e}")

async def subscribe_user(self, address: str, callback: Optional[Callable] = None):
async def _send_ping(self):
"""Send ping message to keep connection alive"""
if self.ws and not self.ws.closed:
try:
ping_msg = {"method": "ping"}
await self.ws.send(json.dumps(ping_msg))
logger.debug("❤️ Sent ping to keep connection alive")
except Exception as e:
logger.error(f"Failed to send ping: {e}")

async def _heartbeat(self):
"""Heartbeat task to keep WebSocket connection alive"""
while self.is_running:
try:
await asyncio.sleep(self.heartbeat_interval)
if self.is_running and self.ws and not self.ws.closed:
await self._send_ping()
except asyncio.CancelledError:
logger.debug("Heartbeat task cancelled")
break
except Exception as e:
logger.error(f"Error in heartbeat task: {e}")

async def subscribe_user_events(self, address: str, callback: Optional[Callable] = None):
"""
Subscribe to user updates (positions, orders, fills)

Expand All @@ -77,6 +116,34 @@ async def subscribe_user(self, address: str, callback: Optional[Callable] = None
await self._send_subscription(subscription)

logger.info(f"Subscribed to user updates for {address}")


async def subscribe_order_updates(self, address: str, callback: Optional[Callable] = None):
"""
Subscribe to orders updates

Args:
address: Wallet address to monitor
callback: Function to call when updates are received
"""
channel = f"orderUpdates:{address}"

subscription = {
"method": "subscribe",
"subscription": {
"type": "orderUpdates",
"user": address
}
}

self.subscriptions[channel] = subscription
if callback:
self.callbacks[channel] = callback

if self.ws:
await self._send_subscription(subscription)

logger.info(f"Subscribed to 'orderUpdates' for {address}")

async def subscribe_trades(self, symbol: str, callback: Optional[Callable] = None):
"""
Expand Down Expand Up @@ -141,6 +208,11 @@ async def _handle_message(self, message: str):
# Determine the channel/type of update
channel = data.get("channel", "unknown")

# Handle pong response from heartbeat
if channel == "pong":
logger.debug("❤️ Received pong from server")
return

# Log the parsed data
logger.info(f"📦 Parsed - Channel: '{channel}', Keys: {list(data.keys())}")

Expand Down