import aiohttp import json import logging from typing import Dict, Any, Optional, List from providers import Provider from cache import Cache from cache_policy import CachePolicy from errors import ErrorLogger class Router: def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger): self.providers = providers self.cache = cache self.cache_policy = CachePolicy() self.error_logger = error_logger self.current_provider_index = 0 self.logger = logging.getLogger(__name__) async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} # Check if this method should be cached based on caching policy should_cache = self.cache_policy.should_cache(method, params) if should_cache: cached_response = self.cache.get(method, params) if cached_response: self.logger.debug(f"Cache hit for {method}") cached_response["_cached"] = True cached_response["_provider"] = "cache" return cached_response for attempt in range(len(self.providers)): provider = self.get_next_available_provider() if not provider: return self._create_error_response( "All providers are currently unavailable", "NO_AVAILABLE_PROVIDERS" ) try: response = await self._make_request(provider, request) transformed_response = provider.transform_response(response) transformed_response["_cached"] = False transformed_response["_provider"] = provider.name # Cache the response if caching policy allows it if should_cache: ttl = self.cache_policy.get_cache_ttl(method, params) self.cache.set(method, params, transformed_response, ttl) cache_info = f" (cached {'indefinitely' if ttl is None else f'for {ttl}s'})" self.logger.info(f"Request succeeded via {provider.name}{cache_info}") else: self.logger.info(f"Request succeeded via {provider.name} (not cached)") return transformed_response except Exception as error: error_id = self.error_logger.log_error(provider.name, request, error) self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})") # Only mark provider as failed for server/network issues, not RPC errors if await self._is_server_failure(provider, error): provider.mark_failed() self.logger.warning(f"Provider {provider.name} marked as failed due to server issue") else: self.logger.debug(f"Provider {provider.name} had RPC error but server is available") return self._create_error_response( "All providers failed to handle the request", "ALL_PROVIDERS_FAILED" ) def get_next_available_provider(self) -> Optional[Provider]: for _ in range(len(self.providers)): provider = self.providers[self.current_provider_index] self.current_provider_index = (self.current_provider_index + 1) % len(self.providers) if provider.is_available(): return provider return None async def _is_server_failure(self, provider: Provider, error: Exception) -> bool: """ Check if the provider server is actually down by making a simple health check. Only mark as failed if server is unreachable. """ try: # Quick health check with minimal timeout timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout async with aiohttp.ClientSession(timeout=timeout) as session: # Try a simple HTTP GET to check server availability from urllib.parse import urlparse parsed_url = urlparse(provider.http_url) health_url = f"{parsed_url.scheme}://{parsed_url.netloc}" async with session.get(health_url) as response: # Server responded (even with error codes), so it's alive return False except Exception as health_error: # Server is actually unreachable self.logger.debug(f"Health check failed for {provider.name}: {health_error}") return True async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]: transformed_request = provider.transform_request(request) rpc_request = { "jsonrpc": "2.0", "id": 1, "method": transformed_request["method"], "params": transformed_request["params"] } timeout = aiohttp.ClientTimeout(total=30) async with aiohttp.ClientSession(timeout=timeout) as session: async with session.post( provider.http_url, json=rpc_request, headers={"Content-Type": "application/json"} ) as response: if response.status != 200: raise Exception(f"HTTP {response.status}: {await response.text()}") result = await response.json() if "error" in result: raise Exception(f"RPC Error: {result['error']}") return result def _create_error_response(self, message: str, code: str) -> Dict[str, Any]: return { "jsonrpc": "2.0", "id": 1, "error": { "code": -32000, "message": message, "data": {"proxy_error_code": code} }, "_cached": False, "_provider": "proxy_error" }