stack-orchestrator/stack_orchestrator/deploy/k8s/helpers.py
A. F. Dudley d9c0614ba3
All checks were successful
Lint Checks / Run linter (push) Successful in 15s
Set ConfigMap defaultMode to 0755 for executable scripts
ConfigMaps containing scripts need execute permissions.
Without this, scripts mounted from ConfigMaps fail with
"permission denied" when used as container commands.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 06:02:26 -05:00

416 lines
16 KiB
Python

# Copyright © 2023 Vulcanize
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
from kubernetes import client, utils, watch
import os
from pathlib import Path
import subprocess
import re
from typing import Set, Mapping, List
from stack_orchestrator.util import get_k8s_dir, error_exit
from stack_orchestrator.opts import opts
from stack_orchestrator.deploy.deploy_util import parsed_pod_files_map_from_file_names
from stack_orchestrator.deploy.deployer import DeployerException
def _run_command(command: str):
if opts.o.debug:
print(f"Running: {command}")
result = subprocess.run(command, shell=True)
if opts.o.debug:
print(f"Result: {result}")
return result
def get_kind_cluster():
"""Get an existing kind cluster, if any.
Uses `kind get clusters` to find existing clusters.
Returns the cluster name or None if no cluster exists.
"""
result = subprocess.run(
"kind get clusters",
shell=True,
capture_output=True,
text=True
)
if result.returncode != 0:
return None
clusters = result.stdout.strip().splitlines()
if clusters:
return clusters[0] # Return the first cluster found
return None
def create_cluster(name: str, config_file: str):
"""Create a kind cluster, or reuse an existing one.
Checks if any kind cluster already exists. If so, uses that cluster
instead of creating a new one. This allows multiple deployments to
share the same kind cluster.
Args:
name: The desired cluster name (used only if creating new)
config_file: Path to kind config file (used only if creating new)
Returns:
The name of the cluster being used (either existing or newly created)
"""
existing = get_kind_cluster()
if existing:
print(f"Using existing cluster: {existing}")
return existing
print(f"Creating new cluster: {name}")
result = _run_command(f"kind create cluster --name {name} --config {config_file}")
if result.returncode != 0:
raise DeployerException(f"kind create cluster failed: {result}")
return name
def destroy_cluster(name: str):
_run_command(f"kind delete cluster --name {name}")
def wait_for_ingress_in_kind(ingress_type="caddy"):
"""Wait for ingress controller to become ready.
Args:
ingress_type: "caddy" or "nginx" - determines which namespace and labels to check
"""
core_v1 = client.CoreV1Api()
if ingress_type == "caddy":
namespace = "caddy-system"
label_selector = "app.kubernetes.io/component=controller"
else:
namespace = "ingress-nginx"
label_selector = "app.kubernetes.io/component=controller"
for i in range(20):
warned_waiting = False
w = watch.Watch()
for event in w.stream(func=core_v1.list_namespaced_pod,
namespace=namespace,
label_selector=label_selector,
timeout_seconds=30):
if event['object'].status.container_statuses:
if event['object'].status.container_statuses[0].ready is True:
if warned_waiting:
print(f"{ingress_type.capitalize()} ingress controller is ready")
return
print(f"Waiting for {ingress_type} ingress controller to become ready...")
warned_waiting = True
error_exit(f"ERROR: Timed out waiting for {ingress_type} ingress to become ready")
def install_ingress_for_kind(ingress_type="caddy"):
"""Install ingress controller in kind cluster.
Args:
ingress_type: "caddy" or "nginx" - determines which ingress controller to install
"""
api_client = client.ApiClient()
if ingress_type == "caddy":
ingress_install = os.path.abspath(get_k8s_dir().joinpath("components", "ingress", "ingress-caddy-kind-deploy.yaml"))
if opts.o.debug:
print("Installing Caddy ingress controller in kind cluster")
else:
ingress_install = os.path.abspath(get_k8s_dir().joinpath("components", "ingress", "ingress-nginx-kind-deploy.yaml"))
if opts.o.debug:
print("Installing nginx ingress controller in kind cluster")
utils.create_from_yaml(api_client, yaml_file=ingress_install)
def load_images_into_kind(kind_cluster_name: str, image_set: Set[str]):
for image in image_set:
result = _run_command(f"kind load docker-image {image} --name {kind_cluster_name}")
if result.returncode != 0:
raise DeployerException(f"kind create cluster failed: {result}")
def pods_in_deployment(core_api: client.CoreV1Api, deployment_name: str):
pods = []
pod_response = core_api.list_namespaced_pod(namespace="default", label_selector=f"app={deployment_name}")
if opts.o.debug:
print(f"pod_response: {pod_response}")
for pod_info in pod_response.items:
pod_name = pod_info.metadata.name
pods.append(pod_name)
return pods
def containers_in_pod(core_api: client.CoreV1Api, pod_name: str):
containers = []
pod_response = core_api.read_namespaced_pod(pod_name, namespace="default")
if opts.o.debug:
print(f"pod_response: {pod_response}")
pod_containers = pod_response.spec.containers
for pod_container in pod_containers:
containers.append(pod_container.name)
return containers
def log_stream_from_string(s: str):
# Note response has to be UTF-8 encoded because the caller expects to decode it
yield ("ignore", s.encode())
def named_volumes_from_pod_files(parsed_pod_files):
# Parse the compose files looking for named volumes
named_volumes = []
for pod in parsed_pod_files:
parsed_pod_file = parsed_pod_files[pod]
if "volumes" in parsed_pod_file:
volumes = parsed_pod_file["volumes"]
for volume, value in volumes.items():
# Volume definition looks like:
# 'laconicd-data': None
named_volumes.append(volume)
return named_volumes
def get_kind_pv_bind_mount_path(volume_name: str):
return f"/mnt/{volume_name}"
def volume_mounts_for_service(parsed_pod_files, service):
result = []
# Find the service
for pod in parsed_pod_files:
parsed_pod_file = parsed_pod_files[pod]
if "services" in parsed_pod_file:
services = parsed_pod_file["services"]
for service_name in services:
if service_name == service:
service_obj = services[service_name]
if "volumes" in service_obj:
volumes = service_obj["volumes"]
for mount_string in volumes:
# Looks like: test-data:/data or test-data:/data:ro or test-data:/data:rw
if opts.o.debug:
print(f"mount_string: {mount_string}")
mount_split = mount_string.split(":")
volume_name = mount_split[0]
mount_path = mount_split[1]
mount_options = mount_split[2] if len(mount_split) == 3 else None
if opts.o.debug:
print(f"volume_name: {volume_name}")
print(f"mount path: {mount_path}")
print(f"mount options: {mount_options}")
volume_device = client.V1VolumeMount(
mount_path=mount_path,
name=volume_name,
read_only="ro" == mount_options
)
result.append(volume_device)
return result
def volumes_for_pod_files(parsed_pod_files, spec, app_name):
result = []
for pod in parsed_pod_files:
parsed_pod_file = parsed_pod_files[pod]
if "volumes" in parsed_pod_file:
volumes = parsed_pod_file["volumes"]
for volume_name in volumes.keys():
if volume_name in spec.get_configmaps():
# Set defaultMode=0o755 to make scripts executable
config_map = client.V1ConfigMapVolumeSource(name=f"{app_name}-{volume_name}", default_mode=0o755)
volume = client.V1Volume(name=volume_name, config_map=config_map)
result.append(volume)
else:
claim = client.V1PersistentVolumeClaimVolumeSource(claim_name=f"{app_name}-{volume_name}")
volume = client.V1Volume(name=volume_name, persistent_volume_claim=claim)
result.append(volume)
return result
def _get_host_paths_for_volumes(deployment_context):
return deployment_context.spec.get_volumes()
def _make_absolute_host_path(data_mount_path: Path, deployment_dir: Path) -> Path:
if os.path.isabs(data_mount_path):
return data_mount_path
else:
# Python Path voodo that looks pretty odd:
return Path.cwd().joinpath(deployment_dir.joinpath(data_mount_path)).resolve()
def _generate_kind_mounts(parsed_pod_files, deployment_dir, deployment_context):
volume_definitions = []
volume_host_path_map = _get_host_paths_for_volumes(deployment_context)
# Note these paths are relative to the location of the pod files (at present)
# So we need to fix up to make them correct and absolute because kind assumes
# relative to the cwd.
for pod in parsed_pod_files:
parsed_pod_file = parsed_pod_files[pod]
if "services" in parsed_pod_file:
services = parsed_pod_file["services"]
for service_name in services:
service_obj = services[service_name]
if "volumes" in service_obj:
volumes = service_obj["volumes"]
for mount_string in volumes:
# Looks like: test-data:/data or test-data:/data:ro or test-data:/data:rw
if opts.o.debug:
print(f"mount_string: {mount_string}")
mount_split = mount_string.split(":")
volume_name = mount_split[0]
mount_path = mount_split[1]
if opts.o.debug:
print(f"volume_name: {volume_name}")
print(f"map: {volume_host_path_map}")
print(f"mount path: {mount_path}")
if volume_name not in deployment_context.spec.get_configmaps():
if volume_host_path_map[volume_name]:
volume_definitions.append(
f" - hostPath: {_make_absolute_host_path(volume_host_path_map[volume_name], deployment_dir)}\n"
f" containerPath: {get_kind_pv_bind_mount_path(volume_name)}\n"
)
return (
"" if len(volume_definitions) == 0 else (
" extraMounts:\n"
f"{''.join(volume_definitions)}"
)
)
# TODO: decide if we need this functionality
def _generate_kind_port_mappings_from_services(parsed_pod_files):
port_definitions = []
for pod in parsed_pod_files:
parsed_pod_file = parsed_pod_files[pod]
if "services" in parsed_pod_file:
services = parsed_pod_file["services"]
for service_name in services:
service_obj = services[service_name]
if "ports" in service_obj:
ports = service_obj["ports"]
for port_string in ports:
# TODO handle the complex cases
# Looks like: 80 or something more complicated
port_definitions.append(f" - containerPort: {port_string}\n hostPort: {port_string}\n")
return (
"" if len(port_definitions) == 0 else (
" extraPortMappings:\n"
f"{''.join(port_definitions)}"
)
)
def _generate_kind_port_mappings(parsed_pod_files):
port_definitions = []
# Map port 80 (HTTP) and 443 (HTTPS) for the ingress controller
for port_string in ["80", "443"]:
port_definitions.append(f" - containerPort: {port_string}\n hostPort: {port_string}\n")
return (
"" if len(port_definitions) == 0 else (
" extraPortMappings:\n"
f"{''.join(port_definitions)}"
)
)
# Note: this makes any duplicate definition in b overwrite a
def merge_envs(a: Mapping[str, str], b: Mapping[str, str]) -> Mapping[str, str]:
result = {**a, **b}
return result
def _expand_shell_vars(raw_val: str, env_map: Mapping[str, str] = None) -> str:
# Expand docker-compose style variable substitution:
# ${VAR} - use VAR value or empty string
# ${VAR:-default} - use VAR value or default if unset/empty
# ${VAR-default} - use VAR value or default if unset
if env_map is None:
env_map = {}
if raw_val is None:
return ""
match = re.search(r"^\$\{([^}]+)\}$", raw_val)
if match:
inner = match.group(1)
# Check for default value syntax
if ":-" in inner:
var_name, default_val = inner.split(":-", 1)
return env_map.get(var_name, "") or default_val
elif "-" in inner:
var_name, default_val = inner.split("-", 1)
return env_map.get(var_name, default_val)
else:
return env_map.get(inner, "")
return raw_val
def envs_from_compose_file(compose_file_envs: Mapping[str, str], env_map: Mapping[str, str] = None) -> Mapping[str, str]:
result = {}
for env_var, env_val in compose_file_envs.items():
expanded_env_val = _expand_shell_vars(env_val, env_map)
result.update({env_var: expanded_env_val})
return result
def envs_from_environment_variables_map(map: Mapping[str, str]) -> List[client.V1EnvVar]:
result = []
for env_var, env_val in map.items():
result.append(client.V1EnvVar(env_var, env_val))
return result
# This needs to know:
# The service ports for the cluster
# The bind mounted volumes for the cluster
#
# Make ports like this:
# extraPortMappings:
# - containerPort: 80
# hostPort: 80
# # optional: set the bind address on the host
# # 0.0.0.0 is the current default
# listenAddress: "127.0.0.1"
# # optional: set the protocol to one of TCP, UDP, SCTP.
# # TCP is the default
# protocol: TCP
# Make bind mounts like this:
# extraMounts:
# - hostPath: /path/to/my/files
# containerPath: /files
def generate_kind_config(deployment_dir: Path, deployment_context):
compose_file_dir = deployment_dir.joinpath("compose")
# TODO: this should come from the stack file, not this way
pod_files = [p for p in compose_file_dir.iterdir() if p.is_file()]
parsed_pod_files_map = parsed_pod_files_map_from_file_names(pod_files)
port_mappings_yml = _generate_kind_port_mappings(parsed_pod_files_map)
mounts_yml = _generate_kind_mounts(parsed_pod_files_map, deployment_dir, deployment_context)
return (
"kind: Cluster\n"
"apiVersion: kind.x-k8s.io/v1alpha4\n"
"nodes:\n"
"- role: control-plane\n"
" kubeadmConfigPatches:\n"
" - |\n"
" kind: InitConfiguration\n"
" nodeRegistration:\n"
" kubeletExtraArgs:\n"
" node-labels: \"ingress-ready=true\"\n"
f"{port_mappings_yml}\n"
f"{mounts_yml}\n"
)