fetch-images command #768
@ -15,8 +15,11 @@
|
||||
|
||||
import click
|
||||
from dataclasses import dataclass
|
||||
import json
|
||||
import platform
|
||||
from python_on_whales import DockerClient
|
||||
from python_on_whales.components.manifest.cli_wrapper import ManifestCLI, ManifestList
|
||||
from python_on_whales.utils import run
|
||||
import requests
|
||||
from typing import List
|
||||
|
||||
@ -34,10 +37,20 @@ class RegistryInfo:
|
||||
registry_token: str
|
||||
|
||||
|
||||
# Extending this code to support the --verbose option, cnosider contributing upstream
|
||||
# https://github.com/gabrieldemarmiesse/python-on-whales/blob/master/python_on_whales/components/manifest/cli_wrapper.py#L129
|
||||
class ExtendedManifestCLI(ManifestCLI):
|
||||
def inspect_verbose(self, x: str) -> ManifestList:
|
||||
"""Returns a Docker manifest list object."""
|
||||
json_str = run(self.docker_cmd + ["manifest", "inspect", "--verbose", x])
|
||||
return json.loads(json_str)
|
||||
|
||||
|
||||
def _local_tag_for(container: str):
|
||||
return f"{container}:local"
|
||||
|
||||
|
||||
# See: https://docker-docs.uclv.cu/registry/spec/api/
|
||||
# Emulate this:
|
||||
# $ curl -u "my-username:my-token" -X GET "https://<container-registry-hostname>/v2/cerc-io/cerc/test-container/tags/list"
|
||||
# {"name":"cerc-io/cerc/test-container","tags":["202402232130","202402232208"]}
|
||||
@ -51,7 +64,7 @@ def _get_tags_for_container(container: str, registry_info: RegistryInfo) -> List
|
||||
if response.status_code == 200:
|
||||
tag_info = response.json()
|
||||
if opts.o.debug:
|
||||
print(f"Got this: {tag_info}")
|
||||
print(f"container tags list: {tag_info}")
|
||||
tags_array = tag_info["tags"]
|
||||
return tags_array
|
||||
else:
|
||||
@ -59,16 +72,37 @@ def _get_tags_for_container(container: str, registry_info: RegistryInfo) -> List
|
||||
|
||||
|
||||
def _find_latest(candidate_tags: List[str]):
|
||||
# HACK: return the first one for now
|
||||
return candidate_tags[0]
|
||||
# Lex sort should give us the latest first
|
||||
sorted_candidates = candidate_tags.sort()
|
||||
return sorted_candidates[0]
|
||||
|
||||
|
||||
def _filter_for_platform(container: str,
|
||||
registry_info: RegistryInfo,
|
||||
tag_list: List[str]) -> List[str] :
|
||||
this_platform = platform.architecture()
|
||||
print(f"architecture is: {this_platform}")
|
||||
return tag_list
|
||||
filtered_tags = []
|
||||
this_machine = platform.machine()
|
||||
# Translate between Python platform names and docker
|
||||
if this_machine == "x86_64":
|
||||
this_machine = "amd64"
|
||||
if opts.o.debug:
|
||||
print(f"Python says the architecture is: {this_machine}")
|
||||
docker = DockerClient()
|
||||
docker.login(registry_info.registry, registry_info.registry_username, registry_info.registry_token)
|
||||
for tag in tag_list:
|
||||
remote_tag = f"{registry_info.registry}/{container}:{tag}"
|
||||
manifest_cmd = ExtendedManifestCLI(docker.client_config)
|
||||
manifest = manifest_cmd.inspect_verbose(remote_tag)
|
||||
if opts.o.debug:
|
||||
print(f"manifest: {manifest}")
|
||||
image_architecture = manifest["Descriptor"]["platform"]["architecture"]
|
||||
if opts.o.debug:
|
||||
print(f"image_architecture: {image_architecture}")
|
||||
if this_machine == image_architecture:
|
||||
filtered_tags.append(tag)
|
||||
if opts.o.debug:
|
||||
print(f"Tags filtered for platform: {filtered_tags}")
|
||||
return filtered_tags
|
||||
|
||||
|
||||
def _get_latest_image(container: str, registry_info: RegistryInfo):
|
||||
|
Loading…
Reference in New Issue
Block a user