solana-proxy/router.py
2025-07-28 18:01:18 +05:30

110 lines
4.0 KiB
Python

import aiohttp
import json
import logging
from typing import Dict, Any, Optional, List
from providers import Provider
from cache import Cache
from errors import ErrorLogger
class Router:
def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger, disable_cache: bool = False):
self.providers = providers
self.cache = cache
self.error_logger = error_logger
self.disable_cache = disable_cache
self.current_provider_index = 0
self.logger = logging.getLogger(__name__)
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
request = {"method": method, "params": params}
if not self.disable_cache:
cached_response = self.cache.get(method, params)
if cached_response:
self.logger.debug(f"Cache hit for {method}")
cached_response["_cached"] = True
cached_response["_provider"] = "cache"
return cached_response
for attempt in range(len(self.providers)):
provider = self.get_next_available_provider()
if not provider:
return self._create_error_response(
"All providers are currently unavailable",
"NO_AVAILABLE_PROVIDERS"
)
try:
response = await self._make_request(provider, request)
transformed_response = provider.transform_response(response)
transformed_response["_cached"] = False
transformed_response["_provider"] = provider.name
if not self.disable_cache:
self.cache.set(method, params, transformed_response)
self.logger.info(f"Request succeeded via {provider.name}")
return transformed_response
except Exception as error:
error_id = self.error_logger.log_error(provider.name, request, error)
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
provider.mark_failed()
return self._create_error_response(
"All providers failed to handle the request",
"ALL_PROVIDERS_FAILED"
)
def get_next_available_provider(self) -> Optional[Provider]:
for _ in range(len(self.providers)):
provider = self.providers[self.current_provider_index]
if provider.is_available():
return provider
else:
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
return None
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
transformed_request = provider.transform_request(request)
rpc_request = {
"jsonrpc": "2.0",
"id": 1,
"method": transformed_request["method"],
"params": transformed_request["params"]
}
timeout = aiohttp.ClientTimeout(total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
provider.http_url,
json=rpc_request,
headers={"Content-Type": "application/json"}
) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {await response.text()}")
result = await response.json()
if "error" in result:
raise Exception(f"RPC Error: {result['error']}")
return result
def _create_error_response(self, message: str, code: str) -> Dict[str, Any]:
return {
"jsonrpc": "2.0",
"id": 1,
"error": {
"code": -32000,
"message": message,
"data": {"proxy_error_code": code}
},
"_cached": False,
"_provider": "proxy_error"
}