Files
payload/test/queues/config.ts
Alessio Gravili 59f536c2c9 refactor: simplify job queue error handling (#12845)
This simplifies workflow / task error handling, as well as cancelling
jobs. Previously, we were handling errors when they occur and passing
through error state using a `state` object - errors were then handled in
multiple areas of the code.

This PR adds new, clean `TaskError`, `WorkflowError` and
`JobCancelledError` errors that are thrown when they occur and are
handled **in one single place**, massively cleaning up complex functions
like
[payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts](https://github.com/payloadcms/payload/compare/refactor/jobs-errors?expand=1#diff-53dc7ccb7c8e023c9ba63fdd2e78c32ad0be606a2c64a3512abad87893f5fd21)

Performance will also be positively improved by this change -
previously, as task / workflow failure or cancellation would have
resulted in multiple, separate `updateJob` db calls, as data
modifications to the job object required for storing failure state were
done multiple times in multiple areas of the codebase. Most notably,
task error state was handled and updated separately from workflow error
state.
Now, it's just a clean, single `updateJob` call

This PR also does the following:
- adds a new test for `deleteJobOnComplete` behavior
- cleans up test suite
- ensures `deleteJobOnComplete` does not delete definitively failed jobs

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210553277813320
2025-06-17 22:24:53 +00:00

417 lines
11 KiB
TypeScript

import type { TaskConfig } from 'payload'
import { lexicalEditor } from '@payloadcms/richtext-lexical'
import { fileURLToPath } from 'node:url'
import path from 'path'
import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js'
import { devUser } from '../credentials.js'
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
import { seed } from './seed.js'
import { externalWorkflow } from './workflows/externalWorkflow.js'
import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js'
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
import { longRunningWorkflow } from './workflows/longRunning.js'
import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js'
import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js'
import { retries0Workflow } from './workflows/retries0.js'
import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js'
import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js'
import { retriesTestWorkflow } from './workflows/retriesTest.js'
import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js'
import { subTaskWorkflow } from './workflows/subTask.js'
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
import { updatePostWorkflow } from './workflows/updatePost.js'
import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
// eslint-disable-next-line no-restricted-exports
export default buildConfigWithDefaults({
collections: [
{
slug: 'posts',
admin: {
useAsTitle: 'title',
},
hooks: {
afterChange: [
async ({ req, doc, context }) => {
await req.payload.jobs.queue({
workflow: context.useJSONWorkflow ? 'updatePostJSONWorkflow' : 'updatePost',
input: {
post: doc.id,
message: 'hello',
},
req,
})
},
],
},
fields: [
{
name: 'title',
type: 'text',
required: true,
},
{
name: 'content',
type: 'richText',
},
{
name: 'jobStep1Ran',
type: 'text',
},
{
name: 'jobStep2Ran',
type: 'text',
},
],
},
{
slug: 'simple',
admin: {
useAsTitle: 'title',
},
fields: [
{
name: 'title',
type: 'text',
required: true,
},
],
},
],
admin: {
importMap: {
baseDir: path.resolve(dirname),
},
autoLogin: {
prefillOnly: true,
email: devUser.email,
password: devUser.password,
},
},
jobs: {
autoRun: [
{
// Every second
cron: '* * * * * *',
limit: 100,
queue: 'autorunSecond', // name of the queue
},
// add as many cron jobs as you want
],
shouldAutoRun: () => true,
jobsCollectionOverrides: ({ defaultJobsCollection }) => {
return {
...defaultJobsCollection,
admin: {
...(defaultJobsCollection?.admin || {}),
hidden: false,
},
}
},
processingOrder: {
queues: {
lifo: '-createdAt',
},
},
tasks: [
{
retries: 2,
slug: 'UpdatePost',
interfaceName: 'MyUpdatePostType',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'messageTwice',
type: 'text',
required: true,
},
],
handler: updatePostStep1,
} as TaskConfig<'UpdatePost'>,
{
retries: 2,
slug: 'UpdatePostStep2',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'messageTwice',
type: 'text',
required: true,
},
],
handler: updatePostStep2,
} as TaskConfig<'UpdatePostStep2'>,
{
retries: 3,
slug: 'CreateSimple',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimple'>,
{
slug: 'CreateSimpleRetriesUndefined',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimpleRetriesUndefined'>,
{
slug: 'CreateSimpleRetries0',
retries: 0,
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimpleRetries0'>,
{
retries: 2,
slug: 'CreateSimpleWithDuplicateMessage',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message + input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimpleWithDuplicateMessage'>,
{
retries: 2,
slug: 'ExternalTask',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler',
} as TaskConfig<'ExternalTask'>,
{
retries: 0,
slug: 'ThrowError',
inputSchema: [],
outputSchema: [],
handler: () => {
throw new Error('failed')
},
} as TaskConfig<'ThrowError'>,
{
retries: 0,
slug: 'ReturnError',
inputSchema: [],
outputSchema: [],
handler: () => {
return {
state: 'failed',
}
},
} as TaskConfig<'ReturnError'>,
{
retries: 0,
slug: 'ReturnCustomError',
inputSchema: [
{
name: 'errorMessage',
type: 'text',
required: true,
},
],
outputSchema: [],
handler: ({ input }) => {
return {
state: 'failed',
errorMessage: input.errorMessage,
}
},
} as TaskConfig<'ReturnCustomError'>,
],
workflows: [
updatePostWorkflow,
updatePostJSONWorkflow,
retriesTestWorkflow,
retriesRollbackTestWorkflow,
retriesWorkflowLevelTestWorkflow,
noRetriesSetWorkflow,
retries0Workflow,
workflowAndTasksRetriesUndefinedWorkflow,
workflowRetries2TasksRetriesUndefinedWorkflow,
workflowRetries2TasksRetries0Workflow,
inlineTaskTestWorkflow,
failsImmediatelyWorkflow,
inlineTaskTestDelayedWorkflow,
externalWorkflow,
retriesBackoffTestWorkflow,
subTaskWorkflow,
subTaskFailsWorkflow,
longRunningWorkflow,
parallelTaskWorkflow,
],
},
editor: lexicalEditor(),
onInit: async (payload) => {
if (process.env.SEED_IN_CONFIG_ONINIT !== 'false') {
await seed(payload)
}
},
typescript: {
outputFile: path.resolve(dirname, 'payload-types.ts'),
},
})