From 0dc3f060efecd8ac71535db99b2e16765623d70f Mon Sep 17 00:00:00 2001 From: Shreerang Kale Date: Fri, 1 Aug 2025 15:53:35 +0530 Subject: [PATCH] Update back off mechanism to only back off if the endpoint is not available --- .env.example | 2 +- cache.py | 19 +++++++------- cache_policy.py | 5 ++-- docs/solana-proxy-implementation-plan.md | 18 ++++++------- main.py | 4 +-- router.py | 33 ++++++++++++++++++++++-- 6 files changed, 55 insertions(+), 26 deletions(-) diff --git a/.env.example b/.env.example index fa0e7e6..8667386 100644 --- a/.env.example +++ b/.env.example @@ -6,7 +6,7 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 -CACHE_SIZE_GB=100 +CACHE_SIZE_GB=1 BACKOFF_MINUTES=30 # Logging diff --git a/cache.py b/cache.py index cd46a40..b290cca 100644 --- a/cache.py +++ b/cache.py @@ -6,7 +6,7 @@ import diskcache class Cache: - def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 100): + def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 1): self.cache_dir = cache_dir self.size_limit_bytes = size_limit_gb * 1024 * 1024 * 1024 self.cache = diskcache.Cache( @@ -14,17 +14,17 @@ class Cache: size_limit=self.size_limit_bytes, eviction_policy='least-recently-used' ) - + def _make_key(self, method: str, params: Dict[str, Any]) -> str: return f"{method}:{json.dumps(params, sort_keys=True)}" - + def get(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]: key = self._make_key(method, params) cached_data = self.cache.get(key) - + if cached_data is None: return None - + # Check if cached data has TTL and if it's expired if isinstance(cached_data, dict) and '_cache_expiry' in cached_data: if time.time() > cached_data['_cache_expiry']: @@ -35,12 +35,12 @@ class Cache: response = cached_data.copy() del response['_cache_expiry'] return response - + return cached_data - + def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any], ttl: Optional[int] = None) -> None: key = self._make_key(method, params) - + # Add TTL metadata if specified if ttl is not None: cached_response = response.copy() @@ -48,7 +48,7 @@ class Cache: self.cache.set(key, cached_response) else: self.cache.set(key, response) - + def size_check(self) -> Dict[str, Any]: stats = self.cache.stats() return { @@ -57,4 +57,3 @@ class Cache: "count": stats[0], "limit_gb": self.size_limit_bytes / (1024 * 1024 * 1024) } - diff --git a/cache_policy.py b/cache_policy.py index 4b27d92..af5a194 100644 --- a/cache_policy.py +++ b/cache_policy.py @@ -44,14 +44,15 @@ class CachePolicy: Returns: True if the method should be cached, False otherwise """ - if method in self.CACHEABLE_IMMUTABLE: + + if method in self.CACHEABLE_WITH_TTL: # For getBlock, only cache finalized blocks if method == 'getBlock': commitment = self._get_commitment(params) return commitment == 'finalized' return True - if method in self.CACHEABLE_WITH_TTL: + if method in self.CACHEABLE_IMMUTABLE: return True # Default to not caching unknown methods diff --git a/docs/solana-proxy-implementation-plan.md b/docs/solana-proxy-implementation-plan.md index be340cb..d50d3ef 100644 --- a/docs/solana-proxy-implementation-plan.md +++ b/docs/solana-proxy-implementation-plan.md @@ -11,7 +11,7 @@ A Python-based reverse proxy for Solana RPC endpoints that provides unified acce ``` Provider class: - name: str -- http_url: str +- http_url: str - ws_url: str - transform_request(request) -> request - transform_response(response) -> response @@ -34,15 +34,15 @@ Provider class: Cache class: - get(method: str, params: dict) -> Optional[response] - set(method: str, params: dict, response: dict) -> None -- size_check() -> None # Enforce 100GB limit +- size_check() -> None # Enforce 1GB limit - clear_oldest() -> None # LRU eviction ``` **Implementation Notes**: - Use `diskcache` library for simplicity -- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"` +- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"` - Store both HTTP responses and WebSocket messages -- Implement 100GB limit with LRU eviction +- Implement 1GB limit with LRU eviction ### 3. Error Logger Module (`errors.py`) **Purpose**: SQLite-based error logging with UUID tracking @@ -90,7 +90,7 @@ Router class: - providers: List[Provider] - cache: Cache - error_logger: ErrorLogger -- +- - route_request(method: str, params: dict) -> response - get_available_provider() -> Optional[Provider] - mark_provider_failed(provider: Provider) -> None @@ -146,7 +146,7 @@ QUICKNODE_TOKEN=your_token_here # Proxy settings PROXY_PORT=8545 -CACHE_SIZE_GB=100 +CACHE_SIZE_GB=1 BACKOFF_MINUTES=30 # Logging @@ -227,7 +227,7 @@ Happy-path end-to-end tests only: ## Deployment Considerations -1. **Cache Storage**: Need ~100GB disk space +1. **Cache Storage**: Need ~1GB disk space 2. **Memory Usage**: Keep minimal, use disk cache 3. **Concurrent Clients**: Basic round-robin if multiple connect 4. **Monitoring**: Log all errors, provide error IDs @@ -273,8 +273,8 @@ aiohttp-cors==0.7.0 1. Single endpoint proxies to 5 providers 2. Automatic failover works -3. Responses are cached (up to 100GB) +3. Responses are cached (up to 1GB) 4. Errors logged with retrievable IDs 5. Both HTTP and WebSocket work 6. Response format is unified -7. Happy-path tests pass \ No newline at end of file +7. Happy-path tests pass diff --git a/main.py b/main.py index 8499c99..f73d907 100644 --- a/main.py +++ b/main.py @@ -35,7 +35,7 @@ def load_config() -> dict: return { "proxy_port": int(os.getenv("PROXY_PORT", 8545)), - "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)), + "cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 1)), "backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)), "log_level": os.getenv("LOG_LEVEL", "INFO"), "error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"), @@ -72,7 +72,7 @@ def main() -> None: logger = logging.getLogger(__name__) logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}") - logger.info(f"Intelligent caching enabled - Cache size limit: {config['cache_size_gb']}GB") + logger.info(f"Cache size limit: {config['cache_size_gb']}GB") logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes") app = create_app(config) diff --git a/router.py b/router.py index 1c3f309..858568b 100644 --- a/router.py +++ b/router.py @@ -20,7 +20,7 @@ class Router: async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]: request = {"method": method, "params": params} - # Check if this method should be cached based on intelligent caching policy + # Check if this method should be cached based on caching policy should_cache = self.cache_policy.should_cache(method, params) if should_cache: @@ -60,7 +60,13 @@ class Router: except Exception as error: error_id = self.error_logger.log_error(provider.name, request, error) self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})") - provider.mark_failed() + + # Only mark provider as failed for server/network issues, not RPC errors + if await self._is_server_failure(provider, error): + provider.mark_failed() + self.logger.warning(f"Provider {provider.name} marked as failed due to server issue") + else: + self.logger.debug(f"Provider {provider.name} had RPC error but server is available") return self._create_error_response( "All providers failed to handle the request", @@ -77,6 +83,29 @@ class Router: return None + async def _is_server_failure(self, provider: Provider, error: Exception) -> bool: + """ + Check if the provider server is actually down by making a simple health check. + Only mark as failed if server is unreachable. + """ + try: + # Quick health check with minimal timeout + timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout + async with aiohttp.ClientSession(timeout=timeout) as session: + # Try a simple HTTP GET to check server availability + from urllib.parse import urlparse + parsed_url = urlparse(provider.http_url) + health_url = f"{parsed_url.scheme}://{parsed_url.netloc}" + + async with session.get(health_url) as response: + # Server responded (even with error codes), so it's alive + return False + + except Exception as health_error: + # Server is actually unreachable + self.logger.debug(f"Health check failed for {provider.name}: {health_error}") + return True + async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]: transformed_request = provider.transform_request(request)