Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions deployments/charts/service/migrations/005_v6_2_0_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"operations": [
{
"sql": {
"up": "ALTER TABLE workflows ADD COLUMN IF NOT EXISTS log_line_count INTEGER;"
}
},
{
"sql": {
"up": "ALTER TABLE tasks ADD COLUMN IF NOT EXISTS log_line_count INTEGER;"
}
}
]
}
1 change: 1 addition & 0 deletions src/service/core/workflow/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -1083,6 +1083,7 @@ def send_submit_workflow_to_queue(self,
task_obj_spec.resources.memory or '0', 'GiB'),
json.dumps(task_obj.exit_actions, default=common.pydantic_encoder),
task_obj.lead,
-1,
))
task.Task.batch_insert_to_db(postgres, task_entries)

Expand Down
10 changes: 10 additions & 0 deletions src/service/logger/ctrl_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,10 @@ async def get_logs(websocket):
workflow_obj.workflow_id, task_name, retry_id),
json.loads(logs.json()),
maxlen=workflow_config.max_task_log_lines)
await redis_client.incr(
f'{workflow_obj.workflow_id}-log-count')
await redis_client.incr(
f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count')
# Set expiration on first log message
if first_run:
first_run = False
Expand All @@ -213,6 +217,12 @@ async def get_logs(websocket):
common.get_redis_task_log_name(
workflow_obj.workflow_id, task_name, retry_id),
connectors.MAX_LOG_TTL)
await redis_client.expire(
f'{workflow_obj.workflow_id}-log-count',
connectors.MAX_LOG_TTL)
await redis_client.expire(
f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count',
connectors.MAX_LOG_TTL)

# If there is an action request (i.e. exec and port-forward), pull it from the queue
# and relay that request to osmo-ctrl through this websocket connection
Expand Down
3 changes: 2 additions & 1 deletion src/tests/common/database/testdata/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ CREATE TABLE IF NOT EXISTS pools (

CREATE TABLE IF NOT EXISTS workflows (
workflow_id TEXT PRIMARY KEY,
pool TEXT NOT NULL DEFAULT ''
pool TEXT NOT NULL DEFAULT '',
log_line_count INTEGER
Comment on lines 60 to +63
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Match the fixture's workflows.pool definition to production.

src/utils/connectors/postgres.py still defines this column as nullable, but this fixture makes it NOT NULL DEFAULT ''. That changes NULL vs empty-string behavior and can hide bugs that only show up against the real schema.

🔧 Suggested change
 CREATE TABLE IF NOT EXISTS workflows (
     workflow_id TEXT PRIMARY KEY,
-    pool TEXT NOT NULL DEFAULT '',
+    pool TEXT,
     log_line_count INTEGER
 );
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
CREATE TABLE IF NOT EXISTS workflows (
workflow_id TEXT PRIMARY KEY,
pool TEXT NOT NULL DEFAULT ''
pool TEXT NOT NULL DEFAULT '',
log_line_count INTEGER
CREATE TABLE IF NOT EXISTS workflows (
workflow_id TEXT PRIMARY KEY,
pool TEXT,
log_line_count INTEGER
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/tests/common/database/testdata/schema.sql` around lines 60 - 63, The test
fixture defines the workflows.table column "pool" as NOT NULL DEFAULT '' which
differs from production where "pool" is nullable; update the CREATE TABLE for
workflows to declare pool as nullable text (e.g., just "pool TEXT" with no NOT
NULL or DEFAULT) so the test schema matches production, and adjust any tests
that currently rely on empty-string behavior to account for NULLs instead;
target the workflows table definition (column "pool") in the schema.sql used by
tests.

);
2 changes: 2 additions & 0 deletions src/utils/connectors/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -819,6 +819,7 @@ def _init_tables(self):
app_version INT,
plugins JSONB,
priority TEXT DEFAULT 'NORMAL',
log_line_count INTEGER,
PRIMARY KEY (workflow_uuid),
CONSTRAINT workflows_name_job UNIQUE(workflow_name, job_id),
CONSTRAINT workflows_workflow_id UNIQUE(workflow_id)
Expand Down Expand Up @@ -909,6 +910,7 @@ def _init_tables(self):
refresh_token BYTEA,
pod_name TEXT,
pod_ip TEXT,
log_line_count INTEGER,
PRIMARY KEY (task_db_key),
CONSTRAINT tasks_uuid_retry UNIQUE(task_uuid, retry_id),
CONSTRAINT tasks_id_name UNIQUE(workflow_id, retry_id, name)
Expand Down
16 changes: 15 additions & 1 deletion src/utils/job/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ def execute(self, context: JobExecutionContext,
task_obj_spec.resources.memory or '0', 'GiB'),
json.dumps(task_obj.exit_actions, default=common.pydantic_encoder),
task_obj.lead,
-1,
))
task.Task.batch_insert_to_db(postgres, task_entries)
progress_writer.report_progress()
Expand Down Expand Up @@ -1505,13 +1506,26 @@ async def run_log_migrations():
workflow_obj.update_log_to_db(wf_logs_ss_file_path)
workflow_obj.update_events_to_db(wf_events_ss_file_path)

# Read and persist log line counts before deleting Redis keys
workflow_log_count_key = f'{self.workflow_id}-log-count'
workflow_log_count_raw = redis_client.get(workflow_log_count_key)
if workflow_log_count_raw is not None:
workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw))

# Remove logs from Redis
redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key]
redis_keys_to_delete : List[str] = [
workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key]
for group in workflow_obj.groups:
for task_obj in group.tasks:
task_redis_path = common.get_redis_task_log_name(
self.workflow_id, task_obj.name, task_obj.retry_id)
redis_keys_to_delete.append(task_redis_path)
task_log_count_key = (
f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count')
task_log_count_raw = redis_client.get(task_log_count_key)
if task_log_count_raw is not None:
task_obj.update_log_line_count_to_db(int(task_log_count_raw))
redis_keys_to_delete.append(task_log_count_key)
Comment on lines +1509 to +1528
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Finalize missing Redis counters as 0, not -1.

Lines 1512-1513 and Lines 1526-1527 only persist log_line_count when the Redis key exists. A quiet workflow/task can finish without ever creating that key, so cleanup leaves the row at -1 forever even though cleanup already completed. That breaks the sentinel contract (-1 = unfinalized, 0 = finalized with no lines) and will skew any downstream SQL/Grafana analysis. Since update_log_line_count_to_db() already ignores legacy NULL rows, this path can safely default a missing key to 0 and update unconditionally.

💡 Suggested fix
-        workflow_log_count_raw = redis_client.get(workflow_log_count_key)
-        if workflow_log_count_raw is not None:
-            workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw))
+        workflow_log_count_raw = redis_client.get(workflow_log_count_key)
+        workflow_log_count = 0 if workflow_log_count_raw is None else int(workflow_log_count_raw)
+        workflow_obj.update_log_line_count_to_db(workflow_log_count)
@@
-                task_log_count_raw = redis_client.get(task_log_count_key)
-                if task_log_count_raw is not None:
-                    task_obj.update_log_line_count_to_db(int(task_log_count_raw))
+                task_log_count_raw = redis_client.get(task_log_count_key)
+                task_log_count = 0 if task_log_count_raw is None else int(task_log_count_raw)
+                task_obj.update_log_line_count_to_db(task_log_count)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Read and persist log line counts before deleting Redis keys
workflow_log_count_key = f'{self.workflow_id}-log-count'
workflow_log_count_raw = redis_client.get(workflow_log_count_key)
if workflow_log_count_raw is not None:
workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw))
# Remove logs from Redis
redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key]
redis_keys_to_delete : List[str] = [
workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key]
for group in workflow_obj.groups:
for task_obj in group.tasks:
task_redis_path = common.get_redis_task_log_name(
self.workflow_id, task_obj.name, task_obj.retry_id)
redis_keys_to_delete.append(task_redis_path)
task_log_count_key = (
f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count')
task_log_count_raw = redis_client.get(task_log_count_key)
if task_log_count_raw is not None:
task_obj.update_log_line_count_to_db(int(task_log_count_raw))
redis_keys_to_delete.append(task_log_count_key)
# Read and persist log line counts before deleting Redis keys
workflow_log_count_key = f'{self.workflow_id}-log-count'
workflow_log_count_raw = redis_client.get(workflow_log_count_key)
workflow_log_count = 0 if workflow_log_count_raw is None else int(workflow_log_count_raw)
workflow_obj.update_log_line_count_to_db(workflow_log_count)
# Remove logs from Redis
redis_keys_to_delete : List[str] = [
workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key]
for group in workflow_obj.groups:
for task_obj in group.tasks:
task_redis_path = common.get_redis_task_log_name(
self.workflow_id, task_obj.name, task_obj.retry_id)
redis_keys_to_delete.append(task_redis_path)
task_log_count_key = (
f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count')
task_log_count_raw = redis_client.get(task_log_count_key)
task_log_count = 0 if task_log_count_raw is None else int(task_log_count_raw)
task_obj.update_log_line_count_to_db(task_log_count)
redis_keys_to_delete.append(task_log_count_key)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/job/jobs.py` around lines 1509 - 1528, The code only updates DB
log_line_count when Redis keys exist, leaving rows at -1 for quiet workflows;
change the logic around workflow_log_count_key and task_log_count_key so that
after reading redis_client.get(...) you treat a missing value as 0 and call
workflow_obj.update_log_line_count_to_db(int_value) /
task_obj.update_log_line_count_to_db(int_value) unconditionally (i.e., compute
int_value = int(raw) if raw is not None else 0), while still appending the keys
to redis_keys_to_delete and keeping the current calls to
common.get_redis_task_log_name and redis_keys_to_delete management.

if task_obj.status.has_error_logs():
prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}'
redis_keys_to_delete.append(f'{prefix}-error-logs')
Expand Down
25 changes: 19 additions & 6 deletions src/utils/job/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,9 @@ class Task(pydantic.BaseModel):
node_name: str | None
pod_ip: str | None
lead: bool
# NULL = predates this column; -1 = initialized at submission, not yet finalized;
# >= 0 = finalized by CleanupWorkflow (0 = confirmed zero lines).
log_line_count: int | None = None

class Config:
arbitrary_types_allowed = True
Expand Down Expand Up @@ -1064,7 +1067,7 @@ def batch_insert_to_db(
for i in range(0, len(task_entries), batch_size):
chunk = task_entries[i:i + batch_size]
values_clause = ','.join(
['(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)']
['(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)']
* len(chunk)
)
flat_args: List[Any] = []
Expand All @@ -1075,7 +1078,7 @@ def batch_insert_to_db(
INSERT INTO tasks
(workflow_id, name, group_name, task_db_key, retry_id, task_uuid,
status, pod_name, failure_message, gpu_count, cpu_count,
disk_count, memory_count, exit_actions, lead)
disk_count, memory_count, exit_actions, lead, log_line_count)
VALUES {values_clause} ON CONFLICT DO NOTHING;
'''
database.execute_commit_command(insert_cmd, tuple(flat_args))
Expand All @@ -1087,8 +1090,9 @@ def insert_to_db(self, gpu_count: float, cpu_count: float, disk_count: float,
insert_cmd = '''
INSERT INTO tasks
(workflow_id, name, group_name, task_db_key, retry_id, task_uuid, status, pod_name,
failure_message, gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;
failure_message, gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead,
log_line_count)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING;
'''
workflow_uuid = self.workflow_uuid if self.workflow_uuid else ''
self.database.execute_commit_command(
Expand All @@ -1099,7 +1103,7 @@ def insert_to_db(self, gpu_count: float, cpu_count: float, disk_count: float,
failure_message, gpu_count, cpu_count,
disk_count, memory_count,
json.dumps(self.exit_actions, default=common.pydantic_encoder),
self.lead))
self.lead, -1))

@property
def workflow_id(self) -> str:
Expand All @@ -1119,6 +1123,14 @@ def workflow_id(self) -> str:
return fetched_workflow_id


def update_log_line_count_to_db(self, count: int) -> None:
""" Writes the final log line count. Only updates if the sentinel -1 is still present. """
cmd = '''
UPDATE tasks SET log_line_count = %s
WHERE task_db_key = %s AND log_line_count = -1;
'''
self.database.execute_commit_command(cmd, (count, self.task_db_key))

def add_refresh_token_to_db(self, refresh_token: str):
""" Hash and store refresh token in the database. """
# Hash the refresh token
Expand Down Expand Up @@ -1180,7 +1192,8 @@ def from_db_row(cls, task_row, database) -> 'Task':
database=database,
exit_actions=task_row['exit_actions'],
node_name=task_row['node_name'],
lead=task_row['lead'])
lead=task_row['lead'],
log_line_count=task_row.get('log_line_count'))

@classmethod
def fetch_row_from_db(cls, database: connectors.PostgresConnector,
Expand Down
13 changes: 13 additions & 0 deletions src/utils/job/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -106,3 +106,16 @@ osmo_py_test(
size = "large",
tags = ["requires-network"],
)

osmo_py_test(
name = "test_log_line_count",
srcs = ["test_log_line_count.py"],
deps = [
"//src/lib/utils:common",
"//src/tests/common",
"//src/utils/connectors",
"//src/utils/job",
],
size = "large",
tags = ["requires-network"],
)
Loading
Loading