From f8e8b359bc83c160ce5a4536b6684cce2aaa5b4e Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 25 Jul 2025 16:20:48 +0530 Subject: [PATCH 01/12] Add debug logs for signature subscription --- http_proxy.py | 29 ++++++----- main.py | 99 +++++++++++++++++++++++++++--------- providers.py | 40 +++++++-------- router.py | 41 +++++++-------- ws_proxy.py | 138 ++++++++++++++++++++++++++++++++++++-------------- 5 files changed, 230 insertions(+), 117 deletions(-) diff --git a/http_proxy.py b/http_proxy.py index fafac6f..5be8b23 100644 --- a/http_proxy.py +++ b/http_proxy.py @@ -7,10 +7,10 @@ from router import Router async def handle_rpc_request(request: web.Request) -> web.Response: router: Router = request.app['router'] logger = logging.getLogger(__name__) - + try: body = await request.json() - + if not isinstance(body, dict): return web.json_response({ "jsonrpc": "2.0", @@ -20,11 +20,11 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Invalid Request" } }, status=400) - + method = body.get("method") params = body.get("params", []) request_id = body.get("id", 1) - + if not method: return web.json_response({ "jsonrpc": "2.0", @@ -34,14 +34,22 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Missing method" } }, status=400) - + logger.info(f"Handling RPC request: {method}") + # Special logging for signature status requests + if method == "getSignatureStatuses": + signatures = params if isinstance(params, list) else [] + logger.info(f"GET_SIGNATURE_STATUSES: Checking signatures: {signatures}") + response = await router.route_request(method, params) + + if method == "getSignatureStatuses": + logger.info(f"GET_SIGNATURE_STATUSES: Response: {response}") response["id"] = request_id - + return web.json_response(response) - + except json.JSONDecodeError: return web.json_response({ "jsonrpc": "2.0", @@ -51,7 +59,7 @@ async def handle_rpc_request(request: web.Request) -> web.Response: "message": "Parse error" } }, status=400) - + except Exception as e: logger.error(f"Unexpected error: {e}") return web.json_response({ @@ -64,10 +72,5 @@ async def handle_rpc_request(request: web.Request) -> web.Response: }, status=500) - - - def setup_routes(app: web.Application) -> None: app.router.add_post('/', handle_rpc_request) - - diff --git a/main.py b/main.py index 63678c2..31b4c9f 100644 --- a/main.py +++ b/main.py @@ -1,5 +1,6 @@ import os import logging +import asyncio from dotenv import load_dotenv from aiohttp import web from providers import create_providers @@ -10,11 +11,31 @@ from http_proxy import setup_routes from ws_proxy import setup_ws_routes +@web.middleware +async def cors_middleware(request, handler): + """Add CORS headers to all responses""" + if request.method == 'OPTIONS': + # Handle preflight requests + return web.Response(headers={ + 'Access-Control-Allow-Origin': '*', + 'Access-Control-Allow-Methods': 'GET, POST, OPTIONS', + 'Access-Control-Allow-Headers': '*', + 'Access-Control-Max-Age': '86400' + }) + + response = await handler(request) + response.headers['Access-Control-Allow-Origin'] = '*' + response.headers['Access-Control-Allow-Methods'] = 'GET, POST, OPTIONS' + response.headers['Access-Control-Allow-Headers'] = '*' + return response + + def load_config() -> dict: load_dotenv() - + return { - "proxy_port": int(os.getenv("PROXY_PORT", 8545)), + "rpc_port": int(os.getenv("RPC_PORT", 8545)), + "ws_port": int(os.getenv("WS_PORT", 8546)), "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), @@ -29,41 +50,69 @@ def setup_logging(log_level: str) -> None: ) -def create_app(config: dict) -> web.Application: - app = web.Application() - - providers = create_providers() - cache = Cache(size_limit_gb=config["cache_size_gb"]) - error_logger = ErrorLogger(db_path=config["error_db_path"]) - router = Router(providers, cache, error_logger) - +def create_rpc_app(config: dict, router: Router) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) app['router'] = router app['config'] = config - setup_routes(app) + return app + +def create_ws_app(config: dict, router: Router) -> web.Application: + app = web.Application(middlewares=[cors_middleware]) + app['router'] = router + app['config'] = config setup_ws_routes(app) - return app +async def run_servers(config: dict) -> None: + # Create shared components + providers = create_providers() + cache = Cache(size_limit_gb=config["cache_size_gb"]) + error_logger = ErrorLogger(db_path=config["error_db_path"]) + router = Router(providers, cache, error_logger) + + # Create separate apps + rpc_app = create_rpc_app(config, router) + ws_app = create_ws_app(config, router) + + # Create runners + rpc_runner = web.AppRunner(rpc_app) + ws_runner = web.AppRunner(ws_app) + + await rpc_runner.setup() + await ws_runner.setup() + + # Create sites + rpc_site = web.TCPSite(rpc_runner, '0.0.0.0', config["rpc_port"]) + ws_site = web.TCPSite(ws_runner, '0.0.0.0', config["ws_port"]) + + # Start both servers + await rpc_site.start() + await ws_site.start() + + logger = logging.getLogger(__name__) + logger.info(f"RPC server started on port {config['rpc_port']}") + logger.info(f"WebSocket server started on port {config['ws_port']}") + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") + + # Keep servers running + try: + await asyncio.Event().wait() + except KeyboardInterrupt: + pass + finally: + await rpc_runner.cleanup() + await ws_runner.cleanup() + def main() -> None: config = load_config() setup_logging(config["log_level"]) - - logger = logging.getLogger(__name__) - logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - logger.info(f"Cache size limit: {config['cache_size_gb']}GB") - logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") - - app = create_app(config) - - web.run_app( - app, - host='0.0.0.0', - port=config["proxy_port"] - ) + + asyncio.run(run_servers(config)) if __name__ == "__main__": diff --git a/providers.py b/providers.py index 7da9c66..f6ab75a 100644 --- a/providers.py +++ b/providers.py @@ -8,28 +8,28 @@ class Provider(ABC): def __init__(self, name: str): self.name = name self.backoff_until: Optional[datetime] = None - + @property @abstractmethod def http_url(self) -> str: pass - + @property @abstractmethod def ws_url(self) -> str: pass - + def transform_request(self, request: Dict[str, Any]) -> Dict[str, Any]: return request - + def transform_response(self, response: Dict[str, Any]) -> Dict[str, Any]: return response - + def is_available(self) -> bool: if self.backoff_until is None: return True return datetime.now() > self.backoff_until - + def mark_failed(self, backoff_minutes: int = 30) -> None: self.backoff_until = datetime.now() + timedelta(minutes=backoff_minutes) @@ -38,11 +38,11 @@ class AlchemyProvider(Provider): def __init__(self): super().__init__("alchemy") self.api_key = os.getenv("ALCHEMY_API_KEY", "") - + @property def http_url(self) -> str: return f"https://solana-mainnet.g.alchemy.com/v2/{self.api_key}" - + @property def ws_url(self) -> str: return f"wss://solana-mainnet.g.alchemy.com/v2/{self.api_key}" @@ -51,11 +51,11 @@ class AlchemyProvider(Provider): class PublicNodeProvider(Provider): def __init__(self): super().__init__("publicnode") - + @property def http_url(self) -> str: return "https://solana-rpc.publicnode.com" - + @property def ws_url(self) -> str: return "wss://solana-rpc.publicnode.com" @@ -65,11 +65,11 @@ class HeliusProvider(Provider): def __init__(self): super().__init__("helius") self.api_key = os.getenv("HELIUS_API_KEY", "") - + @property def http_url(self) -> str: return f"https://mainnet.helius-rpc.com/?api-key={self.api_key}" - + @property def ws_url(self) -> str: return f"wss://mainnet.helius-rpc.com/?api-key={self.api_key}" @@ -80,11 +80,11 @@ class QuickNodeProvider(Provider): super().__init__("quicknode") self.endpoint = os.getenv("QUICKNODE_ENDPOINT", "") self.token = os.getenv("QUICKNODE_TOKEN", "") - + @property def http_url(self) -> str: return f"https://{self.endpoint}/{self.token}/" - + @property def ws_url(self) -> str: return f"wss://{self.endpoint}/{self.token}/" @@ -93,11 +93,11 @@ class QuickNodeProvider(Provider): class SolanaPublicProvider(Provider): def __init__(self): super().__init__("solana_public") - + @property def http_url(self) -> str: return "https://api.mainnet-beta.solana.com" - + @property def ws_url(self) -> str: return "wss://api.mainnet-beta.solana.com" @@ -105,9 +105,9 @@ class SolanaPublicProvider(Provider): def create_providers() -> list[Provider]: return [ - AlchemyProvider(), - PublicNodeProvider(), + SolanaPublicProvider(), HeliusProvider(), + AlchemyProvider(), QuickNodeProvider(), - SolanaPublicProvider() - ] \ No newline at end of file + PublicNodeProvider(), + ] diff --git a/router.py b/router.py index ee5b535..ffa1c50 100644 --- a/router.py +++ b/router.py @@ -14,17 +14,17 @@ class Router: self.error_logger = error_logger self.current_provider_index = 0 self.logger = logging.getLogger(__name__) - - async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: + + async def route_request(self, method: str, params: Any) -> Dict[str, Any]: request = {"method": method, "params": params} - + cached_response = self.cache.get(method, params) if cached_response: self.logger.debug(f"Cache hit for {method}") cached_response["_cached"] = True cached_response["_provider"] = "cache" return cached_response - + for attempt in range(len(self.providers)): provider = self.get_next_available_provider() if not provider: @@ -32,48 +32,49 @@ class Router: "All providers are currently unavailable", "NO_AVAILABLE_PROVIDERS" ) - + try: response = await self._make_request(provider, request) - + transformed_response = provider.transform_response(response) transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - + self.cache.set(method, params, transformed_response) self.logger.info(f"Request succeeded via {provider.name}") return transformed_response - + except Exception as error: error_id = self.error_logger.log_error(provider.name, request, error) self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})") provider.mark_failed() - + return self._create_error_response( "All providers failed to handle the request", "ALL_PROVIDERS_FAILED" ) - + def get_next_available_provider(self) -> Optional[Provider]: for _ in range(len(self.providers)): provider = self.providers[self.current_provider_index] - self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) - + if provider.is_available(): return provider - + else: + self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) + return None - + async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]: transformed_request = provider.transform_request(request) - + rpc_request = { "jsonrpc": "2.0", "id": 1, "method": transformed_request["method"], "params": transformed_request["params"] } - + timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( @@ -83,14 +84,14 @@ class Router: ) as response: if response.status != 200: raise Exception(f"HTTP {response.status}: {await response.text()}") - + result = await response.json() - + if "error" in result: raise Exception(f"RPC Error: {result['error']}") - + return result - + def _create_error_response(self, message: str, code: str) -> Dict[str, Any]: return { "jsonrpc": "2.0", diff --git a/ws_proxy.py b/ws_proxy.py index 76536eb..7d335b2 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -11,110 +11,170 @@ class WebSocketProxy: self.router = router self.logger = logging.getLogger(__name__) self.subscription_mappings: Dict[str, str] = {} - + async def handle_ws_connection(self, request: web.Request) -> web.WebSocketResponse: ws = web.WebSocketResponse() await ws.prepare(request) - + self.logger.info("New WebSocket connection established") - + provider = self.router.get_next_available_provider() if not provider: await ws.close(code=1011, message=b'No available providers') return ws - + try: - provider_ws = await self._connect_to_provider(provider) - if not provider_ws: + provider_connection = await self._connect_to_provider(provider) + if not provider_connection: await ws.close(code=1011, message=b'Failed to connect to provider') return ws - - await asyncio.gather( - self._proxy_client_to_provider(ws, provider_ws, provider), - self._proxy_provider_to_client(provider_ws, ws, provider), - return_exceptions=True - ) - + + provider_ws, provider_session = provider_connection + + try: + await asyncio.gather( + self._proxy_client_to_provider(ws, provider_ws, provider), + self._proxy_provider_to_client(provider_ws, ws, provider), + return_exceptions=True + ) + finally: + # Clean up provider connection + if not provider_ws.closed: + await provider_ws.close() + await provider_session.close() + except Exception as e: self.logger.error(f"WebSocket proxy error: {e}") - + finally: if not ws.closed: await ws.close() - + return ws - - async def _connect_to_provider(self, provider) -> Optional[object]: + + async def _connect_to_provider(self, provider) -> Optional[tuple]: + session = None try: session = ClientSession() + self.logger.info(f"Attempting to connect to provider {provider.name} at {provider.ws_url}") ws = await session.ws_connect(provider.ws_url) - self.logger.info(f"Connected to provider {provider.name} WebSocket") - return ws + self.logger.info(f"Successfully connected to provider {provider.name} WebSocket at {provider.ws_url}") + return (ws, session) except Exception as e: - self.logger.error(f"Failed to connect to provider {provider.name}: {e}") + self.logger.error(f"Failed to connect to provider {provider.name} at {provider.ws_url}: {e}") + if session: + await session.close() return None - + async def _proxy_client_to_provider(self, client_ws: web.WebSocketResponse, provider_ws, provider) -> None: async for msg in client_ws: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) - + method = data.get('method', 'unknown') + + self.logger.info(f"Received from client: {data}") + + # Handle ping messages locally + if method == "ping": + pong_response = { + "jsonrpc": "2.0", + "result": "pong", + "id": data.get("id") + } + await client_ws.send_str(json.dumps(pong_response)) + self.logger.info("Responded to ping with pong") + continue + + # Special logging for signature subscriptions + if method == "signatureSubscribe": + signature = data.get('params', [None])[0] if data.get('params') else None + self.logger.info(f"SIGNATURE_SUBSCRIBE: Forwarding to {provider.name} for signature: {signature}") + transformed_request = provider.transform_request(data) - + + if method == "signatureSubscribe": + self.logger.info(f"SIGNATURE_SUBSCRIBE: Sending to provider {provider.name}: {transformed_request}") + await provider_ws.send_str(json.dumps(transformed_request)) - self.logger.debug(f"Forwarded message to {provider.name}: {data.get('method', 'unknown')}") - + self.logger.info(f"Forwarded message to {provider.name}: {method}") + except json.JSONDecodeError: self.logger.warning("Received invalid JSON from client") except Exception as e: self.logger.error(f"Error forwarding to provider: {e}") break - + + elif msg.type == WSMsgType.PING: + await client_ws.pong(msg.data) + self.logger.debug("Responded to WebSocket ping with pong") + elif msg.type == WSMsgType.ERROR: self.logger.error(f'WebSocket error: {client_ws.exception()}') break - + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING): break - + async def _proxy_provider_to_client(self, provider_ws, client_ws: web.WebSocketResponse, provider) -> None: + self.logger.info(f"Starting provider-to-client message loop for {provider.name}") + message_count = 0 async for msg in provider_ws: + message_count += 1 + self.logger.info(f"Provider {provider.name} message #{message_count}, type: {msg.type}") if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) - + self.logger.info(f"Received from provider {provider.name}: {data}") + + # Special logging for signature subscription responses + if "result" in data and isinstance(data.get("result"), (int, str)): + self.logger.info(f"SIGNATURE_SUBSCRIBE: Got subscription ID response from {provider.name}: {data.get('result')}") + elif data.get("method") == "signatureNotification": + subscription_id = data.get("params", {}).get("subscription") + result = data.get("params", {}).get("result") + self.logger.info(f"SIGNATURE_NOTIFICATION: From {provider.name}, subscription {subscription_id}, result: {result}") + transformed_response = provider.transform_response(data) - + if "result" in transformed_response and "subscription" in str(transformed_response.get("result", {})): subscription_id = transformed_response.get("result") if subscription_id: self.subscription_mappings[str(subscription_id)] = provider.name - + self.logger.info(f"SIGNATURE_SUBSCRIBE: Mapped subscription {subscription_id} to {provider.name}") + transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - + method = transformed_response.get("method", "") params = transformed_response.get("params", {}) if method and params: self.router.cache.set(method, params, transformed_response) - + await client_ws.send_str(json.dumps(transformed_response)) - self.logger.debug(f"Forwarded response from {provider.name}") - + self.logger.info(f"Forwarded response to client from {provider.name}: {transformed_response}") + except json.JSONDecodeError: self.logger.warning(f"Received invalid JSON from provider {provider.name}") except Exception as e: self.logger.error(f"Error forwarding from provider: {e}") - break - + # Don't break here - continue processing other messages + continue + + elif msg.type == WSMsgType.PING: + await provider_ws.pong(msg.data) + self.logger.debug(f"Responded to provider WebSocket ping from {provider.name}") + elif msg.type == WSMsgType.ERROR: self.logger.error(f'Provider WebSocket error: {provider_ws.exception()}') break - + elif msg.type in (WSMsgType.CLOSE, WSMsgType.CLOSING): + self.logger.warning(f"Provider WebSocket connection closed from {provider.name}") break + self.logger.warning(f"Provider-to-client message loop ended for {provider.name} after {message_count} messages") + async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: router: Router = request.app['router'] @@ -123,4 +183,4 @@ async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: def setup_ws_routes(app: web.Application) -> None: - app.router.add_get('/ws', handle_ws_connection) \ No newline at end of file + app.router.add_get('/', handle_ws_connection) -- 2.45.2 From 02ee35437b9bfe0c29cb7dfe89c2c0722bc8059b Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 15:21:06 +0530 Subject: [PATCH 02/12] Remove debug logs --- http_proxy.py | 9 +-------- ws_proxy.py | 20 -------------------- 2 files changed, 1 insertion(+), 28 deletions(-) diff --git a/http_proxy.py b/http_proxy.py index 5be8b23..c49b073 100644 --- a/http_proxy.py +++ b/http_proxy.py @@ -36,16 +36,9 @@ async def handle_rpc_request(request: web.Request) -> web.Response: }, status=400) logger.info(f"Handling RPC request: {method}") - - # Special logging for signature status requests - if method == "getSignatureStatuses": - signatures = params if isinstance(params, list) else [] - logger.info(f"GET_SIGNATURE_STATUSES: Checking signatures: {signatures}") response = await router.route_request(method, params) - - if method == "getSignatureStatuses": - logger.info(f"GET_SIGNATURE_STATUSES: Response: {response}") + response["id"] = request_id return web.json_response(response) diff --git a/ws_proxy.py b/ws_proxy.py index 7d335b2..35e6789 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -86,16 +86,8 @@ class WebSocketProxy: self.logger.info("Responded to ping with pong") continue - # Special logging for signature subscriptions - if method == "signatureSubscribe": - signature = data.get('params', [None])[0] if data.get('params') else None - self.logger.info(f"SIGNATURE_SUBSCRIBE: Forwarding to {provider.name} for signature: {signature}") - transformed_request = provider.transform_request(data) - if method == "signatureSubscribe": - self.logger.info(f"SIGNATURE_SUBSCRIBE: Sending to provider {provider.name}: {transformed_request}") - await provider_ws.send_str(json.dumps(transformed_request)) self.logger.info(f"Forwarded message to {provider.name}: {method}") @@ -117,24 +109,12 @@ class WebSocketProxy: break async def _proxy_provider_to_client(self, provider_ws, client_ws: web.WebSocketResponse, provider) -> None: - self.logger.info(f"Starting provider-to-client message loop for {provider.name}") - message_count = 0 async for msg in provider_ws: - message_count += 1 - self.logger.info(f"Provider {provider.name} message #{message_count}, type: {msg.type}") if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) self.logger.info(f"Received from provider {provider.name}: {data}") - # Special logging for signature subscription responses - if "result" in data and isinstance(data.get("result"), (int, str)): - self.logger.info(f"SIGNATURE_SUBSCRIBE: Got subscription ID response from {provider.name}: {data.get('result')}") - elif data.get("method") == "signatureNotification": - subscription_id = data.get("params", {}).get("subscription") - result = data.get("params", {}).get("result") - self.logger.info(f"SIGNATURE_NOTIFICATION: From {provider.name}, subscription {subscription_id}, result: {result}") - transformed_response = provider.transform_response(data) if "result" in transformed_response and "subscription" in str(transformed_response.get("result", {})): -- 2.45.2 From 04b31f6a55f6fab410e5bc94a8dd6172109944c7 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 15:47:04 +0530 Subject: [PATCH 03/12] Update return type for route_request method --- providers.py | 4 ++-- router.py | 2 +- ws_proxy.py | 14 +++++++------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/providers.py b/providers.py index f6ab75a..708842d 100644 --- a/providers.py +++ b/providers.py @@ -106,8 +106,8 @@ class SolanaPublicProvider(Provider): def create_providers() -> list[Provider]: return [ SolanaPublicProvider(), - HeliusProvider(), AlchemyProvider(), - QuickNodeProvider(), PublicNodeProvider(), + HeliusProvider(), + QuickNodeProvider(), ] diff --git a/router.py b/router.py index ffa1c50..fd1dca5 100644 --- a/router.py +++ b/router.py @@ -15,7 +15,7 @@ class Router: self.current_provider_index = 0 self.logger = logging.getLogger(__name__) - async def route_request(self, method: str, params: Any) -> Dict[str, Any]: + async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} cached_response = self.cache.get(method, params) diff --git a/ws_proxy.py b/ws_proxy.py index 35e6789..f3141a5 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -73,7 +73,7 @@ class WebSocketProxy: data = json.loads(msg.data) method = data.get('method', 'unknown') - self.logger.info(f"Received from client: {data}") + self.logger.debug(f"Received from client: {data}") # Handle ping messages locally if method == "ping": @@ -83,13 +83,13 @@ class WebSocketProxy: "id": data.get("id") } await client_ws.send_str(json.dumps(pong_response)) - self.logger.info("Responded to ping with pong") + self.logger.debug("Responded to ping with pong") continue transformed_request = provider.transform_request(data) await provider_ws.send_str(json.dumps(transformed_request)) - self.logger.info(f"Forwarded message to {provider.name}: {method}") + self.logger.debug(f"Forwarded message to {provider.name}: {method}") except json.JSONDecodeError: self.logger.warning("Received invalid JSON from client") @@ -113,7 +113,7 @@ class WebSocketProxy: if msg.type == WSMsgType.TEXT: try: data = json.loads(msg.data) - self.logger.info(f"Received from provider {provider.name}: {data}") + self.logger.debug(f"Received from provider {provider.name}: {data}") transformed_response = provider.transform_response(data) @@ -121,7 +121,7 @@ class WebSocketProxy: subscription_id = transformed_response.get("result") if subscription_id: self.subscription_mappings[str(subscription_id)] = provider.name - self.logger.info(f"SIGNATURE_SUBSCRIBE: Mapped subscription {subscription_id} to {provider.name}") + self.logger.debug(f"SIGNATURE_SUBSCRIBE: Mapped subscription {subscription_id} to {provider.name}") transformed_response["_cached"] = False transformed_response["_provider"] = provider.name @@ -132,7 +132,7 @@ class WebSocketProxy: self.router.cache.set(method, params, transformed_response) await client_ws.send_str(json.dumps(transformed_response)) - self.logger.info(f"Forwarded response to client from {provider.name}: {transformed_response}") + self.logger.debug(f"Forwarded response to client from {provider.name}: {transformed_response}") except json.JSONDecodeError: self.logger.warning(f"Received invalid JSON from provider {provider.name}") @@ -153,7 +153,7 @@ class WebSocketProxy: self.logger.warning(f"Provider WebSocket connection closed from {provider.name}") break - self.logger.warning(f"Provider-to-client message loop ended for {provider.name} after {message_count} messages") + self.logger.warning(f"Provider-to-client message loop ended for {provider.name}") async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: -- 2.45.2 From e44fa4f72b1068535bc7db4ac97fd419af464bc2 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 15:48:40 +0530 Subject: [PATCH 04/12] Remove nested try block --- ws_proxy.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/ws_proxy.py b/ws_proxy.py index f3141a5..9aeae94 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -31,17 +31,16 @@ class WebSocketProxy: provider_ws, provider_session = provider_connection - try: - await asyncio.gather( - self._proxy_client_to_provider(ws, provider_ws, provider), - self._proxy_provider_to_client(provider_ws, ws, provider), - return_exceptions=True - ) - finally: - # Clean up provider connection - if not provider_ws.closed: - await provider_ws.close() - await provider_session.close() + await asyncio.gather( + self._proxy_client_to_provider(ws, provider_ws, provider), + self._proxy_provider_to_client(provider_ws, ws, provider), + return_exceptions=True + ) + + # Clean up provider connection + if not provider_ws.closed: + await provider_ws.close() + await provider_session.close() except Exception as e: self.logger.error(f"WebSocket proxy error: {e}") -- 2.45.2 From 0735b3082282ffe2ca2eb48011e7caedb2418618 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 16:51:34 +0530 Subject: [PATCH 05/12] Change back ws port to 8545 --- main.py | 71 +++++++++++++++------------------------------------ normalizer.py | 8 +++--- ws_proxy.py | 2 +- 3 files changed, 25 insertions(+), 56 deletions(-) diff --git a/main.py b/main.py index 31b4c9f..3914a43 100644 --- a/main.py +++ b/main.py @@ -34,8 +34,7 @@ def load_config() -> dict: load_dotenv() return { - "rpc_port": int(os.getenv("RPC_PORT", 8545)), - "ws_port": int(os.getenv("WS_PORT", 8546)), + "proxy_port": int(os.getenv("PROXY_PORT", 8545)), "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), @@ -50,70 +49,40 @@ def setup_logging(log_level: str) -> None: ) -def create_rpc_app(config: dict, router: Router) -> web.Application: +def create_app(config: dict) -> web.Application: app = web.Application(middlewares=[cors_middleware]) - app['router'] = router - app['config'] = config - setup_routes(app) - return app -def create_ws_app(config: dict, router: Router) -> web.Application: - app = web.Application(middlewares=[cors_middleware]) - app['router'] = router - app['config'] = config - setup_ws_routes(app) - return app - - - - -async def run_servers(config: dict) -> None: - # Create shared components providers = create_providers() cache = Cache(size_limit_gb=config["cache_size_gb"]) error_logger = ErrorLogger(db_path=config["error_db_path"]) router = Router(providers, cache, error_logger) - # Create separate apps - rpc_app = create_rpc_app(config, router) - ws_app = create_ws_app(config, router) + app['router'] = router + app['config'] = config - # Create runners - rpc_runner = web.AppRunner(rpc_app) - ws_runner = web.AppRunner(ws_app) + setup_routes(app) + setup_ws_routes(app) - await rpc_runner.setup() - await ws_runner.setup() + return app - # Create sites - rpc_site = web.TCPSite(rpc_runner, '0.0.0.0', config["rpc_port"]) - ws_site = web.TCPSite(ws_runner, '0.0.0.0', config["ws_port"]) - - # Start both servers - await rpc_site.start() - await ws_site.start() - - logger = logging.getLogger(__name__) - logger.info(f"RPC server started on port {config['rpc_port']}") - logger.info(f"WebSocket server started on port {config['ws_port']}") - logger.info(f"Cache size limit: {config['cache_size_gb']}GB") - logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") - - # Keep servers running - try: - await asyncio.Event().wait() - except KeyboardInterrupt: - pass - finally: - await rpc_runner.cleanup() - await ws_runner.cleanup() def main() -> None: config = load_config() setup_logging(config["log_level"]) - asyncio.run(run_servers(config)) + logger = logging.getLogger(__name__) + logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") + + app = create_app(config) + + web.run_app( + app, + host='0.0.0.0', + port=config["proxy_port"] + ) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/normalizer.py b/normalizer.py index dfb94c0..98a9964 100644 --- a/normalizer.py +++ b/normalizer.py @@ -3,16 +3,16 @@ from typing import Dict, Any def normalize_response(provider: str, response: Dict[str, Any]) -> Dict[str, Any]: normalized = response.copy() - + # Ensure consistent field names if "result" in normalized and normalized["result"] is None: # Some providers return null, others omit the field pass - + # Handle null vs missing fields consistently if "error" in normalized and normalized["error"] is None: del normalized["error"] - + return normalized @@ -25,4 +25,4 @@ def normalize_error(error: Exception, error_id: str) -> Dict[str, Any]: "message": str(error), "data": {"error_id": error_id} } - } \ No newline at end of file + } diff --git a/ws_proxy.py b/ws_proxy.py index 9aeae94..0885337 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -162,4 +162,4 @@ async def handle_ws_connection(request: web.Request) -> web.WebSocketResponse: def setup_ws_routes(app: web.Application) -> None: - app.router.add_get('/', handle_ws_connection) + app.router.add_get('/ws', handle_ws_connection) -- 2.45.2 From 07c6888bb6030098483b90fba7d7070d9c2126aa Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 17:11:17 +0530 Subject: [PATCH 06/12] Add flag to disable caching responses --- .env.example | 1 + main.py | 8 ++++++-- router.py | 19 +++++++++++-------- ws_proxy.py | 2 +- 4 files changed, 19 insertions(+), 11 deletions(-) diff --git a/.env.example b/.env.example index 845f69b..afc68a9 100644 --- a/.env.example +++ b/.env.example @@ -7,6 +7,7 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 CACHE_SIZE_GB=100 +DISABLE_CACHE=true BACKOFF_MINUTES=30 # Logging diff --git a/main.py b/main.py index 3914a43..cf8785a 100644 --- a/main.py +++ b/main.py @@ -36,6 +36,7 @@ def load_config() -> dict: return { "proxy_port": int(os.getenv("PROXY_PORT", 8545)), "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), + "disable_cache": os.getenv("DISABLE_CACHE", "true").lower() == "true", "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), "error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"), @@ -55,7 +56,7 @@ def create_app(config: dict) -> web.Application: providers = create_providers() cache = Cache(size_limit_gb=config["cache_size_gb"]) error_logger = ErrorLogger(db_path=config["error_db_path"]) - router = Router(providers, cache, error_logger) + router = Router(providers, cache, error_logger, config["disable_cache"]) app['router'] = router app['config'] = config @@ -72,7 +73,10 @@ def main() -> None: logger = logging.getLogger(__name__) logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - logger.info(f"Cache size limit: {config['cache_size_gb']}GB") + if config['disable_cache']: + logger.info("Cache is DISABLED - all responses will be fresh") + else: + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") app = create_app(config) diff --git a/router.py b/router.py index fd1dca5..19c9693 100644 --- a/router.py +++ b/router.py @@ -8,22 +8,24 @@ from errors import ErrorLogger class Router: - def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger): + def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger, disable_cache: bool = False): self.providers = providers self.cache = cache self.error_logger = error_logger + self.disable_cache = disable_cache self.current_provider_index = 0 self.logger = logging.getLogger(__name__) async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} - cached_response = self.cache.get(method, params) - if cached_response: - self.logger.debug(f"Cache hit for {method}") - cached_response["_cached"] = True - cached_response["_provider"] = "cache" - return cached_response + if not self.disable_cache: + cached_response = self.cache.get(method, params) + if cached_response: + self.logger.debug(f"Cache hit for {method}") + cached_response["_cached"] = True + cached_response["_provider"] = "cache" + return cached_response for attempt in range(len(self.providers)): provider = self.get_next_available_provider() @@ -40,7 +42,8 @@ class Router: transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - self.cache.set(method, params, transformed_response) + if not self.disable_cache: + self.cache.set(method, params, transformed_response) self.logger.info(f"Request succeeded via {provider.name}") return transformed_response diff --git a/ws_proxy.py b/ws_proxy.py index 0885337..8477c27 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -127,7 +127,7 @@ class WebSocketProxy: method = transformed_response.get("method", "") params = transformed_response.get("params", {}) - if method and params: + if method and params and not self.router.disable_cache: self.router.cache.set(method, params, transformed_response) await client_ws.send_str(json.dumps(transformed_response)) -- 2.45.2 From 3d91cca7c6bb96639ef05229542cd11ceebaa730 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 18:01:18 +0530 Subject: [PATCH 07/12] Add steps to setup and start proxy --- .env.example | 4 ++-- README.md | 24 +++++++++++++++++++++++- router.py | 1 + 3 files changed, 26 insertions(+), 3 deletions(-) diff --git a/.env.example b/.env.example index afc68a9..e1ff357 100644 --- a/.env.example +++ b/.env.example @@ -1,4 +1,4 @@ -# Provider endpoints and auth +# Provider endpoints and auth (optional) ALCHEMY_API_KEY=your_key_here HELIUS_API_KEY=your_key_here QUICKNODE_ENDPOINT=your_endpoint.quiknode.pro @@ -12,4 +12,4 @@ BACKOFF_MINUTES=30 # Logging LOG_LEVEL=INFO -ERROR_DB_PATH=./errors.db \ No newline at end of file +ERROR_DB_PATH=./errors.db diff --git a/README.md b/README.md index 781020d..e3fa07a 100644 --- a/README.md +++ b/README.md @@ -1 +1,23 @@ -The trenches are brutal. +# Solana Proxy + +## Setup + +- Copy `.env.example` to `.env`: + + ```bash + cp .env.example .env + ``` + +- The proxy will work without making any changes to the `.env` file but you can optionally set the API keys for different providers + +## Run + +- Start the proxy: + + ```bash + python3 main.py + ``` + +- This will start the proxy with, + - RPC endpoint at: + - WS endpoint at: diff --git a/router.py b/router.py index 19c9693..5410bc1 100644 --- a/router.py +++ b/router.py @@ -44,6 +44,7 @@ class Router: if not self.disable_cache: self.cache.set(method, params, transformed_response) + self.logger.info(f"Request succeeded via {provider.name}") return transformed_response -- 2.45.2 From 4a6331c71f98e260d56b2564467c5e705c8ac4c5 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Mon, 28 Jul 2025 19:55:43 +0530 Subject: [PATCH 08/12] Remove ping responses --- README.md | 2 +- ws_proxy.py | 19 ------------------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/README.md b/README.md index e3fa07a..ad76eed 100644 --- a/README.md +++ b/README.md @@ -20,4 +20,4 @@ - This will start the proxy with, - RPC endpoint at: - - WS endpoint at: + - WS endpoint at: diff --git a/ws_proxy.py b/ws_proxy.py index 8477c27..dd73c66 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -74,17 +74,6 @@ class WebSocketProxy: self.logger.debug(f"Received from client: {data}") - # Handle ping messages locally - if method == "ping": - pong_response = { - "jsonrpc": "2.0", - "result": "pong", - "id": data.get("id") - } - await client_ws.send_str(json.dumps(pong_response)) - self.logger.debug("Responded to ping with pong") - continue - transformed_request = provider.transform_request(data) await provider_ws.send_str(json.dumps(transformed_request)) @@ -96,10 +85,6 @@ class WebSocketProxy: self.logger.error(f"Error forwarding to provider: {e}") break - elif msg.type == WSMsgType.PING: - await client_ws.pong(msg.data) - self.logger.debug("Responded to WebSocket ping with pong") - elif msg.type == WSMsgType.ERROR: self.logger.error(f'WebSocket error: {client_ws.exception()}') break @@ -140,10 +125,6 @@ class WebSocketProxy: # Don't break here - continue processing other messages continue - elif msg.type == WSMsgType.PING: - await provider_ws.pong(msg.data) - self.logger.debug(f"Responded to provider WebSocket ping from {provider.name}") - elif msg.type == WSMsgType.ERROR: self.logger.error(f'Provider WebSocket error: {provider_ws.exception()}') break -- 2.45.2 From 3da690f2d104ec7fd32edf6c3b5e2404405a70bf Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 1 Aug 2025 10:27:58 +0530 Subject: [PATCH 09/12] Update gitignore file --- .gitignore | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 2eea525..fe1c538 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,3 @@ -.env \ No newline at end of file +.env +__pycache__ +*.db -- 2.45.2 From 23d69eec14ac0f7302adbb40e965a5210d4d0806 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 1 Aug 2025 14:23:23 +0530 Subject: [PATCH 10/12] Implement selective caching for RPC method responses --- .gitignore | 1 + cache.py | 30 ++++++++++++++-- cache_policy.py | 95 +++++++++++++++++++++++++++++++++++++++++++++++++ main.py | 8 ++--- router.py | 24 ++++++++----- ws_proxy.py | 5 --- 6 files changed, 141 insertions(+), 22 deletions(-) create mode 100644 cache_policy.py diff --git a/.gitignore b/.gitignore index fe1c538..3e9c748 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ .env __pycache__ *.db +cache diff --git a/cache.py b/cache.py index e840864..cd46a40 100644 --- a/cache.py +++ b/cache.py @@ -1,5 +1,6 @@ import json import os +import time from typing import Dict, Any, Optional import diskcache @@ -19,11 +20,34 @@ class Cache: def get(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: key = self._make_key(method, params) - return self.cache.get(key) + cached_data = self.cache.get(key) + + if cached_data is None: + return None + + # Check if cached data has TTL and if it's expired + if isinstance(cached_data, dict) and '_cache_expiry' in cached_data: + if time.time() > cached_data['_cache_expiry']: + # Remove expired entry + self.cache.delete(key) + return None + # Remove cache metadata before returning + response = cached_data.copy() + del response['_cache_expiry'] + return response + + return cached_data - def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any]) -> None: + def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any], ttl: Optional[int] = None) -> None: key = self._make_key(method, params) - self.cache.set(key, response) + + # Add TTL metadata if specified + if ttl is not None: + cached_response = response.copy() + cached_response['_cache_expiry'] = time.time() + ttl + self.cache.set(key, cached_response) + else: + self.cache.set(key, response) def size_check(self) -> Dict[str, Any]: stats = self.cache.stats() diff --git a/cache_policy.py b/cache_policy.py new file mode 100644 index 0000000..4b27d92 --- /dev/null +++ b/cache_policy.py @@ -0,0 +1,95 @@ +from typing import Dict, Any, Optional +import time + + +class CachePolicy: + """ + Determines caching behavior for Solana RPC methods based on their characteristics. + """ + + # Methods that return immutable data - cache indefinitely + CACHEABLE_IMMUTABLE = { + 'getGenesisHash' # Network genesis hash never changes + } + + # Methods with time-based TTL caching based on data change frequency + CACHEABLE_WITH_TTL = { + # Network/validator information - changes periodically + 'getVoteAccounts': 120, # Validator vote accounts change every few minutes + 'getSupply': 300, # Total SOL supply changes slowly + + # Epoch and network info - changes with epoch boundaries (~2-3 days) + 'getEpochInfo': 3600, # Current epoch info changes slowly + 'getInflationRate': 1800, # Inflation rate changes infrequently + 'getInflationGovernor': 3600, # Inflation governor params rarely change + + # Network constants - change very rarely or never + 'getEpochSchedule': 86400, # Epoch schedule rarely changes + 'getVersion': 3600, # RPC version changes occasionally + 'getIdentity': 3600, # Node identity changes rarely + + # Never change for the given parameters but will add new entry in the DB if the input parameters change + 'getBlock': 86400, + 'getTransaction':86400 + } + + def should_cache(self, method: str, params: Dict[str, Any]) -> bool: + """ + Determine if a method should be cached based on the method name and parameters. + + Args: + method: The RPC method name + params: The method parameters + + Returns: + True if the method should be cached, False otherwise + """ + if method in self.CACHEABLE_IMMUTABLE: + # For getBlock, only cache finalized blocks + if method == 'getBlock': + commitment = self._get_commitment(params) + return commitment == 'finalized' + return True + + if method in self.CACHEABLE_WITH_TTL: + return True + + # Default to not caching unknown methods + return False + + def get_cache_ttl(self, method: str, params: Dict[str, Any]) -> Optional[int]: + """ + Get the Time To Live (TTL) for a cached method in seconds. + + Args: + method: The RPC method name + params: The method parameters + + Returns: + TTL in seconds, or None for indefinite caching + """ + if method in self.CACHEABLE_IMMUTABLE: + return None # Cache indefinitely + + if method in self.CACHEABLE_WITH_TTL: + return self.CACHEABLE_WITH_TTL[method] + + return None + + def _get_commitment(self, params: Dict[str, Any]) -> str: + """ + Extract the commitment level from RPC parameters. + + Args: + params: The method parameters + + Returns: + The commitment level, defaults to 'processed' + """ + if isinstance(params, list) and len(params) > 1: + if isinstance(params[1], dict) and 'commitment' in params[1]: + return params[1]['commitment'] + elif isinstance(params, dict) and 'commitment' in params: + return params['commitment'] + + return 'processed' # Default commitment level diff --git a/main.py b/main.py index cf8785a..8499c99 100644 --- a/main.py +++ b/main.py @@ -36,7 +36,6 @@ def load_config() -> dict: return { "proxy_port": int(os.getenv("PROXY_PORT", 8545)), "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), - "disable_cache": os.getenv("DISABLE_CACHE", "true").lower() == "true", "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), "error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"), @@ -56,7 +55,7 @@ def create_app(config: dict) -> web.Application: providers = create_providers() cache = Cache(size_limit_gb=config["cache_size_gb"]) error_logger = ErrorLogger(db_path=config["error_db_path"]) - router = Router(providers, cache, error_logger, config["disable_cache"]) + router = Router(providers, cache, error_logger) app['router'] = router app['config'] = config @@ -73,10 +72,7 @@ def main() -> None: logger = logging.getLogger(__name__) logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - if config['disable_cache']: - logger.info("Cache is DISABLED - all responses will be fresh") - else: - logger.info(f"Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Intelligent caching enabled - Cache size limit: {config['cache_size_gb']}GB") logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") app = create_app(config) diff --git a/router.py b/router.py index 5410bc1..1c3f309 100644 --- a/router.py +++ b/router.py @@ -4,22 +4,26 @@ import logging from typing import Dict, Any, Optional, List from providers import Provider from cache import Cache +from cache_policy import CachePolicy from errors import ErrorLogger class Router: - def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger, disable_cache: bool = False): + def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger): self.providers = providers self.cache = cache + self.cache_policy = CachePolicy() self.error_logger = error_logger - self.disable_cache = disable_cache self.current_provider_index = 0 self.logger = logging.getLogger(__name__) async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} - if not self.disable_cache: + # Check if this method should be cached based on intelligent caching policy + should_cache = self.cache_policy.should_cache(method, params) + + if should_cache: cached_response = self.cache.get(method, params) if cached_response: self.logger.debug(f"Cache hit for {method}") @@ -42,10 +46,15 @@ class Router: transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - if not self.disable_cache: - self.cache.set(method, params, transformed_response) + # Cache the response if caching policy allows it + if should_cache: + ttl = self.cache_policy.get_cache_ttl(method, params) + self.cache.set(method, params, transformed_response, ttl) + cache_info = f" (cached {'indefinitely' if ttl is None else f'for {ttl}s'})" + self.logger.info(f"Request succeeded via {provider.name}{cache_info}") + else: + self.logger.info(f"Request succeeded via {provider.name} (not cached)") - self.logger.info(f"Request succeeded via {provider.name}") return transformed_response except Exception as error: @@ -61,11 +70,10 @@ class Router: def get_next_available_provider(self) -> Optional[Provider]: for _ in range(len(self.providers)): provider = self.providers[self.current_provider_index] + self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) if provider.is_available(): return provider - else: - self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) return None diff --git a/ws_proxy.py b/ws_proxy.py index dd73c66..a2f2e1e 100644 --- a/ws_proxy.py +++ b/ws_proxy.py @@ -110,11 +110,6 @@ class WebSocketProxy: transformed_response["_cached"] = False transformed_response["_provider"] = provider.name - method = transformed_response.get("method", "") - params = transformed_response.get("params", {}) - if method and params and not self.router.disable_cache: - self.router.cache.set(method, params, transformed_response) - await client_ws.send_str(json.dumps(transformed_response)) self.logger.debug(f"Forwarded response to client from {provider.name}: {transformed_response}") -- 2.45.2 From 307abfc78fdf99c7c832033627e5a8615fe460d8 Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 1 Aug 2025 14:36:35 +0530 Subject: [PATCH 11/12] Update example env --- .env.example | 1 - 1 file changed, 1 deletion(-) diff --git a/.env.example b/.env.example index e1ff357..fa0e7e6 100644 --- a/.env.example +++ b/.env.example @@ -7,7 +7,6 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 CACHE_SIZE_GB=100 -DISABLE_CACHE=true BACKOFF_MINUTES=30 # Logging -- 2.45.2 From 0dc3f060efecd8ac71535db99b2e16765623d70f Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 1 Aug 2025 15:53:35 +0530 Subject: [PATCH 12/12] Update back off mechanism to only back off if the endpoint is not available --- .env.example | 2 +- cache.py | 19 +++++++------- cache_policy.py | 5 ++-- docs/solana-proxy-implementation-plan.md | 18 ++++++------- main.py | 4 +-- router.py | 33 ++++++++++++++++++++++-- 6 files changed, 55 insertions(+), 26 deletions(-) diff --git a/.env.example b/.env.example index fa0e7e6..8667386 100644 --- a/.env.example +++ b/.env.example @@ -6,7 +6,7 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 -CACHE_SIZE_GB=100 +CACHE_SIZE_GB=1 BACKOFF_MINUTES=30 # Logging diff --git a/cache.py b/cache.py index cd46a40..b290cca 100644 --- a/cache.py +++ b/cache.py @@ -6,7 +6,7 @@ import diskcache class Cache: - def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 100): + def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 1): self.cache_dir = cache_dir self.size_limit_bytes = size_limit_gb * 1024 * 1024 * 1024 self.cache = diskcache.Cache( @@ -14,17 +14,17 @@ class Cache: size_limit=self.size_limit_bytes, eviction_policy='least-recently-used' ) - + def _make_key(self, method: str, params: Dict[str, Any]) -> str: return f"{method}:{json.dumps(params, sort_keys=True)}" - + def get(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: key = self._make_key(method, params) cached_data = self.cache.get(key) - + if cached_data is None: return None - + # Check if cached data has TTL and if it's expired if isinstance(cached_data, dict) and '_cache_expiry' in cached_data: if time.time() > cached_data['_cache_expiry']: @@ -35,12 +35,12 @@ class Cache: response = cached_data.copy() del response['_cache_expiry'] return response - + return cached_data - + def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any], ttl: Optional[int] = None) -> None: key = self._make_key(method, params) - + # Add TTL metadata if specified if ttl is not None: cached_response = response.copy() @@ -48,7 +48,7 @@ class Cache: self.cache.set(key, cached_response) else: self.cache.set(key, response) - + def size_check(self) -> Dict[str, Any]: stats = self.cache.stats() return { @@ -57,4 +57,3 @@ class Cache: "count": stats[0], "limit_gb": self.size_limit_bytes / (1024 * 1024 * 1024) } - diff --git a/cache_policy.py b/cache_policy.py index 4b27d92..af5a194 100644 --- a/cache_policy.py +++ b/cache_policy.py @@ -44,14 +44,15 @@ class CachePolicy: Returns: True if the method should be cached, False otherwise """ - if method in self.CACHEABLE_IMMUTABLE: + + if method in self.CACHEABLE_WITH_TTL: # For getBlock, only cache finalized blocks if method == 'getBlock': commitment = self._get_commitment(params) return commitment == 'finalized' return True - if method in self.CACHEABLE_WITH_TTL: + if method in self.CACHEABLE_IMMUTABLE: return True # Default to not caching unknown methods diff --git a/docs/solana-proxy-implementation-plan.md b/docs/solana-proxy-implementation-plan.md index be340cb..d50d3ef 100644 --- a/docs/solana-proxy-implementation-plan.md +++ b/docs/solana-proxy-implementation-plan.md @@ -11,7 +11,7 @@ A Python-based reverse proxy for Solana RPC endpoints that provides unified acce ``` Provider class: - name: str -- http_url: str +- http_url: str - ws_url: str - transform_request(request) -> request - transform_response(response) -> response @@ -34,15 +34,15 @@ Provider class: Cache class: - get(method: str, params: dict) -> Optional[response] - set(method: str, params: dict, response: dict) -> None -- size_check() -> None # Enforce 100GB limit +- size_check() -> None # Enforce 1GB limit - clear_oldest() -> None # LRU eviction ``` **Implementation Notes**: - Use `diskcache` library for simplicity -- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"` +- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"` - Store both HTTP responses and WebSocket messages -- Implement 100GB limit with LRU eviction +- Implement 1GB limit with LRU eviction ### 3. Error Logger Module (`errors.py`) **Purpose**: SQLite-based error logging with UUID tracking @@ -90,7 +90,7 @@ Router class: - providers: List[Provider] - cache: Cache - error_logger: ErrorLogger -- +- - route_request(method: str, params: dict) -> response - get_available_provider() -> Optional[Provider] - mark_provider_failed(provider: Provider) -> None @@ -146,7 +146,7 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 -CACHE_SIZE_GB=100 +CACHE_SIZE_GB=1 BACKOFF_MINUTES=30 # Logging @@ -227,7 +227,7 @@ Happy-path end-to-end tests only: ## Deployment Considerations -1. **Cache Storage**: Need ~100GB disk space +1. **Cache Storage**: Need ~1GB disk space 2. **Memory Usage**: Keep minimal, use disk cache 3. **Concurrent Clients**: Basic round-robin if multiple connect 4. **Monitoring**: Log all errors, provide error IDs @@ -273,8 +273,8 @@ aiohttp-cors==0.7.0 1. Single endpoint proxies to 5 providers 2. Automatic failover works -3. Responses are cached (up to 100GB) +3. Responses are cached (up to 1GB) 4. Errors logged with retrievable IDs 5. Both HTTP and WebSocket work 6. Response format is unified -7. Happy-path tests pass \ No newline at end of file +7. Happy-path tests pass diff --git a/main.py b/main.py index 8499c99..f73d907 100644 --- a/main.py +++ b/main.py @@ -35,7 +35,7 @@ def load_config() -> dict: return { "proxy_port": int(os.getenv("PROXY_PORT", 8545)), - "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), + "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 1)), "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), "error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"), @@ -72,7 +72,7 @@ def main() -> None: logger = logging.getLogger(__name__) logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - logger.info(f"Intelligent caching enabled - Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") app = create_app(config) diff --git a/router.py b/router.py index 1c3f309..858568b 100644 --- a/router.py +++ b/router.py @@ -20,7 +20,7 @@ class Router: async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} - # Check if this method should be cached based on intelligent caching policy + # Check if this method should be cached based on caching policy should_cache = self.cache_policy.should_cache(method, params) if should_cache: @@ -60,7 +60,13 @@ class Router: except Exception as error: error_id = self.error_logger.log_error(provider.name, request, error) self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})") - provider.mark_failed() + + # Only mark provider as failed for server/network issues, not RPC errors + if await self._is_server_failure(provider, error): + provider.mark_failed() + self.logger.warning(f"Provider {provider.name} marked as failed due to server issue") + else: + self.logger.debug(f"Provider {provider.name} had RPC error but server is available") return self._create_error_response( "All providers failed to handle the request", @@ -77,6 +83,29 @@ class Router: return None + async def _is_server_failure(self, provider: Provider, error: Exception) -> bool: + """ + Check if the provider server is actually down by making a simple health check. + Only mark as failed if server is unreachable. + """ + try: + # Quick health check with minimal timeout + timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout + async with aiohttp.ClientSession(timeout=timeout) as session: + # Try a simple HTTP GET to check server availability + from urllib.parse import urlparse + parsed_url = urlparse(provider.http_url) + health_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + + async with session.get(health_url) as response: + # Server responded (even with error codes), so it's alive + return False + + except Exception as health_error: + # Server is actually unreachable + self.logger.debug(f"Health check failed for {provider.name}: {health_error}") + return True + async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]: transformed_request = provider.transform_request(request) -- 2.45.2