2023-11-14 18:05:07 +00:00
from collections import namedtuple
from playwright . sync_api import Page
from vega_sim . null_service import VegaServiceNull
from typing import Optional
WalletConfig = namedtuple ( " WalletConfig " , [ " name " , " passphrase " ] )
ASSET_NAME = " tDAI "
2023-12-12 10:15:20 +00:00
2023-11-14 18:05:07 +00:00
def wait_for_toast_confirmation ( page : Page , timeout : int = 30000 ) :
page . wait_for_function ( """
document . querySelector ( ' [data-testid= " toast-content " ] ' ) & &
document . querySelector ( ' [data-testid= " toast-content " ] ' ) . innerText . includes ( ' AWAITING CONFIRMATION ' )
""" , timeout=timeout)
2023-12-12 10:15:20 +00:00
2023-11-14 18:05:07 +00:00
def create_and_faucet_wallet (
vega : VegaServiceNull ,
wallet : WalletConfig ,
symbol : Optional [ str ] = None ,
amount : float = 1e4 ,
2023-12-12 10:15:20 +00:00
2023-11-14 18:05:07 +00:00
) :
2023-12-12 10:15:20 +00:00
asset_id = vega . find_asset_id (
symbol = symbol if symbol is not None else ASSET_NAME )
2023-11-14 18:05:07 +00:00
vega . create_key ( wallet . name )
vega . mint ( wallet . name , asset_id , amount )
2023-12-12 10:15:20 +00:00
2023-11-14 18:05:07 +00:00
def next_epoch ( vega : VegaServiceNull ) :
forwards = 0
epoch_seq = vega . statistics ( ) . epoch_seq
while epoch_seq == vega . statistics ( ) . epoch_seq :
vega . wait_fn ( 1 )
forwards + = 1
if forwards > 2 * 10 * 60 :
raise Exception (
" Epoch not started after forwarding the duration of two epochs. "
)
vega . wait_fn ( 1 )
2023-11-21 14:36:02 +00:00
vega . wait_for_total_catchup ( )
2023-12-12 10:15:20 +00:00
2023-11-21 14:36:02 +00:00
def truncate_middle ( market_id , start = 6 , end = 4 ) :
if len ( market_id ) < 11 :
return market_id
return market_id [ : start ] + ' \u2026 ' + market_id [ - end : ]
2023-12-12 10:15:20 +00:00
def change_keys ( page : Page , vega : VegaServiceNull , key_name ) :
2023-11-21 14:36:02 +00:00
page . get_by_test_id ( " manage-vega-wallet " ) . click ( )
page . get_by_test_id ( " key- " + vega . wallet . public_key ( key_name ) ) . click ( )
2023-12-12 10:15:20 +00:00
page . click (
f ' data-testid=key- { vega . wallet . public_key ( key_name ) } >> .inline-flex ' )
2023-11-21 14:36:02 +00:00
page . reload ( )
2023-12-12 10:15:20 +00:00
def forward_time ( vega : VegaServiceNull , forward_epoch : bool = False ) :
vega . wait_fn ( 1 )
vega . wait_for_total_catchup ( )
if forward_epoch :
next_epoch ( vega )
# This is for when the element will initially load but contain an outdated value. It will wait for the element to contain the expected text, returning False after a timeout or exception
def selector_contains_text ( page : Page , selector , expected_text , timeout = 5000 ) :
try :
page . wait_for_selector (
f ' { selector } >> text= { expected_text } ' , timeout = timeout )
return True
except :
return False