Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
bd4c9cc
Added new feature to execute workflows without a kubernetes cluster, …
maufrancom Apr 2, 2026
cfbba1d
Add local.py and update dependencies in BUILD file
maufrancom Apr 3, 2026
7f16d4f
Add GPU passthrough support in LocalExecutor
maufrancom Apr 3, 2026
1c71133
Update Docker command construction in LocalExecutor
maufrancom Apr 3, 2026
77fa2ab
Add resume functionality to local workflow execution
maufrancom Apr 3, 2026
c44dbfb
Update .gitignore to include .venv directory
maufrancom Apr 3, 2026
ffcdd72
Enhance local workflow execution with Docker command support
maufrancom Apr 3, 2026
0bf8bd5
Enhance documentation and comments in local execution modules
maufrancom Apr 4, 2026
a79bca8
Refactor file handling in LocalExecutor for UTF-8 encoding
maufrancom Apr 4, 2026
27424cd
Enhance error handling and update documentation in local execution mo…
maufrancom Apr 4, 2026
313466b
Update copyright line in test_local_executor.py to comply with pylint…
maufrancom Apr 4, 2026
9459fec
Add shared memory size support for GPU tasks in local execution
maufrancom Apr 4, 2026
e5adf29
Add tutorial specs filegroup and enhance local executor tests
maufrancom Apr 4, 2026
429aa84
Implement file path validation in LocalExecutor to prevent directory …
maufrancom Apr 4, 2026
d08bf9b
Clear GPU device specification in Docker arguments for LocalExecutor
maufrancom Apr 4, 2026
c59a65d
Refactor shared memory size handling in LocalExecutor
maufrancom Apr 4, 2026
b50140f
Add spec_includes library and integrate into local_executor
maufrancom Apr 4, 2026
72c1990
Merge branch 'NVIDIA:main' into mfranco/spec-composition
maufrancom Apr 6, 2026
688ecc6
Enhance local execution capabilities and documentation
maufrancom Apr 6, 2026
ba9b994
Refactor default-values handling in local execution
maufrancom Apr 6, 2026
34936e2
Add compose command to CLI for workflow spec resolution
maufrancom Apr 6, 2026
37e5aa4
Update project configuration and enhance .gitignore
maufrancom Apr 6, 2026
1f37243
Refactor and enhance code documentation in local execution
maufrancom Apr 6, 2026
16f410a
Enhance LocalExecutor to support Docker Compose for workflow execution
maufrancom Apr 6, 2026
524f7cb
Enhance LocalExecutor with additional volume support and command vali…
maufrancom Apr 7, 2026
17d73e0
Refactor task dependency handling and enhance test coverage
maufrancom Apr 7, 2026
e707b27
Enhance workflow specification handling with environment variable res…
maufrancom Apr 7, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ docs/**/domain_config.js
.ruff_cache

.lycheecache

.venv/
build/
*.egg-info/
3 changes: 3 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Entry point: `service/core/service.py`. Framework: FastAPI + Uvicorn + OpenTelem
| `utils/job/` | `Task`, `FrontendJob`, `K8sObjectFactory`, `PodGroupTopologyBuilder` | Workflow execution framework. Task → K8s spec generation. Gang scheduling via PodGroup. Topology constraints. Backend job definitions. |
| `utils/connectors/` | `ClusterConnector`, `PostgresConnector`, `RedisConnector` | K8s API wrapper, PostgreSQL operations, Redis job queue management. |
| `utils/secret_manager/` | `SecretManager` | JWE-based secret encryption/decryption. MEK/UEK key management. |
| `utils/local_executor.py` | `LocalExecutor`, `run_workflow_locally` | Local Docker Compose-based workflow execution. Generates a `docker-compose.yml` from workflow specs and runs `docker compose up`, providing on-cluster container paths (`/osmo/data/output`, `/osmo/data/input/N`), real parallel execution via `depends_on`, cycle detection, DNS-addressable `{{host:taskname}}`, resume (`--from-step`), and GPU passthrough. |
| `utils/spec_includes.py` | `resolve_includes` | Helpers to resolve and merge workflow spec `includes` directives into fully composed specs. Supports recursive inclusion, cycle detection, deep-merging, and `default-values` variable expansion. |
| `utils/progress_check/` | — | Liveness/progress tracking for long-running services. |
| `utils/metrics/` | — | Prometheus metrics collection and export. |

Expand All @@ -139,6 +141,7 @@ Entry point: `cli.py` → `main_parser.py` (argparse). Subcommand modules:
| `login.py` | Authentication |
| `pool.py`, `resources.py`, `user.py`, `credential.py`, `access_token.py`, `bucket.py`, `task.py`, `version.py` | Supporting commands |
| `backend.py` | Backend cluster management |
| `local.py` | Local workflow execution via Docker (`osmo local run`) |

Features: Tab completion (shtab), response formatting (`formatters.py`), spec editor (`editor.py`), PyInstaller packaging (`cli_builder.py`, `packaging/`).

Expand Down
5 changes: 5 additions & 0 deletions cookbook/tutorials/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
filegroup(
name = "tutorial_specs",
srcs = glob(["*.yaml"]),
visibility = ["//src/utils/tests:__pkg__"],
)
44 changes: 44 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[build-system]
requires = ["setuptools>=64"]
build-backend = "setuptools.build_meta"

[project]
name = "nvidia-osmo"
version = "0.1.0"
requires-python = ">=3.10"
dependencies = [
"pydantic>=1.10,<2",
"pyyaml>=6.0",
"requests>=2.28",
"urllib3>=1.26",
"typing_extensions>=4.0",
"boto3>=1.26",
"botocore>=1.29",
"mypy-boto3-iam>=1.26",
"mypy-boto3-s3>=1.26",
"mypy-boto3-sts>=1.26",
"azure-storage-blob>=12.14",
"azure-identity>=1.12",
"psycopg2-binary>=2.9",
"pyjwt>=2.6",
"jwcrypto>=1.5",
"jinja2>=3.1",
"pytz>=2023.3",
"texttable>=1.6",
"tqdm>=4.64",
"aiofiles>=23.0",
"kombu>=5.2",
"redis>=4.4",
"kubernetes>=24.2",
"fastapi>=0.100",
"slack_sdk>=3.20",
"shtab>=1.5",
]

[project.scripts]
osmo = "src.cli.cli:main"

[tool.setuptools.packages.find]
include = ["src*"]
exclude = ["src.ui*", "src.tests*"]
namespaces = true
3 changes: 3 additions & 0 deletions src/cli/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ osmo_py_library(
"dataset.py",
"editor.py",
"formatters.py",
"local.py",
"login.py",
"main_parser.py",
"pool.py",
Expand Down Expand Up @@ -73,6 +74,8 @@ osmo_py_library(
"//src/lib/utils:validation",
"//src/lib/utils:version",
"//src/lib/utils:workflow",
"//src/utils:local_executor",
"//src/utils:spec_includes",
],
)

Expand Down
209 changes: 209 additions & 0 deletions src/cli/local.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
# pylint: disable=line-too-long
"""
SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

SPDX-License-Identifier: Apache-2.0
"""

import argparse
import os
import re
import sys

import shtab
import yaml

from src.utils import local_executor, spec_includes


def setup_parser(parser: argparse._SubParsersAction):
"""Register the 'local' subcommand and its nested actions with the CLI argument parser."""
local_parser = parser.add_parser(
'local',
help='Run workflows locally using Docker (no Kubernetes cluster required).')
subparsers = local_parser.add_subparsers(dest='command')
subparsers.required = True

run_parser = subparsers.add_parser(
'run',
help='Execute a workflow spec locally using Docker containers.')
run_parser.add_argument(
'-f', '--file',
required=True,
dest='workflow_file',
help='Path to the workflow YAML spec file.').complete = shtab.FILE
run_parser.add_argument(
'--work-dir',
dest='work_dir',
default=None,
help='Directory for task inputs/outputs. Defaults to a temporary directory.')
run_parser.add_argument(
'--keep',
action='store_true',
default=False,
help='Keep the work directory after execution (always kept on failure).')
run_parser.add_argument(
'--docker',
dest='docker_cmd',
default='docker',
help='Docker-compatible command to use (e.g. podman). Default: docker.')
run_parser.add_argument(
'--resume',
action='store_true',
default=False,
help='Resume a previous run, skipping tasks that already completed successfully. '
'Requires --work-dir pointing to the previous run directory.')
run_parser.add_argument(
'--from-step',
dest='from_step',
default=None,
help='Resume from a specific task, re-running it and all downstream tasks. '
'Tasks upstream of the specified step are skipped if they completed '
'successfully. Requires --work-dir pointing to the previous run directory.')
run_parser.add_argument(
'--shm-size',
dest='shm_size',
default=None,
help='Shared memory size for GPU containers (e.g. 16g, 32g). '
'Defaults to 16g for tasks that request GPUs. '
'PyTorch DataLoader workers require large shared memory.')
run_parser.set_defaults(func=_run_local)

compose_parser = subparsers.add_parser(
'compose',
help='Flatten includes and expand task refs into a single spec with a '
'default-values variable map (no variable substitution).')
compose_parser.add_argument(
'-f', '--file',
required=True,
dest='workflow_file',
help='Path to the workflow YAML spec file.').complete = shtab.FILE
compose_parser.add_argument(
'-o', '--output',
dest='output_file',
default=None,
help='Write the composed spec to a file instead of stdout.').complete = shtab.FILE
compose_parser.set_defaults(func=_compose)


def _run_local(service_client, args: argparse.Namespace): # pylint: disable=unused-argument
"""Execute a workflow locally via Docker using the parsed CLI arguments."""
try:
success = local_executor.run_workflow_locally(
spec_path=args.workflow_file,
work_dir=args.work_dir,
keep_work_dir=args.keep,
resume=args.resume,
from_step=args.from_step,
docker_cmd=args.docker_cmd,
shm_size=args.shm_size,
)
except (ValueError, FileNotFoundError, PermissionError) as error:
print(f'Error: {error}', file=sys.stderr)
sys.exit(1)

if not success:
sys.exit(1)


_ENV_REF_RE = re.compile(r'\$\{env:([^}]+)\}')


def _resolve_set_env_refs(value: str) -> str:
"""Replace ``${env:VAR}`` patterns only when VAR is present in ``os.environ``."""
def _replacer(match: re.Match) -> str:
env_var = match.group(1)
if env_var in os.environ:
return os.environ[env_var]
return match.group(0)
return _ENV_REF_RE.sub(_replacer, value)


def _compose(service_client, args: argparse.Namespace): # pylint: disable=unused-argument
"""Flatten includes, resolve variables, and output a submittable spec.

When all ``${env:VAR}`` references can be resolved the output is fully
flat: no ``default-values`` section, no ``{variable}`` references —
ready to submit to the OSMO server or run locally.

When environment variables are missing the output keeps a
``default-values`` section with the unresolvable entries so the user
can fill them in and re-compose.
"""
unresolved_env: dict = {}
try:
abs_path = os.path.abspath(args.workflow_file)
with open(abs_path, encoding='utf-8') as f:
spec_text = f.read()

spec_text = spec_includes.resolve_includes(
spec_text, os.path.dirname(abs_path), source_path=abs_path)

unresolved_env = spec_includes.find_unresolved_env_variables(spec_text)

if unresolved_env:
spec_text = _compose_with_unresolved(spec_text, unresolved_env)
else:
spec_text = _compose_fully_resolved(spec_text)
except (ValueError, FileNotFoundError, PermissionError) as error:
print(f'Error: {error}', file=sys.stderr)
sys.exit(1)

if unresolved_env:
env_list = ', '.join(
f'${v}' for v in sorted(set(unresolved_env.values())))
print(
f'Warning: environment variables not set: {env_list}\n'
'Set them and re-compose, or edit the default-values section '
'in the output.',
file=sys.stderr)

if args.output_file:
with open(args.output_file, 'w', encoding='utf-8') as f:
f.write(spec_text)
print(f'Composed spec written to {args.output_file}', file=sys.stderr)
else:
print(spec_text, end='')


def _compose_fully_resolved(spec_text: str) -> str:
"""Resolve all variables and produce a submittable spec."""
return spec_includes.resolve_default_values(spec_text)


def _compose_with_unresolved(spec_text: str,
unresolved_env: dict) -> str:
"""Keep a ``default-values`` map for variables that cannot be resolved."""
parsed = yaml.safe_load(spec_text)
raw_defaults = parsed.pop('default-values', None) or {}

scalar_defaults: dict = {}
for key in sorted(raw_defaults):
value = raw_defaults[key]
if isinstance(value, (str, int, float, bool)):
scalar_defaults[key] = value
elif value is None:
scalar_defaults[key] = value

for key, value in scalar_defaults.items():
if isinstance(value, str):
scalar_defaults[key] = _resolve_set_env_refs(value)

output: dict = {}
if scalar_defaults:
output['default-values'] = scalar_defaults
output.update(parsed)

return yaml.safe_dump(output, default_flow_style=False, sort_keys=False)
4 changes: 3 additions & 1 deletion src/cli/main_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
credential,
data,
dataset,
local,
login,
pool,
profile,
Expand Down Expand Up @@ -55,7 +56,8 @@
profile.setup_parser,
pool.setup_parser,
user.setup_parser,
config.setup_parser
config.setup_parser,
local.setup_parser,
)


Expand Down
6 changes: 5 additions & 1 deletion src/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
from src.lib.data import storage
from src.lib.utils import (client, common, osmo_errors, paths, port_forward, priority as wf_priority,
validation, workflow as workflow_utils)
from src.utils import spec_includes


INTERACTIVE_COMMANDS = ['bash', 'sh', 'zsh', 'fish', 'tcsh', 'csh', 'ksh']
Expand Down Expand Up @@ -588,8 +589,11 @@ def parse_file_for_template(workflow_contents: str, set_variables: List[str],

def _load_wf_file(workflow_path: str, set_variables: List[str],
set_string_variables: List[str]) -> TemplateData:
with open(workflow_path, 'r', encoding='utf-8') as file:
abs_path = os.path.abspath(workflow_path)
with open(abs_path, 'r', encoding='utf-8') as file:
full_file_text = file.read()
full_file_text = spec_includes.resolve_includes(
full_file_text, os.path.dirname(abs_path), source_path=abs_path)
return parse_file_for_template(full_file_text, set_variables, set_string_variables)


Expand Down
21 changes: 21 additions & 0 deletions src/utils/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,24 @@ osmo_py_library(
],
visibility = ["//visibility:public"],
)

osmo_py_library(
name = "spec_includes",
srcs = ["spec_includes.py"],
deps = [
requirement("pyyaml"),
"//src/lib/utils:osmo_errors",
],
visibility = ["//visibility:public"],
)

osmo_py_library(
name = "local_executor",
srcs = ["local_executor.py"],
deps = [
requirement("pyyaml"),
"//src/utils:spec_includes",
"//src/utils/job",
],
visibility = ["//visibility:public"],
)
10 changes: 4 additions & 6 deletions src/utils/job/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ class TaskSpec(pydantic.BaseModel, extra=pydantic.Extra.forbid):
""" Represents the container spec in a task spec. """
name: task_common.NamePattern
image: str
command: List[str]
command: List[str] = []
inputs: List[InputType] = []
outputs: List[OutputType] = []
kpis: List[TaskKPI] = []
Expand Down Expand Up @@ -663,12 +663,10 @@ def validate_command(cls, command: List[str], values: Dict) -> List[str]:
"""
Validates command. Returns the value of command if valid.

Raises:
ValueError: Containers fails validation.
An empty command list means "use the container image's default
ENTRYPOINT", which is a valid and common pattern (e.g. NRE images
with pycena_run as the built-in entrypoint).
"""
name = values.get('name', '')
if not command:
raise ValueError(f'Container {name} should have at least one command.')
return command

@pydantic.validator('files')
Expand Down
Loading