Add $generate:type:length$ token support for K8s secrets

- Add GENERATE_TOKEN_PATTERN to detect $generate:hex:N$ and $generate:base64:N$ tokens
- Add _generate_and_store_secrets() to create K8s Secrets from spec.yml config
- Modify _write_config_file() to separate secrets from regular config
- Add env_from with secretRef to container spec in cluster_info.py
- Secrets are injected directly into containers via K8s native mechanism

This enables declarative secret generation in spec.yml:
  config:
    SESSION_SECRET: $generate:hex:32$
    DB_PASSWORD: $generate:hex:16$

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
A. F. Dudley 2026-02-03 00:55:14 -05:00
parent 2d3721efa4
commit ca090d2cd5
2 changed files with 109 additions and 8 deletions

View File

@ -16,6 +16,8 @@
import click
from importlib import util
import os
import re
import base64
from pathlib import Path
from typing import List, Optional
import random
@ -484,15 +486,99 @@ def init_operation(
get_yaml().dump(spec_file_content, output_file)
def _write_config_file(spec_file: Path, config_env_file: Path):
# Token pattern: $generate:hex:32$ or $generate:base64:16$
GENERATE_TOKEN_PATTERN = re.compile(r"\$generate:(\w+):(\d+)\$")
def _generate_and_store_secrets(config_vars: dict, deployment_name: str):
"""Generate secrets for $generate:...$ tokens and store in K8s Secret.
Called by `deploy create` - generates fresh secrets and stores them.
Returns the generated secrets dict for reference.
"""
from kubernetes import client, config as k8s_config
secrets = {}
for name, value in config_vars.items():
if not isinstance(value, str):
continue
match = GENERATE_TOKEN_PATTERN.search(value)
if not match:
continue
secret_type, length = match.group(1), int(match.group(2))
if secret_type == "hex":
secrets[name] = token_hex(length)
elif secret_type == "base64":
secrets[name] = base64.b64encode(os.urandom(length)).decode()
else:
secrets[name] = token_hex(length)
if not secrets:
return secrets
# Store in K8s Secret
try:
k8s_config.load_kube_config()
except Exception:
# Fall back to in-cluster config if available
try:
k8s_config.load_incluster_config()
except Exception:
print(
"Warning: Could not load kube config, secrets will not be stored in K8s"
)
return secrets
v1 = client.CoreV1Api()
secret_name = f"{deployment_name}-generated-secrets"
namespace = "default"
secret_data = {k: base64.b64encode(v.encode()).decode() for k, v in secrets.items()}
k8s_secret = client.V1Secret(
metadata=client.V1ObjectMeta(name=secret_name), data=secret_data, type="Opaque"
)
try:
v1.create_namespaced_secret(namespace, k8s_secret)
num_secrets = len(secrets)
print(f"Created K8s Secret '{secret_name}' with {num_secrets} secret(s)")
except client.exceptions.ApiException as e:
if e.status == 409: # Already exists
v1.replace_namespaced_secret(secret_name, namespace, k8s_secret)
num_secrets = len(secrets)
print(f"Updated K8s Secret '{secret_name}' with {num_secrets} secret(s)")
else:
raise
return secrets
def _write_config_file(
spec_file: Path, config_env_file: Path, deployment_name: Optional[str] = None
):
spec_content = get_parsed_deployment_spec(spec_file)
# Note: we want to write an empty file even if we have no config variables
config_vars = spec_content.get("config", {}) or {}
# Generate and store secrets in K8s if deployment_name provided and tokens exist
if deployment_name and config_vars:
has_generate_tokens = any(
isinstance(v, str) and GENERATE_TOKEN_PATTERN.search(v)
for v in config_vars.values()
)
if has_generate_tokens:
_generate_and_store_secrets(config_vars, deployment_name)
# Write non-secret config to config.env (exclude $generate:...$ tokens)
with open(config_env_file, "w") as output_file:
if "config" in spec_content and spec_content["config"]:
config_vars = spec_content["config"]
if config_vars:
for variable_name, variable_value in config_vars.items():
output_file.write(f"{variable_name}={variable_value}\n")
if config_vars:
for variable_name, variable_value in config_vars.items():
# Skip variables with generate tokens - they go to K8s Secret
if isinstance(variable_value, str) and GENERATE_TOKEN_PATTERN.search(
variable_value
):
continue
output_file.write(f"{variable_name}={variable_value}\n")
def _write_kube_config_file(external_path: Path, internal_path: Path):
@ -756,7 +842,11 @@ def _write_deployment_files(
_create_deployment_file(target_dir, stack_source=stack_source)
# Copy any config variables from the spec file into an env file suitable for compose
_write_config_file(spec_file, target_dir.joinpath(constants.config_file_name))
# Use stack_name as deployment_name for K8s secret naming
deployment_name = stack_name.replace("_", "-")
_write_config_file(
spec_file, target_dir.joinpath(constants.config_file_name), deployment_name
)
# Copy any k8s config file into the target dir
if deployment_type == "k8s":

View File

@ -453,6 +453,16 @@ class ClusterInfo:
if "command" in service_info:
cmd = service_info["command"]
container_args = cmd if isinstance(cmd, list) else cmd.split()
# Add env_from to pull secrets from K8s Secret
secret_name = f"{self.app_name}-generated-secrets"
env_from = [
client.V1EnvFromSource(
secret_ref=client.V1SecretEnvSource(
name=secret_name,
optional=True, # Don't fail if no secrets
)
)
]
container = client.V1Container(
name=container_name,
image=image_to_use,
@ -460,6 +470,7 @@ class ClusterInfo:
command=container_command,
args=container_args,
env=envs,
env_from=env_from,
ports=container_ports if container_ports else None,
volume_mounts=volume_mounts,
security_context=client.V1SecurityContext(