Update simulation notebook to use refactored code

This commit is contained in:
Prathamesh Musale 2025-08-13 11:13:10 +05:30
parent 7604f91d13
commit 331b4ac761
2 changed files with 167 additions and 1202 deletions

File diff suppressed because one or more lines are too long

View File

@ -6,14 +6,20 @@ penalty systems, bonus pools, and final allocations. It can be used by both
the simulation notebook and the experimental notebook.
"""
from decimal import Decimal, ROUND_DOWN, getcontext
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import json
import os
import urbitob
import ipywidgets as widgets
import traceback
from decimal import Decimal, ROUND_DOWN, getcontext
from collections import defaultdict
from tabulate import tabulate
from datetime import datetime
from IPython.display import clear_output
# Configure decimal precision
getcontext().prec = 28
@ -190,6 +196,8 @@ def calculate_dynamic_allocations(participation_counts):
'total_galaxies_locked': total_galaxies_locked,
'adjusted_max_allocation_per_star': adjusted_max_allocation_per_star,
'adjusted_max_allocation_per_galaxy': adjusted_max_allocation_per_galaxy,
'rounding_error_per_star': rounding_error_per_star,
'rounding_error_per_galaxy': rounding_error_per_galaxy,
'total_rounding_error_stars': total_rounding_error_stars,
'total_rounding_error_galaxies': total_rounding_error_galaxies,
'adjusted_z_per_star_per_block': adjusted_z_per_star_per_block,
@ -320,6 +328,8 @@ def print_allocation_calculations(allocation_data):
total_stars_locked = allocation_data['total_stars_locked']
total_galaxies_locked = allocation_data['total_galaxies_locked']
rounding_error_per_star = allocation_data['rounding_error_per_star']
rounding_error_per_galaxy = allocation_data['rounding_error_per_galaxy']
adjusted_max_allocation_per_star = allocation_data['adjusted_max_allocation_per_star']
adjusted_max_allocation_per_galaxy = allocation_data['adjusted_max_allocation_per_galaxy']
adjusted_z_per_star_per_block = allocation_data['adjusted_z_per_star_per_block']
@ -354,8 +364,6 @@ def print_allocation_calculations(allocation_data):
print_table_with_borders(quanta_df, "⚙️ QUANTA CALCULATION")
# Adjusted Allocations Table (after quanta calculation)
rounding_error_per_star = raw_allocation_per_star - adjusted_max_allocation_per_star
rounding_error_per_galaxy = raw_allocation_per_galaxy - adjusted_max_allocation_per_galaxy
percentage_rounding_error_stars = total_rounding_error_stars / TOTAL_SUPPLY * 100 if total_stars_locked > 0 else Decimal('0')
percentage_rounding_error_galaxies = total_rounding_error_galaxies / TOTAL_SUPPLY * 100 if total_galaxies_locked > 0 else Decimal('0')
@ -617,57 +625,9 @@ def create_visualization(allocation_data, final_data):
plt.subplots_adjust(bottom=0.08, top=0.88, left=0.05, right=0.98)
plt.show()
def load_watcher_events_data(generated_dir='generated'):
"""Load participation data from watcher events file."""
import urbitob
watcher_events_path = os.path.join(generated_dir, 'watcher-events.json')
try:
with open(watcher_events_path, 'r') as f:
data = json.load(f)
events = data['data']['eventsInRange']
# Analyze events
lock_duration_counts = {
'star': defaultdict(int),
'galaxy': defaultdict(int)
}
for event_data in events:
if event_data['event']['__typename'] == 'PointLockedEvent':
point = event_data['event']['point']
lock_period = event_data['event']['lock_period']
point_num = urbitob.patp_to_num(point)
point_type = "galaxy" if point_num < NUM_GALAXIES else "star"
lock_duration_counts[point_type][lock_period] += 1
# Extract counts
result = {}
for years in [1, 2, 3, 4, 5]:
result[f'stars_{years}_years'] = lock_duration_counts['star'][years]
result[f'galaxies_{years}_years'] = lock_duration_counts['galaxy'][years]
return result, True
except FileNotFoundError:
print(f"⚠️ Watcher events file not found at {watcher_events_path}")
return None, False
except Exception as e:
print(f"⚠️ Error loading watcher events: {e}")
return None, False
# Experimental notebook widget and interaction functions
def create_experiment_widgets():
"""Create input widgets for the experimental notebook."""
try:
import ipywidgets as widgets
except ImportError:
raise ImportError("ipywidgets is required for the experimental interface. Install with: pip install ipywidgets")
# Define reasonable default values
default_participants = {
'stars_1_years': 8000,
@ -742,13 +702,6 @@ def create_experiment_widgets():
# Set up export button handler
def on_export_click(button=None):
"""Handle export button click with timestamp."""
try:
from datetime import datetime
from IPython.display import clear_output
except ImportError:
print("❌ Required libraries not available")
return
# Generate timestamp for filename
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f'lockdrop_calculations_result_{timestamp}.json'
@ -824,10 +777,6 @@ def create_calculate_function(widget_dict):
"""Create the calculation and display function for the experimental notebook."""
def calculate_and_display(button=None):
"""Calculate allocations and display results based on input parameters."""
try:
from IPython.display import clear_output
except ImportError:
raise ImportError("IPython is required for the experimental interface")
widgets_map = widget_dict['widgets']
output_area = widget_dict['output_area']
@ -913,7 +862,6 @@ def create_calculate_function(widget_dict):
except Exception as e:
print(f"❌ Error during calculation: {str(e)}")
import traceback
traceback.print_exc()
return calculate_and_display
@ -983,52 +931,113 @@ def create_scenario_loader(widget_dict):
return load_scenario
def create_export_function(widget_dict):
"""Create export function for the experimental notebook."""
def export_current_scenario(filename='lockdrop_allocations_experiment.json'):
"""Export current scenario results to JSON file."""
widgets_map = widget_dict['widgets']
# Simulation-specific helper functions for processing generated data
# Get current values from widgets
participation_counts = {
'stars_1_years': widgets_map['stars_1_years'].value,
'stars_2_years': widgets_map['stars_2_years'].value,
'stars_3_years': widgets_map['stars_3_years'].value,
'stars_4_years': widgets_map['stars_4_years'].value,
'stars_5_years': widgets_map['stars_5_years'].value,
'galaxies_1_years': widgets_map['galaxies_1_years'].value,
'galaxies_2_years': widgets_map['galaxies_2_years'].value,
'galaxies_3_years': widgets_map['galaxies_3_years'].value,
'galaxies_4_years': widgets_map['galaxies_4_years'].value,
'galaxies_5_years': widgets_map['galaxies_5_years'].value
def simulation_load_watcher_events(file_path):
"""Load and parse PointLockedEvent events from JSON file (simulation-specific)."""
with open(file_path, 'r') as f:
data = json.load(f)
# Filter for only PointLockedEvent events during loading
point_locked_events = [
event_data for event_data in data['data']['eventsInRange']
if event_data['event']['__typename'] == 'PointLockedEvent'
]
return point_locked_events
def simulation_analyze_lockdrop_events(events):
"""Analyze PointLockedEvent events and return participation statistics (simulation-specific)."""
# Initialize counters
lock_duration_counts = {
'star': defaultdict(int),
'galaxy': defaultdict(int)
}
# Process events (already filtered to PointLockedEvent only)
for event_data in events:
point = event_data['event']['point']
lock_period = event_data['event']['lock_period']
# Determine if it's a galaxy or star
point_num = urbitob.patp_to_num(point)
point_type = "galaxy" if point_num < NUM_GALAXIES else "star"
# Count by lock period
lock_duration_counts[point_type][lock_period] += 1
# Extract counts for each year and point type
result = {}
for years in [1, 2, 3, 4, 5]:
result[f'stars_{years}_years'] = lock_duration_counts['star'][years]
result[f'galaxies_{years}_years'] = lock_duration_counts['galaxy'][years]
return result
def run_simulation_analysis(generated_dir=None):
"""Complete simulation analysis pipeline (simulation-specific)."""
# Determine generated directory
if generated_dir is None:
generated_dir = os.getenv('GENERATED_DIR', 'generated')
# Load and analyze watcher events
watcher_events_path = os.path.join(generated_dir, 'watcher-events.json')
print("=" * 80)
print("📁 WATCHER EVENTS DATA SUMMARY")
print("=" * 80)
try:
point_locked_events = simulation_load_watcher_events(watcher_events_path)
if point_locked_events:
total_events = len(point_locked_events)
print(f"✅ Successfully loaded {total_events} PointLockedEvent records from watcher file")
else:
print("⚠️ No PointLockedEvent events found")
return None
# Analyze events and convert to Decimal
lock_stats = simulation_analyze_lockdrop_events(point_locked_events)
participation_counts = {k: Decimal(v) for k, v in lock_stats.items()}
# Run calculations
allocation_data = calculate_dynamic_allocations(participation_counts)
bonus_data = calculate_bonus_pools(allocation_data)
final_data = calculate_final_allocations(allocation_data, bonus_data)
# Print comprehensive analysis
print_analysis_tables(allocation_data, bonus_data, final_data)
# Generate and save test output
test_output = generate_test_output(final_data)
allocations_output_file = 'lockdrop_allocations_notebook.json'
export_data = {
'metadata': {
'source': 'lockdrop-calculations-simulated.ipynb',
'participation_counts': lock_stats
},
'allocations': test_output
}
try:
# Calculate allocations
allocation_data = calculate_dynamic_allocations(participation_counts)
bonus_data = calculate_bonus_pools(allocation_data)
final_data = calculate_final_allocations(allocation_data, bonus_data)
with open(allocations_output_file, 'w') as f:
json.dump(export_data, f, indent=2)
# Generate test output
test_output = generate_test_output(final_data)
print("=" * 80)
print("💾 JSON OUTPUT GENERATED")
print(f"✅ Final allocations saved to: {allocations_output_file}")
print("=" * 80)
# Add metadata
export_data = {
'metadata': {
'source': 'lockdrop-calculations.ipynb',
'participation_counts': participation_counts
},
'allocations': test_output
}
# Return data for visualization
return allocation_data, final_data
# Save to file
with open(filename, 'w') as f:
json.dump(export_data, f, indent=2)
print(f"✅ Results exported to: {filename}")
print(f"📊 Total participants: {sum(participation_counts.values()):,}")
except Exception as e:
print(f"❌ Export failed: {str(e)}")
return export_current_scenario
except FileNotFoundError:
print(f"❌ Watcher events file not found: {watcher_events_path}")
print(" Make sure GENERATED_DIR environment variable points to the correct directory")
return None
except Exception as e:
print(f"❌ Error during simulation analysis: {e}")
return None