diff --git a/apps/sim/app/api/chat/[identifier]/route.test.ts b/apps/sim/app/api/chat/[identifier]/route.test.ts index a754356c45..dac5048fc8 100644 --- a/apps/sim/app/api/chat/[identifier]/route.test.ts +++ b/apps/sim/app/api/chat/[identifier]/route.test.ts @@ -140,6 +140,10 @@ vi.mock('@/lib/workflows/streaming/streaming', () => ({ createStreamingResponse: vi.fn().mockImplementation(async () => createMockStream()), })) +vi.mock('@/lib/workflows/executor/execute-workflow', () => ({ + executeWorkflow: vi.fn().mockResolvedValue({ success: true, output: {} }), +})) + vi.mock('@/lib/core/utils/sse', () => ({ SSE_HEADERS: { 'Content-Type': 'text/event-stream', diff --git a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts index 7a407480e3..8c060d9d13 100644 --- a/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts +++ b/apps/sim/app/api/resume/[workflowId]/[executionId]/[contextId]/route.ts @@ -5,6 +5,7 @@ import { getJobQueue, shouldUseBullMQ } from '@/lib/core/async-jobs' import { createBullMQJobData } from '@/lib/core/bullmq' import { generateRequestId } from '@/lib/core/utils/request' import { SSE_HEADERS } from '@/lib/core/utils/sse' +import { getBaseUrl } from '@/lib/core/utils/urls' import { generateId } from '@/lib/core/utils/uuid' import { enqueueWorkspaceDispatch } from '@/lib/core/workspace-dispatch' import { setExecutionMeta } from '@/lib/execution/event-buffer' @@ -224,10 +225,11 @@ export async function POST( parentExecutionId: executionId, } + let jobId: string try { const useBullMQ = shouldUseBullMQ() if (useBullMQ) { - await enqueueWorkspaceDispatch({ + jobId = await enqueueWorkspaceDispatch({ id: enqueueResult.resumeExecutionId, workspaceId: workflow.workspaceId, lane: 'runtime', @@ -241,28 +243,33 @@ export async function POST( }) } else { const jobQueue = await getJobQueue() - const jobId = await jobQueue.enqueue('resume-execution', resumePayload, { + jobId = await jobQueue.enqueue('resume-execution', resumePayload, { metadata: { workflowId, workspaceId: workflow.workspaceId, userId }, }) - logger.info('Enqueued resume execution job', { - jobId, - resumeExecutionId: enqueueResult.resumeExecutionId, - }) } + logger.info('Enqueued async resume execution', { + jobId, + resumeExecutionId: enqueueResult.resumeExecutionId, + }) } catch (dispatchError) { - logger.error('Failed to dispatch async resume, falling back to in-process', { + logger.error('Failed to dispatch async resume execution', { error: dispatchError instanceof Error ? dispatchError.message : String(dispatchError), + resumeExecutionId: enqueueResult.resumeExecutionId, }) - PauseResumeManager.startResumeExecution(resumeArgs).catch((error) => { - logger.error('Fallback resume execution also failed', { error }) - }) + return NextResponse.json( + { error: 'Failed to queue resume execution. Please try again.' }, + { status: 503 } + ) } return NextResponse.json( { - status: 'started', + success: true, + async: true, + jobId, executionId: enqueueResult.resumeExecutionId, - message: 'Resume execution started asynchronously.', + message: 'Resume execution queued', + statusUrl: `${getBaseUrl()}/api/jobs/${jobId}`, }, { status: 202 } )