Part of https://www.notion.so/Laconic-Mainnet-Plan-1eca6b22d47280569cd0d1e6d711d949 Co-authored-by: Shreerang Kale <shreerangkale@gmail.com> Reviewed-on: #1 Co-authored-by: shreerang <shreerang@noreply.git.vdb.to> Co-committed-by: shreerang <shreerang@noreply.git.vdb.to>
147 lines
5.9 KiB
Python
147 lines
5.9 KiB
Python
import aiohttp
|
|
import json
|
|
import logging
|
|
from typing import Dict, Any, Optional, List
|
|
from providers import Provider
|
|
from cache import Cache
|
|
from cache_policy import CachePolicy
|
|
from errors import ErrorLogger
|
|
|
|
|
|
class Router:
|
|
def __init__(self, providers: List[Provider], cache: Cache, error_logger: ErrorLogger):
|
|
self.providers = providers
|
|
self.cache = cache
|
|
self.cache_policy = CachePolicy()
|
|
self.error_logger = error_logger
|
|
self.current_provider_index = 0
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
|
request = {"method": method, "params": params}
|
|
|
|
# Check if this method should be cached based on caching policy
|
|
should_cache = self.cache_policy.should_cache(method, params)
|
|
|
|
if should_cache:
|
|
cached_response = self.cache.get(method, params)
|
|
if cached_response:
|
|
self.logger.debug(f"Cache hit for {method}")
|
|
cached_response["_cached"] = True
|
|
cached_response["_provider"] = "cache"
|
|
return cached_response
|
|
|
|
for attempt in range(len(self.providers)):
|
|
provider = self.get_next_available_provider()
|
|
if not provider:
|
|
return self._create_error_response(
|
|
"All providers are currently unavailable",
|
|
"NO_AVAILABLE_PROVIDERS"
|
|
)
|
|
|
|
try:
|
|
response = await self._make_request(provider, request)
|
|
|
|
transformed_response = provider.transform_response(response)
|
|
transformed_response["_cached"] = False
|
|
transformed_response["_provider"] = provider.name
|
|
|
|
# Cache the response if caching policy allows it
|
|
if should_cache:
|
|
ttl = self.cache_policy.get_cache_ttl(method, params)
|
|
self.cache.set(method, params, transformed_response, ttl)
|
|
cache_info = f" (cached {'indefinitely' if ttl is None else f'for {ttl}s'})"
|
|
self.logger.info(f"Request succeeded via {provider.name}{cache_info}")
|
|
else:
|
|
self.logger.info(f"Request succeeded via {provider.name} (not cached)")
|
|
|
|
return transformed_response
|
|
|
|
except Exception as error:
|
|
error_id = self.error_logger.log_error(provider.name, request, error)
|
|
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
|
|
|
|
# Only mark provider as failed for server/network issues, not RPC errors
|
|
if await self._is_server_failure(provider, error):
|
|
provider.mark_failed()
|
|
self.logger.warning(f"Provider {provider.name} marked as failed due to server issue")
|
|
else:
|
|
self.logger.debug(f"Provider {provider.name} had RPC error but server is available")
|
|
|
|
return self._create_error_response(
|
|
"All providers failed to handle the request",
|
|
"ALL_PROVIDERS_FAILED"
|
|
)
|
|
|
|
def get_next_available_provider(self) -> Optional[Provider]:
|
|
for _ in range(len(self.providers)):
|
|
provider = self.providers[self.current_provider_index]
|
|
self.current_provider_index = (self.current_provider_index + 1) % len(self.providers)
|
|
|
|
if provider.is_available():
|
|
return provider
|
|
|
|
return None
|
|
|
|
async def _is_server_failure(self, provider: Provider, error: Exception) -> bool:
|
|
"""
|
|
Check if the provider server is actually down by making a simple health check.
|
|
Only mark as failed if server is unreachable.
|
|
"""
|
|
try:
|
|
# Quick health check with minimal timeout
|
|
timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
# Try a simple HTTP GET to check server availability
|
|
from urllib.parse import urlparse
|
|
parsed_url = urlparse(provider.http_url)
|
|
health_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
|
|
|
async with session.get(health_url) as response:
|
|
# Server responded (even with error codes), so it's alive
|
|
return False
|
|
|
|
except Exception as health_error:
|
|
# Server is actually unreachable
|
|
self.logger.debug(f"Health check failed for {provider.name}: {health_error}")
|
|
return True
|
|
|
|
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
transformed_request = provider.transform_request(request)
|
|
|
|
rpc_request = {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"method": transformed_request["method"],
|
|
"params": transformed_request["params"]
|
|
}
|
|
|
|
timeout = aiohttp.ClientTimeout(total=30)
|
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
|
async with session.post(
|
|
provider.http_url,
|
|
json=rpc_request,
|
|
headers={"Content-Type": "application/json"}
|
|
) as response:
|
|
if response.status != 200:
|
|
raise Exception(f"HTTP {response.status}: {await response.text()}")
|
|
|
|
result = await response.json()
|
|
|
|
if "error" in result:
|
|
raise Exception(f"RPC Error: {result['error']}")
|
|
|
|
return result
|
|
|
|
def _create_error_response(self, message: str, code: str) -> Dict[str, Any]:
|
|
return {
|
|
"jsonrpc": "2.0",
|
|
"id": 1,
|
|
"error": {
|
|
"code": -32000,
|
|
"message": message,
|
|
"data": {"proxy_error_code": code}
|
|
},
|
|
"_cached": False,
|
|
"_provider": "proxy_error"
|
|
} |