From 6923e1c23bfada209389fb32ea7686ddbd3c335f Mon Sep 17 00:00:00 2001 From: "A. F. Dudley" Date: Fri, 20 Mar 2026 15:20:50 +0000 Subject: [PATCH] refactor: extract methods from K8sDeployer.up to fix C901 complexity Split up() into _setup_cluster(), _create_ingress(), _create_nodeports(). Reduces cyclomatic complexity below the flake8 threshold. Co-Authored-By: Claude Opus 4.6 (1M context) --- stack_orchestrator/deploy/k8s/deploy_k8s.py | 113 ++++++++++---------- 1 file changed, 54 insertions(+), 59 deletions(-) diff --git a/stack_orchestrator/deploy/k8s/deploy_k8s.py b/stack_orchestrator/deploy/k8s/deploy_k8s.py index eb257ef6..438c1c81 100644 --- a/stack_orchestrator/deploy/k8s/deploy_k8s.py +++ b/stack_orchestrator/deploy/k8s/deploy_k8s.py @@ -562,68 +562,40 @@ class K8sDeployer(Deployer): return cert return None - def up(self, detach, skip_cluster_management, services, image_overrides=None): - self.image_overrides = image_overrides - self.skip_cluster_management = skip_cluster_management - if not opts.o.dry_run: - if self.is_kind() and not self.skip_cluster_management: - # Create the kind cluster (or reuse existing one) - kind_config = str( - self.deployment_dir.joinpath(constants.kind_config_filename) + def _setup_cluster(self): + """Create/reuse kind cluster, load images, ensure namespace.""" + if self.is_kind() and not self.skip_cluster_management: + kind_config = str( + self.deployment_dir.joinpath(constants.kind_config_filename) + ) + actual_cluster = create_cluster(self.kind_cluster_name, kind_config) + if actual_cluster != self.kind_cluster_name: + self.kind_cluster_name = actual_cluster + # Only load locally-built images into kind + local_containers = self.deployment_context.stack.obj.get("containers", []) + if local_containers: + local_images = { + img + for img in self.cluster_info.image_set + if any(c in img for c in local_containers) + } + if local_images: + load_images_into_kind(self.kind_cluster_name, local_images) + self.connect_api() + self._ensure_namespace() + if self.is_kind() and not self.skip_cluster_management: + if not is_ingress_running(): + install_ingress_for_kind(self.cluster_info.spec.get_acme_email()) + wait_for_ingress_in_kind() + if self.cluster_info.spec.get_unlimited_memlock(): + _create_runtime_class( + constants.high_memlock_runtime, + constants.high_memlock_runtime, ) - actual_cluster = create_cluster(self.kind_cluster_name, kind_config) - if actual_cluster != self.kind_cluster_name: - # An existing cluster was found, use it instead - self.kind_cluster_name = actual_cluster - # Only load locally-built images into kind - # Registry images (docker.io, ghcr.io, etc.) will be pulled by k8s - local_containers = self.deployment_context.stack.obj.get( - "containers", [] - ) - if local_containers: - # Filter image_set to only images matching local containers - local_images = { - img - for img in self.cluster_info.image_set - if any(c in img for c in local_containers) - } - if local_images: - load_images_into_kind(self.kind_cluster_name, local_images) - # Note: if no local containers defined, all images come from registries - self.connect_api() - # Create deployment-specific namespace for resource isolation - self._ensure_namespace() - if self.is_kind() and not self.skip_cluster_management: - # Configure ingress controller (not installed by default in kind) - # Skip if already running (idempotent for shared cluster) - if not is_ingress_running(): - install_ingress_for_kind(self.cluster_info.spec.get_acme_email()) - # Wait for ingress to start - # (deployment provisioning will fail unless this is done) - wait_for_ingress_in_kind() - # Create RuntimeClass if unlimited_memlock is enabled - if self.cluster_info.spec.get_unlimited_memlock(): - _create_runtime_class( - constants.high_memlock_runtime, - constants.high_memlock_runtime, - ) - - else: - print("Dry run mode enabled, skipping k8s API connect") - - # Create registry secret if configured - from stack_orchestrator.deploy.deployment_create import create_registry_secret - - create_registry_secret( - self.cluster_info.spec, self.cluster_info.app_name, self.k8s_namespace - ) - - self._create_volume_data() - self._create_deployment() - self._create_jobs() + def _create_ingress(self): + """Create or update Ingress with TLS certificate lookup.""" http_proxy_info = self.cluster_info.spec.get_http_proxy() - # Note: we don't support tls for kind (enabling tls causes errors) use_tls = http_proxy_info and not self.is_kind() certificates = None if use_tls: @@ -669,6 +641,8 @@ class K8sDeployer(Deployer): if opts.o.debug: print("No ingress configured") + def _create_nodeports(self): + """Create or update NodePort services.""" nodeports: List[client.V1Service] = self.cluster_info.get_nodeports() for nodeport in nodeports: if opts.o.debug: @@ -696,6 +670,27 @@ class K8sDeployer(Deployer): else: raise + def up(self, detach, skip_cluster_management, services, image_overrides=None): + self.image_overrides = image_overrides + self.skip_cluster_management = skip_cluster_management + if not opts.o.dry_run: + self._setup_cluster() + else: + print("Dry run mode enabled, skipping k8s API connect") + + # Create registry secret if configured + from stack_orchestrator.deploy.deployment_create import create_registry_secret + + create_registry_secret( + self.cluster_info.spec, self.cluster_info.app_name, self.k8s_namespace + ) + + self._create_volume_data() + self._create_deployment() + self._create_jobs() + self._create_ingress() + self._create_nodeports() + # Call start() hooks — stacks can create additional k8s resources if self.deployment_context: from stack_orchestrator.deploy.deployment_create import (