Part of https://www.notion.so/Laconic-Mainnet-Plan-1eca6b22d47280569cd0d1e6d711d949 Co-authored-by: Shreerang Kale <shreerangkale@gmail.com> Reviewed-on: #1 Co-authored-by: shreerang <shreerang@noreply.git.vdb.to> Co-committed-by: shreerang <shreerang@noreply.git.vdb.to>
114 lines
2.8 KiB
Python
114 lines
2.8 KiB
Python
from abc import ABC, abstractmethod
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, Any, Optional
|
|
import os
|
|
|
|
|
|
class Provider(ABC):
|
|
def __init__(self, name: str):
|
|
self.name = name
|
|
self.backoff_until: Optional[datetime] = None
|
|
|
|
@property
|
|
@abstractmethod
|
|
def http_url(self) -> str:
|
|
pass
|
|
|
|
@property
|
|
@abstractmethod
|
|
def ws_url(self) -> str:
|
|
pass
|
|
|
|
def transform_request(self, request: Dict[str, Any]) -> Dict[str, Any]:
|
|
return request
|
|
|
|
def transform_response(self, response: Dict[str, Any]) -> Dict[str, Any]:
|
|
return response
|
|
|
|
def is_available(self) -> bool:
|
|
if self.backoff_until is None:
|
|
return True
|
|
return datetime.now() > self.backoff_until
|
|
|
|
def mark_failed(self, backoff_minutes: int = 30) -> None:
|
|
self.backoff_until = datetime.now() + timedelta(minutes=backoff_minutes)
|
|
|
|
|
|
class AlchemyProvider(Provider):
|
|
def __init__(self):
|
|
super().__init__("alchemy")
|
|
self.api_key = os.getenv("ALCHEMY_API_KEY", "")
|
|
|
|
@property
|
|
def http_url(self) -> str:
|
|
return f"https://solana-mainnet.g.alchemy.com/v2/{self.api_key}"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return f"wss://solana-mainnet.g.alchemy.com/v2/{self.api_key}"
|
|
|
|
|
|
class PublicNodeProvider(Provider):
|
|
def __init__(self):
|
|
super().__init__("publicnode")
|
|
|
|
@property
|
|
def http_url(self) -> str:
|
|
return "https://solana-rpc.publicnode.com"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return "wss://solana-rpc.publicnode.com"
|
|
|
|
|
|
class HeliusProvider(Provider):
|
|
def __init__(self):
|
|
super().__init__("helius")
|
|
self.api_key = os.getenv("HELIUS_API_KEY", "")
|
|
|
|
@property
|
|
def http_url(self) -> str:
|
|
return f"https://mainnet.helius-rpc.com/?api-key={self.api_key}"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return f"wss://mainnet.helius-rpc.com/?api-key={self.api_key}"
|
|
|
|
|
|
class QuickNodeProvider(Provider):
|
|
def __init__(self):
|
|
super().__init__("quicknode")
|
|
self.endpoint = os.getenv("QUICKNODE_ENDPOINT", "")
|
|
self.token = os.getenv("QUICKNODE_TOKEN", "")
|
|
|
|
@property
|
|
def http_url(self) -> str:
|
|
return f"https://{self.endpoint}/{self.token}/"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return f"wss://{self.endpoint}/{self.token}/"
|
|
|
|
|
|
class SolanaPublicProvider(Provider):
|
|
def __init__(self):
|
|
super().__init__("solana_public")
|
|
|
|
@property
|
|
def http_url(self) -> str:
|
|
return "https://api.mainnet-beta.solana.com"
|
|
|
|
@property
|
|
def ws_url(self) -> str:
|
|
return "wss://api.mainnet-beta.solana.com"
|
|
|
|
|
|
def create_providers() -> list[Provider]:
|
|
return [
|
|
SolanaPublicProvider(),
|
|
AlchemyProvider(),
|
|
PublicNodeProvider(),
|
|
HeliusProvider(),
|
|
QuickNodeProvider(),
|
|
]
|