dify/docker/caddy/render_caddy.py
Joey Yakimowich-Payne 6038fc25f5
Add multi-site Caddy helpers and document usage
- add startup/shutdown scripts that render a Caddyfile from JSON config and run health checks

- add Python utilities and a sample sites.json for declarative multi-site configuration

- document the workflow and ignore generated Caddy state artifacts

- normalize double-quote style across challenge workflow controllers, nodes, and tests
2025-10-15 22:03:56 -06:00

296 lines
11 KiB
Python

"""Render a multi-site Caddyfile from JSON configuration.
This module is invoked by docker/startup-caddy.sh. It reads a JSON config,
expands environment variables, and writes out a Caddyfile plus site metadata
used for health checks and logging.
"""
from __future__ import annotations
import json
import os
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, Iterator, List, Tuple
from urllib.parse import urlparse
@dataclass
class SiteMetadata:
"""Compact representation of site health/check details."""
name: str
health_url: str
skip_healthcheck: bool
app_url: str
def serialize(self) -> str:
return "|".join(
[
self.name,
self.health_url,
"true" if self.skip_healthcheck else "false",
self.app_url,
]
)
DEFAULT_PATTERN = re.compile(r"\${([^}:]+):-([^}]*)}")
def _expand_string(raw: str) -> str:
def replace(match: re.Match[str]) -> str:
var, default = match.group(1), match.group(2)
current = os.environ.get(var)
return current if current not in (None, "") else default
substituted = DEFAULT_PATTERN.sub(replace, raw)
return os.path.expandvars(substituted)
def _expand(value): # type: ignore[no-untyped-def]
if isinstance(value, str):
return _expand_string(value)
if isinstance(value, list):
return [_expand(v) for v in value]
if isinstance(value, dict):
return {k: _expand(v) for k, v in value.items()}
return value
def _to_bool(value, default: bool = False) -> bool:
if value is None:
return default
if isinstance(value, bool):
return value
if isinstance(value, (int, float)):
return bool(value)
value_str = str(value).strip().lower()
if value_str in {"true", "1", "yes", "y", "on"}:
return True
if value_str in {"false", "0", "no", "n", "off"}:
return False
return default
def _iter_headers(value) -> Iterator[Tuple[str, str]]: # type: ignore[no-untyped-def]
if isinstance(value, dict):
for key, val in value.items():
if key and val is not None:
yield str(key), str(val)
return
if isinstance(value, (list, tuple)):
for item in value:
if isinstance(item, dict):
name = item.get("name") or item.get("header")
val = item.get("value")
if name and val is not None:
yield str(name), str(val)
elif isinstance(item, (list, tuple)) and len(item) == 2:
name, val = item
if name and val is not None:
yield str(name), str(val)
elif value is not None:
raise ValueError("Unsupported header specification: %r" % (value,))
def render_sites(config_path: Path, log_dir: Path) -> tuple[str, List[SiteMetadata]]:
with config_path.open("r", encoding="utf-8") as fh:
data = json.load(fh)
raw_sites = data.get("sites", [])
if not raw_sites:
raise ValueError("No sites defined in configuration file")
log_dir.mkdir(parents=True, exist_ok=True)
blocks: List[str] = []
metadata: List[SiteMetadata] = []
for raw_site in raw_sites:
site = _expand(raw_site)
name = site.get("name") or "site"
address = site.get("address")
if not address:
raise ValueError(f"Site '{name}' is missing the required 'address' field")
https_redirect = _to_bool(site.get("https_redirect"), default=False)
acme_challenge = _to_bool(site.get("acme_challenge"), default=False)
address = address.strip()
log_file = site.get("log_file")
log_path = Path(log_file) if log_file else log_dir / f"{name}-access.log"
if not log_path.is_absolute():
log_path = log_dir / log_path
log_path.parent.mkdir(parents=True, exist_ok=True)
lines: List[str] = [f"{address} {{"]
lines.append(" encode gzip")
headers = site.get("headers") or {}
if headers:
lines.append(" header {")
for key, value in headers.items():
lines.append(f' {key} "{value}"')
lines.append(" }")
health_path = site.get("health_path", "/health")
if health_path:
lines.append(f' respond {health_path} "OK" 200')
if acme_challenge:
lines.extend(
[
" handle /.well-known/acme-challenge/* {",
" file_server",
" }",
]
)
for route in site.get("api_routes") or []:
path = route.get("path")
upstream = route.get("upstream")
if not path or not upstream:
continue
headers_up = list(_iter_headers(route.get("headers_up")))
headers_down = list(_iter_headers(route.get("headers_down")))
lines.append(f" handle {path} {{")
if headers_up or headers_down:
lines.append(f" reverse_proxy {upstream} {{")
for key, val in headers_up:
lines.append(f" header_up {key} {val}")
for key, val in headers_down:
lines.append(f" header_down {key} {val}")
lines.append(" }")
else:
lines.append(f" reverse_proxy {upstream}")
lines.append(" }")
for route in site.get("static_routes") or []:
path = route.get("path")
root = route.get("root")
if not path or not root:
continue
browse = _to_bool(route.get("browse"))
lines.append(f" handle {path} {{")
lines.append(f" root * {root}")
lines.append(" file_server browse" if browse else " file_server")
lines.append(" }")
if site.get("explore_route"):
explore = site["explore_route"]
upstream = explore.get("upstream")
if upstream:
lines.append(" handle /explore* {")
lines.append(f" reverse_proxy {upstream}")
lines.append(" }")
if site.get("hooks_route"):
hook = site["hooks_route"]
path = hook.get("path", "/e/*")
upstream = hook.get("upstream")
if upstream:
headers_up = list(_iter_headers(hook.get("headers_up")))
headers_down = list(_iter_headers(hook.get("headers_down")))
lines.append(f" handle {path} {{")
if headers_up or headers_down:
lines.append(f" reverse_proxy {upstream} {{")
for key, val in headers_up:
lines.append(f" header_up {key} {val}")
for key, val in headers_down:
lines.append(f" header_down {key} {val}")
lines.append(" }")
else:
lines.append(f" reverse_proxy {upstream}")
lines.append(" }")
frontend = site.get("frontend") or {}
frontend_type = frontend.get("type", "reverse_proxy")
if frontend_type == "reverse_proxy":
upstream = frontend.get("upstream")
if upstream:
rp_lines = [" handle {"]
headers_up = list(_iter_headers(frontend.get("headers_up")))
headers_down = list(_iter_headers(frontend.get("headers_down")))
if headers_up or headers_down:
rp_lines.append(f" reverse_proxy {upstream} {{")
for key, val in headers_up:
rp_lines.append(f" header_up {key} {val}")
for key, val in headers_down:
rp_lines.append(f" header_down {key} {val}")
rp_lines.append(" }")
else:
rp_lines.append(f" reverse_proxy {upstream}")
rp_lines.append(" }")
lines.extend(rp_lines)
elif frontend_type == "static":
root = frontend.get("root")
if root:
lines.append(" handle {")
lines.append(f" root * {root}")
try_files = frontend.get("try_files") or []
if try_files:
lines.append(" try_files " + " ".join(try_files))
lines.append(" file_server")
lines.append(" }")
cache_static = site.get("cache_static") or {}
cache_paths = cache_static.get("paths") or []
cache_header = cache_static.get("header")
if cache_paths and cache_header:
lines.append(" @static {")
lines.append(" path " + " ".join(cache_paths))
lines.append(" }")
lines.append(f' header @static Cache-Control "{cache_header}"')
lines.extend(
[
" log {",
f" output file {log_path} {{",
" roll_size 100mb",
" roll_keep 10",
" roll_keep_for 720h",
" }",
" format json",
" }",
"}",
]
)
blocks.append("\n".join(lines))
host_for_redirect = ""
redirect_port = None
if not address.startswith(":"):
parsed_address = urlparse(address if "://" in address else f"https://{address}")
host_for_redirect = parsed_address.hostname or ""
redirect_port = parsed_address.port
if https_redirect and host_for_redirect:
port_segment = f":{redirect_port}" if redirect_port and redirect_port != 443 else ""
blocks.append(
"\n".join(
[
f"http://{host_for_redirect}{port_segment} {{",
f" redir https://{host_for_redirect}{{uri}}",
"}",
]
)
)
health_cfg = site.get("health_check") or {}
metadata.append(
SiteMetadata(
name=name,
health_url=health_cfg.get("url", ""),
skip_healthcheck=_to_bool(health_cfg.get("skip"), default=False),
app_url=site.get("app_url", ""),
)
)
return "\n\n".join(blocks) + "\n", metadata
def render_metadata_lines(entries: Iterable[SiteMetadata]) -> List[str]:
return [entry.serialize() for entry in entries]