k8s: add start() hook for post-deployment k8s resource creation
Some checks failed
Lint Checks / Run linter (pull_request) Successful in 2m7s
Deploy Test / Run deploy test suite (pull_request) Successful in 5m5s
K8s Deploy Test / Run deploy test suite on kind/k8s (pull_request) Failing after 5m19s
K8s Deployment Control Test / Run deployment control suite on kind/k8s (pull_request) Failing after 6m32s
Webapp Test / Run webapp test suite (pull_request) Successful in 6m7s
Smoke Test / Run basic test suite (pull_request) Successful in 3m31s

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Prathamesh Musale 2026-03-09 09:49:24 +00:00
parent 86c11ff241
commit 8769df6c35
2 changed files with 24 additions and 0 deletions

View File

@ -265,6 +265,25 @@ def call_stack_deploy_create(deployment_context, extra_args):
imported_stack.create(deployment_context, extra_args)
def call_stack_deploy_start(deployment_context):
"""Call start() hooks after k8s deployments and jobs are created.
The start() hook receives the DeploymentContext, allowing stacks to
create additional k8s resources (Services, etc.) in the deployment namespace.
The namespace can be derived as f"laconic-{deployment_context.id}".
"""
python_file_paths = _commands_plugin_paths(deployment_context.stack.name)
for python_file_path in python_file_paths:
if python_file_path.exists():
spec = util.spec_from_file_location("commands", python_file_path)
if spec is None or spec.loader is None:
continue
imported_stack = util.module_from_spec(spec)
spec.loader.exec_module(imported_stack)
if _has_method(imported_stack, "start"):
imported_stack.start(deployment_context)
# Inspect the pod yaml to find config files referenced in subdirectories
# other than the one associated with the pod
def _find_extra_config_dirs(parsed_pod_file, pod):

View File

@ -461,6 +461,11 @@ class K8sDeployer(Deployer):
print("NodePort created:")
print(f"{nodeport_resp}")
# Call start() hooks — stacks can create additional k8s resources
if self.deployment_context:
from stack_orchestrator.deploy.deployment_create import call_stack_deploy_start
call_stack_deploy_start(self.deployment_context)
def down(self, timeout, volumes, skip_cluster_management):
self.skip_cluster_management = skip_cluster_management
self.connect_api()