From 06997729cc4002030f25d97faf16d575908aeff4 Mon Sep 17 00:00:00 2001 From: JP Lemelin Date: Wed, 27 May 2026 13:39:38 -0400 Subject: [PATCH 1/2] WS: Add heartbeat to avoid connection close --- src/hyperliquid/websocket.py | 44 ++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/src/hyperliquid/websocket.py b/src/hyperliquid/websocket.py index a704fd5..5c22f73 100644 --- a/src/hyperliquid/websocket.py +++ b/src/hyperliquid/websocket.py @@ -18,6 +18,8 @@ def __init__(self, ws_url: str = "wss://api.hyperliquid.xyz/ws"): self.reconnect_delay = 5 self.subscriptions: Dict[str, Any] = {} self.callbacks: Dict[str, Callable] = {} + self.heartbeat_task: Optional[asyncio.Task] = None + self.heartbeat_interval = 55 # Send ping every 55 seconds (server closes at 60) async def connect(self): """Establish WebSocket connection""" @@ -27,6 +29,11 @@ async def connect(self): self.is_running = True logger.info("WebSocket connected successfully") + # Start heartbeat task + if self.heartbeat_task is None or self.heartbeat_task.done(): + self.heartbeat_task = asyncio.create_task(self._heartbeat()) + logger.info("Heartbeat task started") + # Resubscribe to channels after reconnection for channel, sub_data in self.subscriptions.items(): await self._send_subscription(sub_data) @@ -38,6 +45,15 @@ async def connect(self): async def disconnect(self): """Close WebSocket connection""" self.is_running = False + + # Stop heartbeat task + if self.heartbeat_task and not self.heartbeat_task.done(): + self.heartbeat_task.cancel() + try: + await self.heartbeat_task + except asyncio.CancelledError: + logger.info("Heartbeat task cancelled") + if self.ws: await self.ws.close() logger.info("WebSocket disconnected") @@ -51,6 +67,29 @@ async def _send_subscription(self, data: dict): except Exception as e: logger.error(f"Failed to send subscription: {e}") + async def _send_ping(self): + """Send ping message to keep connection alive""" + if self.ws and not self.ws.closed: + try: + ping_msg = {"method": "ping"} + await self.ws.send(json.dumps(ping_msg)) + logger.debug("❤️ Sent ping to keep connection alive") + except Exception as e: + logger.error(f"Failed to send ping: {e}") + + async def _heartbeat(self): + """Heartbeat task to keep WebSocket connection alive""" + while self.is_running: + try: + await asyncio.sleep(self.heartbeat_interval) + if self.is_running and self.ws and not self.ws.closed: + await self._send_ping() + except asyncio.CancelledError: + logger.debug("Heartbeat task cancelled") + break + except Exception as e: + logger.error(f"Error in heartbeat task: {e}") + async def subscribe_user(self, address: str, callback: Optional[Callable] = None): """ Subscribe to user updates (positions, orders, fills) @@ -141,6 +180,11 @@ async def _handle_message(self, message: str): # Determine the channel/type of update channel = data.get("channel", "unknown") + # Handle pong response from heartbeat + if channel == "pong": + logger.debug("❤️ Received pong from server") + return + # Log the parsed data logger.info(f"📦 Parsed - Channel: '{channel}', Keys: {list(data.keys())}") From b1a4c5ac97b53bd9127d8b6952283e66c0b89268 Mon Sep 17 00:00:00 2001 From: JP Lemelin Date: Wed, 27 May 2026 14:29:41 -0400 Subject: [PATCH 2/2] WS: Add subscribe to orderUpdates --- src/copy_engine/monitor.py | 28 ++++++++++++++++++++++++++-- src/hyperliquid/websocket.py | 30 +++++++++++++++++++++++++++++- 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/src/copy_engine/monitor.py b/src/copy_engine/monitor.py index 5762f28..6db9cf2 100644 --- a/src/copy_engine/monitor.py +++ b/src/copy_engine/monitor.py @@ -58,7 +58,10 @@ async def start_monitoring(self): await self.ws.connect() # Subscribe to user updates - await self.ws.subscribe_user(self.target_address, self._handle_update) + await self.ws.subscribe_user_events(self.target_address, self._handle_user_event) + + # Subscribe to order updates + await self.ws.subscribe_order_updates(self.target_address, self._handle_order_update) # Start listening await self.ws.listen() @@ -68,7 +71,7 @@ async def stop_monitoring(self): logger.info("Stopping wallet monitoring") await self.ws.stop() - async def _handle_update(self, update: WebSocketUpdate): + async def _handle_user_event(self, update: WebSocketUpdate): """Handle WebSocket updates from target wallet""" logger.info(f"🔔 WebSocket Update Received: {update.channel}") @@ -213,3 +216,24 @@ async def _handle_orders(self, orders: List[dict]): # Update state await self.get_current_state() + + async def _handle_order_update(self, update: WebSocketUpdate): + """Handle order updates from WebSocket""" + logger.info(f"🔔 Order Update Received: {update.channel}") + + try: + if "data" not in update.data: + logger.warning(f"⚠️ Update has no 'data' field: {update.data}") + return + + data = update.data["data"] + logger.info(f"📦 Order update data keys: {list(data.keys())}") + + if "orders" in data: + logger.success(f"📋 ORDERS UPDATE: {len(data['orders'])} orders") + await self._handle_orders(data["orders"]) + + except Exception as e: + logger.error(f"Error handling order update: {e}") + import traceback + logger.error(traceback.format_exc()) diff --git a/src/hyperliquid/websocket.py b/src/hyperliquid/websocket.py index 5c22f73..7ef4ba6 100644 --- a/src/hyperliquid/websocket.py +++ b/src/hyperliquid/websocket.py @@ -90,7 +90,7 @@ async def _heartbeat(self): except Exception as e: logger.error(f"Error in heartbeat task: {e}") - async def subscribe_user(self, address: str, callback: Optional[Callable] = None): + async def subscribe_user_events(self, address: str, callback: Optional[Callable] = None): """ Subscribe to user updates (positions, orders, fills) @@ -116,6 +116,34 @@ async def subscribe_user(self, address: str, callback: Optional[Callable] = None await self._send_subscription(subscription) logger.info(f"Subscribed to user updates for {address}") + + + async def subscribe_order_updates(self, address: str, callback: Optional[Callable] = None): + """ + Subscribe to orders updates + + Args: + address: Wallet address to monitor + callback: Function to call when updates are received + """ + channel = f"orderUpdates:{address}" + + subscription = { + "method": "subscribe", + "subscription": { + "type": "orderUpdates", + "user": address + } + } + + self.subscriptions[channel] = subscription + if callback: + self.callbacks[channel] = callback + + if self.ws: + await self._send_subscription(subscription) + + logger.info(f"Subscribed to 'orderUpdates' for {address}") async def subscribe_trades(self, symbol: str, callback: Optional[Callable] = None): """