solana-proxy/router.py
shreerang 75eaba600a Handle CORS and selectively cache responses for appropriate methods (#1)
Part of https://www.notion.so/Laconic-Mainnet-Plan-1eca6b22d47280569cd0d1e6d711d949

Co-authored-by: Shreerang Kale <shreerangkale@gmail.com>
Reviewed-on: #1
Co-authored-by: shreerang <shreerang@noreply.git.vdb.to>
Co-committed-by: shreerang <shreerang@noreply.git.vdb.to>
2025-08-01 10:37:06 +00:00

147 lines
5.9 KiB
Python

import aiohttp
import json
import logging
from typing import Dict, Any, Optional, List
from providers import Provider
from cache import Cache
from cache_policy import CachePolicy
from errors import ErrorLogger
class Router:
def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger):
self.providers = providers
self.cache = cache
self.cache_policy = CachePolicy()
self.error_logger = error_logger
self.current_provider_index = 0
self.logger = logging.getLogger(__name__)
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
request = {"method": method, "params": params}
# Check if this method should be cached based on caching policy
should_cache = self.cache_policy.should_cache(method, params)
if should_cache:
cached_response = self.cache.get(method, params)
if cached_response:
self.logger.debug(f"Cache hit for {method}")
cached_response["_cached"] = True
cached_response["_provider"] = "cache"
return cached_response
for attempt in range(len(self.providers)):
provider = self.get_next_available_provider()
if not provider:
return self._create_error_response(
"All providers are currently unavailable",
"NO_AVAILABLE_PROVIDERS"
)
try:
response = await self._make_request(provider, request)
transformed_response = provider.transform_response(response)
transformed_response["_cached"] = False
transformed_response["_provider"] = provider.name
# Cache the response if caching policy allows it
if should_cache:
ttl = self.cache_policy.get_cache_ttl(method, params)
self.cache.set(method, params, transformed_response, ttl)
cache_info = f" (cached {'indefinitely' if ttl is None else f'for {ttl}s'})"
self.logger.info(f"Request succeeded via {provider.name}{cache_info}")
else:
self.logger.info(f"Request succeeded via {provider.name} (not cached)")
return transformed_response
except Exception as error:
error_id = self.error_logger.log_error(provider.name, request, error)
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
# Only mark provider as failed for server/network issues, not RPC errors
if await self._is_server_failure(provider, error):
provider.mark_failed()
self.logger.warning(f"Provider {provider.name} marked as failed due to server issue")
else:
self.logger.debug(f"Provider {provider.name} had RPC error but server is available")
return self._create_error_response(
"All providers failed to handle the request",
"ALL_PROVIDERS_FAILED"
)
def get_next_available_provider(self) -> Optional[Provider]:
for _ in range(len(self.providers)):
provider = self.providers[self.current_provider_index]
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
if provider.is_available():
return provider
return None
async def _is_server_failure(self, provider: Provider, error: Exception) -> bool:
"""
Check if the provider server is actually down by making a simple health check.
Only mark as failed if server is unreachable.
"""
try:
# Quick health check with minimal timeout
timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout
async with aiohttp.ClientSession(timeout=timeout) as session:
# Try a simple HTTP GET to check server availability
from urllib.parse import urlparse
parsed_url = urlparse(provider.http_url)
health_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
async with session.get(health_url) as response:
# Server responded (even with error codes), so it's alive
return False
except Exception as health_error:
# Server is actually unreachable
self.logger.debug(f"Health check failed for {provider.name}: {health_error}")
return True
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
transformed_request = provider.transform_request(request)
rpc_request = {
"jsonrpc": "2.0",
"id": 1,
"method": transformed_request["method"],
"params": transformed_request["params"]
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
provider.http_url,
json=rpc_request,
headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {await response.text()}")
result = await response.json()
if "error" in result:
raise Exception(f"RPC Error: {result['error']}")
return result
def _create_error_response(self, message: str, code: str) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32000,
"message": message,
"data": {"proxy_error_code": code}
},
"_cached": False,
"_provider": "proxy_error"
}