diff --git a/deployments/charts/service/templates/_sidecar-helpers.tpl b/deployments/charts/service/templates/_sidecar-helpers.tpl index 809293cc9..5b830c74a 100644 --- a/deployments/charts/service/templates/_sidecar-helpers.tpl +++ b/deployments/charts/service/templates/_sidecar-helpers.tpl @@ -305,6 +305,9 @@ Authorization sidecar container imagePullPolicy: {{ .Values.sidecars.authz.imagePullPolicy }} args: - "--grpc-port={{ .Values.sidecars.authz.grpcPort }}" + {{- if .Values.services.configs.enabled }} + - "--roles-file=/etc/osmo/configs/config.yaml" + {{- else }} - "--postgres-host={{ .Values.services.postgres.serviceName }}" - "--postgres-port={{ .Values.services.postgres.port }}" - "--postgres-database={{ .Values.services.postgres.db }}" @@ -313,6 +316,7 @@ Authorization sidecar container - "--postgres-max-conns={{ .Values.sidecars.authz.postgres.maxConns }}" - "--postgres-min-conns={{ .Values.sidecars.authz.postgres.minConns }}" - "--postgres-max-conn-lifetime={{ .Values.sidecars.authz.postgres.maxConnLifetimeMin }}" + {{- end }} - "--cache-ttl={{ .Values.sidecars.authz.cache.ttl }}" - "--cache-max-size={{ .Values.sidecars.authz.cache.maxSize }}" {{- if .Values.global.logs.enabled }} @@ -340,11 +344,16 @@ Authorization sidecar container name: redis-secret key: redis-password {{- end }} - {{- if .Values.global.logs.enabled }} volumeMounts: + {{- if .Values.global.logs.enabled }} - name: logs mountPath: /logs {{- end }} + {{- if .Values.services.configs.enabled }} + - name: configs + mountPath: /etc/osmo/configs + readOnly: true + {{- end }} {{- with .Values.sidecars.authz.livenessProbe }} livenessProbe: {{- toYaml . | nindent 4 }} diff --git a/deployments/charts/service/templates/agent-service.yaml b/deployments/charts/service/templates/agent-service.yaml index 173f1018f..da3e6ba42 100644 --- a/deployments/charts/service/templates/agent-service.yaml +++ b/deployments/charts/service/templates/agent-service.yaml @@ -36,6 +36,9 @@ spec: {{- end }} annotations: {{- include "osmo.extra-annotations" .Values.services.agent | nindent 8 }} + {{- if .Values.services.configs.enabled }} + checksum/configs: {{ .Values.services.configs | toYaml | sha256sum }} + {{- end }} spec: {{- with .Values.services.agent.hostAliases }} hostAliases: @@ -214,6 +217,11 @@ spec: - name: logs emptyDir: {} {{- end}} + {{- if .Values.services.configs.enabled }} + - name: configs + configMap: + name: {{ .Values.services.service.serviceName }}-configs + {{- end }} {{- if .Values.services.configFile.enabled}} - configMap: defaultMode: 420 diff --git a/deployments/charts/service/templates/api-service.yaml b/deployments/charts/service/templates/api-service.yaml index 4f9caf6d8..13f19f138 100644 --- a/deployments/charts/service/templates/api-service.yaml +++ b/deployments/charts/service/templates/api-service.yaml @@ -35,6 +35,9 @@ spec: {{- end }} annotations: checksum/envoy-config: {{ .Values.sidecars.envoy | toYaml | sha256sum }} + {{- if .Values.services.configs.enabled }} + checksum/configs: {{ .Values.services.configs | toYaml | sha256sum }} + {{- end }} {{- include "osmo.extra-annotations" .Values.services.service | nindent 8 }} spec: {{- with .Values.services.service.hostAliases }} @@ -149,6 +152,10 @@ spec: - --default_admin_username - {{ .Values.services.defaultAdmin.username | quote }} {{- end }} + {{- if .Values.services.configs.enabled }} + - --config_file + - /etc/osmo/configs/config.yaml + {{- end }} {{- range $arg := .Values.services.service.extraArgs }} - {{ $arg | quote }} {{- end }} @@ -192,7 +199,7 @@ spec: ports: - name: metrics containerPort: 9464 - {{- if or .Values.services.configFile.enabled .Values.global.logs.enabled .Values.services.service.extraVolumeMounts }} + {{- if or .Values.services.configFile.enabled .Values.global.logs.enabled .Values.services.configs.enabled .Values.services.service.extraVolumeMounts }} volumeMounts: {{- end }} {{- if .Values.services.configFile.enabled}} @@ -200,6 +207,16 @@ spec: name: mek-volume subPath: mek.yaml {{- end }} + {{- if .Values.services.configs.enabled }} + - name: configs + mountPath: /etc/osmo/configs + readOnly: true + {{- range .Values.services.configs.secretRefs }} + - name: secret-{{ .secretName }} + mountPath: /etc/osmo/secrets/{{ .secretName }} + readOnly: true + {{- end }} + {{- end }} {{- if .Values.global.logs.enabled }} - name: logs mountPath: /logs @@ -262,6 +279,16 @@ spec: name: mek-config name: mek-volume {{- end}} + {{- if .Values.services.configs.enabled }} + - name: configs + configMap: + name: {{ .Values.services.service.serviceName }}-configs + {{- range .Values.services.configs.secretRefs }} + - name: secret-{{ .secretName }} + secret: + secretName: {{ .secretName }} + {{- end }} + {{- end }} --- {{- if .Values.sidecars.envoy.enabled }} diff --git a/deployments/charts/service/templates/configs.yaml b/deployments/charts/service/templates/configs.yaml new file mode 100644 index 000000000..1a1a0b98e --- /dev/null +++ b/deployments/charts/service/templates/configs.yaml @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 +{{- if .Values.services.configs.enabled }} +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Values.services.service.serviceName }}-configs + labels: + app: {{ .Values.services.service.serviceName }} +data: + config.yaml: | + {{- $cfg := .Values.services.configs }} + {{- if $cfg.service }} + service: + {{- $service := deepCopy $cfg.service }} + {{- if and (not (index $service "service_base_url")) .Values.services.service.hostname }} + {{- $_ := set $service "service_base_url" (printf "https://%s" .Values.services.service.hostname) }} + {{- end }} + {{- toYaml $service | nindent 6 }} + {{- end }} + {{- if $cfg.workflow }} + workflow: + {{- toYaml $cfg.workflow | nindent 6 }} + {{- end }} + {{- if $cfg.dataset }} + dataset: + {{- toYaml $cfg.dataset | nindent 6 }} + {{- end }} + {{- if $cfg.pools }} + pools: + {{- toYaml $cfg.pools | nindent 6 }} + {{- end }} + {{- if $cfg.podTemplates }} + pod_templates: + {{- toYaml $cfg.podTemplates | nindent 6 }} + {{- end }} + {{- if $cfg.resourceValidations }} + resource_validations: + {{- toYaml $cfg.resourceValidations | nindent 6 }} + {{- end }} + {{- if $cfg.backends }} + backends: + {{- toYaml $cfg.backends | nindent 6 }} + {{- end }} + {{- if $cfg.backendTests }} + backend_tests: + {{- toYaml $cfg.backendTests | nindent 6 }} + {{- end }} + {{- if $cfg.groupTemplates }} + group_templates: + {{- toYaml $cfg.groupTemplates | nindent 6 }} + {{- end }} + {{- if $cfg.roles }} + roles: + {{- toYaml $cfg.roles | nindent 6 }} + {{- end }} +{{- end }} diff --git a/deployments/charts/service/templates/logger-service.yaml b/deployments/charts/service/templates/logger-service.yaml index 86d85ca90..ed0fe4c67 100644 --- a/deployments/charts/service/templates/logger-service.yaml +++ b/deployments/charts/service/templates/logger-service.yaml @@ -36,6 +36,9 @@ spec: {{- end }} annotations: {{- include "osmo.extra-annotations" .Values.services.logger | nindent 8 }} + {{- if .Values.services.configs.enabled }} + checksum/configs: {{ .Values.services.configs | toYaml | sha256sum }} + {{- end }} spec: {{- with .Values.services.logger.hostAliases }} hostAliases: @@ -206,6 +209,11 @@ spec: - name: logs emptyDir: {} {{- end}} + {{- if .Values.services.configs.enabled }} + - name: configs + configMap: + name: {{ .Values.services.service.serviceName }}-configs + {{- end }} {{- if .Values.services.configFile.enabled}} - configMap: defaultMode: 420 diff --git a/deployments/charts/service/values.yaml b/deployments/charts/service/values.yaml index 495e24ed3..5e7fb2f59 100644 --- a/deployments/charts/service/values.yaml +++ b/deployments/charts/service/values.yaml @@ -229,6 +229,192 @@ services: ## passwordSecretKey: password + ## Dynamic configuration loaded from ConfigMap. + ## When enabled, all configs are served from in-memory (parsed from the mounted + ## ConfigMap file). CLI/API writes return 409. The authz_sidecar reads roles + ## from the same file instead of PostgreSQL. Changes are detected via watchdog/inotify. + ## + configs: + ## Enable ConfigMap-based configuration. + ## When true: all configs served from in-memory (parsed from ConfigMap), + ## CLI/API writes return 409, authz_sidecar reads roles from file. + ## When false: configs in database, CLI/API works normally. + ## + enabled: false + + ## Service config. service_base_url is auto-derived from + ## services.service.hostname if not set here. + ## + service: {} + + ## Workflow config (limits, timeouts, image credentials). + ## Use secretName in config fields to reference K8s Secrets. + ## Each secretName must also be listed in secretRefs above. + ## + workflow: + max_num_tasks: 100 + max_exec_timeout: "30d" + default_exec_timeout: "7d" + + ## Dataset config (buckets, credentials). + ## Use credentialSecretName to reference K8s Secrets for bucket creds. + ## + dataset: {} + + ## Pod templates applied to workflow containers. + ## + podTemplates: + default_ctrl: + spec: + containers: + - name: osmo-ctrl + resources: + requests: + cpu: "1" + memory: "1Gi" + limits: + memory: "1Gi" + default_user: + spec: + containers: + - name: "{{USER_CONTAINER_NAME}}" + resources: + requests: + cpu: "{{USER_CPU}}" + memory: "{{USER_MEMORY}}" + nvidia.com/gpu: "{{USER_GPU}}" + ephemeral-storage: "{{USER_STORAGE}}" + limits: + memory: "{{USER_MEMORY}}" + nvidia.com/gpu: "{{USER_GPU}}" + ephemeral-storage: "{{USER_STORAGE}}" + + ## Resource validation rules. + ## + resourceValidations: + default_cpu: + - left_operand: cpu + operator: LE + right_operand: node_cpu + assert_message: "cpu must be <= node_cpu" + - left_operand: cpu + operator: GT + right_operand: "0" + assert_message: "cpu must be > 0" + default_memory: + - left_operand: memory + operator: LE + right_operand: node_memory + assert_message: "memory must be <= node_memory" + - left_operand: memory + operator: GT + right_operand: "0" + assert_message: "memory must be > 0" + + ## RBAC role definitions. Policies are product defaults. + ## Override external_roles per deployment to map IDP groups. + ## + roles: + osmo-admin: + description: "Admin role — full access to all resources" + policies: + - effect: Allow + actions: ["*:*"] + resources: ["*"] + external_roles: [osmo-admin] + osmo-user: + description: "User role — standard workflow and data operations" + policies: + - effect: Allow + actions: + - "app:*" + - "auth:Token" + - "credentials:*" + - "dataset:*" + - "pool:List" + - "profile:Read" + - "profile:Update" + - "resources:Read" + - "user:List" + - "workflow:Cancel" + - "workflow:Create" + - "workflow:Delete" + - "workflow:Exec" + - "workflow:List" + - "workflow:PortForward" + - "workflow:Read" + - "workflow:Rsync" + - "workflow:Update" + resources: ["*"] + external_roles: [osmo-user] + osmo-default: + description: "Default role — login, profile, health check" + policies: + - effect: Allow + actions: + - "auth:Login" + - "auth:Refresh" + - "auth:Token" + - "profile:*" + - "system:Health" + resources: ["*"] + external_roles: [] + osmo-ctrl: + description: "Controller role — internal container communication" + policies: + - effect: Allow + actions: ["internal:Logger", "internal:Router"] + resources: ["*"] + external_roles: [osmo-ctrl] + osmo-backend: + description: "Backend role — internal operator communication" + policies: + - effect: Allow + actions: ["internal:Operator", "config:Read", "pool:List"] + resources: ["*"] + external_roles: [osmo-backend] + + ## Backend cluster definitions. The "default" backend is created by + ## configure_app() on first deploy — include it here so ConfigMap mode + ## serves it from memory. + ## + backends: + default: + description: "Default backend" + scheduler_settings: + scheduler_type: kai + scheduler_name: kai-scheduler + scheduler_timeout: 30 + + ## Pool definitions. The "default" pool references the "default" backend. + ## + pools: + default: + description: "Default pool" + backend: default + default_platform: default + platforms: + default: {} + + ## Backend test definitions. + ## + backendTests: {} + + ## Group templates. + ## + groupTemplates: {} + + ## K8s Secrets to mount into the service pods. + ## Each entry generates a volume + volumeMount at /etc/osmo/secrets//. + ## The Python config loader resolves secretName references in configs at runtime. + ## + ## Example: + ## secretRefs: + ## - secretName: my-bucket-cred # mounts all keys + ## - secretName: imagepullsecret # mounts all keys + ## + secretRefs: [] + ## Redis cache service configuration ## Set enabled to false if using an external Redis deployment ## diff --git a/projects/configmap-configs.md b/projects/configmap-configs.md new file mode 100644 index 000000000..0290a4246 --- /dev/null +++ b/projects/configmap-configs.md @@ -0,0 +1,311 @@ + + +# ConfigMap-Sourced Dynamic Configuration + +**Author**: @vvnpn-nv
+**PIC**: @vvnpn-nv
+**Status**: v3 — authz_sidecar reads from file, product defaults in chart, zero DB for config + +## Overview + +Enable OSMO's configuration to be defined declaratively in Kubernetes ConfigMaps via Helm values, served from in-memory cache, and automatically reloaded on file changes. This follows the standard K8s pattern used by CoreDNS, Prometheus, NGINX Ingress, and Grafana: ConfigMap mounted as file, parsed into memory, served from memory, file watcher detects changes. + +### Motivation + +Today, after every OSMO deployment, an administrator must manually run `osmo config update` CLI commands to configure the instance. This is error-prone, not version-controlled, and doesn't fit a GitOps deployment model. + +### Problem + +- Config changes are imperative (CLI commands) rather than declarative (checked into Git) +- No way to reproduce a config state from source control +- Config drift between environments is invisible until something breaks +- New deployments require manual post-deploy setup steps + +## Architecture + +### Two Global Modes + +A single toggle `configs.enabled` controls the entire system: + +| | **ConfigMap Mode** (`enabled: true`) | **DB Mode** (`enabled: false` / absent) | +|---|---|---| +| Source of truth | ConfigMap (Helm values / GitOps) | Database (CLI/API) | +| Config writes | ALL write endpoints return 409 | Normal CLI/API behavior | +| Config reads | In-memory dict (parsed from file) | Database | +| How configs change | Edit Helm values, deploy via ArgoCD/Flux, kubelet updates mount, watchdog detects, reload | `osmo config update` / API calls | +| File watcher | Active (watchdog/inotify) | Not running | +| DB role | Runtime state only (agent heartbeats, k8s_uid) | Persistent store for everything | + +There are no per-config management modes. Either all configs come from ConfigMap or all configs come from the database. This simplifies the system and eliminates drift reconciliation (since CLI writes are fully blocked in ConfigMap mode, drift is impossible). + +### Config vs Runtime Data Separation + +Not everything in the system is configuration. Some data is generated at runtime by services or agents: + +| Data | Source | ConfigMap Mode | DB Mode | +|---|---|---|---| +| Singleton configs (service, workflow, dataset) | Admin | In-memory dict | DB | +| Backend config (scheduler_settings, node_conditions) | Admin | In-memory dict | DB | +| Backend runtime (k8s_uid, last_heartbeat, version) | Agent | DB (backends table) | DB | +| Pools, templates, validations, backend_tests | Admin | In-memory dict | DB | +| Roles + external role mappings | Admin | In-memory dict (both Python + Go) | DB | +| service_auth (RSA keys, login_info) | Service startup | DB (auto-generated) | DB | + +### Data Flow + +``` +ConfigMap Mode: + + Helm Values --> K8s ConfigMap --> Mounted File + (/etc/osmo/configs/config.yaml) + | + +------------------+------------------+ + | | + v v + Python ConfigMapWatcher Go authz_sidecar + (watchdog/inotify) (file poll 30s) + | | + v v + _parsed_configs dict FileRoleStore + (module-level, in-memory) (roles + external mappings + | + pool names in-memory) + | | + +--------------+--------------+ | + | | | v + get_configs() Backend.fetch() PodTemplate.fetch() resolveRoles() + (singleton) (config + DB (pure in-memory) (IDP group -> OSMO + runtime merge) role, in-memory) + + CLI / API --> config_service --> 409 Guard --> X (all writes blocked) + + K8s Secret --> Mounted File --> Resolved on parse --> Stored in dict + (credentials) (/etc/osmo/secrets/) +``` + +### Key Components + +| Component | File | Purpose | +|---|---|---| +| `configmap_state` | `src/utils/configmap_state.py` | Dependency-free module holding mode boolean + parsed config snapshot. Importable from both utils and service layers without circular deps. | +| `configmap_guard` | `src/service/core/config/configmap_guard.py` | 409 write protection. Single `reject_if_configmap_mode(username)` function. | +| `configmap_loader` | `src/service/core/config/configmap_loader.py` | ConfigMapWatcher class, watchdog event handler, validation, secret resolution. | +| Postgres model methods | `src/utils/connectors/postgres.py` | 13 interception points that check `configmap_state.get_snapshot()` and serve from memory. | +| `FileRoleStore` | `src/utils/roles/file_loader.go` | Go in-memory store for roles, external role mappings, and pool names. Loaded from ConfigMap file, polled for changes. | +| `authz_server` | `src/service/authz_sidecar/server/authz_server.go` | Dual-mode: file-backed (no DB) or DB-backed. Uses `FileRoleStore` for role resolution when `--roles-file` is set. | + +### How File Changes Are Detected + +**Python service**: Using the `watchdog` library (v6.0.0, already a project dependency for rsync) with inotify backend on Linux. Watches the parent directory to detect K8s ConfigMap atomic symlink swaps. 2-second debounce for rapid events. + +**Go authz_sidecar**: Polls file modification time every 30 seconds via `os.Stat()`. Reloads roles, external mappings, and pool names on change. + +### Validation + +All-or-nothing validation before applying — same pattern as `nginx -t`: + +1. Parse ConfigMap YAML +2. Resolve all secret file references +3. Validate each section by constructing Pydantic models (ServiceConfig, WorkflowConfig, etc.) +4. If ANY section fails validation: log error, keep previous config, service continues +5. If ALL pass: atomic swap of module-level dict reference + +On first deployment with bad config: service starts with DB defaults (from `configure_app()`), ConfigMap mode is NOT activated, ERROR log shows what's wrong. + +### 409 Write Protection + +In ConfigMap mode, ALL config write endpoints return HTTP 409 Conflict: + +``` +Configs are managed by ConfigMap and cannot be modified via CLI/API. +Update the Helm values and redeploy instead. +``` + +29 guard call sites across `config_service.py` (27) and `helpers.py` (2). + +### Secret Handling + +Credentials are **never stored in ConfigMap or Helm values**. Instead: + +1. K8s Secret created out-of-band (Vault, ExternalSecrets, or `kubectl create secret`) +2. Secret mounted into pod via Helm-generated volume/volumeMount +3. ConfigMap references the secret via `secretName` or `credentialSecretName` +4. Helm template transforms `secretName` to `secret_file` path +5. Loader reads the file during parse and injects resolved credentials into the in-memory dict + +Supports three secret formats: +- Simple string: `{value: "token-value"}` (e.g., Slack tokens) +- Docker registry: `.dockerconfigjson` format (auto-detected) +- YAML dict: arbitrary key-value pairs (e.g., storage credentials) + +### Backend Config + Runtime Merge + +Backend reads merge config from the in-memory dict with runtime data from DB: + +| ConfigMap | Agent Connected | Behavior | +|---|---|---| +| Declared | Yes | Config fields from dict, runtime (heartbeat, k8s_uid) from DB | +| Declared | No | Config fields from dict, runtime defaults to empty (offline) | +| Not declared | Yes | Excluded from config list (agent row exists but not managed) | + +Agent code (`service/agent/helpers.py`) is unchanged — it continues writing heartbeats and k8s_uid to the `backends` table. + +### Authz Sidecar — File-Backed Roles + +The Go authz_sidecar reads roles directly from the ConfigMap file via `FileRoleStore`, eliminating the PostgreSQL dependency in ConfigMap mode. + +**How it works:** +1. `--roles-file` flag set in Helm template when `configs.enabled=true` +2. `FileRoleStore` loads roles + `external_roles` mappings from YAML on startup +3. Builds reverse map: `externalRole -> []osmoRoleName` for fast IDP resolution +4. On each auth check, `ResolveExternalRoles()` maps JWT claims to OSMO roles in-memory +5. No `user_roles` DB table, no `SyncUserRoles` SQL — IDP groups are the source of truth +6. Pool names for RBAC evaluation also come from the file + +**User role management:** +- Users are assigned roles via IDP group membership (e.g., Azure AD groups) +- Adding a user to a role = adding them to the IDP group (done in Azure AD, not OSMO) +- The `external_roles` field in each role definition maps IDP groups to OSMO roles +- No per-user role assignments in OSMO — all role assignments are declarative via IDP + +**Helm template behavior:** +- `configs.enabled=true`: authz_sidecar gets `--roles-file`, no postgres args, ConfigMap volume mounted +- `configs.enabled=false`: authz_sidecar gets postgres args (legacy DB mode) + +### Runtime Field Injection + +`service_auth` (RSA keys) and `service_base_url` are auto-generated by `configure_app()` on startup — they are NOT in the ConfigMap. The loader injects them: + +- First load: reads from DB (configure_app has already written them) +- Subsequent reloads: carries forward from previous snapshot + +`service_base_url` is also auto-derived from `services.service.hostname` in the Helm template if not explicitly set. + +## Chart Defaults vs Per-Deployment Values + +The chart `values.yaml` ships with product defaults. Per-deployment values only need site-specific overrides. + +### Chart Defaults (values.yaml) + +| Config | Default | +|---|---| +| Workflow limits | `max_num_tasks: 100`, `max_exec_timeout: 30d`, `default_exec_timeout: 7d` | +| Pod templates | `default_ctrl` (1 CPU, 1Gi), `default_user` (templated placeholders) | +| Resource validations | `default_cpu` (LE node_cpu, GT 0), `default_memory` (LE node_memory, GT 0) | +| Roles | `osmo-admin` (wildcard), `osmo-user` (workflow/data ops), `osmo-default` (login/profile), `osmo-ctrl` (internal), `osmo-backend` (internal) | +| Backend | `default` (kai scheduler, 30s timeout) | +| Pool | `default` (references default backend, default platform) | +| `service_base_url` | Auto-derived from `services.service.hostname` | + +### Per-Deployment Overrides (site values) + +| Config | Why site-specific | +|---|---| +| `configs.enabled` | Toggle ConfigMap mode | +| Workflow limit overrides | Different limits per environment | +| `backend_images.credential` | Registry secret name varies | +| `cli_config` versions | Tied to deployed version | +| Dataset buckets + credentials | Storage paths and secrets vary | +| Additional backends | Site-specific clusters | +| `external_roles` on roles | IDP group names are org-specific | + +### Example Per-Deployment Values + +```yaml +services: + configs: + enabled: true + + workflow: + config: + max_num_tasks: 200 + max_exec_timeout: "60d" + backend_images: + credential: + secretName: imagepullsecret + secretKey: .dockerconfigjson + + service: + config: + cli_config: + min_supported_version: "6.0.0" + latest_version: "6.2.12" + + dataset: + config: + default_bucket: sandbox + buckets: + sandbox: + dataset_path: "s3://my-bucket" + region: "us-west-2" + mode: "read-write" + default_credential: + credentialSecretName: my-bucket-cred +``` + +## Testing + +### Unit Tests (23 tests) +- ConfigMap guard: 409 when active, allow when inactive, bypass for configmap-sync +- Config state: snapshot set/get, atomic swap preserves old references +- Secret resolution: success, missing file, simple string, Docker registry, secretName conversion +- Validation: valid sections, invalid items type, unknown keys, empty configs +- ConfigMapWatcher: file not found, no managed_configs, populates snapshot, injects runtime fields, validation failure preserves previous config +- Event handler: ignores unrelated events, reacts to config file events, reacts to ..data symlink events + +### Integration Tests (testcontainers PostgreSQL) +- Singleton configs served from snapshot +- Named configs (PodTemplate, ResourceValidation) served from snapshot +- 409 rejection on patch/put endpoints +- 409 bypass for configmap-sync username +- ConfigMapWatcher loads configs into snapshot + +### Go Tests +- Existing `roles_test` and `server_test` pass (DB-backed path unchanged) +- `server_integration_test` passes (testcontainers PostgreSQL) + +### E2E Tests (validated on live dev instance) + +| Test | Result | +|---|---| +| Service startup: ConfigMap loaded, mode activated, watcher started | PASS | +| Authz sidecar: 5 roles loaded from file, ConfigMap mode, migration skipped | PASS | +| No errors in either container | PASS | +| GET workflow (max_num_tasks, registry credential) | PASS | +| GET service (service_base_url auto-derived, service_auth injected) | PASS | +| GET dataset (bucket credentials resolved from K8s Secret) | PASS | +| GET pod templates (from chart defaults) | PASS | +| GET backends (default, config from memory + runtime from DB) | PASS | +| GET pools (default, status computed from DB heartbeat) | PASS | +| GET resource validations (ResourceAssertion format, HTTP 200) | PASS | +| GET roles (5 roles from chart defaults) | PASS | +| 409: PATCH workflow, PUT pod template, DELETE backend | PASS | +| 409: DELETE pool, DELETE dataset bucket, rollback | PASS | + +## Backwards Compatibility + +- Fully backwards compatible: `configs.enabled: false` (default) preserves current behavior +- No DB schema changes required +- Agent code unchanged +- CLI works normally in DB mode +- Authz sidecar falls back to DB mode when `--roles-file` is not set + +## Open Questions + +- [x] Per-config or global mode? -> Global mode (team feedback) +- [x] Polling or event watching? -> Watchdog/inotify events with 2s debounce (team feedback) +- [x] Write to DB or serve from memory? -> In-memory, standard K8s pattern (team feedback) +- [x] How to handle runtime data (heartbeats)? -> Separate: config from memory, runtime from DB +- [x] How to handle roles (authz_sidecar)? -> File-backed FileRoleStore, no DB needed +- [x] What about service_auth? -> Inject from DB on first load, carry forward on reloads +- [x] What about user role assignments? -> IDP groups are source of truth, no per-user DB state +- [x] What should be chart defaults vs per-deployment? -> Product roles/templates/validations/defaults in chart, site-specific overrides per deployment +- [ ] Should the CLI display warnings when in ConfigMap mode? +- [ ] Health check endpoint for ConfigMap mode status? diff --git a/run/e2e/test_configs.py b/run/e2e/test_configs.py new file mode 100644 index 000000000..2899287d7 --- /dev/null +++ b/run/e2e/test_configs.py @@ -0,0 +1,259 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +"""E2E tests for ConfigMap-sourced dynamic configuration. + +These tests validate the configmap_loader feature by deploying OSMO with +configs enabled via Helm values and verifying that configs are +correctly applied to the database on startup. + +Prerequisites: +- A KIND cluster with OSMO deployed using the configs values file + (run/minimal/osmo_configs_values.yaml) +- The OSMO_E2E_URL environment variable set to the service base URL + +See run/minimal/osmo_configs_values.yaml for the test values. +""" + +from typing import Any, Dict, List + +import pytest + +from run.e2e.e2e_client import OsmoE2EClient + + +CONFIGMAP_SYNC_USERNAME = 'configmap-sync' +CONFIGMAP_SYNC_TAG = 'configmap' + + +def _get_config_history( + client: OsmoE2EClient, + config_type: str, + name: str | None = None, + tags: list[str] | None = None, + limit: int = 100, +) -> List[Dict[str, Any]]: + """Fetch config history entries with optional filters.""" + params: Dict[str, Any] = { + 'config_types': config_type, + 'limit': limit, + 'order': 'DESC', + } + if name: + params['name'] = name + if tags: + params['tags'] = tags + response = client.get('/api/configs/history', params=params) + if response.status_code != 200: + return [] + return response.json().get('configs', []) + + +class TestDynamicConfigFreshDeployment: + """Scenario 1: Fresh deployment with ConfigMap configs. + + Verify that all config types defined in configs Helm values + appear in the database after startup. + """ + + def test_workflow_config_applied( + self, e2e_client: OsmoE2EClient) -> None: + """Verify workflow config from ConfigMap is applied.""" + response = e2e_client.get('/api/configs/workflow') + assert response.status_code == 200 + data = response.json() + assert data.get('max_num_tasks') == 50 + + def test_pool_created_from_configmap( + self, e2e_client: OsmoE2EClient) -> None: + """Verify the 'e2e-test' pool was created from ConfigMap.""" + response = e2e_client.get('/api/configs/pool/e2e-test') + assert response.status_code == 200 + data = response.json() + assert data.get('description') == 'E2E test pool from ConfigMap' + + def test_pod_template_created_from_configmap( + self, e2e_client: OsmoE2EClient) -> None: + """Verify pod template was created from ConfigMap.""" + response = e2e_client.get('/api/configs/pod_template/e2e-compute') + assert response.status_code == 200 + + def test_resource_validation_created_from_configmap( + self, e2e_client: OsmoE2EClient) -> None: + """Verify resource validation was created from ConfigMap.""" + response = e2e_client.get( + '/api/configs/resource_validation/e2e-cpu-limit') + assert response.status_code == 200 + + def test_backend_created_from_configmap( + self, e2e_client: OsmoE2EClient) -> None: + """Verify backend was created from ConfigMap.""" + response = e2e_client.get('/api/configs/backend/e2e-backend') + assert response.status_code == 200 + data = response.json() + assert data.get('description') == 'E2E test backend' + + def test_config_history_shows_configmap_sync( + self, e2e_client: OsmoE2EClient) -> None: + """Verify config history entries have username=configmap-sync.""" + history = _get_config_history( + e2e_client, + config_type='WORKFLOW', + tags=[CONFIGMAP_SYNC_TAG], + ) + configmap_entries = [ + entry for entry in history + if entry.get('username') == CONFIGMAP_SYNC_USERNAME + ] + assert len(configmap_entries) > 0, ( + 'No config history entries found with ' + f'username={CONFIGMAP_SYNC_USERNAME}') + + def test_config_history_has_configmap_tag( + self, e2e_client: OsmoE2EClient) -> None: + """Verify config history entries are tagged with 'configmap'.""" + history = _get_config_history( + e2e_client, + config_type='POOL', + name='e2e-test', + ) + configmap_entries = [ + entry for entry in history + if CONFIGMAP_SYNC_TAG in (entry.get('tags') or []) + ] + assert len(configmap_entries) > 0, ( + 'No config history entries found with ' + f'tag={CONFIGMAP_SYNC_TAG}') + + +class TestDynamicConfigConfigmapMode: + """Scenario 3: CLI update AFTER ConfigMap (configmap mode). + + Workflow config is deployed with managed_by=configmap. CLI changes + are ephemeral and will be overwritten on the next pod restart. + """ + + def test_workflow_overwrite_detection( + self, e2e_client: OsmoE2EClient) -> None: + """Verify workflow config matches the ConfigMap value. + + The configmap mode always overwrites on startup, so the + value should always be what the ConfigMap specifies. + """ + response = e2e_client.get('/api/configs/workflow') + assert response.status_code == 200 + data = response.json() + assert data.get('max_num_tasks') == 50 + + +class TestDynamicConfigHelmUpgrade: + """Scenario 4: ConfigMap update after CLI. + + After a helm upgrade with new values, the pod should restart + (due to checksum annotation) and apply the new config. + + NOTE: These tests verify the mechanism works by checking that + the checksum annotation triggers restarts. The actual helm + upgrade step requires kind/kubectl access. + """ + + def test_pool_listing_includes_configmap_pools( + self, e2e_client: OsmoE2EClient) -> None: + """Verify ConfigMap-defined pools appear in pool listing.""" + response = e2e_client.get('/api/configs/pool') + assert response.status_code == 200 + data = response.json() + pool_names = [pool.get('name') for pool in data] + assert 'e2e-test' in pool_names, ( + f'e2e-test pool not found in listing: {pool_names}') + + def test_backend_listing_includes_configmap_backends( + self, e2e_client: OsmoE2EClient) -> None: + """Verify ConfigMap-defined backends appear in listing.""" + response = e2e_client.get('/api/configs/backend') + assert response.status_code == 200 + data = response.json() + backend_names = [backend.get('name') for backend in data] + assert 'e2e-backend' in backend_names, ( + f'e2e-backend not found in listing: {backend_names}') + + +class TestDynamicConfigErrorResilience: + """Scenario 6: Error resilience. + + If one config type fails validation, the service should still + start and other config types should be applied correctly. + + This is tested by deploying with an intentionally invalid config + for one type and verifying other types succeed. + """ + + def test_service_healthy_despite_config_errors( + self, e2e_client: OsmoE2EClient) -> None: + """Verify the service is healthy even if some configs failed.""" + response = e2e_client.get('/health') + assert response.status_code == 200 + + def test_valid_configs_applied_despite_errors( + self, e2e_client: OsmoE2EClient) -> None: + """Verify configs that were valid were still applied.""" + response = e2e_client.get('/api/configs/workflow') + assert response.status_code == 200 + data = response.json() + assert data.get('max_num_tasks') == 50 + + +class TestDynamicConfigMultiReplica: + """Scenario 7: Multi-replica startup. + + When multiple replicas start simultaneously, only one should + apply configs (via advisory lock). Config history should not + show duplicate entries. + """ + + def test_no_duplicate_history_entries( + self, e2e_client: OsmoE2EClient) -> None: + """Verify configs are applied exactly once (no duplicates). + + Check config history for the pool created from ConfigMap. + There should be exactly one creation entry from configmap-sync. + """ + history = _get_config_history( + e2e_client, + config_type='POOL', + name='e2e-test', + tags=[CONFIGMAP_SYNC_TAG], + ) + creation_entries = [ + entry for entry in history + if entry.get('username') == CONFIGMAP_SYNC_USERNAME + ] + # With advisory lock, there should be exactly 1 entry + # per restart (not 2+ from concurrent replicas). + assert len(creation_entries) >= 1 + # If service was restarted N times, there should be at + # most N entries but never 2+ from the same startup + # cycle. We can't easily distinguish cycles, so we just + # verify no obvious duplication (same timestamps). + if len(creation_entries) > 1: + timestamps = [ + entry.get('created_at') + for entry in creation_entries + ] + unique_timestamps = set(timestamps) + # Each entry should have a unique timestamp (different restart cycles) + assert len(unique_timestamps) == len(timestamps), ( + f'Duplicate config history timestamps detected: {timestamps}') diff --git a/run/minimal/osmo_configs_values.yaml b/run/minimal/osmo_configs_values.yaml new file mode 100644 index 000000000..69803d7f1 --- /dev/null +++ b/run/minimal/osmo_configs_values.yaml @@ -0,0 +1,85 @@ +# SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# SPDX-License-Identifier: Apache-2.0 + +# Helm values for E2E testing of the dynamic config (ConfigMap loader) feature. +# Use as an overlay on top of the base osmo_values.yaml: +# helm upgrade --install osmo deployments/charts/service/ \ +# -f run/minimal/osmo_values.yaml \ +# -f run/minimal/osmo_configs_values.yaml \ +# -n osmo + +sidecars: + oauth2Proxy: + enabled: false + authz: + imagePullPolicy: IfNotPresent + +services: + service: + imagePullPolicy: IfNotPresent + worker: + imagePullPolicy: IfNotPresent + agent: + imagePullPolicy: IfNotPresent + logger: + imagePullPolicy: IfNotPresent + delayedJobMonitor: + imagePullPolicy: IfNotPresent + configs: + enabled: true + + # Singleton configs + workflow: + max_num_tasks: 50 + max_exec_timeout: "60d" + + service: + service_base_url: "http://ingress-nginx-controller.ingress-nginx.svc.cluster.local" + + # Named configs + backends: + e2e-backend: + description: "E2E test backend" + + pools: + e2e-test: + description: "E2E test pool from ConfigMap" + backend: "e2e-backend" + platforms: + default: + labels: + node_group: "compute" + + podTemplates: + e2e-compute: + spec: + containers: + - name: "{{USER_CONTAINER_NAME}}" + env: + - name: OSMO_LOGIN_DEV + value: "true" + - name: osmo-ctrl + env: + - name: OSMO_LOGIN_DEV + value: "true" + nodeSelector: + node_group: compute + + resourceValidations: + e2e-cpu-limit: + - resource: cpu + min: "1" + max: "64" diff --git a/run/minimal/osmo_values.yaml b/run/minimal/osmo_values.yaml index d27fd244e..5868edeb9 100644 --- a/run/minimal/osmo_values.yaml +++ b/run/minimal/osmo_values.yaml @@ -109,3 +109,6 @@ sidecars: rateLimit: enabled: false + +podMonitor: + enabled: false diff --git a/src/service/authz_sidecar/main.go b/src/service/authz_sidecar/main.go index e72bd6b9a..23b19a946 100644 --- a/src/service/authz_sidecar/main.go +++ b/src/service/authz_sidecar/main.go @@ -40,14 +40,18 @@ import ( ) const ( - defaultGRPCPort = 50052 - maxGRPCMsgSize = 4 * 1024 * 1024 // 4MB + defaultGRPCPort = 50052 + maxGRPCMsgSize = 4 * 1024 * 1024 // 4MB + filePollInterval = 30 * time.Second ) var ( grpcPort = flag.Int("grpc-port", defaultGRPCPort, "gRPC server port") enableReflection = flag.Bool("enable-reflection", false, "Enable gRPC reflection (for local testing only)") + rolesFile = flag.String("roles-file", "", + "Path to ConfigMap-mounted YAML file for roles. "+ + "When set, reads roles from file instead of PostgreSQL (ConfigMap mode).") // PostgreSQL flags - registered via postgres package postgresFlagPtrs = postgres.RegisterPostgresFlags() @@ -62,29 +66,22 @@ var ( func main() { flag.Parse() - // Setup structured logging using the OSMO service log format loggingConfig := loggingFlagPtrs.ToConfig() logger := logging.InitLogger("authz-sidecar", loggingConfig) - // Create PostgreSQL client - ctx := context.Background() - postgresConfig := postgresFlagPtrs.ToPostgresConfig() - pgClient, err := postgresConfig.CreateClient(logger) - if err != nil { - logger.Error("failed to create postgres client", slog.String("error", err.Error())) - os.Exit(1) - } - defer pgClient.Close() - - // Create caches - cacheConfig := cacheFlagPtrs.ToCacheConfig() - roleCache := roles.NewRoleCache(cacheConfig.MaxSize, cacheConfig.TTL, logger) - poolNameCache := roles.NewPoolNameCache(cacheConfig.TTL, logger) + var authzServer *server.AuthzServer - // Create authorization server - authzServer := server.NewAuthzServer(pgClient, roleCache, poolNameCache, logger) + if *rolesFile != "" { + // ConfigMap mode: read roles from file, no DB needed + authzServer = initFileBackedServer(*rolesFile, logger) + } else { + // DB mode: read roles from PostgreSQL (uses caches) + cacheConfig := cacheFlagPtrs.ToCacheConfig() + authzServer = initDBBackedServer(cacheConfig, logger) + } - // Migrate all roles to semantic format in the database + // Migrate roles (no-op in file-backed mode) + ctx := context.Background() if err := authzServer.MigrateRoles(ctx); err != nil { logger.Error("failed to migrate roles", slog.String("error", err.Error())) os.Exit(1) @@ -92,28 +89,10 @@ func main() { logger.Info("authz server initialized") - // Create gRPC server options - opts := []grpc.ServerOption{ - grpc.KeepaliveParams(keepalive.ServerParameters{ - Time: 60 * time.Second, - Timeout: 20 * time.Second, - }), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 30 * time.Second, - PermitWithoutStream: true, - }), - grpc.MaxRecvMsgSize(maxGRPCMsgSize), - grpc.MaxSendMsgSize(maxGRPCMsgSize), - } - - grpcServer := grpc.NewServer(opts...) - - // Register health service + grpcServer := createGRPCServer() healthServer := health.NewServer() grpc_health_v1.RegisterHealthServer(grpcServer, healthServer) healthServer.SetServingStatus("", grpc_health_v1.HealthCheckResponse_SERVING) - - // Register authorization service server.RegisterAuthzService(grpcServer, authzServer) if *enableReflection { @@ -121,12 +100,8 @@ func main() { logger.Warn("gRPC reflection enabled (not recommended for production)") } - logger.Info("authz server configured", - slog.Int("port", *grpcPort), - slog.String("postgres_host", postgresConfig.Host), - ) + logger.Info("authz server configured", slog.Int("port", *grpcPort)) - // Start gRPC server lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *grpcPort)) if err != nil { logger.Error("failed to listen", slog.String("error", err.Error())) @@ -139,3 +114,54 @@ func main() { os.Exit(1) } } + +func initFileBackedServer( + filePath string, + logger *slog.Logger, +) *server.AuthzServer { + fileStore := roles.NewFileRoleStore(filePath, logger) + if err := fileStore.Load(); err != nil { + logger.Error("failed to load roles from file", + slog.String("file", filePath), + slog.String("error", err.Error())) + os.Exit(1) + } + fileStore.Start(filePollInterval) + logger.Info("authz sidecar running in ConfigMap mode", + slog.String("roles_file", filePath)) + return server.NewFileBackedAuthzServer(fileStore, logger) +} + +func initDBBackedServer( + cacheConfig roles.CacheConfig, + logger *slog.Logger, +) *server.AuthzServer { + postgresConfig := postgresFlagPtrs.ToPostgresConfig() + pgClient, err := postgresConfig.CreateClient(logger) + if err != nil { + logger.Error("failed to create postgres client", + slog.String("error", err.Error())) + os.Exit(1) + } + roleCache := roles.NewRoleCache(cacheConfig.MaxSize, cacheConfig.TTL, logger) + poolNameCache := roles.NewPoolNameCache(cacheConfig.TTL, logger) + logger.Info("authz sidecar running in DB mode", + slog.String("postgres_host", postgresConfig.Host)) + return server.NewAuthzServer(pgClient, roleCache, poolNameCache, logger) +} + +func createGRPCServer() *grpc.Server { + opts := []grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + Time: 60 * time.Second, + Timeout: 20 * time.Second, + }), + grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: 30 * time.Second, + PermitWithoutStream: true, + }), + grpc.MaxRecvMsgSize(maxGRPCMsgSize), + grpc.MaxSendMsgSize(maxGRPCMsgSize), + } + return grpc.NewServer(opts...) +} diff --git a/src/service/authz_sidecar/server/authz_server.go b/src/service/authz_sidecar/server/authz_server.go index d31e0ffe6..1d364aef7 100644 --- a/src/service/authz_sidecar/server/authz_server.go +++ b/src/service/authz_sidecar/server/authz_server.go @@ -54,10 +54,11 @@ type AuthzServer struct { pgClient *postgres.PostgresClient roleCache *roles.RoleCache poolNameCache *roles.PoolNameCache + fileStore *roles.FileRoleStore // when set, reads from ConfigMap file instead of DB logger *slog.Logger } -// NewAuthzServer creates a new authorization server +// NewAuthzServer creates a new authorization server backed by PostgreSQL func NewAuthzServer( pgClient *postgres.PostgresClient, roleCache *roles.RoleCache, @@ -72,9 +73,27 @@ func NewAuthzServer( } } +// NewFileBackedAuthzServer creates a server that reads roles from a +// ConfigMap-mounted file instead of PostgreSQL. No DB connection needed. +// No caches needed — FileRoleStore is already in-memory. +func NewFileBackedAuthzServer( + fileStore *roles.FileRoleStore, + logger *slog.Logger, +) *AuthzServer { + return &AuthzServer{ + fileStore: fileStore, + logger: logger, + } +} + // MigrateRoles converts all legacy roles to semantic format and updates the database. // This should be called at startup to ensure all roles are in semantic format. +// Skipped when using file-backed roles (file always has semantic format). func (s *AuthzServer) MigrateRoles(ctx context.Context) error { + if s.fileStore != nil { + s.logger.Info("skipping role migration (file-backed mode)") + return nil + } // Get all role names from the database allRoleNames, err := roles.GetAllRoleNames(ctx, s.pgClient) if err != nil { @@ -153,21 +172,24 @@ func (s *AuthzServer) Check(ctx context.Context, req *envoy_service_auth_v3.Chec parseDone := time.Now() - // Sync user_roles table from external IDP roles and retrieve the user's - // complete set of assigned OSMO roles in a single atomic operation. - // This maps external role names (from the JWT) to OSMO roles via - // role_external_mappings and applies sync_mode logic (import/force). - // Skip sync for access tokens and workflow-originated requests, as their - // roles are already resolved from the access_token_roles table. + // Map external IDP roles to OSMO roles. + // In file-backed mode: pure in-memory lookup from ConfigMap data. + // In DB mode: sync user_roles table via SQL (legacy). + // Skip for access tokens and workflow requests (roles already resolved). if user != "" && tokenName == "" && workflowID == "" { - dbRoleNames, err := roles.SyncUserRoles(ctx, s.pgClient, user, roleNames, s.logger) - if err != nil { - s.logger.Error("failed to sync user roles", - slog.String("user", user), - slog.String("error", err.Error()), - ) + if s.fileStore != nil { + resolved := s.fileStore.ResolveExternalRoles(roleNames) + roleNames = append(roleNames, resolved...) + } else { + dbRoleNames, err := roles.SyncUserRoles(ctx, s.pgClient, user, roleNames, s.logger) + if err != nil { + s.logger.Error("failed to sync user roles", + slog.String("user", user), + slog.String("error", err.Error()), + ) + } + roleNames = append(roleNames, dbRoleNames...) } - roleNames = append(roleNames, dbRoleNames...) } // Add default role and deduplicate @@ -256,10 +278,15 @@ func (s *AuthzServer) Check(ctx context.Context, req *envoy_service_auth_v3.Chec return s.allowResponse(responseHeaders), nil } -// resolveRoles fetches role objects from cache/DB for the given role names. +// resolveRoles fetches role objects for the given role names. +// File-backed mode: direct in-memory lookup (no cache needed). +// DB mode: LRU cache with DB fallback. func (s *AuthzServer) resolveRoles(ctx context.Context, roleNames []string) ([]*roles.Role, error) { - cachedRoles, missingNames := s.roleCache.Get(roleNames) + if s.fileStore != nil { + return s.fileStore.GetRoles(roleNames), nil + } + cachedRoles, missingNames := s.roleCache.Get(roleNames) if len(missingNames) > 0 { dbRoles, err := roles.GetRoles(ctx, s.pgClient, missingNames, s.logger) if err != nil { @@ -270,7 +297,6 @@ func (s *AuthzServer) resolveRoles(ctx context.Context, roleNames []string) ([]* cachedRoles = append(cachedRoles, dbRoles...) } } - return cachedRoles, nil } @@ -287,17 +313,23 @@ func (s *AuthzServer) checkAccess( // hitting the database on every request. func (s *AuthzServer) computeAllowedPools( ctx context.Context, user string, userRoles []*roles.Role) []string { - allPoolNames, ok := s.poolNameCache.Get() - if !ok { - var err error - allPoolNames, err = roles.GetAllPoolNames(ctx, s.pgClient) - if err != nil { - s.logger.Error("failed to get pool names for allowed pools computation", - slog.String("user", user), - slog.String("error", err.Error())) - return []string{} + var allPoolNames []string + if s.fileStore != nil { + allPoolNames = s.fileStore.GetPoolNames() + } else { + var ok bool + allPoolNames, ok = s.poolNameCache.Get() + if !ok { + var err error + allPoolNames, err = roles.GetAllPoolNames(ctx, s.pgClient) + if err != nil { + s.logger.Error("failed to get pool names for allowed pools computation", + slog.String("user", user), + slog.String("error", err.Error())) + return []string{} + } + s.poolNameCache.Set(allPoolNames) } - s.poolNameCache.Set(allPoolNames) } allowed := roles.GetAllowedPools(userRoles, allPoolNames) diff --git a/src/service/core/config/BUILD b/src/service/core/config/BUILD index 4242f6001..ec9bee5de 100644 --- a/src/service/core/config/BUILD +++ b/src/service/core/config/BUILD @@ -1,5 +1,5 @@ """ -SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,16 +25,21 @@ osmo_py_library( srcs = [ "config_history_helpers.py", "config_service.py", + "configmap_guard.py", + "configmap_loader.py", "helpers.py", "objects.py", ], deps = [ requirement("fastapi"), requirement("pydantic"), + requirement("pyyaml"), + requirement("watchdog"), "//src/lib/utils:common", "//src/lib/utils:osmo_errors", "//src/lib/utils:config_history", "//src/service/core/workflow:workflow", + "//src/utils:configmap_state", "//src/utils/connectors", "//src/utils/job:job", "//src/utils/job:backend_job", diff --git a/src/service/core/config/config_service.py b/src/service/core/config/config_service.py index efd0bce29..6823b5b12 100644 --- a/src/service/core/config/config_service.py +++ b/src/service/core/config/config_service.py @@ -25,7 +25,9 @@ from src.lib.utils import common, osmo_errors from src.utils.job import workflow -from src.service.core.config import config_history_helpers, helpers, objects +from src.service.core.config import ( + config_history_helpers, configmap_guard, helpers, objects +) from src.service.core.workflow import ( helpers as workflow_helpers, objects as workflow_objects ) @@ -54,11 +56,19 @@ def _check_config_name(name: str, name_type: ConfigNameType): 'be alphanumeric and contain dash or underscore.' ) + +def _add_configmap_mode_flag(response_dict: Dict) -> Dict: + """Add _configmap_mode flag to GET responses when ConfigMap mode is active.""" + if configmap_guard.is_configmap_mode(): + response_dict['_configmap_mode'] = True + return response_dict + + @router.get('/api/configs/service', response_class=common.PrettyJSONResponse) def read_service_configs() -> Dict: """Read all the service configurations""" postgres = connectors.PostgresConnector.get_instance() - return postgres.get_service_configs().dict(by_alias=True) + return _add_configmap_mode_flag(postgres.get_service_configs().dict(by_alias=True)) @router.put('/api/configs/service') @@ -77,6 +87,7 @@ def patch_service_configs( username: str = fastapi.Depends(connectors.parse_username), ) -> Dict: """Patch service configurations""" + return helpers.patch_configs(request, connectors.ConfigType.SERVICE, username) @@ -84,7 +95,7 @@ def patch_service_configs( def read_workflow_configs() -> Dict: """Read all the workflow configurations""" postgres = connectors.PostgresConnector.get_instance() - return postgres.get_workflow_configs().dict(by_alias=True) + return _add_configmap_mode_flag(postgres.get_workflow_configs().dict(by_alias=True)) @router.put('/api/configs/workflow') @@ -93,6 +104,7 @@ def put_workflow_configs( username: str = fastapi.Depends(connectors.parse_username), ) -> Dict: """Put workflow configurations""" + return helpers.put_configs(request, connectors.ConfigType.WORKFLOW, username) @@ -102,6 +114,7 @@ def patch_workflow_configs( username: str = fastapi.Depends(connectors.parse_username), ) -> Dict: """Patch workflow configurations""" + return helpers.patch_configs(request, connectors.ConfigType.WORKFLOW, username) @@ -109,7 +122,7 @@ def patch_workflow_configs( def read_dataset_configs() -> Dict: """Read all the dataset configurations""" postgres = connectors.PostgresConnector.get_instance() - return postgres.get_dataset_configs().dict(by_alias=True) + return _add_configmap_mode_flag(postgres.get_dataset_configs().dict(by_alias=True)) @router.put('/api/configs/dataset') @@ -118,6 +131,7 @@ def put_dataset_configs( username: str = fastapi.Depends(connectors.parse_username), ) -> Dict: """Put dataset configurations""" + return helpers.put_configs(request, connectors.ConfigType.DATASET, username) @@ -127,6 +141,7 @@ def patch_dataset_configs( username: str = fastapi.Depends(connectors.parse_username), ) -> Dict: """Patch dataset configurations""" + return helpers.patch_configs(request, connectors.ConfigType.DATASET, username) @@ -154,6 +169,7 @@ def delete_dataset( username: str = fastapi.Depends(connectors.parse_username), ): """Delete dataset configuration for a specific bucket""" + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() try: @@ -222,6 +238,7 @@ def update_backend( username: str = fastapi.Depends(connectors.parse_username), ): """ Override the config for a specific backend. """ + configmap_guard.reject_if_configmap_mode(username) helpers.update_backend(name, request, username) @@ -239,6 +256,7 @@ def delete_backend( username: str = fastapi.Depends(connectors.parse_username), ): """Remove a backend.""" + configmap_guard.reject_if_configmap_mode(username) # TODO: Resolve race condition where a workflow is submitted between checking for # running workflow and deleting backend if not request.force: @@ -331,6 +349,7 @@ def put_pools( username: str = fastapi.Depends(connectors.parse_username), ): """ Put Pool configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() # Check all pool names in response before inserting any pool into the database @@ -366,6 +385,7 @@ def put_pools( helpers.update_backend_queues(backend) + @router.get('/api/configs/pool/{name}', response_class=common.PrettyJSONResponse) def read_pool(name: str, verbose: bool = False) -> Any: @@ -387,6 +407,7 @@ def put_pool( username: str = fastapi.Depends(connectors.parse_username), ): """ Put Pool configurations """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(name, ConfigNameType.POOL) for platform_name in request.configs.platforms.keys(): _check_config_name(platform_name, ConfigNameType.PLATFORM) @@ -426,6 +447,7 @@ def patch_pool( username: str = fastapi.Depends(connectors.parse_username), ): """ Patch Pool configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() # Check platform names if they exist in the patch if 'platforms' in request.configs_dict: @@ -476,6 +498,7 @@ def rename_pool( username: str = fastapi.Depends(connectors.parse_username), ): """ Rename Pool """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(request.new_name, ConfigNameType.POOL) postgres = connectors.PostgresConnector.get_instance() connectors.Pool.rename(postgres, name, request.new_name) @@ -501,6 +524,7 @@ def delete_pool( username: str = fastapi.Depends(connectors.parse_username), ): """ Delete Pool configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() try: pool = connectors.Pool.fetch_from_db(postgres, name) @@ -562,6 +586,7 @@ def put_platform_in_pool( username: str = fastapi.Depends(connectors.parse_username), ): """ Put Platform configurations """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(platform_name, ConfigNameType.PLATFORM) postgres = connectors.PostgresConnector.get_instance() old_platform: connectors.Platform | None = None @@ -591,6 +616,7 @@ def rename_platform_in_pool(name: str, platform_name: str, request: objects.RenamePoolPlatformRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Rename Platform """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(request.new_name, ConfigNameType.PLATFORM) postgres = connectors.PostgresConnector.get_instance() connectors.Pool.rename_platform(postgres, name, platform_name, request.new_name) @@ -622,6 +648,7 @@ def read_pod_template(name: str) -> Dict: def put_pod_templates(request: objects.PutPodTemplatesRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Set Dict of Pod Templates configurations """ + configmap_guard.reject_if_configmap_mode(username) for name in request.configs.keys(): _check_config_name(name, ConfigNameType.POD_TEMPLATE) @@ -656,6 +683,7 @@ def put_pod_template(name: str, request: objects.PutPodTemplateRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Put Pod Template configurations """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(name, ConfigNameType.POD_TEMPLATE) postgres = connectors.PostgresConnector.get_instance() old_pod_template = None @@ -690,6 +718,7 @@ def delete_pod_template( username: str = fastapi.Depends(connectors.parse_username), ): """ Delete Pod Template configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() connectors.PodTemplate.delete_from_db(postgres, name) @@ -719,6 +748,7 @@ def read_group_template(name: str) -> Dict: def put_group_templates(request: objects.PutGroupTemplatesRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Set Dict of Group Templates configurations """ + configmap_guard.reject_if_configmap_mode(username) for name in request.configs.keys(): _check_config_name(name, ConfigNameType.GROUP_TEMPLATE) @@ -740,6 +770,7 @@ def put_group_template(name: str, request: objects.PutGroupTemplateRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Put Group Template configurations """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(name, ConfigNameType.GROUP_TEMPLATE) postgres = connectors.PostgresConnector.get_instance() group_template = connectors.GroupTemplate(group_template=request.configs) @@ -760,6 +791,7 @@ def delete_group_template( username: str = fastapi.Depends(connectors.parse_username), ): """ Delete Group Template configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() connectors.GroupTemplate.delete_from_db(postgres, name) @@ -791,6 +823,7 @@ def put_resource_validations( username: str = fastapi.Depends(connectors.parse_username), ): """ Put Resource Validation configurations """ + configmap_guard.reject_if_configmap_mode(username) for name in request.configs_dict.keys(): _check_config_name(name, ConfigNameType.RESOURCE_VALIDATON) @@ -815,6 +848,7 @@ def put_resource_validation( username: str = fastapi.Depends(connectors.parse_username), ): """ Put Resource Validation configurations """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(name, ConfigNameType.RESOURCE_VALIDATON) postgres = connectors.PostgresConnector.get_instance() resource_validation = connectors.ResourceValidation( @@ -836,6 +870,7 @@ def delete_resource_validation( username: str = fastapi.Depends(connectors.parse_username), ): """Delete Resource Validation configurations""" + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() connectors.ResourceValidation.delete_from_db(postgres, name) helpers.create_resource_validation_config_history_entry( @@ -864,6 +899,7 @@ def read_role(name: str) -> connectors.Role: def put_roles(request: objects.PutRolesRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Put Roles """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() for role in request.configs: role.insert_into_db(postgres) @@ -881,6 +917,7 @@ def put_role(name: str, request: objects.PutRoleRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Patch Role configurations """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() request.configs.insert_into_db(postgres) @@ -897,6 +934,7 @@ def delete_role(name: str, request: objects.ConfigsRequest, username: str = fastapi.Depends(connectors.parse_username)): """ Delete Role """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() connectors.Role.delete_from_db(postgres, name) @@ -921,6 +959,7 @@ def put_backend_tests( username: str = fastapi.Depends(connectors.parse_username), ): """ Put backend test configurations """ + configmap_guard.reject_if_configmap_mode(username) for name in request.configs.keys(): _check_config_name(name, ConfigNameType.BACKEND_TEST) @@ -952,6 +991,7 @@ def put_backend_test( username: str = fastapi.Depends(connectors.parse_username), ): """ Put backend test configuration """ + configmap_guard.reject_if_configmap_mode(username) _check_config_name(name, ConfigNameType.BACKEND_TEST) postgres = connectors.PostgresConnector.get_instance() request.configs.insert_into_db(postgres, name) @@ -973,6 +1013,7 @@ def patch_backend_test( username: str = fastapi.Depends(connectors.parse_username), ): """ Patch backend test configuration """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() try: current_test = connectors.BackendTests.fetch_from_db(postgres, name) @@ -1008,6 +1049,7 @@ def delete_backend_test( username: str = fastapi.Depends(connectors.parse_username), ): """ Delete test configuration """ + configmap_guard.reject_if_configmap_mode(username) postgres = connectors.PostgresConnector.get_instance() connectors.BackendTests.delete_from_db(postgres, name) @@ -1053,6 +1095,8 @@ def rollback_config( username: str = fastapi.Depends(connectors.parse_username), ): """Roll back a config to a particular revision.""" + configmap_guard.reject_if_configmap_mode(username) + postgres = connectors.PostgresConnector.get_instance() # Get the config history entry for the specified revision diff --git a/src/service/core/config/configmap_guard.py b/src/service/core/config/configmap_guard.py new file mode 100644 index 000000000..3d1fb3248 --- /dev/null +++ b/src/service/core/config/configmap_guard.py @@ -0,0 +1,46 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +from src.lib.utils import osmo_errors +from src.utils import configmap_state + +CONFIGMAP_SYNC_USERNAME = 'configmap-sync' + +# Delegate state to configmap_state (dependency-free module importable +# by both the service layer and the utils/connectors layer). +set_configmap_mode = configmap_state.set_configmap_mode +is_configmap_mode = configmap_state.is_configmap_mode +set_parsed_configs = configmap_state.set_parsed_configs +get_snapshot = configmap_state.get_snapshot + + +def reject_if_configmap_mode(username: str) -> None: + """Raise 409 if ConfigMap mode is active and caller is not + configmap-sync. + + Single enforcement point for all config write protection. + In ConfigMap mode, ALL config writes are blocked — configs are + managed via GitOps/kubectl only. + """ + if username == CONFIGMAP_SYNC_USERNAME: + return + if configmap_state.is_configmap_mode(): + raise osmo_errors.OSMOUserError( + 'Configs are managed by ConfigMap and cannot be modified ' + 'via CLI/API. Update the Helm values and redeploy instead.', + status_code=409) diff --git a/src/service/core/config/configmap_loader.py b/src/service/core/config/configmap_loader.py new file mode 100644 index 000000000..bce95f083 --- /dev/null +++ b/src/service/core/config/configmap_loader.py @@ -0,0 +1,348 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +import json +import logging +import os +import threading +from typing import Any, Callable, Dict, List + +import yaml +from watchdog import events, observers + +from src.service.core.config import configmap_guard +from src.utils import connectors + + +# --------------------------------------------------------------------------- +# File event handler (watchdog) +# --------------------------------------------------------------------------- + +class ConfigFileEventHandler(events.FileSystemEventHandler): + """Watches for ConfigMap file changes with debounce. + + K8s ConfigMap volume mounts use atomic symlink swaps (..data → timestamped dir). + We watch the parent directory and filter for events affecting our config file + or the ..data symlink. + """ + + def __init__(self, config_filename: str, reload_callback: Callable): + super().__init__() + self._config_filename = config_filename + self._reload_callback = reload_callback + self._debounce_timer: threading.Timer | None = None + self._debounce_delay = 2.0 + self._lock = threading.Lock() + + def on_any_event(self, event: events.FileSystemEvent) -> None: + path = str(event.src_path) + if not (path.endswith(self._config_filename) + or '..data' in path): + return + with self._lock: + if self._debounce_timer: + self._debounce_timer.cancel() + self._debounce_timer = threading.Timer( + self._debounce_delay, self._reload_callback) + self._debounce_timer.daemon = True + self._debounce_timer.start() + + +# --------------------------------------------------------------------------- +# ConfigMap watcher +# --------------------------------------------------------------------------- + +class ConfigMapWatcher: + """Watches a ConfigMap-mounted YAML file and serves configs from memory. + + On startup: parse file → validate → populate module-level dict → start watchdog. + On file change: re-parse → validate → atomic swap of dict reference. + Configs are served from the in-memory dict. DB is only used for + backend runtime data (agent writes heartbeats to backends table). + """ + + def __init__(self, config_file_path: str, + postgres: connectors.PostgresConnector | None = None): + self._config_file_path = config_file_path + self._postgres = postgres + self._watch_directory = os.path.dirname(config_file_path) + self._config_filename = os.path.basename(config_file_path) + self._observer: Any = None + + def start(self) -> None: + """Load configs, activate ConfigMap mode, start file watcher.""" + success = self._load_and_apply() + if success: + configmap_guard.set_configmap_mode(True) + logging.info('ConfigMap mode activated — all config writes via CLI/API are blocked') + + self._observer = observers.Observer() + self._observer.schedule( + ConfigFileEventHandler(self._config_filename, self._load_and_apply), + path=self._watch_directory, + recursive=False) + self._observer.daemon = True + self._observer.start() + logging.info('Config file watcher started for %s', self._config_file_path) + + def stop(self) -> None: + if self._observer: + self._observer.stop() + self._observer.join(timeout=5) + + def _load_and_apply(self) -> bool: + """Parse, resolve secrets, validate, and swap the in-memory config dict. + + Returns True if configs were successfully loaded. + """ + try: + with open(self._config_file_path, encoding='utf-8') as f: + raw_config = yaml.safe_load(f) + except (OSError, yaml.YAMLError) as error: + logging.error( + 'Failed to read/parse config file %s: %s', + self._config_file_path, error) + return False + + if not raw_config or not isinstance(raw_config, dict): + logging.warning( + 'Config file %s is empty or invalid', + self._config_file_path) + return False + + managed_configs = raw_config + + # Resolve secret file references (reads mounted K8s Secret files) + for section in managed_configs.values(): + if isinstance(section, dict): + _resolve_secret_file_references(section) + + # Dataset-specific: default endpoint from dataset_path + dataset_section = managed_configs.get('dataset') + if isinstance(dataset_section, dict): + _default_dataset_credential_endpoints(dataset_section) + + # Validate ConfigMap-provided fields BEFORE injecting runtime + # fields. Runtime fields (service_auth) are already validated + # by configure_app(). + validation_errors = _validate_configs(managed_configs) + if validation_errors: + logging.error( + 'ConfigMap validation failed, keeping previous config: %s', + validation_errors) + return False + + # Inject runtime-generated fields (service_auth, service_base_url) + # that are not in the ConfigMap but are needed by the service. + self._inject_runtime_fields(managed_configs) + + # Atomic swap — in-flight requests holding a reference to the old + # dict continue using it; new requests get the new dict. + configmap_guard.set_parsed_configs(managed_configs) + if not configmap_guard.is_configmap_mode(): + configmap_guard.set_configmap_mode(True) + logging.info( + 'ConfigMap mode activated (deferred) — ' + 'all config writes via CLI/API are blocked') + logging.info( + 'ConfigMap configs loaded from %s', self._config_file_path) + + return True + + def _inject_runtime_fields( + self, managed_configs: Dict[str, Any], + ) -> None: + """Inject runtime-generated fields not present in ConfigMap. + + service_auth and service_base_url are auto-generated by + configure_app() on startup. On first load we read them from DB; + on subsequent reloads we carry them forward from the previous + snapshot so we never need ongoing DB reads. + """ + previous = configmap_guard.get_snapshot() + if previous is not None: + prev_service = previous.get('service', {}) + elif self._postgres is not None: + db_config = self._postgres.get_service_configs() + prev_service = db_config.plaintext_dict( + by_alias=True, exclude_unset=True) + else: + prev_service = {} + + service_config = managed_configs.setdefault('service', {}) + + for field in ('service_auth', 'service_base_url'): + if field not in service_config and field in prev_service: + service_config[field] = prev_service[field] + + +# --------------------------------------------------------------------------- +# Validation +# --------------------------------------------------------------------------- + +_EXPECTED_CONFIG_KEYS = { + 'service', 'workflow', 'dataset', 'resource_validations', 'pod_templates', + 'group_templates', 'backends', 'backend_tests', 'pools', 'roles', +} + + +def _validate_configs(managed_configs: Dict[str, Any]) -> List[str]: + """Validate ConfigMap data by constructing typed Pydantic models. + + Returns a list of error strings. Empty list means all valid. + """ + errors: List[str] = [] + + unknown_keys = set(managed_configs.keys()) - _EXPECTED_CONFIG_KEYS + for key in unknown_keys: + logging.warning('Unknown config key: %s (expected one of: %s)', + key, ', '.join(sorted(_EXPECTED_CONFIG_KEYS))) + + # Validate singleton configs by constructing Pydantic models + for config_key, config_class in [ + ('service', connectors.ServiceConfig), + ('workflow', connectors.WorkflowConfig), + ('dataset', connectors.DatasetConfig), + ]: + section = managed_configs.get(config_key) + if not section: + continue + try: + config_class(**section) + except Exception as error: # pylint: disable=broad-exception-caught + errors.append(f'{config_key}: {error}') + + # Validate named config sections are dicts + for config_key in ['resource_validations', 'pod_templates', 'group_templates', + 'backends', 'backend_tests', 'pools', 'roles']: + section = managed_configs.get(config_key) + if section is not None and not isinstance(section, dict): + errors.append( + f'{config_key}: must be a dict, got {type(section).__name__}') + + return errors + + +# --------------------------------------------------------------------------- +# Secret resolution +# --------------------------------------------------------------------------- + +def _resolve_secret_file_references(config_data: Dict[str, Any], + parent_key: str = '') -> None: + """Recursively resolve secret_file / secretName references in a config dict. + + Walks the dict tree. When it finds a dict with 'secret_file' or 'secretName': + - Reads the YAML file from the mounted K8s Secret path + - If the file contains a dict: merges the file contents into the parent dict + - If the file contains a 'value' key: replaces the entire dict with that value + """ + if not isinstance(config_data, dict): + return + + keys_to_process = list(config_data.keys()) + for key in keys_to_process: + value = config_data[key] + if not isinstance(value, dict): + continue + + secret_file_path = value.get('secret_file') + if not secret_file_path: + secret_name = value.get('secretName') + if secret_name: + secret_key = value.get('secretKey', 'cred.yaml') + secret_file_path = f'/etc/osmo/secrets/{secret_name}/{secret_key}' + + if secret_file_path: + _resolve_single_secret(config_data, key, value, secret_file_path, + f'{parent_key}.{key}' if parent_key else key) + else: + _resolve_secret_file_references(value, f'{parent_key}.{key}' if parent_key else key) + + +def _resolve_single_secret(parent_dict: Dict[str, Any], key: str, + current_value: Dict[str, Any], + secret_file_path: str, path_label: str) -> None: + """Read a secret file and replace the reference with actual values. + + Supports three formats: + 1. Simple string: {value: "..."} -> replaces dict with the string + 2. Docker registry: {auths: {registry: {username, password, auth}}} + 3. YAML dict: merges all keys into the current dict + """ + try: + with open(secret_file_path, encoding='utf-8') as secret_file: + content = secret_file.read() + except OSError as error: + logging.error('Failed to read secret file %s for %s: %s', + secret_file_path, path_label, error) + return + + try: + secret_data = json.loads(content) + except (json.JSONDecodeError, ValueError): + try: + secret_data = yaml.safe_load(content) + except yaml.YAMLError as error: + logging.error('Failed to parse secret file %s for %s: %s', + secret_file_path, path_label, error) + return + + if not isinstance(secret_data, dict): + logging.error('Secret file %s for %s does not contain a mapping', + secret_file_path, path_label) + return + + if 'value' in secret_data and len(secret_data) == 1: + parent_dict[key] = secret_data['value'] + logging.info('Loaded secret for %s from secret file', path_label) + return + + if 'auths' in secret_data: + auths = secret_data['auths'] + if isinstance(auths, dict) and auths: + registry_url = next(iter(auths)) + registry_data = auths[registry_url] + extracted = { + 'registry': registry_url, + 'username': registry_data.get('username', ''), + 'auth': registry_data.get('auth', ''), + } + current_value.pop('secret_file', None) + current_value.pop('secretName', None) + current_value.pop('secretKey', None) + current_value.update(extracted) + logging.info('Loaded Docker registry credentials for %s from %s', + path_label, registry_url) + return + + current_value.pop('secret_file', None) + current_value.pop('secretName', None) + current_value.pop('secretKey', None) + current_value.update(secret_data) + logging.info('Loaded credentials for %s from secret file', path_label) + + +def _default_dataset_credential_endpoints(config_data: Dict[str, Any]) -> None: + """Dataset-specific: default 'endpoint' from 'dataset_path' for each bucket credential.""" + buckets = config_data.get('buckets', {}) + for bucket_config in buckets.values(): + if not isinstance(bucket_config, dict): + continue + credential = bucket_config.get('default_credential') + if isinstance(credential, dict) and 'endpoint' not in credential: + credential['endpoint'] = bucket_config.get('dataset_path', '') diff --git a/src/service/core/config/helpers.py b/src/service/core/config/helpers.py index 0e0787316..56cd218f1 100644 --- a/src/service/core/config/helpers.py +++ b/src/service/core/config/helpers.py @@ -26,6 +26,7 @@ import yaml from src.lib.utils import common, osmo_errors +from src.service.core.config import configmap_guard from src.utils.job import backend_jobs, kb_objects, workflow from src.service.core.config import objects as configs_objects from src.service.core.workflow import objects @@ -104,9 +105,14 @@ def put_configs( should_serialize: Whether to serialize the config before storing. Skip serialization when rolling back a config. + Raises: + OSMOUserError(409): If the config is managed by ConfigMap in configmap mode. + Returns: Dict containing the updated configuration """ + configmap_guard.reject_if_configmap_mode(username) + postgres = connectors.PostgresConnector.get_instance() if should_serialize: updated_configs = request.configs.serialize(postgres) @@ -151,7 +157,12 @@ def patch_configs( Returns: Dict containing the updated configuration fields. + + Raises: + OSMOUserError(409): If the config is managed by ConfigMap in configmap mode. """ + configmap_guard.reject_if_configmap_mode(username) + postgres = connectors.PostgresConnector.get_instance() current_configs_dict = postgres.get_configs(config_type).plaintext_dict( by_alias=True, exclude_unset=True) @@ -211,6 +222,7 @@ def patch_configs( new_configs_dict = postgres.get_configs(config_type).dict(by_alias=True, exclude_unset=True) return {key: value for key, value in new_configs_dict.items() if key in request.configs_dict} + def backend_action_request_helper(payload: Dict[str, Any], name: str): """ Helper function that implements support for exec and portforward. """ @@ -733,7 +745,6 @@ def update_backend_tests_cronjobs(backend_name: str, current_tests: List[str], logging.info('Fetched %d test configs for backend %s', len(test_configs), backend_name, extra={'workflow_uuid': getattr(context, 'workflow_uuid', None)}) - print(test_configs) # Create SynchronizeBackendTest job with test configurations sync_job = backend_jobs.BackendSynchronizeBackendTest( backend=backend_name, diff --git a/src/service/core/config/tests/BUILD b/src/service/core/config/tests/BUILD index 408299ae5..23aa777e7 100644 --- a/src/service/core/config/tests/BUILD +++ b/src/service/core/config/tests/BUILD @@ -17,6 +17,7 @@ SPDX-License-Identifier: Apache-2.0 """ load("//bzl:py.bzl", "osmo_py_test") +load("@osmo_python_deps//:requirements.bzl", "requirement") osmo_py_test( name = "test_config_history_helpers", @@ -38,3 +39,26 @@ osmo_py_test( tags = ["requires-network"], visibility = ["//visibility:public"], ) + +osmo_py_test( + name = "test_configmap_loader_unit", + srcs = ["test_configmap_loader_unit.py"], + deps = [ + requirement("pyyaml"), + "//src/service/core/config:config", + ], +) + +osmo_py_test( + name = "test_configmap_loader_integration", + srcs = ["test_configmap_loader_integration.py"], + deps = [ + requirement("pyyaml"), + "//src/service/core/config:config", + "//src/service/core/tests:fixture", + "//src/tests/common", + ], + size = "large", + tags = ["requires-network"], + visibility = ["//visibility:public"], +) diff --git a/src/service/core/config/tests/test_configmap_loader_integration.py b/src/service/core/config/tests/test_configmap_loader_integration.py new file mode 100644 index 000000000..4d23d81a8 --- /dev/null +++ b/src/service/core/config/tests/test_configmap_loader_integration.py @@ -0,0 +1,224 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +# pylint: disable=protected-access + +import os +import tempfile +from typing import Any, Dict + +import yaml + +from src.lib.utils import osmo_errors +from src.service.core.config import ( + config_service, + configmap_guard, + configmap_loader, + objects as config_objects, +) +from src.service.core.tests import fixture +from src.tests.common import runner +from src.utils import configmap_state, connectors + + +class ConfigMapModeReadIntegrationTest(fixture.ServiceTestFixture): + """Integration tests: configs served from in-memory snapshot with real DB. + + Verifies that when ConfigMap mode is active, model methods read from + the in-memory snapshot while backend runtime data still comes from DB. + """ + + def setUp(self): + super().setUp() + configmap_state.set_configmap_mode(False) + configmap_state.set_parsed_configs(None) + + def tearDown(self): + configmap_state.set_configmap_mode(False) + configmap_state.set_parsed_configs(None) + super().tearDown() + + def _get_postgres(self) -> connectors.PostgresConnector: + return connectors.PostgresConnector.get_instance() + + def _activate_configmap_mode(self, managed_configs: Dict[str, Any]): + """Set up ConfigMap mode with the given config snapshot.""" + configmap_state.set_parsed_configs(managed_configs) + configmap_state.set_configmap_mode(True) + + # ------------------------------------------------------------------- + # Singleton configs served from snapshot + # ------------------------------------------------------------------- + + def test_workflow_config_from_snapshot(self): + """get_workflow_configs() returns data from snapshot, not DB.""" + postgres = self._get_postgres() + self._activate_configmap_mode({ + 'workflow': { + 'max_num_tasks': 999, + 'max_exec_timeout': '30d', + 'default_exec_timeout': '7d', + }, + }) + + workflow_config = postgres.get_workflow_configs() + self.assertEqual(workflow_config.max_num_tasks, 999) + + # ------------------------------------------------------------------- + # Named configs served from snapshot + # ------------------------------------------------------------------- + + def test_pod_template_from_snapshot(self): + """PodTemplate.fetch_from_db reads from snapshot.""" + postgres = self._get_postgres() + self._activate_configmap_mode({ + 'pod_templates': { + 'test_tmpl': { + 'spec': { + 'containers': [ + {'name': 'ctrl', 'image': 'test:latest'} + ], + }, + }, + }, + }) + + result = connectors.PodTemplate.fetch_from_db(postgres, 'test_tmpl') + self.assertEqual(result['spec']['containers'][0]['name'], 'ctrl') + + def test_pod_template_not_found_in_snapshot(self): + """PodTemplate.fetch_from_db raises for missing name.""" + postgres = self._get_postgres() + self._activate_configmap_mode({ + 'pod_templates': {}, + }) + + with self.assertRaises(osmo_errors.OSMOUserError): + connectors.PodTemplate.fetch_from_db(postgres, 'nonexistent') + + def test_pod_template_list_from_snapshot(self): + """PodTemplate.list_from_db returns all items from snapshot.""" + postgres = self._get_postgres() + self._activate_configmap_mode({ + 'pod_templates': { + 'tmpl_a': {'spec': {}}, + 'tmpl_b': {'spec': {}}, + }, + }) + + result = connectors.PodTemplate.list_from_db(postgres) + self.assertEqual(set(result.keys()), {'tmpl_a', 'tmpl_b'}) + + def test_resource_validation_from_snapshot(self): + """ResourceValidation.fetch_from_db reads from snapshot.""" + postgres = self._get_postgres() + self._activate_configmap_mode({ + 'resource_validations': { + 'cpu_check': [ + {'resource': 'cpu', 'operator': 'LE', + 'threshold': 'node_cpu'}, + ], + }, + }) + + result = connectors.ResourceValidation.fetch_from_db( + postgres, 'cpu_check') + self.assertEqual(len(result), 1) + # In ConfigMap mode, returns raw dicts from snapshot + self.assertEqual(result[0]['resource'], 'cpu') + + # ------------------------------------------------------------------- + # 409 rejection for all write endpoints + # ------------------------------------------------------------------- + + def test_409_on_patch_service_config(self): + """patch_service_configs returns 409 in ConfigMap mode.""" + self._activate_configmap_mode({}) + with self.assertRaises(osmo_errors.OSMOUserError) as ctx: + config_service.patch_service_configs( + request=config_objects.PatchConfigRequest( + configs_dict={'max_pod_restart_limit': '1h'}, + ), + username='test@nvidia.com', + ) + self.assertEqual(ctx.exception.status_code, 409) + + def test_409_on_put_pod_templates(self): + """put_pod_templates returns 409 in ConfigMap mode.""" + self._activate_configmap_mode({}) + with self.assertRaises(osmo_errors.OSMOUserError) as ctx: + config_service.put_pod_templates( + request=config_objects.PutPodTemplatesRequest( + configs={'test': {'spec': {}}}, + description='test', + ), + username='test@nvidia.com', + ) + self.assertEqual(ctx.exception.status_code, 409) + + def test_409_bypass_for_configmap_sync(self): + """configmap-sync user can write even in ConfigMap mode.""" + self._activate_configmap_mode({}) + # Should not raise for configmap-sync username + # (will likely fail for other reasons, but not 409) + try: + config_service.put_pod_templates( + request=config_objects.PutPodTemplatesRequest( + configs={'test': {'spec': {}}}, + description='test', + ), + username=configmap_guard.CONFIGMAP_SYNC_USERNAME, + ) + except osmo_errors.OSMOUserError as error: + # Any error other than 409 is acceptable + self.assertNotEqual(error.status_code, 409) + except (osmo_errors.OSMOUserError, osmo_errors.OSMOBackendError, + osmo_errors.OSMOServerError): + pass # Non-409 errors are fine + + # ------------------------------------------------------------------- + # ConfigMapWatcher loads configs into snapshot + # ------------------------------------------------------------------- + + def test_watcher_load_populates_snapshot(self): + """ConfigMapWatcher._load_and_apply sets the snapshot.""" + postgres = self._get_postgres() + config = { + 'pod_templates': { + 'watcher_tmpl': {'spec': {'test': True}}, + }, + } + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False) as temp_file: + yaml.dump(config, temp_file) + try: + watcher = configmap_loader.ConfigMapWatcher( + temp_file.name, postgres) + result = watcher._load_and_apply() + self.assertTrue(result) + + snapshot = configmap_state.get_snapshot() + assert snapshot is not None + self.assertIn('watcher_tmpl', + snapshot['pod_templates']) + finally: + os.unlink(temp_file.name) + + +if __name__ == '__main__': + runner.run_test() diff --git a/src/service/core/config/tests/test_configmap_loader_unit.py b/src/service/core/config/tests/test_configmap_loader_unit.py new file mode 100644 index 000000000..bcf4ef57d --- /dev/null +++ b/src/service/core/config/tests/test_configmap_loader_unit.py @@ -0,0 +1,379 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +# pylint: disable=protected-access + +import logging +import os +import tempfile +import unittest +from typing import Any, Dict +from unittest import mock + +import yaml + +from src.lib.utils import osmo_errors +from src.service.core.config import configmap_guard, configmap_loader +from src.utils import configmap_state + + +class TestConfigmapGuard(unittest.TestCase): + """Tests for the global ConfigMap mode guard.""" + + def setUp(self): + configmap_state.set_configmap_mode(False) + + def tearDown(self): + configmap_state.set_configmap_mode(False) + + def test_reject_when_configmap_mode_active(self): + configmap_state.set_configmap_mode(True) + with self.assertRaises(osmo_errors.OSMOUserError) as context: + configmap_guard.reject_if_configmap_mode('some-user') + self.assertEqual(context.exception.status_code, 409) + self.assertIn('ConfigMap', str(context.exception)) + + def test_allow_when_configmap_mode_inactive(self): + configmap_guard.reject_if_configmap_mode('some-user') + + def test_bypass_for_configmap_sync_user(self): + configmap_state.set_configmap_mode(True) + configmap_guard.reject_if_configmap_mode( + configmap_guard.CONFIGMAP_SYNC_USERNAME) + + def test_is_configmap_mode(self): + self.assertFalse(configmap_guard.is_configmap_mode()) + configmap_state.set_configmap_mode(True) + self.assertTrue(configmap_guard.is_configmap_mode()) + + +class TestConfigmapState(unittest.TestCase): + """Tests for the module-level config snapshot.""" + + def setUp(self): + configmap_state.set_parsed_configs(None) + + def tearDown(self): + configmap_state.set_parsed_configs(None) + + def test_snapshot_initially_none(self): + self.assertIsNone(configmap_state.get_snapshot()) + + def test_set_and_get_snapshot(self): + configs = {'service': {'key': 'value'}} + configmap_state.set_parsed_configs(configs) + self.assertEqual(configmap_state.get_snapshot(), configs) + + def test_atomic_swap_preserves_old_reference(self): + old: Dict[str, Any] = {'service': {'version': 1}} + configmap_state.set_parsed_configs(old) + snapshot_ref = configmap_state.get_snapshot() + assert snapshot_ref is not None + + new: Dict[str, Any] = {'service': {'version': 2}} + configmap_state.set_parsed_configs(new) + + # Old reference still valid + self.assertEqual(snapshot_ref['service']['version'], 1) + # New snapshot has new data + new_snapshot = configmap_state.get_snapshot() + assert new_snapshot is not None + self.assertEqual(new_snapshot['service']['version'], 2) + + +class TestResolveSecretFileReferences(unittest.TestCase): + """Tests for _resolve_secret_file_references (unchanged logic).""" + + def test_resolve_dataset_secret_files_success(self): + secret_data = { + 'access_key_id': 'AKIAIOSFODNN7EXAMPLE', + 'access_key': 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY', + 'region': 'us-west-2', + } + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False) as secret_file: + yaml.dump(secret_data, secret_file) + secret_path = secret_file.name + try: + config_data: Dict[str, Any] = { + 'buckets': { + 'primary': { + 'dataset_path': 's3://my-bucket', + 'default_credential': { + 'secret_file': secret_path, + }, + }, + }, + } + configmap_loader._resolve_secret_file_references(config_data) + + credential = config_data['buckets']['primary']['default_credential'] + self.assertEqual(credential['access_key_id'], 'AKIAIOSFODNN7EXAMPLE') + self.assertNotIn('secret_file', credential) + finally: + os.unlink(secret_path) + + def test_resolve_missing_secret_file(self): + config_data: Dict[str, Any] = { + 'buckets': { + 'primary': { + 'default_credential': { + 'secret_file': '/nonexistent/secret.yaml', + }, + }, + }, + } + with self.assertLogs(level=logging.ERROR): + configmap_loader._resolve_secret_file_references(config_data) + # secret_file key still present (not corrupted) + credential = config_data['buckets']['primary']['default_credential'] + self.assertIn('secret_file', credential) + + def test_resolve_simple_string_secret(self): + secret_data = {'value': 'xoxb-slack-token'} + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False) as secret_file: + yaml.dump(secret_data, secret_file) + secret_path = secret_file.name + try: + config_data: Dict[str, Any] = { + 'alerts': { + 'slack_token': {'secret_file': secret_path}, + }, + } + configmap_loader._resolve_secret_file_references(config_data) + self.assertEqual( + config_data['alerts']['slack_token'], 'xoxb-slack-token') + finally: + os.unlink(secret_path) + + def test_resolve_secret_name_converted_to_path(self): + config_data: Dict[str, Any] = { + 'credential': {'secretName': 'my-cred'}, + } + with self.assertLogs(level=logging.ERROR): + configmap_loader._resolve_secret_file_references(config_data) + + +class TestValidateConfigs(unittest.TestCase): + """Tests for _validate_configs.""" + + def test_valid_named_config_section(self): + configs: Dict[str, Any] = { + 'pod_templates': {'tmpl1': {'spec': {}}}, + } + errors = configmap_loader._validate_configs(configs) + self.assertEqual(errors, []) + + def test_invalid_section_type(self): + configs: Dict[str, Any] = { + 'pod_templates': 'not_a_dict', + } + errors = configmap_loader._validate_configs(configs) + self.assertEqual(len(errors), 1) + self.assertIn('pod_templates', errors[0]) + + def test_unknown_keys_logged(self): + configs: Dict[str, Any] = { + 'unknown_section': {'config': {}}, + } + with self.assertLogs(level=logging.WARNING) as log_context: + configmap_loader._validate_configs(configs) + self.assertTrue( + any('Unknown config key' in msg for msg in log_context.output)) + + def test_empty_configs_valid(self): + errors = configmap_loader._validate_configs({}) + self.assertEqual(errors, []) + + +class TestConfigMapWatcherLoadAndApply(unittest.TestCase): + """Tests for ConfigMapWatcher._load_and_apply.""" + + def setUp(self): + self.mock_postgres = mock.MagicMock() + configmap_state.set_configmap_mode(False) + configmap_state.set_parsed_configs(None) + + def tearDown(self): + configmap_state.set_configmap_mode(False) + configmap_state.set_parsed_configs(None) + + def _write_config_file(self, config: Dict[str, Any]) -> str: + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False) as temp: + yaml.dump(config, temp) + return temp.name + + def test_load_file_not_found(self): + watcher = configmap_loader.ConfigMapWatcher( + '/nonexistent/path.yaml', self.mock_postgres) + result = watcher._load_and_apply() + self.assertFalse(result) + self.assertIsNone(configmap_state.get_snapshot()) + + def test_load_empty_file(self): + with tempfile.NamedTemporaryFile( + mode='w', suffix='.yaml', delete=False) as temp: + temp.write('') + path = temp.name + try: + watcher = configmap_loader.ConfigMapWatcher( + path, self.mock_postgres) + result = watcher._load_and_apply() + self.assertFalse(result) + finally: + os.unlink(path) + + def test_load_valid_config_populates_snapshot(self): + config: Dict[str, Any] = { + 'pod_templates': { + 'default_ctrl': {'spec': {'containers': []}}, + }, + } + path = self._write_config_file(config) + try: + mock_service_config = mock.MagicMock() + mock_service_config.plaintext_dict.return_value = { + 'service_auth': {'keys': {}}, + 'service_base_url': 'https://example.com', + } + self.mock_postgres.get_service_configs.return_value = ( + mock_service_config) + + watcher = configmap_loader.ConfigMapWatcher( + path, self.mock_postgres) + result = watcher._load_and_apply() + self.assertTrue(result) + + snapshot = configmap_state.get_snapshot() + assert snapshot is not None + self.assertIn('pod_templates', snapshot) + self.assertIn('default_ctrl', snapshot['pod_templates']) + finally: + os.unlink(path) + + def test_load_injects_runtime_fields(self): + config: Dict[str, Any] = { + 'service': { + 'max_pod_restart_limit': '30m', + }, + } + path = self._write_config_file(config) + try: + mock_service_config = mock.MagicMock() + mock_service_config.plaintext_dict.return_value = { + 'service_auth': {'keys': {'key1': 'value1'}}, + 'service_base_url': 'https://example.com', + } + self.mock_postgres.get_service_configs.return_value = ( + mock_service_config) + + watcher = configmap_loader.ConfigMapWatcher( + path, self.mock_postgres) + result = watcher._load_and_apply() + self.assertTrue(result) + + snapshot = configmap_state.get_snapshot() + assert snapshot is not None + service_config = snapshot['service'] + self.assertEqual( + service_config['max_pod_restart_limit'], '30m') + self.assertIn('service_auth', service_config) + self.assertIn('service_base_url', service_config) + finally: + os.unlink(path) + + def test_validation_failure_keeps_previous_config(self): + # First load succeeds + good_config: Dict[str, Any] = { + 'pod_templates': {'tmpl': {'spec': {}}}, + } + good_path = self._write_config_file(good_config) + mock_service_config = mock.MagicMock() + mock_service_config.plaintext_dict.return_value = {} + self.mock_postgres.get_service_configs.return_value = ( + mock_service_config) + + watcher = configmap_loader.ConfigMapWatcher( + good_path, self.mock_postgres) + watcher._load_and_apply() + + old_snapshot = configmap_state.get_snapshot() + self.assertIsNotNone(old_snapshot) + + # Second load with invalid section type + bad_config: Dict[str, Any] = { + 'pod_templates': 'not_a_dict', + } + bad_path = self._write_config_file(bad_config) + try: + watcher2 = configmap_loader.ConfigMapWatcher( + bad_path, self.mock_postgres) + result = watcher2._load_and_apply() + self.assertFalse(result) + + # Previous snapshot preserved + self.assertIs(configmap_state.get_snapshot(), old_snapshot) + finally: + os.unlink(good_path) + os.unlink(bad_path) + + +class TestConfigFileEventHandler(unittest.TestCase): + """Tests for the watchdog event handler.""" + + def test_ignores_unrelated_events(self): + callback = mock.MagicMock() + handler = configmap_loader.ConfigFileEventHandler( + 'config.yaml', callback) + + event = mock.MagicMock() + event.src_path = '/some/other/file.txt' + handler.on_any_event(event) + + callback.assert_not_called() + + def test_reacts_to_config_file_events(self): + callback = mock.MagicMock() + handler = configmap_loader.ConfigFileEventHandler( + 'config.yaml', callback) + handler._debounce_delay = 0.01 # speed up test + + event = mock.MagicMock() + event.src_path = '/etc/osmo/config/config.yaml' + handler.on_any_event(event) + + # Timer should be set + self.assertIsNotNone(handler._debounce_timer) + + def test_reacts_to_data_symlink_events(self): + callback = mock.MagicMock() + handler = configmap_loader.ConfigFileEventHandler( + 'config.yaml', callback) + handler._debounce_delay = 0.01 + + event = mock.MagicMock() + event.src_path = '/etc/osmo/config/..data' + handler.on_any_event(event) + + self.assertIsNotNone(handler._debounce_timer) + + +if __name__ == '__main__': + unittest.main() diff --git a/src/service/core/service.py b/src/service/core/service.py index 208381bd1..63eef04c7 100644 --- a/src/service/core/service.py +++ b/src/service/core/service.py @@ -37,7 +37,7 @@ from src.service.core.app import app_service from src.service.core.auth import auth_service, objects as auth_objects from src.service.core.config import ( - config_service, helpers as config_helpers, objects as config_objects + config_service, configmap_loader, helpers as config_helpers, objects as config_objects ) from src.service.core.data import data_service, query from src.service.core.profile import profile_service @@ -466,6 +466,18 @@ def configure_app(target_app: fastapi.FastAPI, config: objects.WorkflowServiceCo set_client_install_url(postgres, config) setup_default_admin(postgres, config) + if config.config_file: + try: + _config_watcher = configmap_loader.ConfigMapWatcher( + config.config_file, postgres) + _config_watcher.start() + # Store on app state to prevent GC from killing the watcher + target_app.state.config_watcher = _config_watcher + except Exception: + logging.exception( + 'Failed to start config watcher — ' + 'service will continue without ConfigMap management') + # Instantiate QueryParser query.QueryParser() diff --git a/src/service/core/workflow/objects.py b/src/service/core/workflow/objects.py index 51a535cc8..2ebc29633 100644 --- a/src/service/core/workflow/objects.py +++ b/src/service/core/workflow/objects.py @@ -96,7 +96,11 @@ class WorkflowServiceConfig(connectors.RedisConfig, connectors.PostgresConfig, default=None, description='The password (access token value) for the default admin user. ' 'Must be set if default_admin_username is set.') - + config_file: str | None = pydantic.Field( + command_line='config_file', + env='OSMO_DYNAMIC_CONFIG_FILE', + default=None, + description='Path to ConfigMap YAML file to load configs from.') @pydantic.root_validator() @classmethod def validate_default_admin(cls, values): diff --git a/src/tests/common/database/testdata/schema.sql b/src/tests/common/database/testdata/schema.sql index 24474858f..caf58d41f 100644 --- a/src/tests/common/database/testdata/schema.sql +++ b/src/tests/common/database/testdata/schema.sql @@ -61,3 +61,4 @@ CREATE TABLE IF NOT EXISTS workflows ( workflow_id TEXT PRIMARY KEY, pool TEXT NOT NULL DEFAULT '' ); + diff --git a/src/utils/BUILD b/src/utils/BUILD index f674edf6a..5666af24c 100644 --- a/src/utils/BUILD +++ b/src/utils/BUILD @@ -87,6 +87,12 @@ osmo_py_library( visibility = ["//visibility:public"], ) +osmo_py_library( + name = "configmap_state", + srcs = ["configmap_state.py"], + visibility = ["//visibility:public"], +) + osmo_py_library( name = "backend_messages", srcs = ["backend_messages.py"], diff --git a/src/utils/configmap_state.py b/src/utils/configmap_state.py new file mode 100644 index 000000000..434f2fb52 --- /dev/null +++ b/src/utils/configmap_state.py @@ -0,0 +1,52 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # pylint: disable=line-too-long + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" + +from typing import Any, Dict + +# Global ConfigMap mode state. Set by ConfigMapWatcher, read by +# postgres.py model methods and configmap_guard.py. +# +# This module is intentionally dependency-free (only stdlib) so it can +# be imported from both the utils layer (connectors/postgres.py) and +# the service layer (config/configmap_guard.py) without circular deps. + +_configmap_mode_active: bool = False +_parsed_configs: Dict[str, Any] | None = None + + +def set_configmap_mode(active: bool) -> None: + global _configmap_mode_active # noqa: PLW0603 + _configmap_mode_active = active + + +def is_configmap_mode() -> bool: + return _configmap_mode_active + + +def set_parsed_configs(configs: Dict[str, Any] | None) -> None: + global _parsed_configs # noqa: PLW0603 + _parsed_configs = configs + + +def get_snapshot() -> Dict[str, Any] | None: + """Return the current parsed config dict. + + Callers should grab this reference once per request and reuse it + for all config lookups to get a consistent snapshot. + """ + return _parsed_configs diff --git a/src/utils/connectors/BUILD b/src/utils/connectors/BUILD index c0387ad00..0a2e10f77 100644 --- a/src/utils/connectors/BUILD +++ b/src/utils/connectors/BUILD @@ -22,6 +22,7 @@ osmo_py_library( "//src/lib/utils:role", "//src/lib/utils:validation", "//src/utils:auth", + "//src/utils:configmap_state", "//src/utils:notify", "//src/utils/secret_manager", ], diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index d12d68d6f..5413d9961 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -42,6 +42,7 @@ from jwcrypto import jwe # type: ignore from jwcrypto.common import JWException # type: ignore +from src.utils import configmap_state from src.lib.data import storage from src.lib.data.storage import constants from src.lib.utils import (common, credentials, jinja_sandbox, login, @@ -560,6 +561,11 @@ def mogrify(self, entries: List[Tuple]): def get_configs(self, config_type: ConfigType): """ Get all the config values. """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + return self._get_configs_from_snapshot( + config_type, snapshot) + cmd = 'SELECT * FROM configs WHERE type = %s;' result = self.execute_fetch_command(cmd, (config_type.value,)) if not result: @@ -591,6 +597,18 @@ def get_configs(self, config_type: ConfigType): result_dicts[model.key] = json.loads(model.value) return config_class.deserialize(result_dicts, self) + def _get_configs_from_snapshot(self, config_type: ConfigType, + snapshot: dict): + """Construct a config object from the in-memory ConfigMap snapshot.""" + config_key_map = { + ConfigType.SERVICE: ('service', ServiceConfig), + ConfigType.WORKFLOW: ('workflow', WorkflowConfig), + ConfigType.DATASET: ('dataset', DatasetConfig), + } + key, config_class = config_key_map[config_type] + config_data = snapshot.get(key, {}) + return config_class(**config_data) + def get_service_configs(self) -> 'ServiceConfig': return self.get_configs(ConfigType.SERVICE) @@ -1270,6 +1288,7 @@ def _init_tables(self): ''' self.execute_commit_command(create_cmd, ()) + def _init_configs(self): """ Initializes configs table. """ # Create config objects with deployment values if provided @@ -2441,18 +2460,15 @@ class Backend(pydantic.BaseModel): @classmethod def fetch_from_db(cls, database: PostgresConnector, name: str) -> 'Backend': - """ - Creates a Workflow instance from a database workflow entry. - - Args: - workflow_id (task_common.NamePattern): The workflow id. - - Raises: - OSMODatabaseError: The workflow is not found in the database. + """Fetch a backend by name. - Returns: - Workflow: The workflow. + In ConfigMap mode: config fields from in-memory snapshot, + runtime fields (heartbeat, k8s_uid) from DB. """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + return cls._fetch_from_snapshot(database, name, snapshot) + fetch_cmd = 'SELECT * FROM backends WHERE name = %s;' backend_rows = database.execute_fetch_command(fetch_cmd, (name,)) try: @@ -2477,32 +2493,93 @@ def fetch_from_db(cls, database: PostgresConnector, router_address=backend_row.router_address, online=common.heartbeat_online(backend_row.last_heartbeat)) + @classmethod + def _fetch_from_snapshot(cls, database: PostgresConnector, + name: str, snapshot: dict) -> 'Backend': + """Build a Backend by merging ConfigMap config + DB runtime.""" + items = snapshot.get('backends', {}) + if name not in items: + raise osmo_errors.OSMOBackendError( + f'Backend {name} is not found.') + config = items[name] + + # Runtime fields from DB (agent writes these) + runtime_cmd = ( + 'SELECT k8s_uid, k8s_namespace, version, ' + 'last_heartbeat, created_date ' + 'FROM backends WHERE name = %s;') + runtime_rows = database.execute_fetch_command( + runtime_cmd, (name,), True) + + if runtime_rows: + row = runtime_rows[0] + runtime = { + 'k8s_uid': row['k8s_uid'], + 'k8s_namespace': row['k8s_namespace'], + 'version': row['version'], + 'last_heartbeat': row['last_heartbeat'], + 'created_date': row['created_date'], + } + else: + # Agent hasn't connected yet — defaults + now = common.current_time() + runtime = { + 'k8s_uid': '', 'k8s_namespace': '', + 'version': '', 'last_heartbeat': now, + 'created_date': now, + } + + scheduler = config.get('scheduler_settings', {}) + node_cond = config.get('node_conditions', {}) + return Backend( + name=name, + description=config.get('description', ''), + version=runtime['version'], + k8s_uid=runtime['k8s_uid'], + k8s_namespace=runtime['k8s_namespace'], + dashboard_url=config.get('dashboard_url', ''), + grafana_url=config.get('grafana_url', ''), + tests=config.get('tests', []), + scheduler_settings=BackendSchedulerSettings(**scheduler), + node_conditions=BackendNodeConditions(**node_cond), + last_heartbeat=runtime['last_heartbeat'], + created_date=runtime['created_date'], + router_address=config.get('router_address', ''), + online=common.heartbeat_online(runtime['last_heartbeat']), + ) + @classmethod def list_names_from_db(cls, database: PostgresConnector) -> List[str]: - """ - List all backends in the database + """List all backend names.""" + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('backends', {}) + return sorted(items.keys()) - Returns: - backends: List all backend names in the backend - """ fetch_cmd = 'SELECT name FROM backends ORDER BY name;' backend_rows = database.execute_fetch_command(fetch_cmd, ()) return [backend_row.name for backend_row in backend_rows] @classmethod def list_from_db(cls, database: PostgresConnector) -> List['Backend']: - """ - Creates a backend instance from a database backend entry. + """List all backends. - Args: - database (PostgresConnector): The database to fetch the backend from. - - Raises: - OSMODatabaseError: The backend is not found in the database. - - Returns: - backends: List of all backends in the database. + In ConfigMap mode: iterates snapshot backends, merging + runtime data from DB for each. """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('backends', {}) + backends = [] + for name in sorted(items.keys()): + try: + backends.append( + cls._fetch_from_snapshot(database, name, snapshot)) + except Exception as error: + logging.warning( + 'Skipping backend %s: %s', name, error) + return backends + fetch_cmd = 'SELECT * FROM backends ORDER BY name;' backend_rows = database.execute_fetch_command(fetch_cmd, ()) @@ -2973,6 +3050,13 @@ class ResourceValidation(pydantic.BaseModel): def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = None) \ -> Dict[str, List[ResourceAssertion]]: """ Fetches the list of resource validations from the resource validation table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('resource_validations', {}) + if names: + items = {k: v for k, v in items.items() if k in names} + return items + list_of_names = '' fetch_input: Tuple = () if names: @@ -2986,6 +3070,13 @@ def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = @classmethod def fetch_from_db(cls, database: PostgresConnector, name: str) -> List[ResourceAssertion]: """ Fetches the resource validations from the resource validation table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('resource_validations', {}) + if name not in items: + raise osmo_errors.OSMOUserError(f'Resource Validation {name} does not exist.') + return items[name] + fetch_cmd = 'SELECT * FROM resource_validations WHERE name = %s;' spec_rows = database.execute_fetch_command(fetch_cmd, (name,), True) if not spec_rows: @@ -3047,6 +3138,13 @@ class PodTemplate(pydantic.BaseModel): def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = None) \ -> Dict[str, Dict]: """ Fetches the list of pod templates from the pod template table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('pod_templates', {}) + if names: + items = {k: v for k, v in items.items() if k in names} + return items + list_of_names = '' fetch_input: Tuple = () if names: @@ -3060,6 +3158,14 @@ def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = @classmethod def fetch_from_db(cls, database: PostgresConnector, name: str) -> Dict: """ Fetches the pod template from the pod template table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('pod_templates', {}) + if name not in items: + raise osmo_errors.OSMOUserError( + f'Pod Template {name} does not exist.') + return items[name] + fetch_cmd = 'SELECT * FROM pod_templates WHERE name = %s;' spec_rows = database.execute_fetch_command(fetch_cmd, (name,), True) if not spec_rows: @@ -3136,6 +3242,13 @@ class GroupTemplate(pydantic.BaseModel): def list_from_db(cls, database: PostgresConnector, names: List[str] | None = None) \ -> Dict[str, Dict[str, Any]]: """ Fetches the list of group templates from the group template table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('group_templates', {}) + if names: + items = {k: v for k, v in items.items() if k in names} + return items + name_filter_clause = '' fetch_input: Tuple = () if names: @@ -3149,6 +3262,14 @@ def list_from_db(cls, database: PostgresConnector, names: List[str] | None = Non @classmethod def fetch_from_db(cls, database: PostgresConnector, name: str) -> Dict[str, Any]: """ Fetches the group template from the group template table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('group_templates', {}) + if name not in items: + raise osmo_errors.OSMOUserError( + f'Group Template {name} does not exist.') + return items[name] + fetch_cmd = 'SELECT * FROM group_templates WHERE name = %s;' spec_rows = database.execute_fetch_command(fetch_cmd, (name,), True) if not spec_rows: @@ -3406,6 +3527,16 @@ def fetch_from_db(cls, database: PostgresConnector, name: str) -> 'Pool': return pool_info + @classmethod + def _compute_pool_status(cls, pool_data: dict, + heartbeat: datetime.datetime | None) -> PoolStatus: + """Compute pool status from maintenance flag and heartbeat.""" + if pool_data.get('enable_maintenance', False): + return PoolStatus.MAINTENANCE + if heartbeat and common.heartbeat_online(heartbeat): + return PoolStatus.ONLINE + return PoolStatus.OFFLINE + @classmethod def rename(cls, database: PostgresConnector, old_name: str, new_name: str): """ Renames a pool from the pools table """ @@ -3443,6 +3574,11 @@ def fetch_rows_from_db(cls, database: PostgresConnector, pools: List[str] | None = None, all_pools: bool = True) -> Any: """ Fetches the list of pools from the pools table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + return cls._fetch_pool_rows_from_snapshot( + database, snapshot, backend, pools, all_pools) + params : List[str | Tuple] = [] conditions = [] @@ -3476,6 +3612,53 @@ def fetch_rows_from_db(cls, database: PostgresConnector, pool_row['status'] = PoolStatus.OFFLINE return pool_rows + @classmethod + def _fetch_pool_rows_from_snapshot( + cls, database: PostgresConnector, snapshot: dict, + backend: str | None, pools: List[str] | None, + all_pools: bool, + ) -> List[dict]: + """Build pool rows from snapshot + DB heartbeats.""" + items = snapshot.get('pools', {}) + if not pools: + pools = [] + + # Batch-fetch heartbeats for all referenced backends + backend_names = { + v.get('backend', '') for v in items.values() + if isinstance(v, dict) + } + heartbeat_map: Dict[str, datetime.datetime | None] = {} + if backend_names: + hb_cmd = ( + 'SELECT name, last_heartbeat FROM backends ' + 'WHERE name IN %s;') + hb_rows = database.execute_fetch_command( + hb_cmd, (tuple(backend_names),), True) + heartbeat_map = { + r['name']: r['last_heartbeat'] for r in hb_rows + } + + result = [] + for name in sorted(items.keys()): + pool_data = items[name] + if not isinstance(pool_data, dict): + continue + pool_backend = pool_data.get('backend', '') + + if backend and pool_backend != backend: + continue + if (pools or not all_pools) and name not in pools: + continue + + heartbeat = heartbeat_map.get(pool_backend) + row = {**pool_data, 'name': name, + 'last_heartbeat': heartbeat, + 'status': cls._compute_pool_status( + pool_data, heartbeat)} + result.append(row) + return result + @classmethod def get_all_pool_names(cls) -> List[str]: """Fetch all pool names from the database.""" @@ -4223,6 +4406,13 @@ def update_pod_template(cls, database: PostgresConnector, name: str): @classmethod def list_from_db(cls, database: 'PostgresConnector', name: str | None = None ) -> Dict[str, dict]: + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('backend_tests', {}) + if name: + items = {k: v for k, v in items.items() if k == name} + return items + list_of_names = '' fetch_input: Tuple = () if name: @@ -4234,6 +4424,14 @@ def list_from_db(cls, database: 'PostgresConnector', name: str | None = None @classmethod def fetch_from_db(cls, database: 'PostgresConnector', name: str) -> 'BackendTests': + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('backend_tests', {}) + if name not in items: + raise osmo_errors.OSMOUserError( + f'Test config {name} does not exist.') + return cls(**items[name]) + fetch_cmd = 'SELECT * FROM backend_tests WHERE name = %s;' spec_rows = database.execute_fetch_command(fetch_cmd, (name,), True) if not spec_rows: @@ -4285,6 +4483,19 @@ class Role(role.Role): def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = None) \ -> List['Role']: """ Fetches the list of roles from the roles table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('roles', {}) + result = [] + for role_name, role_data in sorted(items.items()): + if names and role_name not in names: + continue + if not isinstance(role_data, dict): + continue + result.append(cls( + name=role_name, **role_data)) + return result + list_of_names = '' fetch_input: Tuple = () if names: @@ -4310,6 +4521,14 @@ def list_from_db(cls, database: PostgresConnector, names: Optional[List[str]] = @classmethod def fetch_from_db(cls, database: PostgresConnector, name: str) -> 'Role': """ Fetches the role from the role table """ + snapshot = configmap_state.get_snapshot() + if snapshot is not None: + items = snapshot.get('roles', {}) + if name not in items: + raise osmo_errors.OSMOUserError( + f'Role {name} does not exist.') + return cls(name=name, **items[name]) + fetch_cmd = 'SELECT * FROM roles WHERE name = %s;' spec_rows = database.execute_fetch_command(fetch_cmd, (name,), True) if not spec_rows: diff --git a/src/utils/roles/BUILD b/src/utils/roles/BUILD index 8a02fd33a..9a7932b00 100644 --- a/src/utils/roles/BUILD +++ b/src/utils/roles/BUILD @@ -20,6 +20,7 @@ go_library( name = "roles", srcs = [ "action_registry.go", + "file_loader.go", "pool_access.go", "role_cache.go", "roles.go", @@ -31,6 +32,7 @@ go_library( "//src/utils", "//src/utils/postgres", "@com_github_hashicorp_golang_lru_v2//expirable:go_default_library", + "@in_gopkg_yaml_v3//:go_default_library", ], ) diff --git a/src/utils/roles/file_loader.go b/src/utils/roles/file_loader.go new file mode 100644 index 000000000..83d7e0543 --- /dev/null +++ b/src/utils/roles/file_loader.go @@ -0,0 +1,272 @@ +/* +SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package roles + +import ( + "fmt" + "log/slog" + "os" + "sync" + "time" + + "gopkg.in/yaml.v3" +) + +// FileRoleStore loads roles, external role mappings, and pool names from +// a ConfigMap-mounted YAML file. It replaces the PostgreSQL-backed role +// storage for the authz_sidecar in ConfigMap mode. +// +// The file is the same configs YAML mounted for the Python service: +// +// roles: +// osmo-admin: +// policies: [...] +// external_roles: [admin-group] +// pools: +// gpu-large: { ... } +type FileRoleStore struct { + filePath string + logger *slog.Logger + + mu sync.RWMutex + roles map[string]*Role // name -> Role + externalRoleMap map[string][]string // externalRole -> []osmoRoleName + poolNames []string + lastModTime time.Time +} + +// fileConfig mirrors the flat YAML structure of the configs file. +type fileConfig struct { + Roles map[string]fileRole `yaml:"roles"` + Pools map[string]yaml.Node `yaml:"pools"` +} + +type fileRole struct { + Description string `yaml:"description"` + Policies []filePolicy `yaml:"policies"` + ExternalRoles []string `yaml:"external_roles"` + Immutable bool `yaml:"immutable"` +} + +type filePolicy struct { + Effect string `yaml:"effect"` + Actions []any `yaml:"actions"` + Resources []string `yaml:"resources"` +} + +// NewFileRoleStore creates a store that reads from the given YAML file. +// Call Load() to populate, then Start() to begin watching for changes. +func NewFileRoleStore(filePath string, logger *slog.Logger) *FileRoleStore { + return &FileRoleStore{ + filePath: filePath, + logger: logger, + roles: make(map[string]*Role), + externalRoleMap: make(map[string][]string), + } +} + +// Load reads and parses the YAML file, populating the in-memory store. +// Returns an error if the file cannot be read or parsed. +func (s *FileRoleStore) Load() error { + data, err := os.ReadFile(s.filePath) + if err != nil { + return fmt.Errorf("read roles file: %w", err) + } + + var config fileConfig + if err := yaml.Unmarshal(data, &config); err != nil { + return fmt.Errorf("parse roles file: %w", err) + } + + roles := make(map[string]*Role, len(config.Roles)) + externalMap := make(map[string][]string) + + for name, fileRole := range config.Roles { + role, err := parseFileRole(name, fileRole) + if err != nil { + s.logger.Error("skipping invalid role", + slog.String("role", name), + slog.String("error", err.Error())) + continue + } + roles[name] = role + + // Build reverse mapping: externalRole -> []osmoRoleName + extRoles := fileRole.ExternalRoles + if len(extRoles) == 0 { + // Default: role name maps to itself + extRoles = []string{name} + } + for _, extRole := range extRoles { + externalMap[extRole] = append(externalMap[extRole], name) + } + } + + // Extract pool names + poolNames := make([]string, 0, len(config.Pools)) + for name := range config.Pools { + poolNames = append(poolNames, name) + } + + // Stat before lock to get modtime + info, _ := os.Stat(s.filePath) + + // Atomic swap (includes lastModTime to avoid race with poll goroutine) + s.mu.Lock() + s.roles = roles + s.externalRoleMap = externalMap + s.poolNames = poolNames + if info != nil { + s.lastModTime = info.ModTime() + } + s.mu.Unlock() + + s.logger.Info("roles loaded from file", + slog.Int("role_count", len(roles)), + slog.Int("external_mappings", len(externalMap)), + slog.Int("pool_count", len(poolNames)), + slog.String("file", s.filePath)) + + return nil +} + +// Start begins a background goroutine that polls the file for changes. +func (s *FileRoleStore) Start(pollInterval time.Duration) { + go func() { + ticker := time.NewTicker(pollInterval) + defer ticker.Stop() + for range ticker.C { + info, err := os.Stat(s.filePath) + if err != nil { + continue + } + s.mu.RLock() + changed := info.ModTime().After(s.lastModTime) + s.mu.RUnlock() + if changed { + s.logger.Info("roles file changed, reloading", + slog.String("file", s.filePath)) + if err := s.Load(); err != nil { + s.logger.Error("failed to reload roles file", + slog.String("error", err.Error())) + } + } + } + }() +} + +// GetRoles returns Role objects for the given names. +// Unknown names are silently skipped (same behavior as DB query). +func (s *FileRoleStore) GetRoles(names []string) []*Role { + s.mu.RLock() + defer s.mu.RUnlock() + + var result []*Role + for _, name := range names { + if role, ok := s.roles[name]; ok { + result = append(result, role) + } + } + return result +} + +// ResolveExternalRoles maps external IDP roles (from JWT claims) to +// OSMO role names using the in-memory external_roles mappings. +// This replaces the SyncUserRoles SQL query. +func (s *FileRoleStore) ResolveExternalRoles(externalRoles []string) []string { + s.mu.RLock() + defer s.mu.RUnlock() + + seen := make(map[string]bool) + var result []string + for _, extRole := range externalRoles { + for _, osmoRole := range s.externalRoleMap[extRole] { + if !seen[osmoRole] { + seen[osmoRole] = true + result = append(result, osmoRole) + } + } + } + return result +} + +// GetPoolNames returns all pool names from the ConfigMap. +func (s *FileRoleStore) GetPoolNames() []string { + s.mu.RLock() + defer s.mu.RUnlock() + + result := make([]string, len(s.poolNames)) + copy(result, s.poolNames) + return result +} + +// parseFileRole converts a fileRole (YAML) to a Role (Go struct). +func parseFileRole(name string, fr fileRole) (*Role, error) { + role := &Role{ + Name: name, + Description: fr.Description, + Immutable: fr.Immutable, + } + + role.Policies = make([]RolePolicy, 0, len(fr.Policies)) + for i, fp := range fr.Policies { + policy := RolePolicy{ + Resources: fp.Resources, + } + if fp.Effect != "" { + policy.Effect = PolicyEffect(fp.Effect) + } else { + policy.Effect = EffectAllow + } + if policy.Resources == nil { + policy.Resources = []string{} + } + + // Parse actions: each element is either a string (semantic) + // or a map (legacy path-based). + policy.Actions = make(RoleActions, 0, len(fp.Actions)) + for j, action := range fp.Actions { + switch v := action.(type) { + case string: + policy.Actions = append(policy.Actions, RoleAction{Action: v}) + case map[string]any: + ra := RoleAction{} + if s, ok := v["action"].(string); ok { + ra.Action = s + } + if s, ok := v["base"].(string); ok { + ra.Base = s + } + if s, ok := v["path"].(string); ok { + ra.Path = s + } + if s, ok := v["method"].(string); ok { + ra.Method = s + } + policy.Actions = append(policy.Actions, ra) + default: + return nil, fmt.Errorf("policy %d action %d: unexpected type %T", i, j, action) + } + } + + role.Policies = append(role.Policies, policy) + } + + return role, nil +}