diff --git a/deployments/charts/service/migrations/005_v6_2_0_schema.json b/deployments/charts/service/migrations/005_v6_2_0_schema.json new file mode 100644 index 000000000..1e91069ac --- /dev/null +++ b/deployments/charts/service/migrations/005_v6_2_0_schema.json @@ -0,0 +1,14 @@ +{ + "operations": [ + { + "sql": { + "up": "ALTER TABLE workflows ADD COLUMN IF NOT EXISTS log_line_count INTEGER;" + } + }, + { + "sql": { + "up": "ALTER TABLE tasks ADD COLUMN IF NOT EXISTS log_line_count INTEGER;" + } + } + ] +} diff --git a/src/service/core/workflow/objects.py b/src/service/core/workflow/objects.py index 51a535cc8..04799b419 100644 --- a/src/service/core/workflow/objects.py +++ b/src/service/core/workflow/objects.py @@ -1083,6 +1083,7 @@ def send_submit_workflow_to_queue(self, task_obj_spec.resources.memory or '0', 'GiB'), json.dumps(task_obj.exit_actions, default=common.pydantic_encoder), task_obj.lead, + -1, )) task.Task.batch_insert_to_db(postgres, task_entries) diff --git a/src/service/logger/ctrl_websocket.py b/src/service/logger/ctrl_websocket.py index d80e317aa..05e00e056 100644 --- a/src/service/logger/ctrl_websocket.py +++ b/src/service/logger/ctrl_websocket.py @@ -204,6 +204,10 @@ async def get_logs(websocket): workflow_obj.workflow_id, task_name, retry_id), json.loads(logs.json()), maxlen=workflow_config.max_task_log_lines) + await redis_client.incr( + f'{workflow_obj.workflow_id}-log-count') + await redis_client.incr( + f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count') # Set expiration on first log message if first_run: first_run = False @@ -213,6 +217,12 @@ async def get_logs(websocket): common.get_redis_task_log_name( workflow_obj.workflow_id, task_name, retry_id), connectors.MAX_LOG_TTL) + await redis_client.expire( + f'{workflow_obj.workflow_id}-log-count', + connectors.MAX_LOG_TTL) + await redis_client.expire( + f'{workflow_obj.workflow_id}-{task_name}-{retry_id}-log-count', + connectors.MAX_LOG_TTL) # If there is an action request (i.e. exec and port-forward), pull it from the queue # and relay that request to osmo-ctrl through this websocket connection diff --git a/src/tests/common/database/testdata/schema.sql b/src/tests/common/database/testdata/schema.sql index 24474858f..91cbac7d3 100644 --- a/src/tests/common/database/testdata/schema.sql +++ b/src/tests/common/database/testdata/schema.sql @@ -59,5 +59,6 @@ CREATE TABLE IF NOT EXISTS pools ( CREATE TABLE IF NOT EXISTS workflows ( workflow_id TEXT PRIMARY KEY, - pool TEXT NOT NULL DEFAULT '' + pool TEXT NOT NULL DEFAULT '', + log_line_count INTEGER ); diff --git a/src/utils/connectors/postgres.py b/src/utils/connectors/postgres.py index 0f7f7243c..f1fb98413 100644 --- a/src/utils/connectors/postgres.py +++ b/src/utils/connectors/postgres.py @@ -819,6 +819,7 @@ def _init_tables(self): app_version INT, plugins JSONB, priority TEXT DEFAULT 'NORMAL', + log_line_count INTEGER, PRIMARY KEY (workflow_uuid), CONSTRAINT workflows_name_job UNIQUE(workflow_name, job_id), CONSTRAINT workflows_workflow_id UNIQUE(workflow_id) @@ -909,6 +910,7 @@ def _init_tables(self): refresh_token BYTEA, pod_name TEXT, pod_ip TEXT, + log_line_count INTEGER, PRIMARY KEY (task_db_key), CONSTRAINT tasks_uuid_retry UNIQUE(task_uuid, retry_id), CONSTRAINT tasks_id_name UNIQUE(workflow_id, retry_id, name) diff --git a/src/utils/job/jobs.py b/src/utils/job/jobs.py index fc3acefd9..0895b01cf 100644 --- a/src/utils/job/jobs.py +++ b/src/utils/job/jobs.py @@ -246,6 +246,7 @@ def execute(self, context: JobExecutionContext, task_obj_spec.resources.memory or '0', 'GiB'), json.dumps(task_obj.exit_actions, default=common.pydantic_encoder), task_obj.lead, + -1, )) task.Task.batch_insert_to_db(postgres, task_entries) progress_writer.report_progress() @@ -1505,13 +1506,26 @@ async def run_log_migrations(): workflow_obj.update_log_to_db(wf_logs_ss_file_path) workflow_obj.update_events_to_db(wf_events_ss_file_path) + # Read and persist log line counts before deleting Redis keys + workflow_log_count_key = f'{self.workflow_id}-log-count' + workflow_log_count_raw = redis_client.get(workflow_log_count_key) + if workflow_log_count_raw is not None: + workflow_obj.update_log_line_count_to_db(int(workflow_log_count_raw)) + # Remove logs from Redis - redis_keys_to_delete : List[str] = [workflow_logs_redis_key, workflow_events_redis_key] + redis_keys_to_delete : List[str] = [ + workflow_logs_redis_key, workflow_events_redis_key, workflow_log_count_key] for group in workflow_obj.groups: for task_obj in group.tasks: task_redis_path = common.get_redis_task_log_name( self.workflow_id, task_obj.name, task_obj.retry_id) redis_keys_to_delete.append(task_redis_path) + task_log_count_key = ( + f'{self.workflow_id}-{task_obj.name}-{task_obj.retry_id}-log-count') + task_log_count_raw = redis_client.get(task_log_count_key) + if task_log_count_raw is not None: + task_obj.update_log_line_count_to_db(int(task_log_count_raw)) + redis_keys_to_delete.append(task_log_count_key) if task_obj.status.has_error_logs(): prefix = f'{self.workflow_id}-{task_obj.task_uuid}-{task_obj.retry_id}' redis_keys_to_delete.append(f'{prefix}-error-logs') diff --git a/src/utils/job/task.py b/src/utils/job/task.py index 43b94c9d4..f193494fe 100644 --- a/src/utils/job/task.py +++ b/src/utils/job/task.py @@ -1037,6 +1037,9 @@ class Task(pydantic.BaseModel): node_name: str | None pod_ip: str | None lead: bool + # NULL = predates this column; -1 = initialized at submission, not yet finalized; + # >= 0 = finalized by CleanupWorkflow (0 = confirmed zero lines). + log_line_count: int | None = None class Config: arbitrary_types_allowed = True @@ -1064,7 +1067,7 @@ def batch_insert_to_db( for i in range(0, len(task_entries), batch_size): chunk = task_entries[i:i + batch_size] values_clause = ','.join( - ['(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] + ['(%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)'] * len(chunk) ) flat_args: List[Any] = [] @@ -1075,7 +1078,7 @@ def batch_insert_to_db( INSERT INTO tasks (workflow_id, name, group_name, task_db_key, retry_id, task_uuid, status, pod_name, failure_message, gpu_count, cpu_count, - disk_count, memory_count, exit_actions, lead) + disk_count, memory_count, exit_actions, lead, log_line_count) VALUES {values_clause} ON CONFLICT DO NOTHING; ''' database.execute_commit_command(insert_cmd, tuple(flat_args)) @@ -1087,8 +1090,9 @@ def insert_to_db(self, gpu_count: float, cpu_count: float, disk_count: float, insert_cmd = ''' INSERT INTO tasks (workflow_id, name, group_name, task_db_key, retry_id, task_uuid, status, pod_name, - failure_message, gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING; + failure_message, gpu_count, cpu_count, disk_count, memory_count, exit_actions, lead, + log_line_count) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON CONFLICT DO NOTHING; ''' workflow_uuid = self.workflow_uuid if self.workflow_uuid else '' self.database.execute_commit_command( @@ -1099,7 +1103,7 @@ def insert_to_db(self, gpu_count: float, cpu_count: float, disk_count: float, failure_message, gpu_count, cpu_count, disk_count, memory_count, json.dumps(self.exit_actions, default=common.pydantic_encoder), - self.lead)) + self.lead, -1)) @property def workflow_id(self) -> str: @@ -1119,6 +1123,14 @@ def workflow_id(self) -> str: return fetched_workflow_id + def update_log_line_count_to_db(self, count: int) -> None: + """ Writes the final log line count. Only updates if the sentinel -1 is still present. """ + cmd = ''' + UPDATE tasks SET log_line_count = %s + WHERE task_db_key = %s AND log_line_count = -1; + ''' + self.database.execute_commit_command(cmd, (count, self.task_db_key)) + def add_refresh_token_to_db(self, refresh_token: str): """ Hash and store refresh token in the database. """ # Hash the refresh token @@ -1180,7 +1192,8 @@ def from_db_row(cls, task_row, database) -> 'Task': database=database, exit_actions=task_row['exit_actions'], node_name=task_row['node_name'], - lead=task_row['lead']) + lead=task_row['lead'], + log_line_count=task_row.get('log_line_count')) @classmethod def fetch_row_from_db(cls, database: connectors.PostgresConnector, diff --git a/src/utils/job/tests/BUILD b/src/utils/job/tests/BUILD index df70621ed..4166f7125 100644 --- a/src/utils/job/tests/BUILD +++ b/src/utils/job/tests/BUILD @@ -106,3 +106,16 @@ osmo_py_test( size = "large", tags = ["requires-network"], ) + +osmo_py_test( + name = "test_log_line_count", + srcs = ["test_log_line_count.py"], + deps = [ + "//src/lib/utils:common", + "//src/tests/common", + "//src/utils/connectors", + "//src/utils/job", + ], + size = "large", + tags = ["requires-network"], +) diff --git a/src/utils/job/tests/test_log_line_count.py b/src/utils/job/tests/test_log_line_count.py new file mode 100644 index 000000000..f3736ae94 --- /dev/null +++ b/src/utils/job/tests/test_log_line_count.py @@ -0,0 +1,285 @@ +""" +SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +SPDX-License-Identifier: Apache-2.0 +""" +import json + +from src.lib.utils import common +from src.tests.common import fixtures, runner +from src.utils.connectors import postgres +from src.utils.job import task, workflow as workflow_module + + +WORKFLOW_ID = 'test-log-count-wf-1' +WORKFLOW_UUID = common.generate_unique_id() +GROUP_NAME = 'test-group' +GROUP_UUID = common.generate_unique_id() + + +class LogLineCountFixture( + fixtures.PostgresFixture, + fixtures.PostgresTestIsolationFixture, + fixtures.OsmoTestFixture, +): + """Postgres fixture for log_line_count tests.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + postgres.PostgresConnector( + postgres.PostgresConfig( + postgres_host=cls.postgres_container.get_container_host_ip(), + postgres_port=cls.postgres_container.get_database_port(), + postgres_password=cls.postgres_container.password, + postgres_database_name=cls.postgres_container.dbname, + postgres_user=cls.postgres_container.username, + method='dev', + ) + ) + + @classmethod + def tearDownClass(cls): + try: + if postgres.PostgresConnector._instance: # pylint: disable=protected-access + postgres.PostgresConnector._instance.close() # pylint: disable=protected-access + postgres.PostgresConnector._instance = None # pylint: disable=protected-access + finally: + super().tearDownClass() + + def _get_db(self) -> postgres.PostgresConnector: + return postgres.PostgresConnector.get_instance() + + def _insert_workflow(self, workflow_id: str = WORKFLOW_ID, + workflow_uuid: str = WORKFLOW_UUID) -> None: + self._get_db().execute_commit_command( + '''INSERT INTO workflows + (workflow_id, workflow_name, workflow_uuid, submitted_by, + backend, logs, exec_timeout, queue_timeout, plugins, status) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + (workflow_id, 'test-wf', workflow_uuid, 'user@nvidia.com', + 'default', '', 100, 100, '{}', 'PENDING')) + + def _insert_workflow_with_sentinel(self, workflow_id: str = WORKFLOW_ID, + workflow_uuid: str = WORKFLOW_UUID) -> None: + """Insert a workflow with log_line_count = -1 (as insert_to_db now does).""" + self._get_db().execute_commit_command( + '''INSERT INTO workflows + (workflow_id, workflow_name, workflow_uuid, submitted_by, + backend, logs, exec_timeout, queue_timeout, plugins, status, log_line_count) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + (workflow_id, 'test-wf', workflow_uuid, 'user@nvidia.com', + 'default', '', 100, 100, '{}', 'PENDING', -1)) + + def _insert_group(self) -> None: + spec = task.TaskGroupSpec( + name=GROUP_NAME, + ignoreNonleadStatus=True, + tasks=[task.TaskSpec(name='lead', image='img', command=['cmd'], lead=True)], + ) + self._get_db().execute_commit_command( + '''INSERT INTO groups + (workflow_id, name, group_uuid, spec, status, cleaned_up, + remaining_upstream_groups, downstream_groups) + VALUES (%s, %s, %s, %s, %s, FALSE, NULL, NULL)''', + (WORKFLOW_ID, GROUP_NAME, GROUP_UUID, spec.json(), 'RUNNING')) + + def _insert_task(self, task_name: str, retry_id: int = 0, + lead: bool = False) -> str: + """Insert a task without log_line_count (simulating pre-change row).""" + task_db_key = common.generate_unique_id() + task_uuid = common.generate_unique_id() + self._get_db().execute_commit_command( + '''INSERT INTO tasks + (workflow_id, name, group_name, task_db_key, retry_id, task_uuid, + status, pod_name, failure_message, gpu_count, cpu_count, + disk_count, memory_count, exit_actions, lead) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + (WORKFLOW_ID, task_name, GROUP_NAME, task_db_key, retry_id, task_uuid, + 'RUNNING', f'pod-{task_name}', None, 0, 1, 0, 1, + json.dumps({}), lead)) + return task_db_key + + def _insert_task_with_sentinel(self, task_name: str, retry_id: int = 0, + lead: bool = False) -> str: + """Insert a task with log_line_count = -1 (as batch_insert_to_db now does).""" + task_db_key = common.generate_unique_id() + task_uuid = common.generate_unique_id() + self._get_db().execute_commit_command( + '''INSERT INTO tasks + (workflow_id, name, group_name, task_db_key, retry_id, task_uuid, + status, pod_name, failure_message, gpu_count, cpu_count, + disk_count, memory_count, exit_actions, lead, log_line_count) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)''', + (WORKFLOW_ID, task_name, GROUP_NAME, task_db_key, retry_id, task_uuid, + 'RUNNING', f'pod-{task_name}', None, 0, 1, 0, 1, + json.dumps({}), lead, -1)) + return task_db_key + + def _fetch_workflow_log_line_count(self, workflow_id: str = WORKFLOW_ID): + rows = self._get_db().execute_fetch_command( + 'SELECT log_line_count FROM workflows WHERE workflow_id = %s', + (workflow_id,), True) + return rows[0]['log_line_count'] + + def _fetch_task_log_line_count(self, task_db_key: str): + rows = self._get_db().execute_fetch_command( + 'SELECT log_line_count FROM tasks WHERE task_db_key = %s', + (task_db_key,), True) + return rows[0]['log_line_count'] + + def _make_workflow_obj(self, workflow_id: str = WORKFLOW_ID, + log_line_count: int | None = -1) -> workflow_module.Workflow: + """Construct a minimal Workflow object pointing at the test DB.""" + return workflow_module.Workflow( + workflow_name='test-wf', + workflow_uuid=WORKFLOW_UUID, + workflow_id_internal=workflow_id, + groups=[], + user='user@nvidia.com', + logs='', + database=self._get_db(), + status=workflow_module.WorkflowStatus.PENDING, + cancelled_by=None, + backend='default', + pool=None, + priority=workflow_module.wf_priority.WorkflowPriority.NORMAL, + log_line_count=log_line_count, + ) + + def _make_task_obj(self, task_db_key: str, + log_line_count: int | None = -1) -> task.Task: + """Construct a minimal Task object pointing at the test DB.""" + return task.Task( + workflow_uuid=WORKFLOW_UUID, + name='task1', + group_name=GROUP_NAME, + task_uuid=common.generate_unique_id(), + task_db_key=task_db_key, + database=self._get_db(), + exit_actions={}, + node_name=None, + pod_ip=None, + lead=True, + log_line_count=log_line_count, + ) + + +class WorkflowLogLineCountTest(LogLineCountFixture): + """Tests for workflow log_line_count sentinel and update behaviour.""" + + def test_new_workflow_insert_sets_sentinel(self): + """Workflows inserted by new code have log_line_count = -1.""" + self._insert_workflow_with_sentinel() + self.assertEqual(self._fetch_workflow_log_line_count(), -1) + + def test_pre_change_workflow_has_null(self): + """Workflows inserted before this change have log_line_count = NULL.""" + self._insert_workflow() + self.assertIsNone(self._fetch_workflow_log_line_count()) + + def test_update_writes_count_when_sentinel_present(self): + """update_log_line_count_to_db writes the count when sentinel -1 is present.""" + self._insert_workflow_with_sentinel() + wf = self._make_workflow_obj() + wf.update_log_line_count_to_db(4200) + self.assertEqual(self._fetch_workflow_log_line_count(), 4200) + + def test_update_writes_zero_when_no_logs(self): + """update_log_line_count_to_db correctly writes 0 for workflows with no logs.""" + self._insert_workflow_with_sentinel() + wf = self._make_workflow_obj() + wf.update_log_line_count_to_db(0) + self.assertEqual(self._fetch_workflow_log_line_count(), 0) + + def test_update_does_not_overwrite_finalized_count(self): + """update_log_line_count_to_db is idempotent: does not overwrite an already-set count.""" + self._insert_workflow_with_sentinel() + wf = self._make_workflow_obj() + wf.update_log_line_count_to_db(4200) + wf.update_log_line_count_to_db(9999) + self.assertEqual(self._fetch_workflow_log_line_count(), 4200) + + def test_update_does_not_touch_null_row(self): + """update_log_line_count_to_db does not write to pre-change rows (NULL).""" + self._insert_workflow() + wf = self._make_workflow_obj(log_line_count=None) + wf.update_log_line_count_to_db(4200) + self.assertIsNone(self._fetch_workflow_log_line_count()) + + +class TaskLogLineCountTest(LogLineCountFixture): + """Tests for task log_line_count sentinel and update behaviour.""" + + def test_new_task_insert_sets_sentinel(self): + """Tasks inserted by new code have log_line_count = -1.""" + task_db_key = self._insert_task_with_sentinel('task1', lead=True) + self.assertEqual(self._fetch_task_log_line_count(task_db_key), -1) + + def test_pre_change_task_has_null(self): + """Tasks inserted before this change have log_line_count = NULL.""" + task_db_key = self._insert_task('task1', lead=True) + self.assertIsNone(self._fetch_task_log_line_count(task_db_key)) + + def test_update_writes_count_when_sentinel_present(self): + """update_log_line_count_to_db writes the count when sentinel -1 is present.""" + task_db_key = self._insert_task_with_sentinel('task1', lead=True) + t = self._make_task_obj(task_db_key) + t.update_log_line_count_to_db(1500) + self.assertEqual(self._fetch_task_log_line_count(task_db_key), 1500) + + def test_update_writes_zero_when_no_logs(self): + """update_log_line_count_to_db correctly writes 0 for tasks with no logs.""" + task_db_key = self._insert_task_with_sentinel('task1', lead=True) + t = self._make_task_obj(task_db_key) + t.update_log_line_count_to_db(0) + self.assertEqual(self._fetch_task_log_line_count(task_db_key), 0) + + def test_update_does_not_overwrite_finalized_count(self): + """update_log_line_count_to_db is idempotent: does not overwrite an already-set count.""" + task_db_key = self._insert_task_with_sentinel('task1', lead=True) + t = self._make_task_obj(task_db_key) + t.update_log_line_count_to_db(1500) + t.update_log_line_count_to_db(9999) + self.assertEqual(self._fetch_task_log_line_count(task_db_key), 1500) + + def test_update_does_not_touch_null_row(self): + """update_log_line_count_to_db does not write to pre-change rows (NULL).""" + task_db_key = self._insert_task('task1', lead=True) + t = self._make_task_obj(task_db_key, log_line_count=None) + t.update_log_line_count_to_db(1500) + self.assertIsNone(self._fetch_task_log_line_count(task_db_key)) + + def test_batch_insert_sets_sentinel_for_all_tasks(self): + """batch_insert_to_db sets log_line_count = -1 for every task in the batch.""" + self._insert_workflow() + self._insert_group() + task_db_key_1 = common.generate_unique_id() + task_db_key_2 = common.generate_unique_id() + entries = [ + (WORKFLOW_ID, 'task1', GROUP_NAME, task_db_key_1, 0, + common.generate_unique_id(), 'WAITING', 'pod-task1', None, + 0, 1, 0, 1, json.dumps({}), True, -1), + (WORKFLOW_ID, 'task2', GROUP_NAME, task_db_key_2, 0, + common.generate_unique_id(), 'WAITING', 'pod-task2', None, + 0, 1, 0, 1, json.dumps({}), False, -1), + ] + task.Task.batch_insert_to_db(self._get_db(), entries) + self.assertEqual(self._fetch_task_log_line_count(task_db_key_1), -1) + self.assertEqual(self._fetch_task_log_line_count(task_db_key_2), -1) + + +if __name__ == '__main__': + runner.run_test() diff --git a/src/utils/job/workflow.py b/src/utils/job/workflow.py index 026c6c314..23121bb7a 100644 --- a/src/utils/job/workflow.py +++ b/src/utils/job/workflow.py @@ -890,6 +890,9 @@ class Workflow(pydantic.BaseModel): app_version: int | None = None parent_job_id: int | None = None plugins: task_common.WorkflowPlugins = task_common.WorkflowPlugins() + # NULL = predates this column; -1 = initialized at submission, not yet finalized; + # >= 0 = finalized by CleanupWorkflow (0 = confirmed zero lines). + log_line_count: int | None = None class Config: arbitrary_types_allowed = True @@ -907,7 +910,7 @@ def insert_to_db(self, version: int = 2): (workflow_name, job_id, workflow_id, workflow_uuid, submitted_by, submit_time, start_time, end_time, status, logs, exec_timeout, queue_timeout, backend, pool, version, failure_message, parent_name, parent_job_id, app_uuid, app_version, - plugins, priority) + plugins, priority, log_line_count) SELECT %s AS workflow_name, (max_job_id + 1) AS job_id, @@ -930,7 +933,8 @@ def insert_to_db(self, version: int = 2): %s AS app_uuid, %s AS app_version, %s AS plugins, - %s AS priority + %s AS priority, + -1 AS log_line_count FROM last_job ON CONFLICT (workflow_uuid) DO NOTHING; ''' @@ -1143,7 +1147,8 @@ def fetch_from_db(cls, database: connectors.PostgresConnector, app_uuid=workflow_row['app_uuid'], app_version=workflow_row['app_version'], plugins=task_common.WorkflowPlugins(**workflow_row['plugins']), - priority=wf_priority.WorkflowPriority(workflow_row['priority'])) + priority=wf_priority.WorkflowPriority(workflow_row['priority']), + log_line_count=workflow_row.get('log_line_count')) def update_groups(self, workflow_spec: WorkflowSpec, group_and_task_uuids: Dict, remaining_upstream_groups: Dict, downstream_groups: Dict, @@ -1191,6 +1196,14 @@ def update_events_to_db(self, events: str): update_cmd.add_field('events', events) self.database.execute_commit_command(*update_cmd.get_args()) + def update_log_line_count_to_db(self, count: int) -> None: + """ Writes the final log line count. Only updates if the sentinel -1 is still present. """ + cmd = ''' + UPDATE workflows SET log_line_count = %s + WHERE workflow_id = %s AND log_line_count = -1; + ''' + self.database.execute_commit_command(cmd, (count, self.workflow_id)) + def update_cancelled_by(self, canceled_by: str): update_cmd = connectors.PostgresUpdateCommand(table='workflows') update_cmd.add_condition('workflow_id = %s', [self.workflow_id])