lockdrop-simulation/tests/base_test.py
shreerang 779b091ccd Make generated directory path configurable (#1)
Part of https://www.notion.so/Implement-stacks-1b5a6b22d472806a82f5dafed6955138

Co-authored-by: Shreerang Kale <shreerangkale@gmail.com>
Reviewed-on: #1
Co-authored-by: shreerang <shreerang@noreply.git.vdb.to>
Co-committed-by: shreerang <shreerang@noreply.git.vdb.to>
2025-08-04 13:17:31 +00:00

177 lines
7.3 KiB
Python

import json
import os
import unittest
import requests
from collections import defaultdict
from datetime import datetime
import urbitob
SECONDS_PER_YEAR = int(365.25 * 24 * 60 * 60)
BLOCK_DURATION_SECONDS = 2
class BaseAllocationTest(unittest.TestCase):
"""Base test class with shared setup and helper methods"""
@classmethod
def setUpClass(cls):
"""Load data once for all tests"""
cls.rest_api_endpoint = os.getenv('REST_API_ENDPOINT')
cls.rpc_api_endpoint = os.getenv('RPC_API_ENDPOINT')
cls.generated_dir = os.getenv('GENERATED_DIR', './generated')
if not cls.rest_api_endpoint:
raise unittest.SkipTest("REST_API_ENDPOINT environment variable not set")
if not cls.rpc_api_endpoint:
raise unittest.SkipTest("RPC_API_ENDPOINT environment variable not set")
# Load data files
with open(f'{cls.generated_dir}/watcher-events.json', 'r') as f:
cls.watcher_events = json.load(f)
with open(f'{cls.generated_dir}/generated-participants.json', 'r') as f:
cls.participants = json.load(f)
with open('lockdrop_allocations_notebook.json', 'r') as f:
cls.notebook_allocations = json.load(f)
cls.points_by_duration = cls._get_first_points()
# Load distribution config for unlock frequency
with open('distribution-simulate-lockdrop.json', 'r') as f:
distribution_config = json.load(f)
for category in distribution_config:
if category['category'] == 'lockdrop':
cls.unlock_frequency_blocks = category['unlock_params']['unlock_frequency'] // BLOCK_DURATION_SECONDS
break
@classmethod
def _get_first_points(cls):
"""Extract first star and galaxy for each lock duration"""
points = defaultdict(lambda: {'star': None, 'galaxy': None})
for event_data in cls.watcher_events['data']['eventsInRange']:
if event_data['event']['__typename'] == 'PointLockedEvent':
point = event_data['event']['point']
lock_period = event_data['event']['lock_period']
azimuth_id = event_data['event']['azimuth_id']
point_num = urbitob.patp_to_num(point)
point_type = "galaxy" if point_num < 256 else "star"
if points[lock_period][point_type] is None:
# Find zenith address for this point
zenith_address = cls._find_zenith_address(azimuth_id)
points[lock_period][point_type] = {
'point': point,
'azimuth_id': azimuth_id,
'zenith_address': zenith_address,
'lock_period': lock_period,
'block_timestamp': event_data['block']['timestamp']
}
return dict(points)
@classmethod
def _find_zenith_address(cls, azimuth_id):
"""Find zenith address for given azimuth_id"""
for p in cls.participants:
if p['attestation']['payload']['address'] == azimuth_id:
return p['attestation']['payload']['payload']['address']
return None
def _get_point_allocation_amount_from_api(self, zenith_address, point):
"""Query API endpoint for allocation amount"""
point_num = urbitob.patp_to_num(point)
url = f"{self.rest_api_endpoint}/laconic/immutabletreasury/v1/allocations/{zenith_address}/{point_num}"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
if 'allocations' in data and len(data['allocations']) > 0:
return int(data['allocations'][0]['allocated_amount']['amount'])
except Exception as e:
self.fail(f"zenithd request failed for {point}: {e}")
return None
def _get_total_address_allocation_amount_from_api(self, zenith_address):
"""Get total allocation for an address from API"""
url = f"{self.rest_api_endpoint}/laconic/immutabletreasury/v1/allocations/{zenith_address}"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
if 'allocations' in data:
total = sum(int(alloc['allocated_amount']['amount']) for alloc in data['allocations'])
return total
except Exception as e:
self.fail(f"zenithd request failed for address {zenith_address}: {e}")
return 0
def _get_point_allocation_from_api(self, zenith_address, point):
"""Get allocation with unlock schedule from API"""
point_num = urbitob.patp_to_num(point)
url = f"{self.rest_api_endpoint}/laconic/immutabletreasury/v1/allocations/{zenith_address}/{point_num}"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
if 'allocations' in data and len(data['allocations']) > 0:
return data['allocations'][0]
except Exception as e:
self.fail(f"zenithd request failed for {point}: {e}")
return None
def _get_genesis_time_from_api(self):
"""Get genesis time from node"""
url = f"{self.rpc_api_endpoint}/block?height=1"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
genesis_time_str = data['result']['block']['header']['time']
# Remove fractional seconds if present
if '.' in genesis_time_str:
genesis_time_str = genesis_time_str.split('.')[0] + 'Z'
# Parse the time string and return timestamp
genesis_time = datetime.fromisoformat(genesis_time_str.replace('Z', '+00:00'))
return int(genesis_time.timestamp())
except Exception as e:
self.fail(f"Failed to get genesis time: {e}")
return None
def _get_point_accrual_state_from_api(self, zenith_address, point, block_height):
"""Get accrual state for a point at specific block height"""
point_num = urbitob.patp_to_num(point)
url = f"{self.rest_api_endpoint}/laconic/immutabletreasury/v1/accrual_state/{zenith_address}/{point_num}"
headers = {"x-cosmos-block-height": str(block_height)}
try:
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
if 'accrual_state' in data:
return data['accrual_state']
except Exception as e:
self.fail(f"zenithd request failed for accrual state {point} at block {block_height}: {e}")
return None
def _get_latest_block_height_from_api(self):
"""Get latest block height from node"""
url = f"{self.rpc_api_endpoint}/status"
try:
response = requests.get(url, timeout=30)
if response.status_code == 200:
data = response.json()
return int(data['result']['sync_info']['latest_block_height'])
except Exception as e:
self.fail(f"Failed to get latest block height: {e}")
return None