feat: track log line count per workflow and task to enable log truncation observability#758
feat: track log line count per workflow and task to enable log truncation observability#758elookpotts-nvidia wants to merge 1 commit intomainfrom
Conversation
…tion observability
📝 WalkthroughWalkthroughA new Changes
Sequence DiagramsequenceDiagram
participant Client as WebSocket Client
participant Logger as Log Ingestion Service
participant Redis
participant JobCleanup as Job Cleanup Service
participant Database
Client->>Logger: Stream log messages
Logger->>Redis: Increment {workflow_id}-log-count
Logger->>Redis: Increment {workflow_id}-{task}-{retry}-log-count
Logger->>Redis: Set TTL expiration (first message)
Note over Logger,Redis: Per each log message received
JobCleanup->>Redis: Read {workflow_id}-log-count
JobCleanup->>Redis: Read all {workflow_id}-{task}-{retry}-log-count keys
JobCleanup->>Database: Call update_log_line_count_to_db() for workflows
JobCleanup->>Database: Call update_log_line_count_to_db() for tasks
JobCleanup->>Database: Update persisted counts (if sentinel -1)
JobCleanup->>Redis: Delete all log count keys
JobCleanup->>Redis: Delete workflow logs and task logs
Note over JobCleanup,Database: During workflow cleanup phase
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~35 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #758 +/- ##
==========================================
- Coverage 42.84% 42.73% -0.11%
==========================================
Files 203 203
Lines 26844 26865 +21
Branches 7603 7607 +4
==========================================
- Hits 11500 11480 -20
- Misses 15233 15278 +45
+ Partials 111 107 -4
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/service/logger/ctrl_websocket.py (1)
207-225:⚠️ Potential issue | 🟠 MajorDon't consume
first_runbefore the first actual log write.If the first websocket frame is
METRICS,BARRIER, orLOG_DONE, theseEXPIREcalls are all no-ops because the keys do not exist yet.first_runstill flips toFalse, so the log streams and the new count keys never get a TTL afterward.🛠️ Suggested fix
else: if io_type.workflow_logs() and (first_run or\ datetime.datetime.now() - last_heartbeat_check > heartbeat_freq_dt): last_heartbeat_check = datetime.datetime.now() cmd = ''' @@ await redis_client.incr( f'{workflow_obj.workflow_id}-log-count') await redis_client.incr( f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count') - # Set expiration on first log message - if first_run: - first_run = False - await redis_client.expire(f'{workflow_obj.workflow_id}-logs', - connectors.MAX_LOG_TTL) - await redis_client.expire( - common.get_redis_task_log_name( - workflow_obj.workflow_id, task_name, retry_id), - connectors.MAX_LOG_TTL) - await redis_client.expire( - f'{workflow_obj.workflow_id}-log-count', - connectors.MAX_LOG_TTL) - await redis_client.expire( - f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count', - connectors.MAX_LOG_TTL) + # Set expiration on the first actual log message. + if first_run: + first_run = False + await redis_client.expire( + f'{workflow_obj.workflow_id}-logs', + connectors.MAX_LOG_TTL) + await redis_client.expire( + common.get_redis_task_log_name( + workflow_obj.workflow_id, task_name, retry_id), + connectors.MAX_LOG_TTL) + await redis_client.expire( + f'{workflow_obj.workflow_id}-log-count', + connectors.MAX_LOG_TTL) + await redis_client.expire( + f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count', + connectors.MAX_LOG_TTL)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/service/logger/ctrl_websocket.py` around lines 207 - 225, The code flips the first_run flag before any actual log keys are created, causing the expire() calls to be skipped for later log writes when the first websocket frames are METRICS/BARRIER/LOG_DONE; update the logic around first_run (the first_run boolean, the redis_client.incr calls, and the redis_client.expire calls for f'{workflow_obj.workflow_id}-logs', common.get_redis_task_log_name(workflow_obj.workflow_id, task_name, retry_id), f'{workflow_obj.workflow_id}-log-count', and f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count') so that first_run is only set to False after you have actually created/updated the log keys (e.g., after performing the incr/write for a LOG frame or after confirming the relevant keys exist), or alternatively guard the expire block to run only when the current frame is a log-producing frame, ensuring the TTLs are applied on the first real log write.
🧹 Nitpick comments (1)
src/service/core/workflow/objects.py (1)
1084-1086: Replace the raw-1with a shared sentinel constant.This tuple is already positional and easy to misalign. Inlining the
NULL / -1 / >=0state marker here makes it harder to keep the insert path, cleanup predicate, and tests in sync.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/service/core/workflow/objects.py` around lines 1084 - 1086, Replace the raw -1 used as a sentinel in the tuple with a shared constant to avoid positional misalignment; define a clearly named sentinel (e.g., NO_LEAD or UNSET_INDEX) at module scope in objects.py (or in the existing module constants) and use that constant instead of the literal -1 where the tuple is constructed (around task_obj.exit_actions, task_obj.lead, -1) and ensure any other code paths—insert path, cleanup predicate, and tests—that check for -1 are updated to reference the new constant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/tests/common/database/testdata/schema.sql`:
- Around line 60-63: The test fixture defines the workflows.table column "pool"
as NOT NULL DEFAULT '' which differs from production where "pool" is nullable;
update the CREATE TABLE for workflows to declare pool as nullable text (e.g.,
just "pool TEXT" with no NOT NULL or DEFAULT) so the test schema matches
production, and adjust any tests that currently rely on empty-string behavior to
account for NULLs instead; target the workflows table definition (column "pool")
in the schema.sql used by tests.
In `@src/utils/job/jobs.py`:
- Around line 1509-1528: The code only updates DB log_line_count when Redis keys
exist, leaving rows at -1 for quiet workflows; change the logic around
workflow_log_count_key and task_log_count_key so that after reading
redis_client.get(...) you treat a missing value as 0 and call
workflow_obj.update_log_line_count_to_db(int_value) /
task_obj.update_log_line_count_to_db(int_value) unconditionally (i.e., compute
int_value = int(raw) if raw is not None else 0), while still appending the keys
to redis_keys_to_delete and keeping the current calls to
common.get_redis_task_log_name and redis_keys_to_delete management.
---
Outside diff comments:
In `@src/service/logger/ctrl_websocket.py`:
- Around line 207-225: The code flips the first_run flag before any actual log
keys are created, causing the expire() calls to be skipped for later log writes
when the first websocket frames are METRICS/BARRIER/LOG_DONE; update the logic
around first_run (the first_run boolean, the redis_client.incr calls, and the
redis_client.expire calls for f'{workflow_obj.workflow_id}-logs',
common.get_redis_task_log_name(workflow_obj.workflow_id, task_name, retry_id),
f'{workflow_obj.workflow_id}-log-count', and
f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count') so that
first_run is only set to False after you have actually created/updated the log
keys (e.g., after performing the incr/write for a LOG frame or after confirming
the relevant keys exist), or alternatively guard the expire block to run only
when the current frame is a log-producing frame, ensuring the TTLs are applied
on the first real log write.
---
Nitpick comments:
In `@src/service/core/workflow/objects.py`:
- Around line 1084-1086: Replace the raw -1 used as a sentinel in the tuple with
a shared constant to avoid positional misalignment; define a clearly named
sentinel (e.g., NO_LEAD or UNSET_INDEX) at module scope in objects.py (or in the
existing module constants) and use that constant instead of the literal -1 where
the tuple is constructed (around task_obj.exit_actions, task_obj.lead, -1) and
ensure any other code paths—insert path, cleanup predicate, and tests—that check
for -1 are updated to reference the new constant.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 7e607f2e-f2ea-46bc-baa5-e0aa474ba4a3
📒 Files selected for processing (10)
deployments/charts/service/migrations/005_v6_2_0_schema.jsonsrc/service/core/workflow/objects.pysrc/service/logger/ctrl_websocket.pysrc/tests/common/database/testdata/schema.sqlsrc/utils/connectors/postgres.pysrc/utils/job/jobs.pysrc/utils/job/task.pysrc/utils/job/tests/BUILDsrc/utils/job/tests/test_log_line_count.pysrc/utils/job/workflow.py
| CREATE TABLE IF NOT EXISTS workflows ( | ||
| workflow_id TEXT PRIMARY KEY, | ||
| pool TEXT NOT NULL DEFAULT '' | ||
| pool TEXT NOT NULL DEFAULT '', | ||
| log_line_count INTEGER |
There was a problem hiding this comment.
Match the fixture's workflows.pool definition to production.
src/utils/connectors/postgres.py still defines this column as nullable, but this fixture makes it NOT NULL DEFAULT ''. That changes NULL vs empty-string behavior and can hide bugs that only show up against the real schema.
🔧 Suggested change
CREATE TABLE IF NOT EXISTS workflows (
workflow_id TEXT PRIMARY KEY,
- pool TEXT NOT NULL DEFAULT '',
+ pool TEXT,
log_line_count INTEGER
);📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| CREATE TABLE IF NOT EXISTS workflows ( | |
| workflow_id TEXT PRIMARY KEY, | |
| pool TEXT NOT NULL DEFAULT '' | |
| pool TEXT NOT NULL DEFAULT '', | |
| log_line_count INTEGER | |
| CREATE TABLE IF NOT EXISTS workflows ( | |
| workflow_id TEXT PRIMARY KEY, | |
| pool TEXT, | |
| log_line_count INTEGER |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/tests/common/database/testdata/schema.sql` around lines 60 - 63, The test
fixture defines the workflows.table column "pool" as NOT NULL DEFAULT '' which
differs from production where "pool" is nullable; update the CREATE TABLE for
workflows to declare pool as nullable text (e.g., just "pool TEXT" with no NOT
NULL or DEFAULT) so the test schema matches production, and adjust any tests
that currently rely on empty-string behavior to account for NULLs instead;
target the workflows table definition (column "pool") in the schema.sql used by
tests.
| # Read and persist log line counts before deleting Redis keys | ||
| workflow_log_count_key = f'{self.workflow_id}-log-count' | ||
| workflow_log_count_raw = redis_client.get(workflow_log_count_key) | ||
| if workflow_log_count_raw is not None: | ||
| workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw)) | ||
|
|
||
| # Remove logs from Redis | ||
| redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key] | ||
| redis_keys_to_delete : List[str] = [ | ||
| workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key] | ||
| for group in workflow_obj.groups: | ||
| for task_obj in group.tasks: | ||
| task_redis_path = common.get_redis_task_log_name( | ||
| self.workflow_id, task_obj.name, task_obj.retry_id) | ||
| redis_keys_to_delete.append(task_redis_path) | ||
| task_log_count_key = ( | ||
| f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count') | ||
| task_log_count_raw = redis_client.get(task_log_count_key) | ||
| if task_log_count_raw is not None: | ||
| task_obj.update_log_line_count_to_db(int(task_log_count_raw)) | ||
| redis_keys_to_delete.append(task_log_count_key) |
There was a problem hiding this comment.
Finalize missing Redis counters as 0, not -1.
Lines 1512-1513 and Lines 1526-1527 only persist log_line_count when the Redis key exists. A quiet workflow/task can finish without ever creating that key, so cleanup leaves the row at -1 forever even though cleanup already completed. That breaks the sentinel contract (-1 = unfinalized, 0 = finalized with no lines) and will skew any downstream SQL/Grafana analysis. Since update_log_line_count_to_db() already ignores legacy NULL rows, this path can safely default a missing key to 0 and update unconditionally.
💡 Suggested fix
- workflow_log_count_raw = redis_client.get(workflow_log_count_key)
- if workflow_log_count_raw is not None:
- workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw))
+ workflow_log_count_raw = redis_client.get(workflow_log_count_key)
+ workflow_log_count = 0 if workflow_log_count_raw is None else int(workflow_log_count_raw)
+ workflow_obj.update_log_line_count_to_db(workflow_log_count)
@@
- task_log_count_raw = redis_client.get(task_log_count_key)
- if task_log_count_raw is not None:
- task_obj.update_log_line_count_to_db(int(task_log_count_raw))
+ task_log_count_raw = redis_client.get(task_log_count_key)
+ task_log_count = 0 if task_log_count_raw is None else int(task_log_count_raw)
+ task_obj.update_log_line_count_to_db(task_log_count)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Read and persist log line counts before deleting Redis keys | |
| workflow_log_count_key = f'{self.workflow_id}-log-count' | |
| workflow_log_count_raw = redis_client.get(workflow_log_count_key) | |
| if workflow_log_count_raw is not None: | |
| workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw)) | |
| # Remove logs from Redis | |
| redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key] | |
| redis_keys_to_delete : List[str] = [ | |
| workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key] | |
| for group in workflow_obj.groups: | |
| for task_obj in group.tasks: | |
| task_redis_path = common.get_redis_task_log_name( | |
| self.workflow_id, task_obj.name, task_obj.retry_id) | |
| redis_keys_to_delete.append(task_redis_path) | |
| task_log_count_key = ( | |
| f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count') | |
| task_log_count_raw = redis_client.get(task_log_count_key) | |
| if task_log_count_raw is not None: | |
| task_obj.update_log_line_count_to_db(int(task_log_count_raw)) | |
| redis_keys_to_delete.append(task_log_count_key) | |
| # Read and persist log line counts before deleting Redis keys | |
| workflow_log_count_key = f'{self.workflow_id}-log-count' | |
| workflow_log_count_raw = redis_client.get(workflow_log_count_key) | |
| workflow_log_count = 0 if workflow_log_count_raw is None else int(workflow_log_count_raw) | |
| workflow_obj.update_log_line_count_to_db(workflow_log_count) | |
| # Remove logs from Redis | |
| redis_keys_to_delete : List[str] = [ | |
| workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key] | |
| for group in workflow_obj.groups: | |
| for task_obj in group.tasks: | |
| task_redis_path = common.get_redis_task_log_name( | |
| self.workflow_id, task_obj.name, task_obj.retry_id) | |
| redis_keys_to_delete.append(task_redis_path) | |
| task_log_count_key = ( | |
| f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count') | |
| task_log_count_raw = redis_client.get(task_log_count_key) | |
| task_log_count = 0 if task_log_count_raw is None else int(task_log_count_raw) | |
| task_obj.update_log_line_count_to_db(task_log_count) | |
| redis_keys_to_delete.append(task_log_count_key) |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/utils/job/jobs.py` around lines 1509 - 1528, The code only updates DB
log_line_count when Redis keys exist, leaving rows at -1 for quiet workflows;
change the logic around workflow_log_count_key and task_log_count_key so that
after reading redis_client.get(...) you treat a missing value as 0 and call
workflow_obj.update_log_line_count_to_db(int_value) /
task_obj.update_log_line_count_to_db(int_value) unconditionally (i.e., compute
int_value = int(raw) if raw is not None else 0), while still appending the keys
to redis_keys_to_delete and keeping the current calls to
common.get_redis_task_log_name and redis_keys_to_delete management.
Summary
Adds a
log_line_countcolumn to theworkflowsandtaskstables so that the total number of log lines produced by each workflow and task is recorded persistently for the first time, enabling truncation analysis and capacity planning via SQL.Design Changes
NULL= predates change,-1= initialized but not yet finalized,>= 0= finalized) makes it unambiguous whether a completed workflow genuinely produced zero log lines versus whether the counter key expired before cleanup ran.CleanupWorkflowis the single consumer of the Redis counter: reads it once at workflow completion, writes to PostgreSQL, and deletes the key — no Prometheus metrics needed since Grafana queries PostgreSQL directly.Change Log
005_v6_2_0_schema.json) addinglog_line_count INTEGERtoworkflowsandtaskstables for existing deployments.CREATE TABLEdefinitions in_init_tablesso fresh deployments and test environments also have the column.WorkflowandTaskPydantic models with thelog_line_countfield and three-state sentinel semantics documented inline.insert_to_dbandbatch_insert_to_dbon both models to write-1at submission time as the initialization sentinel.update_log_line_count_to_dbto both models, guarded withWHERE log_line_count = -1to prevent overwriting a finalized count on CleanupWorkflow retry.ctrl_websocket.pyafter each XADD to the workflow and per-task log streams, with TTL set via the existingfirst_runguard.CleanupWorkflowto read both workflow-level and per-task Redis counters, write final counts to PostgreSQL, and include counter keys in the existing bulk Redis DELETE.Issue #None
Testing:
bazel test //src/utils/job/tests:test_log_line_count— all 11 tests passChecklist
Summary by CodeRabbit
New Features
Tests