stack-orchestrator/app/deploy/k8s/helpers.py

58 lines
1.8 KiB
Python
Raw Normal View History

2023-10-27 16:19:44 +00:00
# Copyright © 2023 Vulcanize
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
from kubernetes import client
import subprocess
from typing import Set
from app.opts import opts
def _run_command(command: str):
if opts.o.debug:
print(f"Running: {command}")
result = subprocess.run(command, shell=True)
if opts.o.debug:
print(f"Result: {result}")
def create_cluster(name: str):
_run_command(f"kind create cluster --name {name}")
def destroy_cluster(name: str):
_run_command(f"kind delete cluster --name {name}")
def load_images_into_kind(kind_cluster_name: str, image_set: Set[str]):
for image in image_set:
_run_command(f"kind load docker-image {image} --name {kind_cluster_name}")
def pods_in_deployment(core_api: client.CoreV1Api, deployment_name: str):
pods = []
pod_response = core_api.list_namespaced_pod(namespace="default", label_selector="app=test-app")
if opts.o.debug:
print(f"pod_response: {pod_response}")
for pod_info in pod_response.items:
pod_name = pod_info.metadata.name
pods.append(pod_name)
return pods
def log_stream_from_string(s: str):
# Note response has to be UTF-8 encoded because the caller expects to decode it
yield ("ignore", s.encode())