Update back off mechanism to only back off if the endpoint is not available
This commit is contained in:
parent
307abfc78f
commit
0dc3f060ef
@ -6,7 +6,7 @@ QUICKNODE_TOKEN=your_token_here
|
|||||||
|
|
||||||
# Proxy settings
|
# Proxy settings
|
||||||
PROXY_PORT=8545
|
PROXY_PORT=8545
|
||||||
CACHE_SIZE_GB=100
|
CACHE_SIZE_GB=1
|
||||||
BACKOFF_MINUTES=30
|
BACKOFF_MINUTES=30
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
|
19
cache.py
19
cache.py
@ -6,7 +6,7 @@ import diskcache
|
|||||||
|
|
||||||
|
|
||||||
class Cache:
|
class Cache:
|
||||||
def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 100):
|
def __init__(self, cache_dir: str = "./cache", size_limit_gb: int = 1):
|
||||||
self.cache_dir = cache_dir
|
self.cache_dir = cache_dir
|
||||||
self.size_limit_bytes = size_limit_gb * 1024 * 1024 * 1024
|
self.size_limit_bytes = size_limit_gb * 1024 * 1024 * 1024
|
||||||
self.cache = diskcache.Cache(
|
self.cache = diskcache.Cache(
|
||||||
@ -14,17 +14,17 @@ class Cache:
|
|||||||
size_limit=self.size_limit_bytes,
|
size_limit=self.size_limit_bytes,
|
||||||
eviction_policy='least-recently-used'
|
eviction_policy='least-recently-used'
|
||||||
)
|
)
|
||||||
|
|
||||||
def _make_key(self, method: str, params: Dict[str, Any]) -> str:
|
def _make_key(self, method: str, params: Dict[str, Any]) -> str:
|
||||||
return f"{method}:{json.dumps(params, sort_keys=True)}"
|
return f"{method}:{json.dumps(params, sort_keys=True)}"
|
||||||
|
|
||||||
def get(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
def get(self, method: str, params: Dict[str, Any]) -> Optional[Dict[str, Any]]:
|
||||||
key = self._make_key(method, params)
|
key = self._make_key(method, params)
|
||||||
cached_data = self.cache.get(key)
|
cached_data = self.cache.get(key)
|
||||||
|
|
||||||
if cached_data is None:
|
if cached_data is None:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Check if cached data has TTL and if it's expired
|
# Check if cached data has TTL and if it's expired
|
||||||
if isinstance(cached_data, dict) and '_cache_expiry' in cached_data:
|
if isinstance(cached_data, dict) and '_cache_expiry' in cached_data:
|
||||||
if time.time() > cached_data['_cache_expiry']:
|
if time.time() > cached_data['_cache_expiry']:
|
||||||
@ -35,12 +35,12 @@ class Cache:
|
|||||||
response = cached_data.copy()
|
response = cached_data.copy()
|
||||||
del response['_cache_expiry']
|
del response['_cache_expiry']
|
||||||
return response
|
return response
|
||||||
|
|
||||||
return cached_data
|
return cached_data
|
||||||
|
|
||||||
def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any], ttl: Optional[int] = None) -> None:
|
def set(self, method: str, params: Dict[str, Any], response: Dict[str, Any], ttl: Optional[int] = None) -> None:
|
||||||
key = self._make_key(method, params)
|
key = self._make_key(method, params)
|
||||||
|
|
||||||
# Add TTL metadata if specified
|
# Add TTL metadata if specified
|
||||||
if ttl is not None:
|
if ttl is not None:
|
||||||
cached_response = response.copy()
|
cached_response = response.copy()
|
||||||
@ -48,7 +48,7 @@ class Cache:
|
|||||||
self.cache.set(key, cached_response)
|
self.cache.set(key, cached_response)
|
||||||
else:
|
else:
|
||||||
self.cache.set(key, response)
|
self.cache.set(key, response)
|
||||||
|
|
||||||
def size_check(self) -> Dict[str, Any]:
|
def size_check(self) -> Dict[str, Any]:
|
||||||
stats = self.cache.stats()
|
stats = self.cache.stats()
|
||||||
return {
|
return {
|
||||||
@ -57,4 +57,3 @@ class Cache:
|
|||||||
"count": stats[0],
|
"count": stats[0],
|
||||||
"limit_gb": self.size_limit_bytes / (1024 * 1024 * 1024)
|
"limit_gb": self.size_limit_bytes / (1024 * 1024 * 1024)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,14 +44,15 @@ class CachePolicy:
|
|||||||
Returns:
|
Returns:
|
||||||
True if the method should be cached, False otherwise
|
True if the method should be cached, False otherwise
|
||||||
"""
|
"""
|
||||||
if method in self.CACHEABLE_IMMUTABLE:
|
|
||||||
|
if method in self.CACHEABLE_WITH_TTL:
|
||||||
# For getBlock, only cache finalized blocks
|
# For getBlock, only cache finalized blocks
|
||||||
if method == 'getBlock':
|
if method == 'getBlock':
|
||||||
commitment = self._get_commitment(params)
|
commitment = self._get_commitment(params)
|
||||||
return commitment == 'finalized'
|
return commitment == 'finalized'
|
||||||
return True
|
return True
|
||||||
|
|
||||||
if method in self.CACHEABLE_WITH_TTL:
|
if method in self.CACHEABLE_IMMUTABLE:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Default to not caching unknown methods
|
# Default to not caching unknown methods
|
||||||
|
@ -11,7 +11,7 @@ A Python-based reverse proxy for Solana RPC endpoints that provides unified acce
|
|||||||
```
|
```
|
||||||
Provider class:
|
Provider class:
|
||||||
- name: str
|
- name: str
|
||||||
- http_url: str
|
- http_url: str
|
||||||
- ws_url: str
|
- ws_url: str
|
||||||
- transform_request(request) -> request
|
- transform_request(request) -> request
|
||||||
- transform_response(response) -> response
|
- transform_response(response) -> response
|
||||||
@ -34,15 +34,15 @@ Provider class:
|
|||||||
Cache class:
|
Cache class:
|
||||||
- get(method: str, params: dict) -> Optional[response]
|
- get(method: str, params: dict) -> Optional[response]
|
||||||
- set(method: str, params: dict, response: dict) -> None
|
- set(method: str, params: dict, response: dict) -> None
|
||||||
- size_check() -> None # Enforce 100GB limit
|
- size_check() -> None # Enforce 1GB limit
|
||||||
- clear_oldest() -> None # LRU eviction
|
- clear_oldest() -> None # LRU eviction
|
||||||
```
|
```
|
||||||
|
|
||||||
**Implementation Notes**:
|
**Implementation Notes**:
|
||||||
- Use `diskcache` library for simplicity
|
- Use `diskcache` library for simplicity
|
||||||
- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"`
|
- Key format: `f"{method}:{json.dumps(params, sort_keys=True)}"`
|
||||||
- Store both HTTP responses and WebSocket messages
|
- Store both HTTP responses and WebSocket messages
|
||||||
- Implement 100GB limit with LRU eviction
|
- Implement 1GB limit with LRU eviction
|
||||||
|
|
||||||
### 3. Error Logger Module (`errors.py`)
|
### 3. Error Logger Module (`errors.py`)
|
||||||
**Purpose**: SQLite-based error logging with UUID tracking
|
**Purpose**: SQLite-based error logging with UUID tracking
|
||||||
@ -90,7 +90,7 @@ Router class:
|
|||||||
- providers: List[Provider]
|
- providers: List[Provider]
|
||||||
- cache: Cache
|
- cache: Cache
|
||||||
- error_logger: ErrorLogger
|
- error_logger: ErrorLogger
|
||||||
-
|
-
|
||||||
- route_request(method: str, params: dict) -> response
|
- route_request(method: str, params: dict) -> response
|
||||||
- get_available_provider() -> Optional[Provider]
|
- get_available_provider() -> Optional[Provider]
|
||||||
- mark_provider_failed(provider: Provider) -> None
|
- mark_provider_failed(provider: Provider) -> None
|
||||||
@ -146,7 +146,7 @@ QUICKNODE_TOKEN=your_token_here
|
|||||||
|
|
||||||
# Proxy settings
|
# Proxy settings
|
||||||
PROXY_PORT=8545
|
PROXY_PORT=8545
|
||||||
CACHE_SIZE_GB=100
|
CACHE_SIZE_GB=1
|
||||||
BACKOFF_MINUTES=30
|
BACKOFF_MINUTES=30
|
||||||
|
|
||||||
# Logging
|
# Logging
|
||||||
@ -227,7 +227,7 @@ Happy-path end-to-end tests only:
|
|||||||
|
|
||||||
## Deployment Considerations
|
## Deployment Considerations
|
||||||
|
|
||||||
1. **Cache Storage**: Need ~100GB disk space
|
1. **Cache Storage**: Need ~1GB disk space
|
||||||
2. **Memory Usage**: Keep minimal, use disk cache
|
2. **Memory Usage**: Keep minimal, use disk cache
|
||||||
3. **Concurrent Clients**: Basic round-robin if multiple connect
|
3. **Concurrent Clients**: Basic round-robin if multiple connect
|
||||||
4. **Monitoring**: Log all errors, provide error IDs
|
4. **Monitoring**: Log all errors, provide error IDs
|
||||||
@ -273,8 +273,8 @@ aiohttp-cors==0.7.0
|
|||||||
|
|
||||||
1. Single endpoint proxies to 5 providers
|
1. Single endpoint proxies to 5 providers
|
||||||
2. Automatic failover works
|
2. Automatic failover works
|
||||||
3. Responses are cached (up to 100GB)
|
3. Responses are cached (up to 1GB)
|
||||||
4. Errors logged with retrievable IDs
|
4. Errors logged with retrievable IDs
|
||||||
5. Both HTTP and WebSocket work
|
5. Both HTTP and WebSocket work
|
||||||
6. Response format is unified
|
6. Response format is unified
|
||||||
7. Happy-path tests pass
|
7. Happy-path tests pass
|
||||||
|
4
main.py
4
main.py
@ -35,7 +35,7 @@ def load_config() -> dict:
|
|||||||
|
|
||||||
return {
|
return {
|
||||||
"proxy_port": int(os.getenv("PROXY_PORT", 8545)),
|
"proxy_port": int(os.getenv("PROXY_PORT", 8545)),
|
||||||
"cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 100)),
|
"cache_size_gb": int(os.getenv("CACHE_SIZE_GB", 1)),
|
||||||
"backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)),
|
"backoff_minutes": int(os.getenv("BACKOFF_MINUTES", 30)),
|
||||||
"log_level": os.getenv("LOG_LEVEL", "INFO"),
|
"log_level": os.getenv("LOG_LEVEL", "INFO"),
|
||||||
"error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"),
|
"error_db_path": os.getenv("ERROR_DB_PATH", "./errors.db"),
|
||||||
@ -72,7 +72,7 @@ def main() -> None:
|
|||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}")
|
logger.info(f"Starting Solana RPC Proxy on port {config['proxy_port']}")
|
||||||
logger.info(f"Intelligent caching enabled - Cache size limit: {config['cache_size_gb']}GB")
|
logger.info(f"Cache size limit: {config['cache_size_gb']}GB")
|
||||||
logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes")
|
logger.info(f"Provider backoff time: {config['backoff_minutes']} minutes")
|
||||||
|
|
||||||
app = create_app(config)
|
app = create_app(config)
|
||||||
|
33
router.py
33
router.py
@ -20,7 +20,7 @@ class Router:
|
|||||||
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
async def route_request(self, method: str, params: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
request = {"method": method, "params": params}
|
request = {"method": method, "params": params}
|
||||||
|
|
||||||
# Check if this method should be cached based on intelligent caching policy
|
# Check if this method should be cached based on caching policy
|
||||||
should_cache = self.cache_policy.should_cache(method, params)
|
should_cache = self.cache_policy.should_cache(method, params)
|
||||||
|
|
||||||
if should_cache:
|
if should_cache:
|
||||||
@ -60,7 +60,13 @@ class Router:
|
|||||||
except Exception as error:
|
except Exception as error:
|
||||||
error_id = self.error_logger.log_error(provider.name, request, error)
|
error_id = self.error_logger.log_error(provider.name, request, error)
|
||||||
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
|
self.logger.warning(f"Provider {provider.name} failed: {error} (ID: {error_id})")
|
||||||
provider.mark_failed()
|
|
||||||
|
# Only mark provider as failed for server/network issues, not RPC errors
|
||||||
|
if await self._is_server_failure(provider, error):
|
||||||
|
provider.mark_failed()
|
||||||
|
self.logger.warning(f"Provider {provider.name} marked as failed due to server issue")
|
||||||
|
else:
|
||||||
|
self.logger.debug(f"Provider {provider.name} had RPC error but server is available")
|
||||||
|
|
||||||
return self._create_error_response(
|
return self._create_error_response(
|
||||||
"All providers failed to handle the request",
|
"All providers failed to handle the request",
|
||||||
@ -77,6 +83,29 @@ class Router:
|
|||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
async def _is_server_failure(self, provider: Provider, error: Exception) -> bool:
|
||||||
|
"""
|
||||||
|
Check if the provider server is actually down by making a simple health check.
|
||||||
|
Only mark as failed if server is unreachable.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
# Quick health check with minimal timeout
|
||||||
|
timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout
|
||||||
|
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||||
|
# Try a simple HTTP GET to check server availability
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
parsed_url = urlparse(provider.http_url)
|
||||||
|
health_url = f"{parsed_url.scheme}://{parsed_url.netloc}"
|
||||||
|
|
||||||
|
async with session.get(health_url) as response:
|
||||||
|
# Server responded (even with error codes), so it's alive
|
||||||
|
return False
|
||||||
|
|
||||||
|
except Exception as health_error:
|
||||||
|
# Server is actually unreachable
|
||||||
|
self.logger.debug(f"Health check failed for {provider.name}: {health_error}")
|
||||||
|
return True
|
||||||
|
|
||||||
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
|
async def _make_request(self, provider: Provider, request: Dict[str, Any]) -> Dict[str, Any]:
|
||||||
transformed_request = provider.transform_request(request)
|
transformed_request = provider.transform_request(request)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user