solidity/scripts/common/rest_api_helpers.py

190 lines
6.3 KiB
Python
Raw Permalink Normal View History

from pathlib import Path
from typing import List, Mapping, Optional
import functools
import json
import operator
import shutil
import requests
class APIHelperError(Exception):
pass
class JobNotSuccessful(APIHelperError):
def __init__(self, name: str, status: str):
assert status != 'success'
self.name = name
self.status = status
self.job_finished = (status in ['failed', 'blocked'])
if status == 'not_running':
message = f"Job {name} has not started yet."
elif status == 'blocked':
message = f"Job {name} will not run because one of its dependencies failed."
elif status == 'running':
message = f"Job {name} is still running."
elif status == 'failed':
message = f"Job {name} failed."
else:
message = f"Job {name} did not finish successfully. Current status: {status}."
super().__init__(message)
class JobMissing(APIHelperError):
pass
class InvalidResponse(APIHelperError):
pass
class FileAlreadyExists(APIHelperError):
pass
def query_api(url: str, params: Mapping[str, str], debug_requests=False) -> dict:
if debug_requests:
print(f'REQUEST URL: {url}')
if len(params) > 0:
print(f'QUERY: {params}')
2022-08-29 09:15:53 +00:00
response = requests.get(url, params=params, timeout=60)
response.raise_for_status()
if debug_requests:
json_response = response.json()
print('========== RESPONSE ==========')
if json_response is not None:
print(json.dumps(json_response, indent=4))
else:
print(response.content)
print('==============================')
return response.json()
def download_file(url: str, target_path: Path, overwrite=False):
if not overwrite and target_path.exists():
raise FileAlreadyExists(f"Refusing to overwrite existing file: '{target_path}'.")
2022-08-29 09:15:53 +00:00
with requests.get(url, stream=True, timeout=60) as request:
with open(target_path, 'wb') as target_file:
shutil.copyfileobj(request.raw, target_file)
class Github:
BASE_URL = 'https://api.github.com'
project_slug: str
debug_requests: bool
def __init__(self, project_slug: str, debug_requests: bool):
self.project_slug = project_slug
self.debug_requests = debug_requests
def pull_request(self, pr_id: int) -> dict:
return query_api(
f'{self.BASE_URL}/repos/{self.project_slug}/pulls/{pr_id}',
{},
self.debug_requests
)
class CircleCI:
# None might be a more logical default for max_pages but in most cases we'll actually
# want some limit to prevent flooding the API with requests in case of a bug.
DEFAULT_MAX_PAGES = 10
BASE_URL = 'https://circleci.com/api/v2'
project_slug: str
debug_requests: bool
def __init__(self, project_slug: str, debug_requests: bool):
self.project_slug = project_slug
self.debug_requests = debug_requests
def paginated_query_api_iterator(self, url: str, params: Mapping[str, str], max_pages: int = DEFAULT_MAX_PAGES):
assert 'page-token' not in params
page_count = 0
next_page_token = None
while max_pages is None or page_count < max_pages:
if next_page_token is not None:
params = {**params, 'page-token': next_page_token}
json_response = query_api(url, params, self.debug_requests)
yield json_response['items']
next_page_token = json_response['next_page_token']
page_count += 1
if next_page_token is None:
break
def paginated_query_api(self, url: str, params: Mapping[str, str], max_pages: int = DEFAULT_MAX_PAGES):
return functools.reduce(operator.add, self.paginated_query_api_iterator(url, params, max_pages), [])
def pipelines(
self,
branch: Optional[str] = None,
commit_hash: Optional[str] = None,
excluded_trigger_types: List[str] = None,
) -> List[dict]:
if excluded_trigger_types is None:
excluded_trigger_types = []
for items in self.paginated_query_api_iterator(
f'{self.BASE_URL}/project/gh/{self.project_slug}/pipeline',
{'branch': branch} if branch is not None else {},
max_pages=10,
):
matching_items = [
item
for item in items
if (
(commit_hash is None or item['vcs']['revision'] == commit_hash) and
item['trigger']['type'] not in excluded_trigger_types
)
]
if len(matching_items) > 0:
return matching_items
return []
def workflows(self, pipeline_id: str) -> dict:
return self.paginated_query_api(f'{self.BASE_URL}/pipeline/{pipeline_id}/workflow', {})
def jobs(self, workflow_id: str) -> Mapping[str, dict]:
items = self.paginated_query_api(f'{self.BASE_URL}/workflow/{workflow_id}/job', {})
jobs_by_name = {job['name']: job for job in items}
assert len(jobs_by_name) <= len(items)
if len(jobs_by_name) < len(items):
raise InvalidResponse("Job names in the workflow are not unique.")
return jobs_by_name
def job(self, workflow_id: str, name: str, require_success: bool = False) -> dict:
jobs = self.jobs(workflow_id)
if name not in jobs:
raise JobMissing(f"Job {name} is not present in the workflow.")
if require_success and jobs[name]['status'] != 'success':
raise JobNotSuccessful(name, jobs[name]['status'])
return jobs[name]
def artifacts(self, job_number: int) -> Mapping[str, dict]:
items = self.paginated_query_api(f'{self.BASE_URL}/project/gh/{self.project_slug}/{job_number}/artifacts', {})
artifacts_by_name = {artifact['path']: artifact for artifact in items}
assert len(artifacts_by_name) <= len(items)
if len(artifacts_by_name) < len(items):
raise InvalidResponse("Names of artifacts attached to the job are not unique.")
return artifacts_by_name
@staticmethod
def latest_item(items: dict) -> dict:
sorted_items = sorted(items, key=lambda item: item['created_at'], reverse=True)
return sorted_items[0] if len(sorted_items) > 0 else None