801 - Enable Specification Composition#808
Conversation
…for faster iteration when developing workflows
- Implemented GPU resource handling in LocalExecutor to allow tasks to request GPU resources. - Added a new method to determine the GPU count for tasks based on their resource specifications. - Updated Docker run command to include GPU options when applicable.
- Adjusted the handling of the resolved_command to correctly set the entrypoint and append arguments. - Ensured that the first element of resolved_command is used as the entrypoint while the rest are appended to the Docker command.
- Introduced `--resume` and `--from-step` options in the CLI to allow resuming previous runs. - Implemented state management in `LocalExecutor` to save and restore task results. - Enhanced logging to provide feedback on skipped tasks and remaining tasks during resumption. - Added GPU detection improvements to handle scenarios where requested GPUs are unavailable.
- Added `docker_cmd` parameter to `run_workflow_locally` for customizable Docker command execution. - Improved logging to redact sensitive information in Docker command arguments. - Implemented error handling for unexecuted tasks in `LocalExecutor` to detect potential workflow stalls.
- Added detailed docstrings to functions and classes in `local.py` and `local_executor.py` to improve code readability and maintainability. - Updated test cases in `test_local_executor.py` with descriptive comments to clarify the purpose of each test. - Ensured consistency in documentation style across the codebase.
- Updated file operations in `local_executor.py` to explicitly use UTF-8 encoding when reading and writing files, ensuring better compatibility with various text formats. - Adjusted exception handling in `test_local_executor.py` to raise `ValueError` instead of a generic `Exception` for clearer error reporting. - Modified test documentation to reflect the correct Jinja block syntax in error messages.
…dules - Expanded exception handling in `local.py` to include `FileNotFoundError` and `PermissionError` for improved robustness. - Updated comments in `local_executor.py` to clarify unsupported features in local mode, specifically regarding privileged containers and host networking. - Modified test case in `test_local_executor.py` to ensure caller-supplied work directories are preserved on success, enhancing test accuracy.
- Introduced `--shm-size` argument in the CLI for specifying shared memory size for GPU containers, defaulting to 16g. - Updated `LocalExecutor` to accept and utilize the shared memory size during Docker command construction. - Added unit tests to verify correct handling of shared memory size for both default and custom values in GPU tasks, ensuring no shared memory argument is included for non-GPU tasks.
- Created a new `tutorial_specs` filegroup in the `cookbook/tutorials/BUILD` to include YAML specifications. - Updated the `BUILD` file in `src/utils/tests` to include the new `tutorial_specs` as data for local tests. - Added a new test class in `test_local_executor.py` to validate unsupported features in cookbook specifications, ensuring proper error handling for unsupported fields. - Implemented additional tests to verify that specific unsupported features are correctly rejected during local execution.
…traversal - Enhanced the `LocalExecutor` class to validate file paths, ensuring they do not escape the task directory. This prevents potential security risks associated with directory traversal attacks. - Added unit tests in `test_local_executor.py` to verify that invalid file paths raise appropriate exceptions, while valid paths are accepted without errors. - Updated documentation in `AGENTS.md` to include the new local executor functionality for Docker-based workflow execution.
- Updated the `LocalExecutor` class to remove unnecessary quotes around GPU device specifications in Docker command arguments, ensuring correct formatting. - Cleared previous results at the start of the `execute` method to prevent data carryover between executions.
- Updated the `LocalExecutor` class to ensure the `--shm-size` argument is included for CPU-only tasks when explicitly specified by the user. - Adjusted the logic for setting the shared memory size to improve clarity and maintainability. - Enhanced unit tests in `test_local_executor.py` to verify correct behavior for tasks with and without GPU resources regarding shared memory size.
- Introduced a new `spec_includes` library to handle YAML file includes. - Updated `local_executor.py` to utilize `spec_includes` for resolving includes and stripping default-values during local execution. - Enhanced unit tests in `test_local_executor.py` to verify correct behavior when handling default-values in included YAML files. - Updated BUILD files to include the new library and corresponding tests.
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a local execution feature: new Changes
sequenceDiagram
participant User as User (CLI)
participant CLI as "osmo local run"
participant SpecProc as SpecIncludes
participant Exec as LocalExecutor
participant DAG as DAG Builder
participant Docker as Docker Engine
User->>CLI: invoke `osmo local run` with spec & options
CLI->>SpecProc: load spec text, resolve includes & defaults
SpecProc->>SpecProc: merge includes, expand default-values
SpecProc-->>CLI: merged YAML text
CLI->>Exec: init executor (work_dir, docker_cmd, shm_size)
Exec->>DAG: parse spec and build task DAG
DAG-->>Exec: ready tasks order
loop execute tasks
Exec->>Exec: validate task, substitute tokens, prepare mounts
Exec->>Docker: run container (mounts, env, --gpus, --shm-size)
Docker-->>Exec: task exit & outputs
Exec->>Exec: persist .osmo-state.json
end
Exec-->>CLI: return success/failure
CLI->>User: exit code
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (4)
src/utils/spec_includes.py (2)
264-269: Chain exceptions withfromfor included file parse errors.Same pattern as above—preserve the exception chain for debugging.
♻️ Proposed fix
try: included_dict = yaml.safe_load(included_text) except yaml.YAMLError as yaml_err: raise osmo_errors.OSMOUserError( - f'Failed to parse included file "{include_path}": {yaml_err}') + f'Failed to parse included file "{include_path}": {yaml_err}') from yaml_err🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/spec_includes.py` around lines 264 - 269, When catching yaml.YAMLError from yaml.safe_load in the code that parses included_text (the block that raises osmo_errors.OSMOUserError with the message f'Failed to parse included file "{include_path}": {yaml_err}'), re-raise the OSMOUserError using exception chaining (raise osmo_errors.OSMOUserError(...) from yaml_err) so the original yaml_err is preserved on the trace; update the raise in that try/except to use "from yaml_err".
209-218: Chain exceptions withfromfor better tracebacks.Static analysis correctly identifies that re-raised exceptions should preserve the original exception chain.
♻️ Proposed fix
try: spec_dict = yaml.safe_load(spec_text) except yaml.YAMLError as yaml_err: if re.search(r'^includes:', spec_text, re.MULTILINE): raise osmo_errors.OSMOUserError( 'Failed to parse workflow spec for includes resolution. ' 'Specs using "includes" must be valid YAML — Jinja template ' 'variables like {{ }} must be in quoted strings. ' - f'Parse error: {yaml_err}') + f'Parse error: {yaml_err}') from yaml_err return spec_text🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/spec_includes.py` around lines 209 - 218, The except block that raises osmo_errors.OSMOUserError when yaml.safe_load fails should chain the original YAML exception to preserve tracebacks; update the raise in the except handler (the block that checks re.search(r'^includes:', spec_text, re.MULTILINE)) to re-raise the OSMOUserError using "from yaml_err" so the original yaml.YAMLError is attached (keep the same message and conditions around spec_text and includes resolution).src/utils/local_executor.py (1)
431-436: Subprocess execution inherits trust from user-supplied spec.The subprocess call assembles Docker arguments from user-provided YAML spec values (image, command, args, environment). This is acceptable for a local development tool where users execute their own trusted specs. The path traversal check (lines 363-366) properly guards against file escape attacks.
Consider documenting in the module docstring that specs should only be executed from trusted sources, similar to how
docker runitself trusts its inputs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 431 - 436, Add a module-level docstring to src/utils/local_executor.py stating that the module executes user-provided specs via subprocess.run using assembled docker_args and thus those specs must be trusted (similar to how `docker run` trusts its inputs); reference that subprocess.run is used to invoke Docker, docker_args and TaskResult are involved in execution/return, and note that the existing path traversal protection remains in place so that callers should only pass specs from trusted sources.src/utils/tests/test_local_executor.py (1)
751-817: Consider annotating class-level test data withClassVar.The
_UNSUPPORTED_SPECSdict is flagged by static analysis as a mutable class attribute. Since it's constant test data that's never modified, this is a minor concern, but adding a type annotation clarifies intent.♻️ Optional fix
+from typing import ClassVar, Dict, Any + class TestValidateForLocalRemainingBranches(unittest.TestCase): """Verify that _validate_for_local rejects credentials, checkpoint, volumeMounts, privileged, and hostNetwork.""" - _UNSUPPORTED_SPECS = { + _UNSUPPORTED_SPECS: ClassVar[Dict[str, Dict[str, Any]]] = { 'credentials': {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/tests/test_local_executor.py` around lines 751 - 817, Annotate the class-level test data _UNSUPPORTED_SPECS with typing.ClassVar to indicate it's an immutable class constant; import ClassVar (and optionally more specific types like Mapping/Dict from typing) and change the declaration to include a ClassVar[...] type annotation (e.g., _UNSUPPORTED_SPECS: ClassVar[Dict[str, Any]] = {...}) so static analyzers no longer treat it as a mutable class attribute.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@AGENTS.md`:
- Around line 123-124: Add a new table row in the "Codebase Structure" section
of AGENTS.md describing the newly added module utils/spec_includes.py
(referenced by the build target in src/utils/BUILD); list the file name
`utils/spec_includes.py`, mention its purpose as "helpers to resolve and
include/merge workflow spec fragments into full specs" and include the exported
symbols or entry points (e.g., the module name `spec_includes` or any main
functions/classes if present) so the table documents this new major module
alongside `utils/local_executor.py`.
In `@src/cli/local.py`:
- Line 2: The pylint disable is currently inside the module docstring string, so
move the "# pylint: disable=line-too-long" out of the docstring and place it on
its own line immediately before the module-level docstring in src/cli/local.py;
ensure the docstring starts on the next line and is unchanged, leaving the
standalone comment as the first non-shebang/non-license comment so pylint
recognizes it as a file-level directive.
In `@src/utils/tests/BUILD`:
- Around line 34-44: Replace the py_test macro for the test target named
"test_local_executor" with the osmo_py_test macro (i.e., change py_test(...) to
osmo_py_test(...)); ensure you keep the same attrs (srcs, deps, data, local =
True) because osmo_py_test forwards **kwargs to native.py_test so linting and
local=True behavior remain intact.
In `@src/utils/tests/test_local_executor.py`:
- Around line 1113-1127: The test contains local imports (import os as _os, from
src.lib.utils import workflow as workflow_utils, import yaml as _yaml) inside
the test body; move these to the module top-level imports (after the module
docstring) and update the test code to use the top-level names (e.g., replace
_os with os, _yaml with yaml, and use workflow_utils.resolve_includes) so
resolve_includes, main_path and spec_text usage remains unchanged and the module
now imports os, yaml, and src.lib.utils.workflow at top-level.
---
Nitpick comments:
In `@src/utils/local_executor.py`:
- Around line 431-436: Add a module-level docstring to
src/utils/local_executor.py stating that the module executes user-provided specs
via subprocess.run using assembled docker_args and thus those specs must be
trusted (similar to how `docker run` trusts its inputs); reference that
subprocess.run is used to invoke Docker, docker_args and TaskResult are involved
in execution/return, and note that the existing path traversal protection
remains in place so that callers should only pass specs from trusted sources.
In `@src/utils/spec_includes.py`:
- Around line 264-269: When catching yaml.YAMLError from yaml.safe_load in the
code that parses included_text (the block that raises osmo_errors.OSMOUserError
with the message f'Failed to parse included file "{include_path}": {yaml_err}'),
re-raise the OSMOUserError using exception chaining (raise
osmo_errors.OSMOUserError(...) from yaml_err) so the original yaml_err is
preserved on the trace; update the raise in that try/except to use "from
yaml_err".
- Around line 209-218: The except block that raises osmo_errors.OSMOUserError
when yaml.safe_load fails should chain the original YAML exception to preserve
tracebacks; update the raise in the except handler (the block that checks
re.search(r'^includes:', spec_text, re.MULTILINE)) to re-raise the OSMOUserError
using "from yaml_err" so the original yaml.YAMLError is attached (keep the same
message and conditions around spec_text and includes resolution).
In `@src/utils/tests/test_local_executor.py`:
- Around line 751-817: Annotate the class-level test data _UNSUPPORTED_SPECS
with typing.ClassVar to indicate it's an immutable class constant; import
ClassVar (and optionally more specific types like Mapping/Dict from typing) and
change the declaration to include a ClassVar[...] type annotation (e.g.,
_UNSUPPORTED_SPECS: ClassVar[Dict[str, Any]] = {...}) so static analyzers no
longer treat it as a mutable class attribute.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f52d60f8-8caf-4417-abff-187ab72dbbe8
📒 Files selected for processing (12)
.gitignoreAGENTS.mdcookbook/tutorials/BUILDsrc/cli/BUILDsrc/cli/local.pysrc/cli/main_parser.pysrc/utils/BUILDsrc/utils/local_executor.pysrc/utils/spec_includes.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.pysrc/utils/tests/test_spec_includes.py
- Added a new `workflow_local.yaml` file for local execution, providing a detailed guide on usage and modifications from the original `workflow.yaml`. - Updated `AGENTS.md` to include the new `spec_includes.py` utility for resolving workflow spec includes. - Improved documentation in `local_executor.py` to highlight security considerations regarding workflow spec execution. - Refined error handling in `spec_includes.py` to provide clearer error messages for YAML parsing issues. - Updated test cases in `test_local_executor.py` to ensure compatibility with the new local execution features and improved error handling.
- Updated `local_executor.py` to resolve `default-values` directly within the spec text, improving local execution capabilities. - Enhanced `spec_includes.py` with new functions to extract and resolve `default-values` and environment variable references. - Modified unit tests in `test_local_executor.py` to verify that `default-values` are resolved correctly and do not interfere with Jinja template detection. - Adjusted test cases to ensure that specs using `default-values` are processed correctly during local execution.
- Introduced a new `compose` subcommand in the CLI to resolve includes and default-values in workflow specifications. - Enhanced `local.py` to support the new command, allowing users to output a flat workflow spec. - Updated `workflow.py` to integrate `spec_includes` for resolving includes during file loading. - Added unit tests in `test_local_executor.py` to verify the functionality of the `compose` command, ensuring correct resolution of includes and default-values. - Modified BUILD files to include necessary dependencies for the new functionality.
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 480-485: The code only checks for Jinja markers in
template_markers but misses unresolved double-brace placeholders like '{{ ...
}}'; add a second check after the existing "if any(marker in spec_text for
marker in template_markers):" that scans spec_text for '{{' '}}' patterns and
raises the same ValueError unless the placeholder matches the executor's allowed
runtime tokens (implement an allowlist of supported tokens and match against
it), using the same error message and referencing spec_text and template_markers
to locate the change.
- Around line 355-373: The task runner currently reuses existing task
directories which can leave stale artifacts; in _run_task, before creating
files_dir/output_dir and writing files, delete any existing task_dir (or at
least output_dir and files_dir) contents and recreate those directories to
ensure a clean run; update the logic around task_dir, output_dir, and files_dir
to remove existing paths (e.g., via shutil.rmtree or by removing contents) and
then os.makedirs(files_dir, exist_ok=True) so the subsequent token substitution
and file writes always operate on fresh directories.
In `@src/utils/spec_includes.py`:
- Around line 293-299: The code merges 'default-values' without checking its
type, causing raw runtime errors for non-mapping values; before calling
deep_merge_dicts, validate that each included.get('default-values') and
spec_dict.get('default-values') is a mapping (e.g., isinstance(..., Mapping))
and if not raise an OSMOUserError with a clear message identifying the offending
include (use included or spec context), then proceed to merge into all_defaults
using deep_merge_dicts; reference variables/function names: included_dicts,
spec_dict, all_defaults, deep_merge_dicts, and OSMOUserError.
- Around line 209-230: The code currently returns raw spec_text when 'includes:'
isn't present which skips blueprint expansion; instead, after successfully
parsing YAML into spec_dict (i.e., inside the try block after yaml.safe_load and
before the early return), run the blueprint expansion function (e.g., call
_expand_refs_in_workflow(spec_dict) or whatever function is used to expand task
blueprints) so that documents using default-values blueprints like "- \"{{ train
}}\"" get expanded into concrete tasks; keep the existing behavior of returning
raw spec_text only when YAML parsing fails due to unquoted Jinja (the existing
yaml.YAMLError branch), and still only call _resolve_includes(spec_dict,
base_directory, ancestors) when 'includes' exists.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: ead60847-40cc-4253-a6ab-0180f7a97b6c
📒 Files selected for processing (7)
AGENTS.mdsrc/cli/local.pysrc/utils/local_executor.pysrc/utils/spec_includes.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.pysrc/utils/tests/test_spec_includes.py
✅ Files skipped from review due to trivial changes (1)
- src/utils/tests/BUILD
🚧 Files skipped from review as they are similar to previous changes (2)
- AGENTS.md
- src/cli/local.py
src/utils/local_executor.py
Outdated
| def _run_task(self, node: TaskNode, spec: workflow_module.WorkflowSpec) -> TaskResult: | ||
| """Execute a single task as a Docker container, mounting inputs/outputs/files and returning the result.""" | ||
| task_spec = node.spec | ||
| task_dir = os.path.join(self._work_dir, node.name) | ||
| output_dir = os.path.join(task_dir, 'output') | ||
| files_dir = os.path.join(task_dir, 'files') | ||
| os.makedirs(files_dir, exist_ok=True) | ||
|
|
||
| token_map = self._build_token_map(node, output_dir) | ||
|
|
||
| for file_spec in task_spec.files: | ||
| resolved_contents = self._substitute_tokens(file_spec.contents, token_map) | ||
| host_path = os.path.realpath(os.path.join(files_dir, file_spec.path.lstrip('/'))) | ||
| if not host_path.startswith(os.path.realpath(files_dir) + os.sep): | ||
| raise ValueError( | ||
| f'Task "{node.name}": file path "{file_spec.path}" escapes the task directory') | ||
| os.makedirs(os.path.dirname(host_path), exist_ok=True) | ||
| with open(host_path, 'w', encoding='utf-8') as f: | ||
| f.write(resolved_contents) |
There was a problem hiding this comment.
Recreate the task directories before each rerun.
Lines 358-361 reuse any existing output/ and files/ directories. If the caller reuses work_dir, resumes after a failure, or uses --from-step, stale artifacts survive and can be consumed by the rerun. Clear these directories before writing files and launching Docker.
🔧 Proposed fix
def _run_task(self, node: TaskNode, spec: workflow_module.WorkflowSpec) -> TaskResult:
"""Execute a single task as a Docker container, mounting inputs/outputs/files and returning the result."""
task_spec = node.spec
task_dir = os.path.join(self._work_dir, node.name)
output_dir = os.path.join(task_dir, 'output')
files_dir = os.path.join(task_dir, 'files')
- os.makedirs(files_dir, exist_ok=True)
+ shutil.rmtree(output_dir, ignore_errors=True)
+ shutil.rmtree(files_dir, ignore_errors=True)
+ os.makedirs(output_dir, exist_ok=True)
+ os.makedirs(files_dir, exist_ok=True)
token_map = self._build_token_map(node, output_dir)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/local_executor.py` around lines 355 - 373, The task runner
currently reuses existing task directories which can leave stale artifacts; in
_run_task, before creating files_dir/output_dir and writing files, delete any
existing task_dir (or at least output_dir and files_dir) contents and recreate
those directories to ensure a clean run; update the logic around task_dir,
output_dir, and files_dir to remove existing paths (e.g., via shutil.rmtree or
by removing contents) and then os.makedirs(files_dir, exist_ok=True) so the
subsequent token substitution and file writes always operate on fresh
directories.
| if 'includes:' not in spec_text: | ||
| return spec_text | ||
|
|
||
| ancestors: FrozenSet[str] = frozenset() | ||
| if source_path is not None: | ||
| ancestors = frozenset({os.path.normpath(os.path.abspath(source_path))}) | ||
|
|
||
| try: | ||
| spec_dict = yaml.safe_load(spec_text) | ||
| except yaml.YAMLError as yaml_err: | ||
| if re.search(r'^includes:', spec_text, re.MULTILINE): | ||
| raise osmo_errors.OSMOUserError( | ||
| 'Failed to parse workflow spec for includes resolution. ' | ||
| 'Specs using "includes" must be valid YAML — Jinja template ' | ||
| 'variables like {{ }} must be in quoted strings. ' | ||
| f'Parse error: {yaml_err}') from yaml_err | ||
| return spec_text | ||
|
|
||
| if not isinstance(spec_dict, dict) or 'includes' not in spec_dict: | ||
| return spec_text | ||
|
|
||
| return _resolve_includes(spec_dict, base_directory, ancestors) |
There was a problem hiding this comment.
Don't skip blueprint expansion when includes is absent.
Line 209 returns the raw text before _expand_refs_in_workflow() ever runs. A spec that only uses default-values task blueprints like - "{{ train }}" therefore reaches LocalExecutor.load_spec() with string tasks and fails, even though that syntax is part of the new feature. Parse-and-expand whenever the document is valid YAML, and keep the current raw-text fallback only for the unquoted-Jinja case.
🔧 Proposed fix
def resolve_includes(spec_text: str, base_directory: str,
source_path: str | None = None) -> str:
@@
- if 'includes:' not in spec_text:
- return spec_text
-
ancestors: FrozenSet[str] = frozenset()
if source_path is not None:
ancestors = frozenset({os.path.normpath(os.path.abspath(source_path))})
@@
- if not isinstance(spec_dict, dict) or 'includes' not in spec_dict:
+ if not isinstance(spec_dict, dict):
return spec_text
+
+ if 'includes' not in spec_dict:
+ default_values = spec_dict.get('default-values', {})
+ if isinstance(default_values, dict):
+ _expand_refs_in_workflow(spec_dict, default_values)
+ return yaml.safe_dump(spec_dict, default_flow_style=False, sort_keys=False)
+ return spec_text
return _resolve_includes(spec_dict, base_directory, ancestors)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/spec_includes.py` around lines 209 - 230, The code currently
returns raw spec_text when 'includes:' isn't present which skips blueprint
expansion; instead, after successfully parsing YAML into spec_dict (i.e., inside
the try block after yaml.safe_load and before the early return), run the
blueprint expansion function (e.g., call _expand_refs_in_workflow(spec_dict) or
whatever function is used to expand task blueprints) so that documents using
default-values blueprints like "- \"{{ train }}\"" get expanded into concrete
tasks; keep the existing behavior of returning raw spec_text only when YAML
parsing fails due to unquoted Jinja (the existing yaml.YAMLError branch), and
still only call _resolve_includes(spec_dict, base_directory, ancestors) when
'includes' exists.
| all_defaults: Dict[str, Any] = {} | ||
| for included in included_dicts: | ||
| all_defaults = deep_merge_dicts( | ||
| all_defaults, included.get('default-values', {})) | ||
| all_defaults = deep_merge_dicts( | ||
| all_defaults, spec_dict.get('default-values', {})) | ||
|
|
There was a problem hiding this comment.
Validate default-values before merging it.
Lines 295-298 pass whatever is under default-values straight into deep_merge_dicts(). If a user writes a scalar or list there, this path raises a raw runtime error instead of an OSMOUserError, and it happens before schema validation can help. Please reject non-mapping default-values explicitly here.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/spec_includes.py` around lines 293 - 299, The code merges
'default-values' without checking its type, causing raw runtime errors for
non-mapping values; before calling deep_merge_dicts, validate that each
included.get('default-values') and spec_dict.get('default-values') is a mapping
(e.g., isinstance(..., Mapping)) and if not raise an OSMOUserError with a clear
message identifying the offending include (use included or spec context), then
proceed to merge into all_defaults using deep_merge_dicts; reference
variables/function names: included_dicts, spec_dict, all_defaults,
deep_merge_dicts, and OSMOUserError.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/tests/test_local_executor.py`:
- Around line 1395-1451: The tests import _load_wf_file inside two test
functions (test_load_wf_file_flattens_includes and
test_load_wf_file_preserves_template_markers); move the line "from
src.cli.workflow import _load_wf_file" to the top of the test module (after the
module docstring) so the import is module-level, then remove the in-function
imports in both test methods and run tests to ensure nothing else relies on
local import behavior.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: f2926122-35ca-45ba-8957-5d54955061fa
📒 Files selected for processing (5)
src/cli/BUILDsrc/cli/local.pysrc/cli/workflow.pysrc/utils/tests/BUILDsrc/utils/tests/test_local_executor.py
✅ Files skipped from review due to trivial changes (1)
- src/utils/tests/BUILD
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/BUILD
| def test_load_wf_file_flattens_includes(self): | ||
| """_load_wf_file merges included files so the server receives a flat spec.""" | ||
| from src.cli.workflow import _load_wf_file | ||
|
|
||
| self._write_file('base.yaml', '''\ | ||
| workflow: | ||
| resources: | ||
| default: | ||
| cpu: 8 | ||
| ''') | ||
| main_path = self._write_file('main.yaml', '''\ | ||
| includes: | ||
| - base.yaml | ||
| workflow: | ||
| name: test-wf | ||
| tasks: | ||
| - name: task1 | ||
| image: alpine:3.18 | ||
| command: ["echo"] | ||
| ''') | ||
|
|
||
| template_data = _load_wf_file(main_path, [], []) | ||
| parsed = yaml.safe_load(template_data.file) | ||
|
|
||
| self.assertNotIn('includes', parsed) | ||
| self.assertEqual(parsed['workflow']['resources']['default']['cpu'], 8) | ||
| self.assertEqual(parsed['workflow']['name'], 'test-wf') | ||
|
|
||
| def test_load_wf_file_preserves_template_markers(self): | ||
| """Includes are resolved but Jinja/default-values markers are preserved for the server.""" | ||
| from src.cli.workflow import _load_wf_file | ||
|
|
||
| self._write_file('base.yaml', '''\ | ||
| workflow: | ||
| resources: | ||
| default: | ||
| cpu: 4 | ||
| ''') | ||
| main_path = self._write_file('main.yaml', '''\ | ||
| includes: | ||
| - base.yaml | ||
| workflow: | ||
| name: "{{experiment}}" | ||
| tasks: | ||
| - name: task1 | ||
| image: alpine:3.18 | ||
| command: ["echo"] | ||
| default-values: | ||
| experiment: my-exp | ||
| ''') | ||
|
|
||
| template_data = _load_wf_file(main_path, [], []) | ||
| self.assertTrue(template_data.is_templated) | ||
| self.assertNotIn('includes', template_data.file) | ||
| self.assertIn('default-values', template_data.file) | ||
| self.assertIn('{{experiment}}', template_data.file) | ||
|
|
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Move _load_wf_file import to module top level.
Lines 1397 and 1425 contain in-function imports which violate the coding guidelines. Move the import to the module level.
from src.utils import spec_includes
from src.utils.job import task as task_module
from src.utils.local_executor import LocalExecutor, TaskNode, TaskResult, run_workflow_locally
+from src.cli.workflow import _load_wf_fileThen update the test methods:
def test_load_wf_file_flattens_includes(self):
"""_load_wf_file merges included files so the server receives a flat spec."""
- from src.cli.workflow import _load_wf_file
self._write_file('base.yaml', '''\ def test_load_wf_file_preserves_template_markers(self):
"""Includes are resolved but Jinja/default-values markers are preserved for the server."""
- from src.cli.workflow import _load_wf_file
self._write_file('base.yaml', '''\As per coding guidelines: "All imports must be at the top level of the module after the module docstring. No exceptions: imports inside functions are not allowed."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/tests/test_local_executor.py` around lines 1395 - 1451, The tests
import _load_wf_file inside two test functions
(test_load_wf_file_flattens_includes and
test_load_wf_file_preserves_template_markers); move the line "from
src.cli.workflow import _load_wf_file" to the top of the test module (after the
module docstring) so the import is module-level, then remove the in-function
imports in both test methods and run tests to ensure nothing else relies on
local import behavior.
- Added a new `pyproject.toml` file to define project metadata and dependencies for the `nvidia-osmo` package. - Updated `.gitignore` to include `build/` and `*.egg-info/` directories, improving the management of build artifacts and Python package metadata. - Modified `spec_includes.py` to use `copy.deepcopy` for task dictionary handling, ensuring that task references are expanded correctly without unintended side effects.
- Updated function definitions in `local.py` and `local_executor.py` to disable pylint warnings for unused arguments, improving code clarity. - Reformatted docstrings across multiple methods in `local_executor.py` for better readability, ensuring they adhere to PEP 257 standards. - Enhanced logging messages for clarity and consistency, particularly in error handling and GPU detection scenarios.
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (2)
src/utils/local_executor.py (2)
507-512:⚠️ Potential issue | 🟠 MajorReject leftover
{{ ... }}placeholders here too.This only blocks
{%and{#}. Unsupported double-brace placeholders can still survive include/default-value resolution and either get passed to Docker literally or fail much later. Add a second check here with an allowlist for the runtime tokens this executor actually supports (outputandinput:*).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 507 - 512, The current Jinja-detection only checks for '{%' and '{#' in spec_text (template_markers) but misses unsupported '{{ ... }}' placeholders; update the logic in the same function handling spec_text to also scan for double-brace tokens and reject any that are not in the executor's allowlist (exactly 'output' or 'input:<name>' patterns). Specifically, after the existing template_markers check, parse occurrences of '{{...}}' in spec_text, validate the inner token against the allowed forms ('output' or 'input:*'), and if any token is disallowed raise the same ValueError with the user-facing instruction about running server-side expansion (mirroring the existing error message).
379-395:⚠️ Potential issue | 🟠 MajorRecreate
output/andfiles/on each rerun.Reusing a
work_dir,--resume, or--from-stepcan leave stale artifacts in both directories. The next run can then read old inline files or mix old outputs with new ones.🔧 Suggested fix
task_spec = node.spec task_dir = os.path.join(self._work_dir, node.name) output_dir = os.path.join(task_dir, 'output') files_dir = os.path.join(task_dir, 'files') - os.makedirs(files_dir, exist_ok=True) + shutil.rmtree(output_dir, ignore_errors=True) + shutil.rmtree(files_dir, ignore_errors=True) + os.makedirs(output_dir, exist_ok=True) + os.makedirs(files_dir, exist_ok=True) token_map = self._build_token_map(node, output_dir)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/utils/local_executor.py` around lines 379 - 395, The code currently reuses existing output_dir and files_dir which can leave stale artifacts; before building token_map and writing task files in _exec_task (or the function containing task_spec, task_dir, output_dir, files_dir), ensure you recreate/clean both output_dir and files_dir (e.g., remove existing directories if present then os.makedirs) so outputs and inline files are not mixed with previous runs; update the logic around output_dir/files_dir creation (the block using os.makedirs(files_dir, exist_ok=True) and subsequent file writes, and functions _build_token_map/_substitute_tokens where applicable) to delete and recreate these dirs safely (use shutil.rmtree or equivalent) prior to writing.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/utils/local_executor.py`:
- Around line 141-145: The resume/restore logic must run before creating task
directories: move the call to _restore_completed_tasks(from_step) to occur
immediately after _validate_for_local(spec) and before _setup_directories() so
that previously completed tasks remain detected (do not have their output_dir
artificially created); update the method containing these calls (the function
that currently calls _validate_for_local, _setup_directories, and
_restore_completed_tasks) so it invokes _restore_completed_tasks when resume or
from_step is set prior to calling _setup_directories().
- Around line 214-231: In _restore_completed_tasks, validate the provided
from_step before bailing on a missing state: either check at the start (if
from_step and from_step not in self._task_nodes: raise ValueError(...)) or after
calling _load_state() if state is None then raise the same ValueError when
from_step is set; reference the _restore_completed_tasks method, the from_step
parameter, self._load_state(), and self._task_nodes to locate where to add the
validation so a typoed step or wrong work-dir fails fast instead of silently
starting from scratch.
---
Duplicate comments:
In `@src/utils/local_executor.py`:
- Around line 507-512: The current Jinja-detection only checks for '{%' and '{#'
in spec_text (template_markers) but misses unsupported '{{ ... }}' placeholders;
update the logic in the same function handling spec_text to also scan for
double-brace tokens and reject any that are not in the executor's allowlist
(exactly 'output' or 'input:<name>' patterns). Specifically, after the existing
template_markers check, parse occurrences of '{{...}}' in spec_text, validate
the inner token against the allowed forms ('output' or 'input:*'), and if any
token is disallowed raise the same ValueError with the user-facing instruction
about running server-side expansion (mirroring the existing error message).
- Around line 379-395: The code currently reuses existing output_dir and
files_dir which can leave stale artifacts; before building token_map and writing
task files in _exec_task (or the function containing task_spec, task_dir,
output_dir, files_dir), ensure you recreate/clean both output_dir and files_dir
(e.g., remove existing directories if present then os.makedirs) so outputs and
inline files are not mixed with previous runs; update the logic around
output_dir/files_dir creation (the block using os.makedirs(files_dir,
exist_ok=True) and subsequent file writes, and functions
_build_token_map/_substitute_tokens where applicable) to delete and recreate
these dirs safely (use shutil.rmtree or equivalent) prior to writing.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 44cee926-630f-4de1-a538-06ed160c9041
📒 Files selected for processing (2)
src/cli/local.pysrc/utils/local_executor.py
🚧 Files skipped from review as they are similar to previous changes (1)
- src/cli/local.py
| self._validate_for_local(spec) | ||
| self._setup_directories() | ||
|
|
||
| if resume or from_step: | ||
| self._restore_completed_tasks(from_step) |
There was a problem hiding this comment.
Restore resume state before creating task directories.
_setup_directories() creates every <task>/output directory before the resume logic runs. That makes the later os.path.isdir(info['output_dir']) check effectively always true for tasks in the current spec, so a partially cleaned work dir can skip tasks even though their prior outputs are gone.
🔧 Suggested fix
self._results.clear()
self._build_dag(spec)
self._validate_for_local(spec)
- self._setup_directories()
if resume or from_step:
self._restore_completed_tasks(from_step)
+ self._setup_directories()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/local_executor.py` around lines 141 - 145, The resume/restore logic
must run before creating task directories: move the call to
_restore_completed_tasks(from_step) to occur immediately after
_validate_for_local(spec) and before _setup_directories() so that previously
completed tasks remain detected (do not have their output_dir artificially
created); update the method containing these calls (the function that currently
calls _validate_for_local, _setup_directories, and _restore_completed_tasks) so
it invokes _restore_completed_tasks when resume or from_step is set prior to
calling _setup_directories().
| def _restore_completed_tasks(self, from_step: str | None = None): | ||
| """Reload completed tasks from a previous run, optionally | ||
| invalidating from a given step onward.""" | ||
| state = self._load_state() | ||
| if state is None: | ||
| logger.info('No previous state found — starting from scratch') | ||
| return | ||
|
|
||
| completed: Dict[str, Dict] = {} | ||
| for name, info in state.get('tasks', {}).items(): | ||
| if name not in self._task_nodes: | ||
| continue | ||
| if info['exit_code'] == 0 and os.path.isdir(info['output_dir']): | ||
| completed[name] = info | ||
|
|
||
| if from_step: | ||
| if from_step not in self._task_nodes: | ||
| raise ValueError(f'Task "{from_step}" not found in workflow') |
There was a problem hiding this comment.
Don't silently ignore from_step when the state file is missing.
This returns early on missing state before from_step is validated. A typoed step name or wrong --work-dir falls back to a full run instead of failing fast.
🔧 Suggested fix
def _restore_completed_tasks(self, from_step: str | None = None):
"""Reload completed tasks from a previous run, optionally
invalidating from a given step onward."""
+ if from_step and from_step not in self._task_nodes:
+ raise ValueError(f'Task "{from_step}" not found in workflow')
+
state = self._load_state()
if state is None:
- logger.info('No previous state found — starting from scratch')
- return
+ if from_step:
+ raise ValueError(
+ '--from-step requires an existing local execution state file')
+ logger.info('No previous state found — starting from scratch')
+ return
completed: Dict[str, Dict] = {}
for name, info in state.get('tasks', {}).items():
if name not in self._task_nodes:
continue
@@
- if from_step:
- if from_step not in self._task_nodes:
- raise ValueError(f'Task "{from_step}" not found in workflow')
+ if from_step:
to_invalidate = self._get_downstream_tasks(from_step)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/local_executor.py` around lines 214 - 231, In
_restore_completed_tasks, validate the provided from_step before bailing on a
missing state: either check at the start (if from_step and from_step not in
self._task_nodes: raise ValueError(...)) or after calling _load_state() if state
is None then raise the same ValueError when from_step is set; reference the
_restore_completed_tasks method, the from_step parameter, self._load_state(),
and self._task_nodes to locate where to add the validation so a typoed step or
wrong work-dir fails fast instead of silently starting from scratch.
- Refactored `LocalExecutor` to generate a `docker-compose.yml` for local workflow execution, replacing direct Docker commands. - Improved parallel execution capabilities with native dependency handling and cycle detection. - Updated documentation in `AGENTS.md` to reflect changes in local execution methodology. - Enhanced unit tests to verify cycle detection and ensure correct behavior of the new Docker Compose integration.
…dation - Introduced support for extra Docker volume mounts in the `LocalExecutor` class, allowing users to specify additional host paths for container visibility. - Updated command handling to validate entrypoint commands, ensuring proper execution of specified commands within containers. - Improved task dependency management for flat task lists, ensuring sequential execution aligns with on-cluster behavior. - Adjusted the `TaskSpec` model to allow for an empty command list, enabling the use of default container entrypoints.
- Updated the `LocalExecutor` class to simplify the exit code checks for task failures. - Enhanced unit tests to verify behavior for tasks in separate groups with no input dependencies, ensuring correct upstream and downstream relationships. - Adjusted test cases for independent parallel tasks to reflect the new grouping structure, improving clarity and accuracy in dependency management.
…olution
- Introduced functionality to resolve `${env:VAR}` references in `default-values`, allowing for dynamic substitution based on environment variables.
- Updated the `compose` command in `local.py` to handle unresolved environment variables, providing warnings to users when variables are not set.
- Enhanced `LocalExecutor` to raise informative errors when required environment variables are missing during local execution.
- Added unit tests to verify the behavior of environment variable resolution and ensure proper error handling for unset variables.
Description
Workflow specs tend to duplicate shared configuration (resources, timeouts, default-values) across projects. The includes system enables DRY spec authoring by composing reusable building blocks.
Checklist
Summary by CodeRabbit
New Features
osmo local run(Docker-based workflow execution with DAG scheduling, resume/from-step, work-dir preservation) andosmo local compose(resolve/merge YAML includes and default-values; write or print composed spec)Tests
Chores
Packaging/Docs