109 lines
4.0 KiB
Python
109 lines
4.0 KiB
Python
import aiohttp
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
from providers import Provider
|
|
from cache import Cache
|
|
from errors import ErrorLogger
|
|
|
|
|
|
class Router:
|
|
def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger, disable_cache: bool = False):
|
|
self.providers = providers
|
|
self.cache = cache
|
|
self.error_logger = error_logger
|
|
self.disable_cache = disable_cache
|
|
self.current_provider_index = 0
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
request = {"method": method, "params": params}
|
|
|
|
if not self.disable_cache:
|
|
cached_response = self.cache.get(method, params)
|
|
if cached_response:
|
|
self.logger.debug(f"Cache hit for {method}")
|
|
cached_response["_cached"] = True
|
|
cached_response["_provider"] = "cache"
|
|
return cached_response
|
|
|
|
for attempt in range(len(self.providers)):
|
|
provider = self.get_next_available_provider()
|
|
if not provider:
|
|
return self._create_error_response(
|
|
"All providers are currently unavailable",
|
|
"NO_AVAILABLE_PROVIDERS"
|
|
)
|
|
|
|
try:
|
|
response = await self._make_request(provider, request)
|
|
|
|
transformed_response = provider.transform_response(response)
|
|
transformed_response["_cached"] = False
|
|
transformed_response["_provider"] = provider.name
|
|
|
|
if not self.disable_cache:
|
|
self.cache.set(method, params, transformed_response)
|
|
self.logger.info(f"Request succeeded via {provider.name}")
|
|
return transformed_response
|
|
|
|
except Exception as error:
|
|
error_id = self.error_logger.log_error(provider.name, request, error)
|
|
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
|
|
provider.mark_failed()
|
|
|
|
return self._create_error_response(
|
|
"All providers failed to handle the request",
|
|
"ALL_PROVIDERS_FAILED"
|
|
)
|
|
|
|
def get_next_available_provider(self) -> Optional[Provider]:
|
|
for _ in range(len(self.providers)):
|
|
provider = self.providers[self.current_provider_index]
|
|
|
|
if provider.is_available():
|
|
return provider
|
|
else:
|
|
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
|
|
|
|
return None
|
|
|
|
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
transformed_request = provider.transform_request(request)
|
|
|
|
rpc_request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": transformed_request["method"],
|
|
"params": transformed_request["params"]
|
|
}
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.post(
|
|
provider.http_url,
|
|
json=rpc_request,
|
|
headers={"Content-Type": "application/json"}
|
|
) as response:
|
|
if response.status != 200:
|
|
raise Exception(f"HTTP {response.status}: {await response.text()}")
|
|
|
|
result = await response.json()
|
|
|
|
if "error" in result:
|
|
raise Exception(f"RPC Error: {result['error']}")
|
|
|
|
return result
|
|
|
|
def _create_error_response(self, message: str, code: str) -> Dict[str, Any]:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"error": {
|
|
"code": -32000,
|
|
"message": message,
|
|
"data": {"proxy_error_code": code}
|
|
},
|
|
"_cached": False,
|
|
"_provider": "proxy_error"
|
|
} |