fetch-images command #768
@ -14,6 +14,8 @@
|
||||
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
|
||||
|
||||
import click
|
||||
from dataclasses import dataclass
|
||||
import platform
|
||||
from python_on_whales import DockerClient
|
||||
import requests
|
||||
from typing import List
|
||||
@ -25,6 +27,13 @@ from stack_orchestrator.build.build_util import get_containers_in_scope
|
||||
# Experimental fetch-container command
|
||||
|
||||
|
||||
@dataclass
|
||||
class RegistryInfo:
|
||||
registry: str
|
||||
registry_username: str
|
||||
registry_token: str
|
||||
|
||||
|
||||
def _local_tag_for(container: str):
|
||||
return f"{container}:local"
|
||||
|
||||
@ -32,13 +41,13 @@ def _local_tag_for(container: str):
|
||||
# Emulate this:
|
||||
# $ curl -u "my-username:my-token" -X GET "https://<container-registry-hostname>/v2/cerc-io/cerc/test-container/tags/list"
|
||||
# {"name":"cerc-io/cerc/test-container","tags":["202402232130","202402232208"]}
|
||||
def _get_tags_for_container(container: str, registry: str, registry_username: str, registry_token: str) -> List[str]:
|
||||
def _get_tags_for_container(container: str, registry_info: RegistryInfo) -> List[str]:
|
||||
# registry looks like: git.vdb.to/cerc-io
|
||||
registry_parts = registry.split("/")
|
||||
registry_parts = registry_info.registry.split("/")
|
||||
url = f"https://{registry_parts[0]}/v2/{registry_parts[1]}/{container}/tags/list"
|
||||
if opts.o.debug:
|
||||
print(f"Fetching tags from: {url}")
|
||||
response = requests.get(url, auth=(registry_username, registry_token))
|
||||
response = requests.get(url, auth=(registry_info.registry_username, registry_info.registry_token))
|
||||
if response.status_code == 200:
|
||||
tag_info = response.json()
|
||||
if opts.o.debug:
|
||||
@ -54,16 +63,25 @@ def _find_latest(candidate_tags: List[str]):
|
||||
return candidate_tags[0]
|
||||
|
||||
|
||||
def _get_latest_image(container: str, registry: str, registry_username: str, registry_token: str):
|
||||
all_tags = _get_tags_for_container(container, registry, registry_username, registry_token)
|
||||
latest_tag = _find_latest(all_tags)
|
||||
def _filter_for_platform(container: str,
|
||||
registry_info: RegistryInfo,
|
||||
tag_list: List[str]) -> List[str] :
|
||||
this_platform = platform.architecture()
|
||||
print(f"architecture is: {this_platform}")
|
||||
return tag_list
|
||||
|
||||
|
||||
def _get_latest_image(container: str, registry_info: RegistryInfo):
|
||||
all_tags = _get_tags_for_container(container, registry_info)
|
||||
tags_for_platform = _filter_for_platform(container, registry_info, all_tags)
|
||||
latest_tag = _find_latest(tags_for_platform)
|
||||
return f"{container}:{latest_tag}"
|
||||
|
||||
|
||||
def _fetch_image(tag: str, registry: str, registry_username: str, registry_token: str):
|
||||
def _fetch_image(tag: str, registry_info: RegistryInfo):
|
||||
docker = DockerClient()
|
||||
docker.login(registry, registry_username, registry_token)
|
||||
remote_tag = f"{registry}/{tag}"
|
||||
docker.login(registry_info.registry, registry_info.registry_username, registry_info.registry_token)
|
||||
remote_tag = f"{registry_info.registry}/{tag}"
|
||||
if opts.o.debug:
|
||||
print(f"Attempting to pull this image: {remote_tag}")
|
||||
docker.image.pull(remote_tag)
|
||||
@ -95,6 +113,7 @@ def _add_local_tag(remote_tag: str, registry: str, local_tag: str):
|
||||
def command(ctx, include, exclude, force_local_overwrite, image_registry, registry_username, registry_token):
|
||||
'''EXPERIMENTAL: fetch the images for a stack from remote registry'''
|
||||
|
||||
registry_info = RegistryInfo(image_registry, registry_username, registry_token)
|
||||
# Generate list of target containers
|
||||
stack = ctx.obj.stack
|
||||
containers_in_scope = get_containers_in_scope(stack)
|
||||
@ -105,10 +124,10 @@ def command(ctx, include, exclude, force_local_overwrite, image_registry, regist
|
||||
print(f"Processing: {container}")
|
||||
# For each container, attempt to find the latest of a set of
|
||||
# images with the correct name and platform in the specified registry
|
||||
image_to_fetch = _get_latest_image(container, image_registry, registry_username, registry_token)
|
||||
image_to_fetch = _get_latest_image(container, registry_info)
|
||||
if opts.o.debug:
|
||||
print(f"Fetching: {image_to_fetch}")
|
||||
_fetch_image(image_to_fetch, image_registry, registry_username, registry_token)
|
||||
_fetch_image(image_to_fetch, registry_info)
|
||||
# Now check if the target container already exists exists locally already
|
||||
if (_exists_locally(container)):
|
||||
if not opts.o.quiet:
|
||||
|
Loading…
Reference in New Issue
Block a user