From 90e684f3981fc0e13b9829453b71cc0b7ba6a75c Mon Sep 17 00:00:00 2001 From: Guido Petretto Date: Wed, 21 Jan 2026 15:40:42 +0100 Subject: [PATCH] add store_inputs only if needed --- src/jobflow/core/job.py | 43 ++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/src/jobflow/core/job.py b/src/jobflow/core/job.py index b50812b9..a273a3ec 100644 --- a/src/jobflow/core/job.py +++ b/src/jobflow/core/job.py @@ -1359,7 +1359,7 @@ def prepare_replace( """ Prepare a replacement :obj:`Flow` or :obj:`Job`. - If the replacement is a ``Flow``, then an additional ``Job`` will be inserted + If the replacement is a ``Flow``, then an additional ``Job`` may be inserted that maps the output id of the original job to outputs of the ``Flow``. If the replacement is a ``Flow`` or a ``Job``, then this function pass on @@ -1383,16 +1383,37 @@ def prepare_replace( replace = Flow(jobs=replace) if isinstance(replace, Flow) and replace.output is not None: - # add a job with same UUID as the current job to store the outputs of the - # flow; this job will inherit the metadata and output schema of the current - # job - store_output_job = store_inputs(replace.output) - store_output_job.set_uuid(current_job.uuid) - store_output_job.index = current_job.index + 1 - store_output_job.metadata = current_job.metadata - store_output_job.output_schema = current_job.output_schema - store_output_job._kwargs = current_job._kwargs - replace.add_jobs(store_output_job) + leaf_nodes = [n for n, d in replace.graph.out_degree() if d == 0] + is_last_output_leaf = len( + leaf_nodes + ) == 1 and replace.output == OutputReference(leaf_nodes[0]) + if is_last_output_leaf: + # the last job of the replace inherits UUID and metadata from + # the original job + for j in replace.jobs: + if j.uuid == leaf_nodes[0]: + leaf_job = j + break + leaf_job.set_uuid(current_job.uuid) + leaf_job.index = current_job.index + 1 + + metadata = leaf_job.metadata + metadata.update(current_job.metadata) + leaf_job.metadata = metadata + + if not leaf_job.output_schema: + leaf_job.output_schema = current_job.output_schema + else: + # add a job with same UUID as the current job to store the outputs of the + # flow; this job will inherit the metadata and output schema of the current + # job + store_output_job = store_inputs(replace.output) + store_output_job.set_uuid(current_job.uuid) + store_output_job.index = current_job.index + 1 + store_output_job.metadata = current_job.metadata + store_output_job.output_schema = current_job.output_schema + store_output_job._kwargs = current_job._kwargs + replace.add_jobs(store_output_job) elif isinstance(replace, Job): # replace is a single Job