From f8e8b359bc83c160ce5a4536b6684cce2aaa5b4e Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 25 Jul 2025 16:20:48 +0530 Subject: [PATCH] Add debug logs for signature subscription --- http_proxy.py | 29 ++++++----- main.py | 99 +++++++++++++++++++++++++++--------- providers.py | 40 +++++++-------- router.py | 41 +++++++-------- ws_proxy.py | 138 ++++++++++++++++++++++++++++++++++++-------------- 5 files changed, 230 insertions(+), 117 deletions(-) diff --git a/http_proxy.py b/http_proxy.py index fafac6f..5be8b23 100644 --- a/http_proxy.py +++ b/http_proxy.py @@ -7,10 +7,10 @@ from router import Router async def handle_rpc_request(request: web.Request) -> web.Response: router: Router = request.app['router'] logger = logging.getLogger(__name__) - + try: body = await request.json() - + if not isinstance(body, dict): return web.json_response({ "jsonrpc": "2.0", @@ -20,11 +20,11 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Invalid Request" } }, status=400) - + method = body.get("method") params = body.get("params", []) request_id = body.get("id", 1) - + if not method: return web.json_response({ "jsonrpc": "2.0", @@ -34,14 +34,22 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Missing method" } }, status=400) - + logger.info(f"Handling RPC request: {method}") + # Special logging for signature status requests + if method == "getSignatureStatuses": + signatures = params if isinstance(params, list) else [] + logger.info(f"GET_SIGNATURE_STATUSES: Checking signatures: {signatures}") + response = await router.route_request(method, params) + + if method == "getSignatureStatuses": + logger.info(f"GET_SIGNATURE_STATUSES: Response: {response}") response["id"] = request_id - + return web.json_response(response) - + except json.JSONDecodeError: return web.json_response({ "jsonrpc": "2.0", @@ -51,7 +59,7 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Parse error" } }, status=400) - + except Exception as e: logger.error(f"Unexpected error: {e}") return web.json_response({ @@ -64,10 +72,5 @@ async def handle_rpc_request(request: web.Request) -> web.Response: }, status=500) - - - def setup_routes(app: web.Application) -> None: app.router.add_post('/', handle_rpc_request) - - diff --git a/main.py b/main.py index 63678c2..31b4c9f 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import os import logging +import asyncio from dotenv import load_dotenv from aiohttp import web from providers import create_providers @@ -10,11 +11,31 @@ from http_proxy import setup_routes from ws_proxy import setup_ws_routes +@web.middleware +async def cors_middleware(request, handler): + """Add CORS headers to all responses""" + if request.method == 'OPTIONS': + # Handle preflight requests + return web.Response(headers={ + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', + 'Access-Control-Allow-Headers': '*', + 'Access-Control-Max-Age': '86400' + }) + + response = await handler(request) + response.headers['Access-Control-Allow-Origin'] = '*' + response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS' + response.headers['Access-Control-Allow-Headers'] = '*' + return response + + def load_config() -> dict: load_dotenv() - + return { - "proxy_port": int(os.getenv("PROXY_PORT", 8545)), + "rpc_port": int(os.getenv("RPC_PORT", 8545)), + "ws_port": int(os.getenv("WS_PORT", 8546)), "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), @@ -29,41 +50,69 @@ def setup_logging(log_level: str) -> None: ) -def create_app(config: dict) -> web.Application: - app = web.Application() - - providers = create_providers() - cache = Cache(size_limit_gb=config["cache_size_gb"]) - error_logger = ErrorLogger(db_path=config["error_db_path"]) - router = Router(providers, cache, error_logger) - +def create_rpc_app(config: dict, router: Router) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) app['router'] = router app['config'] = config - setup_routes(app) + return app + +def create_ws_app(config: dict, router: Router) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) + app['router'] = router + app['config'] = config setup_ws_routes(app) - return app +async def run_servers(config: dict) -> None: + # Create shared components + providers = create_providers() + cache = Cache(size_limit_gb=config["cache_size_gb"]) + error_logger = ErrorLogger(db_path=config["error_db_path"]) + router = Router(providers, cache, error_logger) + + # Create separate apps + rpc_app = create_rpc_app(config, router) + ws_app = create_ws_app(config, router) + + # Create runners + rpc_runner = web.AppRunner(rpc_app) + ws_runner = web.AppRunner(ws_app) + + await rpc_runner.setup() + await ws_runner.setup() + + # Create sites + rpc_site = web.TCPSite(rpc_runner, '0.0.0.0', config["rpc_port"]) + ws_site = web.TCPSite(ws_runner, '0.0.0.0', config["ws_port"]) + + # Start both servers + await rpc_site.start() + await ws_site.start() + + logger = logging.getLogger(__name__) + logger.info(f"RPC server started on port {config['rpc_port']}") + logger.info(f"WebSocket server started on port {config['ws_port']}") + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") + + # Keep servers running + try: + await asyncio.Event().wait() + except KeyboardInterrupt: + pass + finally: + await rpc_runner.cleanup() + await ws_runner.cleanup() + def main() -> None: config = load_config() setup_logging(config["log_level"]) - - logger = logging.getLogger(__name__) - logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - logger.info(f"Cache size limit: {config['cache_size_gb']}GB") - logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") - - app = create_app(config) - - web.run_app( - app, - host='0.0.0.0', - port=config["proxy_port"] - ) + + asyncio.run(run_servers(config)) if __name__ == "__main__": diff --git a/providers.py b/providers.py index 7da9c66..f6ab75a 100644 --- a/providers.py +++ b/providers.py @@ -8,28 +8,28 @@ class Provider(ABC): def __init__(self, name: str): self.name = name self.backoff_until: Optional[datetime] = None - + @property @abstractmethod def http_url(self) -> str: pass - + @property @abstractmethod def ws_url(self) -> str: pass - + def transform_request(self, request: Dict[str, Any]) -> Dict[str, Any]: return request - + def transform_response(self, response: Dict[str, Any]) -> Dict[str, Any]: return response - + def is_available(self) -> bool: if self.backoff_until is None: return True return datetime.now() > self.backoff_until - + def mark_failed(self, backoff_minutes: int = 30) -> None: self.backoff_until = datetime.now() + timedelta(minutes=backoff_minutes) @@ -38,11 +38,11 @@ class AlchemyProvider(Provider): def __init__(self): super().__init__("alchemy") self.api_key = os.getenv("ALCHEMY_API_KEY", "") - + @property def http_url(self) -> str: return f"https://solana-mainnet.g.alchemy.com/v2/{self.api_key}" - + @property def ws_url(self) -> str: return f"wss://solana-mainnet.g.alchemy.com/v2/{self.api_key}" @@ -51,11 +51,11 @@ class AlchemyProvider(Provider): class PublicNodeProvider(Provider): def __init__(self): super().__init__("publicnode") - + @property def http_url(self) -> str: return "https://solana-rpc.publicnode.com" - + @property def ws_url(self) -> str: return "wss://solana-rpc.publicnode.com" @@ -65,11 +65,11 @@ class HeliusProvider(Provider): def __init__(self): super().__init__("helius") self.api_key = os.getenv("HELIUS_API_KEY", "") - + @property def http_url(self) -> str: return f"https://mainnet.helius-rpc.com/?api-key={self.api_key}" - + @property def ws_url(self) -> str: return f"wss://mainnet.helius-rpc.com/?api-key={self.api_key}" @@ -80,11 +80,11 @@ class QuickNodeProvider(Provider): super().__init__("quicknode") self.endpoint = os.getenv("QUICKNODE_ENDPOINT", "") self.token = os.getenv("QUICKNODE_TOKEN", "") - + @property def http_url(self) -> str: return f"https://{self.endpoint}/{self.token}/" - + @property def ws_url(self) -> str: return f"wss://{self.endpoint}/{self.token}/" @@ -93,11 +93,11 @@ class QuickNodeProvider(Provider): class SolanaPublicProvider(Provider): def __init__(self): super().__init__("solana_public") - + @property def http_url(self) -> str: return "https://api.mainnet-beta.solana.com" - + @property def ws_url(self) -> str: return "wss://api.mainnet-beta.solana.com" @@ -105,9 +105,9 @@ class SolanaPublicProvider(Provider): def create_providers() -> list[Provider]: return [ - AlchemyProvider(), - PublicNodeProvider(), + SolanaPublicProvider(), HeliusProvider(), + AlchemyProvider(), QuickNodeProvider(), - SolanaPublicProvider() - ] \ No newline at end of file + PublicNodeProvider(), + ] diff --git a/router.py b/router.py index ee5b535..ffa1c50 100644 --- a/router.py +++ b/router.py @@ -14,17 +14,17 @@ class Router: self.error_logger = error_logger self.current_provider_index = 0 self.logger = logging.getLogger(__name__) - - async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: + + async def route_request(self, method: str, params: Any) -> Dict[str, Any]: request = {"method": method, "params": params} - + cached_response = self.cache.get(method, params) if cached_response: self.logger.debug(f"Cache hit for {method}") cached_response["_cached"] = True cached_response["_provider"] = "cache" return cached_response - + for attempt in range(len(self.providers)): provider = self.get_next_available_provider() if not provider: @@ -32,48 +32,49 @@ class Router: "All providers are currently unavailable", "NO_AVAILABLE_PROVIDERS" ) - + try: response = await self._make_request(provider, request) - + transformed_response = provider.transform_response(response) transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - + self.cache.set(method, params, transformed_response) self.logger.info(f"Request succeeded via {provider.name}") return transformed_response - + except Exception as error: error_id = self.error_logger.log_error(provider.name, request, error) self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})") provider.mark_failed() - + return self._create_error_response( "All providers failed to handle the request", "ALL_PROVIDERS_FAILED" ) - + def get_next_available_provider(self) -> Optional[Provider]: for _ in range(len(self.providers)): provider = self.providers[self.current_provider_index] - self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) - + if provider.is_available(): return provider - + else: + self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) + return None - + async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]: transformed_request = provider.transform_request(request) - + rpc_request = { "jsonrpc": "2.0", "id": 1, "method": transformed_request["method"], "params": transformed_request["params"] } - + timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( @@ -83,14 +84,14 @@ class Router: ) as response: if response.status != 200: raise Exception(f"HTTP {response.status}: {await response.text()}") - + result = await response.json() - + if "error" in result: raise Exception(f"RPC Error: {result['error']}") - + return result - + def _create_error_response(self, message: str, code: str) -> Dict[str, Any]: return { "jsonrpc": "2.0", diff --git a/ws_proxy.py b/ws_proxy.py index 76536eb..7d335b2 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -11,110 +11,170 @@ class WebSocketProxy: self.router = router self.logger = logging.getLogger(__name__) self.subscription_mappings: Dict[str, str] = {} - + async def handle_ws_connection(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) - + self.logger.info("New WebSocket connection established") - + provider = self.router.get_next_available_provider() if not provider: await ws.close(code=1011, message=b'No available providers') return ws - + try: - provider_ws = await self._connect_to_provider(provider) - if not provider_ws: + provider_connection = await self._connect_to_provider(provider) + if not provider_connection: await ws.close(code=1011, message=b'Failed to connect to provider') return ws - - await asyncio.gather( - self._proxy_client_to_provider(ws, provider_ws, provider), - self._proxy_provider_to_client(provider_ws, ws, provider), - return_exceptions=True - ) - + + provider_ws, provider_session = provider_connection + + try: + await asyncio.gather( + self._proxy_client_to_provider(ws, provider_ws, provider), + self._proxy_provider_to_client(provider_ws, ws, provider), + return_exceptions=True + ) + finally: + # Clean up provider connection + if not provider_ws.closed: + await provider_ws.close() + await provider_session.close() + except Exception as e: self.logger.error(f"WebSocket proxy error: {e}") - + finally: if not ws.closed: await ws.close() - + return ws - - async def _connect_to_provider(self, provider) -> Optional[object]: + + async def _connect_to_provider(self, provider) -> Optional[tuple]: + session = None try: session = ClientSession() + self.logger.info(f"Attempting to connect to provider {provider.name} at {provider.ws_url}") ws = await session.ws_connect(provider.ws_url) - self.logger.info(f"Connected to provider {provider.name} WebSocket") - return ws + self.logger.info(f"Successfully connected to provider {provider.name} WebSocket at {provider.ws_url}") + return (ws, session) except Exception as e: - self.logger.error(f"Failed to connect to provider {provider.name}: {e}") + self.logger.error(f"Failed to connect to provider {provider.name} at {provider.ws_url}: {e}") + if session: + await session.close() return None - + async def _proxy_client_to_provider(self, client_ws: web.WebSocketResponse, provider_ws, provider) -> None: async for msg in client_ws: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) - + method = data.get('method', 'unknown') + + self.logger.info(f"Received from client: {data}") + + # Handle ping messages locally + if method == "ping": + pong_response = { + "jsonrpc": "2.0", + "result": "pong", + "id": data.get("id") + } + await client_ws.send_str(json.dumps(pong_response)) + self.logger.info("Responded to ping with pong") + continue + + # Special logging for signature subscriptions + if method == "signatureSubscribe": + signature = data.get('params', [None])[0] if data.get('params') else None + self.logger.info(f"SIGNATURE_SUBSCRIBE: Forwarding to {provider.name} for signature: {signature}") + transformed_request = provider.transform_request(data) - + + if method == "signatureSubscribe": + self.logger.info(f"SIGNATURE_SUBSCRIBE: Sending to provider {provider.name}: {transformed_request}") + await provider_ws.send_str(json.dumps(transformed_request)) - self.logger.debug(f"Forwarded message to {provider.name}: {data.get('method', 'unknown')}") - + self.logger.info(f"Forwarded message to {provider.name}: {method}") + except json.JSONDecodeError: self.logger.warning("Received invalid JSON from client") except Exception as e: self.logger.error(f"Error forwarding to provider: {e}") break - + + elif msg.type == WSMsgType.PING: + await client_ws.pong(msg.data) + self.logger.debug("Responded to WebSocket ping with pong") + elif msg.type == WSMsgType.ERROR: self.logger.error(f'WebSocket error: {client_ws.exception()}') break - + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING): break - + async def _proxy_provider_to_client(self, provider_ws, client_ws: web.WebSocketResponse, provider) -> None: + self.logger.info(f"Starting provider-to-client message loop for {provider.name}") + message_count = 0 async for msg in provider_ws: + message_count += 1 + self.logger.info(f"Provider {provider.name} message #{message_count}, type: {msg.type}") if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) - + self.logger.info(f"Received from provider {provider.name}: {data}") + + # Special logging for signature subscription responses + if "result" in data and isinstance(data.get("result"), (int, str)): + self.logger.info(f"SIGNATURE_SUBSCRIBE: Got subscription ID response from {provider.name}: {data.get('result')}") + elif data.get("method") == "signatureNotification": + subscription_id = data.get("params", {}).get("subscription") + result = data.get("params", {}).get("result") + self.logger.info(f"SIGNATURE_NOTIFICATION: From {provider.name}, subscription {subscription_id}, result: {result}") + transformed_response = provider.transform_response(data) - + if "result" in transformed_response and "subscription" in str(transformed_response.get("result", {})): subscription_id = transformed_response.get("result") if subscription_id: self.subscription_mappings[str(subscription_id)] = provider.name - + self.logger.info(f"SIGNATURE_SUBSCRIBE: Mapped subscription {subscription_id} to {provider.name}") + transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - + method = transformed_response.get("method", "") params = transformed_response.get("params", {}) if method and params: self.router.cache.set(method, params, transformed_response) - + await client_ws.send_str(json.dumps(transformed_response)) - self.logger.debug(f"Forwarded response from {provider.name}") - + self.logger.info(f"Forwarded response to client from {provider.name}: {transformed_response}") + except json.JSONDecodeError: self.logger.warning(f"Received invalid JSON from provider {provider.name}") except Exception as e: self.logger.error(f"Error forwarding from provider: {e}") - break - + # Don't break here - continue processing other messages + continue + + elif msg.type == WSMsgType.PING: + await provider_ws.pong(msg.data) + self.logger.debug(f"Responded to provider WebSocket ping from {provider.name}") + elif msg.type == WSMsgType.ERROR: self.logger.error(f'Provider WebSocket error: {provider_ws.exception()}') break - + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING): + self.logger.warning(f"Provider WebSocket connection closed from {provider.name}") break + self.logger.warning(f"Provider-to-client message loop ended for {provider.name} after {message_count} messages") + async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: router: Router = request.app['router'] @@ -123,4 +183,4 @@ async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: def setup_ws_routes(app: web.Application) -> None: - app.router.add_get('/ws', handle_ws_connection) \ No newline at end of file + app.router.add_get('/', handle_ws_connection)