vega-frontend-monorepo/apps/trading/e2e/conftest.py

342 lines
12 KiB
Python

import logging
import pytest
import os
import json
import requests
import time
import docker
import http.server
import sys
from dotenv import load_dotenv
from playwright.sync_api import Error as PlaywrightError
from docker.models.containers import Container
from docker.errors import APIError
from contextlib import contextmanager
from vega_sim.null_service import VegaServiceNull, Ports
from playwright.sync_api import Browser, Page
from config import console_image_name, vega_version
from datetime import datetime, timedelta
from fixtures.market import (
setup_simple_market,
setup_opening_auction_market,
setup_continuous_market,
setup_perps_market,
)
# Workaround for current xdist issue with displaying live logs from multiple workers
# https://github.com/pytest-dev/pytest-xdist/issues/402
sys.stdout = sys.stderr
docker_client = docker.from_env()
logger = logging.getLogger()
load_dotenv()
@pytest.hookimpl(tryfirst=True)
def pytest_runtest_makereport(item, call):
outcome = "passed" if call.excinfo is None else "failed"
item.config.cache.set(item.nodeid, outcome)
def pytest_configure(config):
worker_id = os.environ.get("PYTEST_XDIST_WORKER")
if worker_id is not None:
log_dir = os.path.join(os.getcwd(), "logs")
log_name = f"tests_{worker_id}.log"
if not os.path.exists(log_dir):
os.makedirs(log_dir)
logging.basicConfig(
format=config.getini("log_file_format"),
datefmt=config.getini("log_file_date_format"),
filename=os.path.join(log_dir, log_name),
level=config.getini("log_file_level"),
)
class CustomHttpRequestHandler(http.server.SimpleHTTPRequestHandler):
def do_GET(self):
# Set the path to your website's directory here
if self.path == "/":
self.path = "dist/apps/trading/exported/index.html"
return http.server.SimpleHTTPRequestHandler.do_GET(self)
# Start VegaServiceNull
@contextmanager
def init_vega(request=None):
local_server = os.getenv("LOCAL_SERVER", "false").lower() == "true"
port_config = None
if local_server:
port_config = {
Ports.DATA_NODE_REST: 8001,
}
default_seconds = 1
seconds_per_block = default_seconds
if request and hasattr(request, "param"):
seconds_per_block = request.param
logger.info(
"Starting VegaServiceNull",
extra={"worker_id": os.environ.get("PYTEST_XDIST_WORKER")},
)
logger.info(f"Using console image: {console_image_name}")
logger.info(f"Using vega version: {vega_version}")
vega_service_args = {
"run_with_console": False,
"launch_graphql": False,
"retain_log_files": True,
"use_full_vega_wallet": True,
"store_transactions": True,
"transactions_per_block": 1000,
"seconds_per_block": seconds_per_block,
"genesis_time": datetime.now() - timedelta(hours=1),
}
if port_config is not None:
vega_service_args["port_config"] = port_config
with VegaServiceNull(**vega_service_args) as vega:
try:
container = docker_client.containers.run(
console_image_name, detach=True, ports={"80/tcp": vega.console_port}
)
if not isinstance(container, Container):
raise Exception("container instance invalid")
logger.info(
f"Container {container.id} started",
extra={"worker_id": os.environ.get("PYTEST_XDIST_WORKER")},
)
vega.container = container
logger.info(f"Container ID: {container.id}, Name: {container.name}, Status: {container.status}")
yield vega
except APIError as e:
logger.info(f"Container creation failed.")
logger.info(e)
raise e
finally:
logger.info(f"Stopping container {container.id}")
container.stop()
logger.info(f"Container ID: {container.id}, Name: {container.name}, Status: {container.status}")
logger.info(f"Removing container {container.id}")
container.remove()
logger.info(f"Container ID: {container.id}, Name: {container.name}, Status: {container.status}")
@contextmanager
def init_page(vega: VegaServiceNull, browser: Browser, request: pytest.FixtureRequest):
local_server = os.getenv("LOCAL_SERVER", "false").lower() == "true"
server_port = "4200" if local_server else str(vega.console_port)
with browser.new_context(
viewport={"width": 1920, "height": 1080},
base_url=f"http://localhost:{server_port}",
) as context, context.new_page() as page:
context.tracing.start(screenshots=True, snapshots=True, sources=True)
try:
# Wait for the console to be up and running before any tests are run
attempts = 0
while attempts < 100:
try:
code = requests.get(f"http://localhost:{server_port}/").status_code
if code == 200:
break
except requests.exceptions.ConnectionError as e:
attempts += 1
if attempts < 100:
time.sleep(0.1)
continue
else:
raise e
# Set window._env_ so built app uses datanode from vega market sim
env = json.dumps(
{
"VEGA_URL": f"http://localhost:{vega.data_node_rest_port}/graphql",
"VEGA_WALLET_URL": f"http://localhost:{vega.wallet_port}",
}
)
window_env = f"window._env_ = Object.assign({{}}, window._env_, {env})"
page.add_init_script(script=window_env)
yield page
finally:
try:
if not os.path.exists("apps/trading/e2e/traces"):
os.makedirs("apps/trading/e2e/traces")
except OSError as e:
print(f"Failed to create directory '{'apps/trading/e2e/traces'}': {e}")
# Check whether this test failed or passed
outcome = request.config.cache.get(request.node.nodeid, None)
if outcome != "passed":
try:
trace_path = os.path.join("traces", request.node.name + "trace.zip")
context.tracing.stop(path=trace_path)
except Exception as e:
logger.error(f"Failed to save trace: {e}")
@pytest.fixture
def vega(request):
with init_vega(request) as vega_instance:
request.addfinalizer(lambda: cleanup_container(vega_instance))
yield vega_instance
def cleanup_container(vega_instance):
# Ensure there is a list of containers to work with
if hasattr(vega_instance, 'containers') and vega_instance.containers:
for container in vega_instance.containers:
try:
# Check if each container is still running and then stop it
if container.status == 'running':
print(f"Stopping container {container.id}")
container.stop()
else:
print(f"Container {container.id} is not running.")
except docker.errors.NotFound:
print(f"Container {container.id} not found, may have been stopped and removed.")
except Exception as e:
print(f"Error during cleanup for container {container.id}: {str(e)}")
try:
# Attempt to remove the container after stopping it
container.remove()
print(f"Container {container.id} removed.")
except docker.errors.NotFound:
print(f"Container {container.id} not found, may have been removed.")
except Exception as e:
print(f"Error during container removal for container {container.id}: {str(e)}")
else:
print("No containers to cleanup.")
@pytest.fixture
def page(vega, browser, request):
with init_page(vega, browser, request) as page_instance:
yield page_instance
# Set auth token so eager connection for MarketSim wallet is successful
def auth_setup(vega: VegaServiceNull, page: Page):
DEFAULT_WALLET_NAME = "MarketSim" # This is the default wallet name within VegaServiceNull and CANNOT be changed
# Calling get_keypairs will internally call _load_tokens for the given wallet
keypairs = vega.wallet.get_keypairs(DEFAULT_WALLET_NAME)
wallet_api_token = vega.wallet.login_tokens[DEFAULT_WALLET_NAME]
# Set token to localStorage so eager connect hook picks it up and immediately connects
# no pubkey is set so it should default to the first pubkey
wallet_config = json.dumps(
{
"state":{
"chainId":"CUSTOM",
"current":"jsonRpc",
"jsonRpcToken": f"VWT {wallet_api_token}",
},
"version":0
}
)
onboarding_config = json.dumps({
"state": {
"dismissed": True,
"risk": "accepted"
},
"version": 0
})
# Before we set the wallet store, get any currently stored key and set it as well.
# This way if you navigate the page, reload or start a new test, the last used key
# will be persisted. Playwright will clear storage on every test and navigation event
storage_javascript = f"""
var currentStorage = JSON.parse(localStorage.getItem('vega_wallet_store'));
var defaultStorage = JSON.parse('{wallet_config}');
var pubKey = currentStorage?.state?.pubKey;
defaultStorage['state']['pubKey'] = pubKey || undefined;
localStorage.setItem('vega_wallet_store', JSON.stringify(defaultStorage));
localStorage.setItem('vega_onboarding', '{onboarding_config}');
"""
script = "".join(storage_javascript)
page.add_init_script(script)
return {
"wallet": DEFAULT_WALLET_NAME,
"wallet_api_token": wallet_api_token,
"public_key": keypairs["Key 1"],
}
@pytest.fixture(scope="function")
def auth(vega: VegaServiceNull, page: Page):
return auth_setup(vega, page)
# Set 'risk accepted' flag, so that the risk dialog doesn't show up
def risk_accepted_setup(page: Page):
onboarding_config = json.dumps({
"state": {
"dismissed": True,
"risk": "accepted"
},
"version": 0
})
storage_javascript = [
f"localStorage.setItem('vega_onboarding', '{onboarding_config}');",
"localStorage.setItem('vega_telemetry_approval', 'false');",
"localStorage.setItem('vega_telemetry_viewed', 'true');",
]
script = "".join(storage_javascript)
page.add_init_script(script)
@pytest.fixture(scope="function")
def risk_accepted(page: Page):
risk_accepted_setup(page)
@pytest.fixture(scope="function")
def simple_market(vega, request):
kwargs = {}
if hasattr(request, "param"):
kwargs.update(request.param)
return setup_simple_market(vega, **kwargs)
@pytest.fixture(scope="function")
def opening_auction_market(vega):
return setup_opening_auction_market(vega)
@pytest.fixture(scope="function")
def continuous_market(vega):
return setup_continuous_market(vega)
@pytest.fixture(scope="function")
def proposed_market(vega):
return setup_simple_market(vega, approve_proposal=False)
@pytest.fixture(scope="function")
def perps_market(vega, request):
kwargs = {}
if hasattr(request, "param"):
kwargs.update(request.param)
return setup_perps_market(vega, **kwargs)
@pytest.fixture(autouse=True)
def retry_on_http_error(request):
retry_count = 3
for i in range(retry_count):
try:
yield
return
except requests.exceptions.HTTPError:
if i < retry_count - 1:
print(f"Retrying due to HTTPError (attempt {i+1}/{retry_count})")
else:
raise