Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions apps/sim/executor/execution/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ export interface ParallelScope {
parallelId: string
totalBranches: number
branchOutputs: Map<number, NormalizedBlockOutput[]>
completedCount: number
totalExpectedNodes: number
items?: any[]
/** Error message if parallel validation failed (e.g., exceeded max branches) */
validationError?: string
Expand Down
22 changes: 4 additions & 18 deletions apps/sim/executor/orchestrators/node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,7 @@ export class NodeExecutionOrchestrator {

const parallelId = node.metadata.parallelId
if (parallelId && !this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = parallelConfig?.nodes?.length || 1
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId)
}

if (node.metadata.isSentinel) {
Expand Down Expand Up @@ -157,8 +155,7 @@ export class NodeExecutionOrchestrator {
if (!this.parallelOrchestrator.getParallelScope(ctx, parallelId)) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (parallelConfig) {
const nodesInParallel = parallelConfig.nodes?.length || 1
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId)
}
}

Expand Down Expand Up @@ -237,20 +234,9 @@ export class NodeExecutionOrchestrator {
): Promise<void> {
const scope = this.parallelOrchestrator.getParallelScope(ctx, parallelId)
if (!scope) {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
const nodesInParallel = parallelConfig?.nodes?.length || 1
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId, nodesInParallel)
await this.parallelOrchestrator.initializeParallelScope(ctx, parallelId)
}
const allComplete = this.parallelOrchestrator.handleParallelBranchCompletion(
ctx,
parallelId,
node.id,
output
)
if (allComplete) {
await this.parallelOrchestrator.aggregateParallelResults(ctx, parallelId)
}

this.parallelOrchestrator.handleParallelBranchCompletion(ctx, parallelId, node.id, output)
this.state.setBlockOutput(node.id, output)
}

Expand Down
2 changes: 1 addition & 1 deletion apps/sim/executor/orchestrators/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ describe('ParallelOrchestrator', () => {
)
const ctx = createContext()

const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1', 1)
const initializePromise = orchestrator.initializeParallelScope(ctx, 'parallel-1')
await Promise.resolve()

expect(onBlockStart).toHaveBeenCalledTimes(1)
Expand Down
30 changes: 11 additions & 19 deletions apps/sim/executor/orchestrators/parallel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,17 +47,13 @@ export class ParallelOrchestrator {
private contextExtensions: ContextExtensions | null = null
) {}

async initializeParallelScope(
ctx: ExecutionContext,
parallelId: string,
terminalNodesCount = 1
): Promise<ParallelScope> {
async initializeParallelScope(ctx: ExecutionContext, parallelId: string): Promise<ParallelScope> {
const parallelConfig = this.dag.parallelConfigs.get(parallelId)
if (!parallelConfig) {
throw new Error(`Parallel config not found: ${parallelId}`)
}

if (terminalNodesCount === 0 || parallelConfig.nodes.length === 0) {
if (parallelConfig.nodes.length === 0) {
const errorMessage =
'Parallel has no executable blocks inside. Add or enable at least one block in the parallel.'
logger.error(errorMessage, { parallelId })
Expand Down Expand Up @@ -108,8 +104,6 @@ export class ParallelOrchestrator {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
isEmpty: true,
}
Expand Down Expand Up @@ -186,8 +180,6 @@ export class ParallelOrchestrator {
parallelId,
totalBranches: branchCount,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: branchCount * terminalNodesCount,
items,
}

Expand Down Expand Up @@ -253,8 +245,6 @@ export class ParallelOrchestrator {
parallelId,
totalBranches: 0,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 0,
items: [],
validationError: errorMessage,
}
Expand All @@ -277,32 +267,34 @@ export class ParallelOrchestrator {
return resolveArrayInput(ctx, config.distribution, this.resolver)
}

/**
* Stores a node's output in the branch outputs for later aggregation.
* Aggregation is triggered by the sentinel-end node via the edge mechanism,
* not by counting individual node completions. This avoids incorrect completion
* detection when branches have conditional paths (error edges, conditions).
*/
handleParallelBranchCompletion(
ctx: ExecutionContext,
parallelId: string,
nodeId: string,
output: NormalizedBlockOutput
): boolean {
): void {
const scope = ctx.parallelExecutions?.get(parallelId)
if (!scope) {
logger.warn('Parallel scope not found for branch completion', { parallelId, nodeId })
return false
return
}

const branchIndex = extractBranchIndex(nodeId)
if (branchIndex === null) {
logger.warn('Could not extract branch index from node ID', { nodeId })
return false
return
}

if (!scope.branchOutputs.has(branchIndex)) {
scope.branchOutputs.set(branchIndex, [])
}
scope.branchOutputs.get(branchIndex)!.push(output)
scope.completedCount++

const allComplete = scope.completedCount >= scope.totalExpectedNodes
return allComplete
}

async aggregateParallelResults(
Expand Down
2 changes: 0 additions & 2 deletions apps/sim/executor/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,6 @@ export interface ExecutionContext {
parallelId: string
totalBranches: number
branchOutputs: Map<number, any[]>
completedCount: number
totalExpectedNodes: number
parallelType?: 'count' | 'collection'
items?: any[]
}
Expand Down
24 changes: 0 additions & 24 deletions apps/sim/executor/utils/iteration-context.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ describe('getIterationContext', () => {
parallelId: 'p1',
totalBranches: 3,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 3,
},
],
]),
Expand Down Expand Up @@ -135,8 +133,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'outer-p',
totalBranches: 4,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 4,
},
],
]),
Expand Down Expand Up @@ -164,8 +160,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'parallel-1',
totalBranches: 5,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 5,
},
],
]),
Expand Down Expand Up @@ -232,8 +226,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'outer-p',
totalBranches: 3,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 3,
},
],
]),
Expand Down Expand Up @@ -275,8 +267,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'P1',
totalBranches: 2,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 2,
},
],
[
Expand All @@ -285,8 +275,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'P2',
totalBranches: 2,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 2,
},
],
[
Expand All @@ -295,8 +283,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'P2__obranch-1',
totalBranches: 2,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 2,
},
],
]),
Expand Down Expand Up @@ -363,8 +349,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'parallel-1',
totalBranches: 5,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 5,
},
],
]),
Expand Down Expand Up @@ -423,8 +407,6 @@ describe('buildUnifiedParentIterations', () => {
parallelId: 'parallel-1',
totalBranches: 3,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 3,
},
],
]),
Expand Down Expand Up @@ -478,8 +460,6 @@ describe('buildContainerIterationContext', () => {
parallelId: 'parallel-1',
totalBranches: 5,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 5,
},
],
]),
Expand Down Expand Up @@ -541,8 +521,6 @@ describe('buildContainerIterationContext', () => {
parallelId: 'P2__obranch-1',
totalBranches: 5,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 5,
},
],
]),
Expand All @@ -568,8 +546,6 @@ describe('buildContainerIterationContext', () => {
parallelId: 'outer-parallel',
totalBranches: 3,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 3,
},
],
]),
Expand Down
2 changes: 0 additions & 2 deletions apps/sim/executor/variables/resolvers/parallel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ function createParallelScope(items: any[]) {
parallelId: 'parallel-1',
totalBranches: items.length,
branchOutputs: new Map(),
completedCount: 0,
totalExpectedNodes: 1,
items,
}
}
Expand Down
Loading