diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index e225bc8bcb..0a698905a0 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -7,14 +7,19 @@ export type TaskInputOutput = { } export type TaskHandlerResult< TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput, -> = { - output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] - ? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output'] - : TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type - ? TTaskSlugOrInputOutput['output'] - : never - state?: 'failed' | 'succeeded' -} +> = + | { + errorMessage?: string + state: 'failed' + } + | { + output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] + ? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output'] + : TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type + ? TTaskSlugOrInputOutput['output'] + : never + state?: 'succeeded' + } export type TaskHandlerArgs< TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput, @@ -84,6 +89,8 @@ export type RunTaskFunctions = { [TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction } +type MaybePromise = Promise | T + export type RunInlineTaskFunction = ( taskID: string, taskArgs: { @@ -103,12 +110,16 @@ export type RunInlineTaskFunction = req: PayloadRequest tasks: RunTaskFunctions - }) => + }) => MaybePromise< + | { + errorMessage?: string + state: 'failed' + } | { output: TTaskOutput - state?: 'failed' | 'succeeded' + state?: 'succeeded' } - | Promise<{ output: TTaskOutput; state?: 'failed' | 'succeeded' }> + > }, ) => Promise diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index f2b9f6ad39..9feea2fa13 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -48,9 +48,9 @@ export async function handleTaskFailed({ parent, req, retriesConfig, - runnerOutput, state, taskConfig, + taskHandlerResult, taskID, taskSlug, taskStatus, @@ -65,9 +65,9 @@ export async function handleTaskFailed({ parent?: TaskParent req: PayloadRequest retriesConfig: number | RetryConfig - runnerOutput?: TaskHandlerResult state: RunTaskFunctionState taskConfig?: TaskConfig + taskHandlerResult?: TaskHandlerResult taskID: string taskSlug: string taskStatus: null | SingleTaskStatus @@ -88,7 +88,12 @@ export async function handleTaskFailed({ message: error.message, stack: error.stack, } - : runnerOutput.state + : { + message: + taskHandlerResult.state === 'failed' + ? (taskHandlerResult.errorMessage ?? taskHandlerResult.state) + : 'failed', + } job.log.push({ completedAt: new Date().toISOString(), @@ -262,8 +267,6 @@ export const getRunTaskFunction = ( return } - let output: object = {} - let maxRetries: number | undefined = finalRetriesConfig?.attempts if (maxRetries === undefined || maxRetries === null) { @@ -278,8 +281,11 @@ export const getRunTaskFunction = ( } } + let taskHandlerResult: TaskHandlerResult + let output: object = {} + try { - const runnerOutput = await runner({ + taskHandlerResult = await runner({ inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, { taskID, taskSlug, @@ -292,29 +298,6 @@ export const getRunTaskFunction = ( taskSlug, }), }) - - if (runnerOutput.state === 'failed') { - await handleTaskFailed({ - executedAt, - input, - job, - maxRetries, - output, - parent, - req, - retriesConfig: finalRetriesConfig, - runnerOutput, - state, - taskConfig, - taskID, - taskSlug, - taskStatus, - updateJob, - }) - throw new Error('Task failed') - } else { - output = runnerOutput.output - } } catch (err) { await handleTaskFailed({ error: err, @@ -336,6 +319,29 @@ export const getRunTaskFunction = ( throw new Error('Task failed') } + if (taskHandlerResult.state === 'failed') { + await handleTaskFailed({ + executedAt, + input, + job, + maxRetries, + output, + parent, + req, + retriesConfig: finalRetriesConfig, + state, + taskConfig, + taskHandlerResult, + taskID, + taskSlug, + taskStatus, + updateJob, + }) + throw new Error('Task failed') + } else { + output = taskHandlerResult.output + } + if (taskConfig?.onSuccess) { await taskConfig.onSuccess() } diff --git a/test/queues/config.ts b/test/queues/config.ts index 67e1c91265..ac886e2bf4 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -323,6 +323,44 @@ export default buildConfigWithDefaults({ ], handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler', } as TaskConfig<'ExternalTask'>, + { + retries: 0, + slug: 'ThrowError', + inputSchema: [], + outputSchema: [], + handler: () => { + throw new Error('failed') + }, + } as TaskConfig<'ThrowError'>, + { + retries: 0, + slug: 'ReturnError', + inputSchema: [], + outputSchema: [], + handler: () => { + return { + state: 'failed', + } + }, + } as TaskConfig<'ReturnError'>, + { + retries: 0, + slug: 'ReturnCustomError', + inputSchema: [ + { + name: 'errorMessage', + type: 'text', + required: true, + }, + ], + outputSchema: [], + handler: ({ input }) => { + return { + state: 'failed', + errorMessage: input.errorMessage, + } + }, + } as TaskConfig<'ReturnCustomError'>, ], workflows: [ updatePostWorkflow, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 8193988c53..7dd030b9c4 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -1128,4 +1128,69 @@ describe('Queues', () => { // @ts-expect-error expect(jobAfterRun.input.amountTask1Retried).toBe(0) }) + + it('can tasks throw error', async () => { + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + task: 'ThrowError', + input: {}, + }) + + await payload.jobs.run() + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + expect(jobAfterRun.hasError).toBe(true) + expect(jobAfterRun.log?.length).toBe(1) + expect(jobAfterRun.log[0].error.message).toBe('failed') + expect(jobAfterRun.log[0].state).toBe('failed') + }) + + it('can tasks return error', async () => { + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + task: 'ReturnError', + input: {}, + }) + + await payload.jobs.run() + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + expect(jobAfterRun.hasError).toBe(true) + expect(jobAfterRun.log?.length).toBe(1) + expect(jobAfterRun.log[0].error.message).toBe('failed') + expect(jobAfterRun.log[0].state).toBe('failed') + }) + + it('can tasks return error with custom error message', async () => { + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + task: 'ReturnCustomError', + input: { + errorMessage: 'custom error message', + }, + }) + + await payload.jobs.run() + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + expect(jobAfterRun.hasError).toBe(true) + expect(jobAfterRun.log?.length).toBe(1) + expect(jobAfterRun.log[0].error.message).toBe('custom error message') + expect(jobAfterRun.log[0].state).toBe('failed') + }) })