Add a command to run jobs in deployments

This commit is contained in:
Prathamesh Musale 2025-12-02 11:25:22 +05:30
parent 04463d22ef
commit f2c6fb28d1
8 changed files with 293 additions and 53 deletions

View File

@ -94,6 +94,40 @@ class DockerDeployer(Deployer):
except DockerException as e:
raise DeployerException(e)
def run_job(self, job_name: str, release_name: str = None):
# release_name is ignored for Docker deployments (only used for K8s/Helm)
if not opts.o.dry_run:
try:
# Find job compose file in compose-jobs directory
# The deployment should have compose-jobs/docker-compose-<job_name>.yml
if not self.docker.compose_files:
raise DeployerException("No compose files configured")
# Deployment directory is parent of compose directory
compose_dir = Path(self.docker.compose_files[0]).parent
deployment_dir = compose_dir.parent
job_compose_file = deployment_dir / "compose-jobs" / f"docker-compose-{job_name}.yml"
if not job_compose_file.exists():
raise DeployerException(f"Job compose file not found: {job_compose_file}")
if opts.o.verbose:
print(f"Running job from: {job_compose_file}")
# Create a DockerClient for the job compose file with same project name and env file
# This allows the job to access volumes from the main deployment
job_docker = DockerClient(
compose_files=[job_compose_file],
compose_project_name=self.docker.compose_project_name,
compose_env_file=self.docker.compose_env_file
)
# Run the job with --rm flag to remove container after completion
return job_docker.compose.run(service=job_name, remove=True, tty=True)
except DockerException as e:
raise DeployerException(e)
class DockerDeployerConfigGenerator(DeployerConfigGenerator):

View File

@ -84,7 +84,22 @@ def create_deploy_context(
# Extract the cluster name from the deployment, if we have one
if deployment_context and cluster is None:
cluster = deployment_context.get_cluster_id()
cluster_context = _make_cluster_context(global_context, stack, include, exclude, cluster, env_file)
# Check if this is a helm chart deployment (has chart/ but no compose/)
# TODO: Add a new deployment type for helm chart deployments
# To avoid relying on chart existence in such cases
is_helm_chart_deployment = False
if deployment_context:
chart_dir = deployment_context.deployment_dir / "chart"
compose_dir = deployment_context.deployment_dir / "compose"
is_helm_chart_deployment = chart_dir.exists() and not compose_dir.exists()
# For helm chart deployments, skip compose file loading
if is_helm_chart_deployment:
cluster_context = ClusterContext(global_context, cluster, [], [], [], None, env_file)
else:
cluster_context = _make_cluster_context(global_context, stack, include, exclude, cluster, env_file)
deployer = getDeployer(deploy_to, deployment_context, compose_files=cluster_context.compose_files,
compose_project_name=cluster_context.cluster,
compose_env_file=cluster_context.env_file)
@ -188,6 +203,18 @@ def logs_operation(ctx, tail: int, follow: bool, extra_args: str):
print(stream_content.decode("utf-8"), end="")
def run_job_operation(ctx, job_name: str, release_name: str = None):
global_context = ctx.parent.parent.obj
if not global_context.dry_run:
print(f"Running job: {job_name}")
try:
ctx.obj.deployer.run_job(job_name, release_name)
print(f"Job {job_name} completed successfully")
except Exception as e:
print(f"Error running job {job_name}: {e}")
sys.exit(1)
@command.command()
@click.argument('extra_args', nargs=-1) # help: command: up <service1> <service2>
@click.pass_context

View File

@ -55,6 +55,10 @@ class Deployer(ABC):
def run(self, image: str, command=None, user=None, volumes=None, entrypoint=None, env={}, ports=[], detach=False):
pass
@abstractmethod
def run_job(self, job_name: str, release_name: str = None):
pass
class DeployerException(Exception):
def __init__(self, *args: object) -> None:

View File

@ -167,3 +167,14 @@ def status(ctx):
def update(ctx):
ctx.obj = make_deploy_context(ctx)
update_operation(ctx)
@command.command()
@click.argument('job_name')
@click.option('--release-name', help='Helm release name (only for k8s helm chart deployments, defaults to chart name)')
@click.pass_context
def run_job(ctx, job_name, release_name):
'''run a one-time job from the stack'''
from stack_orchestrator.deploy.deploy import run_job_operation
ctx.obj = make_deploy_context(ctx)
run_job_operation(ctx, job_name, release_name)

View File

@ -461,13 +461,6 @@ def create_operation(deployment_command_context, spec_file, deployment_dir, helm
stack_name = parsed_spec["stack"]
deployment_type = parsed_spec[constants.deploy_to_key]
# Branch to Helm chart generation flow early if --helm-chart flag is set
if deployment_type == "k8s" and helm_chart:
from stack_orchestrator.deploy.k8s.helm.chart_generator import generate_helm_chart
generate_helm_chart(stack_name, spec_file, deployment_dir)
return # Exit early, completely separate from existing k8s deployment flow
# Existing deployment flow continues unchanged
stack_file = get_stack_path(stack_name).joinpath(constants.stack_file_name)
parsed_stack = get_parsed_stack_config(stack_name)
if opts.o.debug:
@ -482,7 +475,17 @@ def create_operation(deployment_command_context, spec_file, deployment_dir, helm
# Copy spec file and the stack file into the deployment dir
copyfile(spec_file, deployment_dir_path.joinpath(constants.spec_file_name))
copyfile(stack_file, deployment_dir_path.joinpath(constants.stack_file_name))
# Create deployment.yml with cluster-id
_create_deployment_file(deployment_dir_path)
# Branch to Helm chart generation flow if --helm-chart flag is set
if deployment_type == "k8s" and helm_chart:
from stack_orchestrator.deploy.k8s.helm.chart_generator import generate_helm_chart
generate_helm_chart(stack_name, spec_file, deployment_dir_path)
return # Exit early for helm chart generation
# Existing deployment flow continues unchanged
# Copy any config varibles from the spec file into an env file suitable for compose
_write_config_file(spec_file, deployment_dir_path.joinpath(constants.config_file_name))
# Copy any k8s config file into the deployment dir

View File

@ -510,6 +510,26 @@ class K8sDeployer(Deployer):
# We need to figure out how to do this -- check why we're being called first
pass
def run_job(self, job_name: str, release_name: str = None):
if not opts.o.dry_run:
from stack_orchestrator.deploy.k8s.helm.job_runner import run_helm_job
# Check if this is a helm-based deployment
chart_dir = self.deployment_dir / "chart"
if not chart_dir.exists():
# TODO: Implement job support for compose-based K8s deployments
raise Exception(f"Job support is only available for helm-based deployments. Chart directory not found: {chart_dir}")
# Run the job using the helm job runner
run_helm_job(
chart_dir=chart_dir,
job_name=job_name,
release_name=release_name,
namespace=self.k8s_namespace,
timeout=600,
verbose=opts.o.verbose
)
def is_kind(self):
return self.type == "k8s-kind"

View File

@ -57,7 +57,9 @@ def _wrap_job_templates_with_conditionals(chart_dir: Path, jobs: list) -> None:
content = job_template_file.read_text()
# Wrap with conditional (default false)
wrapped_content = f"""{{{{- if .Values.jobs.{job_name}.enabled | default false }}}}
# Use 'index' function to handle job names with dashes
# Provide default dict for .Values.jobs to handle case where it doesn't exist
wrapped_content = f"""{{{{- if (index (.Values.jobs | default dict) "{job_name}" | default dict).enabled | default false }}}}
{content}{{{{- end }}}}
"""
@ -104,34 +106,29 @@ def _post_process_chart(chart_dir: Path, chart_name: str, jobs: list) -> None:
_wrap_job_templates_with_conditionals(chart_dir, jobs)
def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = None) -> None:
def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir_path: Path) -> None:
"""
Generate a self-sufficient Helm chart from stack compose files using Kompose.
Args:
stack_path: Path to the stack directory
spec_file: Path to the deployment spec file
deployment_dir: Optional directory for deployment output
deployment_dir_path: Deployment directory path (already created with deployment.yml)
Output structure:
deployment-dir/
spec.yml # Reference
stack.yml # Reference
chart/ # Self-sufficient Helm chart
deployment.yml # Contains cluster-id
spec.yml # Reference
stack.yml # Reference
chart/ # Self-sufficient Helm chart
Chart.yaml
README.md
templates/
*.yaml
TODO: Enhancements:
- Parse generated templates and extract values to values.yaml
- Replace hardcoded image tags with {{ .Values.image.tag }}
- Replace hardcoded PVC sizes with {{ .Values.persistence.size }}
- Convert Deployments to StatefulSets for stateful services (zenithd, postgres)
- Add _helpers.tpl with common label/selector functions
- Embed config files (scripts, templates) into ConfigMap templates
- Generate Secret templates for validator keys with placeholders
- Add init containers for genesis/config setup
- Enhance Chart.yaml with proper metadata (version, description, etc.)
"""
@ -142,35 +139,31 @@ def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = N
if not check_kompose_available():
error_exit("kompose not found in PATH.\n")
# 2. Setup deployment directory
if deployment_dir:
deployment_dir_path = Path(deployment_dir)
else:
deployment_dir_path = Path(f"{stack_name}-deployment")
# 2. Read cluster-id from deployment.yml
deployment_file = deployment_dir_path / constants.deployment_file_name
if not deployment_file.exists():
error_exit(f"Deployment file not found: {deployment_file}")
if deployment_dir_path.exists():
error_exit(f"Deployment directory already exists: {deployment_dir_path}")
yaml = get_yaml()
deployment_config = yaml.load(open(deployment_file, "r"))
cluster_id = deployment_config.get(constants.cluster_id_key)
if not cluster_id:
error_exit(f"cluster-id not found in {deployment_file}")
# 3. Derive chart name from stack name + cluster-id suffix
# Sanitize stack name for use in chart name
sanitized_stack_name = stack_name.replace("_", "-").replace(" ", "-")
# Extract hex suffix from cluster-id (after the prefix)
# cluster-id format: "laconic-<hex>" -> extract the hex part
cluster_id_suffix = cluster_id.split("-", 1)[1] if "-" in cluster_id else cluster_id
# Combine to create human-readable + unique chart name
chart_name = f"{sanitized_stack_name}-{cluster_id_suffix}"
if opts.o.debug:
print(f"Creating deployment directory: {deployment_dir_path}")
deployment_dir_path.mkdir(parents=True)
# 3. Copy spec and stack files to deployment directory (for reference)
spec_path = Path(spec_file).resolve()
if not spec_path.exists():
error_exit(f"Spec file not found: {spec_file}")
stack_file_path = get_stack_path(stack_path).joinpath(constants.stack_file_name)
if not stack_file_path.exists():
error_exit(f"Stack file not found: {stack_file_path}")
shutil.copy(spec_path, deployment_dir_path / constants.spec_file_name)
shutil.copy(stack_file_path, deployment_dir_path / constants.stack_file_name)
if opts.o.debug:
print(f"Copied spec file: {spec_path}")
print(f"Copied stack file: {stack_file_path}")
print(f"Cluster ID: {cluster_id}")
print(f"Chart name: {chart_name}")
# 4. Get compose files from stack (pods + jobs)
pods = get_pod_list(parsed_stack)
@ -179,9 +172,6 @@ def generate_helm_chart(stack_path: str, spec_file: str, deployment_dir: str = N
jobs = get_job_list(parsed_stack)
# Get clean stack name from stack.yml
chart_name = stack_name.replace("_", "-").replace(" ", "-")
if opts.o.debug:
print(f"Found {len(pods)} pod(s) in stack: {pods}")
if jobs:
@ -249,6 +239,9 @@ Generated by laconic-so from stack: `{stack_path}`
# Install the chart
helm install {chart_name} {chart_dir}
# Alternatively, install with your own release name
# helm install <your-release-name> {chart_dir}
# Check deployment status
kubectl get pods
```
@ -301,9 +294,10 @@ Edit the generated template files in `templates/` to customize:
print("\nDeployment directory structure:")
print(f" {deployment_dir_path}/")
print(" ├── spec.yml (reference)")
print(" ├── stack.yml (reference)")
print(" └── chart/ (self-sufficient Helm chart)")
print(" ├── deployment.yml (cluster-id)")
print(" ├── spec.yml (reference)")
print(" ├── stack.yml (reference)")
print(" └── chart/ (self-sufficient Helm chart)")
print("\nNext steps:")
print(" 1. Review the chart:")
@ -313,9 +307,12 @@ Edit the generated template files in `templates/` to customize:
print(" 2. Review generated templates:")
print(" ls templates/")
print("")
print(" 3. Install to Kubernetes:")
print(f" 3. Install to Kubernetes:")
print(f" helm install {chart_name} {chart_dir}")
print("")
print(f" # Or use your own release name")
print(f" helm install <your-release-name> {chart_dir}")
print("")
print(" 4. Check deployment:")
print(" kubectl get pods")
print("")

View File

@ -0,0 +1,144 @@
# Copyright © 2025 Vulcanize
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http:#www.gnu.org/licenses/>.
import subprocess
import tempfile
import os
import json
from pathlib import Path
from stack_orchestrator.util import get_yaml
def get_release_name_from_chart(chart_dir: Path) -> str:
"""
Read the chart name from Chart.yaml to use as the release name.
Args:
chart_dir: Path to the Helm chart directory
Returns:
Chart name from Chart.yaml
Raises:
Exception if Chart.yaml not found or name is missing
"""
chart_yaml_path = chart_dir / "Chart.yaml"
if not chart_yaml_path.exists():
raise Exception(f"Chart.yaml not found: {chart_yaml_path}")
yaml = get_yaml()
chart_yaml = yaml.load(open(chart_yaml_path, "r"))
if "name" not in chart_yaml:
raise Exception(f"Chart name not found in {chart_yaml_path}")
return chart_yaml["name"]
def run_helm_job(
chart_dir: Path,
job_name: str,
release_name: str = None,
namespace: str = "default",
timeout: int = 600,
verbose: bool = False
) -> None:
"""
Run a one-time job from a Helm chart.
This function:
1. Uses provided release name, or reads it from Chart.yaml if not provided
2. Uses helm template to render the job manifest with the job enabled
3. Applies the job manifest to the cluster
4. Waits for the job to complete
Args:
chart_dir: Path to the Helm chart directory
job_name: Name of the job to run (without -job suffix)
release_name: Optional Helm release name (defaults to chart name from Chart.yaml)
namespace: Kubernetes namespace
timeout: Timeout in seconds for job completion (default: 600)
verbose: Enable verbose output
Raises:
Exception if the job fails or times out
"""
if not chart_dir.exists():
raise Exception(f"Chart directory not found: {chart_dir}")
# Use provided release name, or get it from Chart.yaml
if release_name is None:
release_name = get_release_name_from_chart(chart_dir)
if verbose:
print(f"Using release name from Chart.yaml: {release_name}")
else:
if verbose:
print(f"Using provided release name: {release_name}")
job_template_file = f"templates/{job_name}-job.yaml"
if verbose:
print(f"Running job '{job_name}' from helm chart: {chart_dir}")
# Use helm template to render the job manifest
with tempfile.NamedTemporaryFile(mode='w', suffix='.yaml', delete=False) as tmp_file:
try:
# Render job template with job enabled
# Use --set-json to properly handle job names with dashes
values_json = json.dumps({"jobs": {job_name: {"enabled": True}}})
helm_cmd = [
"helm", "template", release_name, str(chart_dir),
"--show-only", job_template_file,
"--set-json", f"jobs={values_json}"
]
if verbose:
print(f"Running: {' '.join(helm_cmd)}")
result = subprocess.run(helm_cmd, check=True, capture_output=True, text=True)
tmp_file.write(result.stdout)
tmp_file.flush()
if verbose:
print(f"Generated job manifest:\n{result.stdout}")
# Apply the job manifest
kubectl_apply_cmd = ["kubectl", "apply", "-f", tmp_file.name, "-n", namespace]
subprocess.run(kubectl_apply_cmd, check=True, capture_output=True, text=True)
if verbose:
print(f"Job {job_name} created, waiting for completion...")
# Wait for job completion
job_full_name = f"{release_name}-{job_name}"
wait_cmd = [
"kubectl", "wait", "--for=condition=complete",
f"job/{job_full_name}",
f"--timeout={timeout}s",
"-n", namespace
]
subprocess.run(wait_cmd, check=True, capture_output=True, text=True)
if verbose:
print(f"Job {job_name} completed successfully")
except subprocess.CalledProcessError as e:
error_msg = e.stderr if e.stderr else str(e)
raise Exception(f"Job failed: {error_msg}")
finally:
# Clean up temp file
if os.path.exists(tmp_file.name):
os.unlink(tmp_file.name)