fix: ensure errors returned from tasks are properly logged (#11443)

Fixes https://github.com/payloadcms/payload/issues/9767

We allow failing a job queue task by returning `{ state: 'failed' }` from the task, instead of throwing an error. However, previously, this threw an error when trying to update the task in the database. Additionally, it was not possible to customize the error message.

This PR fixes that by letting you return `errorMessage` alongside `{ state: 'failed' }`, and by ensuring the error is transformed into proper json before saving it to the `error` column.
This commit is contained in:
Alessio Gravili
2025-02-28 09:00:56 -07:00
committed by GitHub
parent dfddee2125
commit d53f166476
4 changed files with 160 additions and 40 deletions

View File

@@ -7,14 +7,19 @@ export type TaskInputOutput = {
} }
export type TaskHandlerResult< export type TaskHandlerResult<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput, TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
> = { > =
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | {
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output'] errorMessage?: string
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type state: 'failed'
? TTaskSlugOrInputOutput['output'] }
: never | {
state?: 'failed' | 'succeeded' output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
} ? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['output']
: never
state?: 'succeeded'
}
export type TaskHandlerArgs< export type TaskHandlerArgs<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput, TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
@@ -84,6 +89,8 @@ export type RunTaskFunctions = {
[TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug> [TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug>
} }
type MaybePromise<T> = Promise<T> | T
export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>( export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>(
taskID: string, taskID: string,
taskArgs: { taskArgs: {
@@ -103,12 +110,16 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
job: RunningJob<any> job: RunningJob<any>
req: PayloadRequest req: PayloadRequest
tasks: RunTaskFunctions tasks: RunTaskFunctions
}) => }) => MaybePromise<
| {
errorMessage?: string
state: 'failed'
}
| { | {
output: TTaskOutput output: TTaskOutput
state?: 'failed' | 'succeeded' state?: 'succeeded'
} }
| Promise<{ output: TTaskOutput; state?: 'failed' | 'succeeded' }> >
}, },
) => Promise<TTaskOutput> ) => Promise<TTaskOutput>

View File

@@ -48,9 +48,9 @@ export async function handleTaskFailed({
parent, parent,
req, req,
retriesConfig, retriesConfig,
runnerOutput,
state, state,
taskConfig, taskConfig,
taskHandlerResult,
taskID, taskID,
taskSlug, taskSlug,
taskStatus, taskStatus,
@@ -65,9 +65,9 @@ export async function handleTaskFailed({
parent?: TaskParent parent?: TaskParent
req: PayloadRequest req: PayloadRequest
retriesConfig: number | RetryConfig retriesConfig: number | RetryConfig
runnerOutput?: TaskHandlerResult<string>
state: RunTaskFunctionState state: RunTaskFunctionState
taskConfig?: TaskConfig<string> taskConfig?: TaskConfig<string>
taskHandlerResult?: TaskHandlerResult<string>
taskID: string taskID: string
taskSlug: string taskSlug: string
taskStatus: null | SingleTaskStatus<string> taskStatus: null | SingleTaskStatus<string>
@@ -88,7 +88,12 @@ export async function handleTaskFailed({
message: error.message, message: error.message,
stack: error.stack, stack: error.stack,
} }
: runnerOutput.state : {
message:
taskHandlerResult.state === 'failed'
? (taskHandlerResult.errorMessage ?? taskHandlerResult.state)
: 'failed',
}
job.log.push({ job.log.push({
completedAt: new Date().toISOString(), completedAt: new Date().toISOString(),
@@ -262,8 +267,6 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
return return
} }
let output: object = {}
let maxRetries: number | undefined = finalRetriesConfig?.attempts let maxRetries: number | undefined = finalRetriesConfig?.attempts
if (maxRetries === undefined || maxRetries === null) { if (maxRetries === undefined || maxRetries === null) {
@@ -278,8 +281,11 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
} }
} }
let taskHandlerResult: TaskHandlerResult<string>
let output: object = {}
try { try {
const runnerOutput = await runner({ taskHandlerResult = await runner({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, { inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, {
taskID, taskID,
taskSlug, taskSlug,
@@ -292,29 +298,6 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
taskSlug, taskSlug,
}), }),
}) })
if (runnerOutput.state === 'failed') {
await handleTaskFailed({
executedAt,
input,
job,
maxRetries,
output,
parent,
req,
retriesConfig: finalRetriesConfig,
runnerOutput,
state,
taskConfig,
taskID,
taskSlug,
taskStatus,
updateJob,
})
throw new Error('Task failed')
} else {
output = runnerOutput.output
}
} catch (err) { } catch (err) {
await handleTaskFailed({ await handleTaskFailed({
error: err, error: err,
@@ -336,6 +319,29 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
throw new Error('Task failed') throw new Error('Task failed')
} }
if (taskHandlerResult.state === 'failed') {
await handleTaskFailed({
executedAt,
input,
job,
maxRetries,
output,
parent,
req,
retriesConfig: finalRetriesConfig,
state,
taskConfig,
taskHandlerResult,
taskID,
taskSlug,
taskStatus,
updateJob,
})
throw new Error('Task failed')
} else {
output = taskHandlerResult.output
}
if (taskConfig?.onSuccess) { if (taskConfig?.onSuccess) {
await taskConfig.onSuccess() await taskConfig.onSuccess()
} }

View File

@@ -323,6 +323,44 @@ export default buildConfigWithDefaults({
], ],
handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler', handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler',
} as TaskConfig<'ExternalTask'>, } as TaskConfig<'ExternalTask'>,
{
retries: 0,
slug: 'ThrowError',
inputSchema: [],
outputSchema: [],
handler: () => {
throw new Error('failed')
},
} as TaskConfig<'ThrowError'>,
{
retries: 0,
slug: 'ReturnError',
inputSchema: [],
outputSchema: [],
handler: () => {
return {
state: 'failed',
}
},
} as TaskConfig<'ReturnError'>,
{
retries: 0,
slug: 'ReturnCustomError',
inputSchema: [
{
name: 'errorMessage',
type: 'text',
required: true,
},
],
outputSchema: [],
handler: ({ input }) => {
return {
state: 'failed',
errorMessage: input.errorMessage,
}
},
} as TaskConfig<'ReturnCustomError'>,
], ],
workflows: [ workflows: [
updatePostWorkflow, updatePostWorkflow,

View File

@@ -1128,4 +1128,69 @@ describe('Queues', () => {
// @ts-expect-error // @ts-expect-error
expect(jobAfterRun.input.amountTask1Retried).toBe(0) expect(jobAfterRun.input.amountTask1Retried).toBe(0)
}) })
it('can tasks throw error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ThrowError',
input: {},
})
await payload.jobs.run()
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('failed')
expect(jobAfterRun.log[0].state).toBe('failed')
})
it('can tasks return error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnError',
input: {},
})
await payload.jobs.run()
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('failed')
expect(jobAfterRun.log[0].state).toBe('failed')
})
it('can tasks return error with custom error message', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnCustomError',
input: {
errorMessage: 'custom error message',
},
})
await payload.jobs.run()
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('custom error message')
expect(jobAfterRun.log[0].state).toBe('failed')
})
}) })