refactor: extract methods from K8sDeployer.up to fix C901 complexity

Split up() into _setup_cluster(), _create_ingress(), _create_nodeports().
Reduces cyclomatic complexity below the flake8 threshold.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
A. F. Dudley 2026-03-20 15:20:50 +00:00
parent 5b8303f8f9
commit 6923e1c23b

View File

@ -562,68 +562,40 @@ class K8sDeployer(Deployer):
return cert
return None
def up(self, detach, skip_cluster_management, services, image_overrides=None):
self.image_overrides = image_overrides
self.skip_cluster_management = skip_cluster_management
if not opts.o.dry_run:
if self.is_kind() and not self.skip_cluster_management:
# Create the kind cluster (or reuse existing one)
kind_config = str(
self.deployment_dir.joinpath(constants.kind_config_filename)
def _setup_cluster(self):
"""Create/reuse kind cluster, load images, ensure namespace."""
if self.is_kind() and not self.skip_cluster_management:
kind_config = str(
self.deployment_dir.joinpath(constants.kind_config_filename)
)
actual_cluster = create_cluster(self.kind_cluster_name, kind_config)
if actual_cluster != self.kind_cluster_name:
self.kind_cluster_name = actual_cluster
# Only load locally-built images into kind
local_containers = self.deployment_context.stack.obj.get("containers", [])
if local_containers:
local_images = {
img
for img in self.cluster_info.image_set
if any(c in img for c in local_containers)
}
if local_images:
load_images_into_kind(self.kind_cluster_name, local_images)
self.connect_api()
self._ensure_namespace()
if self.is_kind() and not self.skip_cluster_management:
if not is_ingress_running():
install_ingress_for_kind(self.cluster_info.spec.get_acme_email())
wait_for_ingress_in_kind()
if self.cluster_info.spec.get_unlimited_memlock():
_create_runtime_class(
constants.high_memlock_runtime,
constants.high_memlock_runtime,
)
actual_cluster = create_cluster(self.kind_cluster_name, kind_config)
if actual_cluster != self.kind_cluster_name:
# An existing cluster was found, use it instead
self.kind_cluster_name = actual_cluster
# Only load locally-built images into kind
# Registry images (docker.io, ghcr.io, etc.) will be pulled by k8s
local_containers = self.deployment_context.stack.obj.get(
"containers", []
)
if local_containers:
# Filter image_set to only images matching local containers
local_images = {
img
for img in self.cluster_info.image_set
if any(c in img for c in local_containers)
}
if local_images:
load_images_into_kind(self.kind_cluster_name, local_images)
# Note: if no local containers defined, all images come from registries
self.connect_api()
# Create deployment-specific namespace for resource isolation
self._ensure_namespace()
if self.is_kind() and not self.skip_cluster_management:
# Configure ingress controller (not installed by default in kind)
# Skip if already running (idempotent for shared cluster)
if not is_ingress_running():
install_ingress_for_kind(self.cluster_info.spec.get_acme_email())
# Wait for ingress to start
# (deployment provisioning will fail unless this is done)
wait_for_ingress_in_kind()
# Create RuntimeClass if unlimited_memlock is enabled
if self.cluster_info.spec.get_unlimited_memlock():
_create_runtime_class(
constants.high_memlock_runtime,
constants.high_memlock_runtime,
)
else:
print("Dry run mode enabled, skipping k8s API connect")
# Create registry secret if configured
from stack_orchestrator.deploy.deployment_create import create_registry_secret
create_registry_secret(
self.cluster_info.spec, self.cluster_info.app_name, self.k8s_namespace
)
self._create_volume_data()
self._create_deployment()
self._create_jobs()
def _create_ingress(self):
"""Create or update Ingress with TLS certificate lookup."""
http_proxy_info = self.cluster_info.spec.get_http_proxy()
# Note: we don't support tls for kind (enabling tls causes errors)
use_tls = http_proxy_info and not self.is_kind()
certificates = None
if use_tls:
@ -669,6 +641,8 @@ class K8sDeployer(Deployer):
if opts.o.debug:
print("No ingress configured")
def _create_nodeports(self):
"""Create or update NodePort services."""
nodeports: List[client.V1Service] = self.cluster_info.get_nodeports()
for nodeport in nodeports:
if opts.o.debug:
@ -696,6 +670,27 @@ class K8sDeployer(Deployer):
else:
raise
def up(self, detach, skip_cluster_management, services, image_overrides=None):
self.image_overrides = image_overrides
self.skip_cluster_management = skip_cluster_management
if not opts.o.dry_run:
self._setup_cluster()
else:
print("Dry run mode enabled, skipping k8s API connect")
# Create registry secret if configured
from stack_orchestrator.deploy.deployment_create import create_registry_secret
create_registry_secret(
self.cluster_info.spec, self.cluster_info.app_name, self.k8s_namespace
)
self._create_volume_data()
self._create_deployment()
self._create_jobs()
self._create_ingress()
self._create_nodeports()
# Call start() hooks — stacks can create additional k8s resources
if self.deployment_context:
from stack_orchestrator.deploy.deployment_create import (