This simplifies workflow / task error handling, as well as cancelling jobs. Previously, we were handling errors when they occur and passing through error state using a `state` object - errors were then handled in multiple areas of the code. This PR adds new, clean `TaskError`, `WorkflowError` and `JobCancelledError` errors that are thrown when they occur and are handled **in one single place**, massively cleaning up complex functions like [payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts](https://github.com/payloadcms/payload/compare/refactor/jobs-errors?expand=1#diff-53dc7ccb7c8e023c9ba63fdd2e78c32ad0be606a2c64a3512abad87893f5fd21) Performance will also be positively improved by this change - previously, as task / workflow failure or cancellation would have resulted in multiple, separate `updateJob` db calls, as data modifications to the job object required for storing failure state were done multiple times in multiple areas of the codebase. Most notably, task error state was handled and updated separately from workflow error state. Now, it's just a clean, single `updateJob` call This PR also does the following: - adds a new test for `deleteJobOnComplete` behavior - cleans up test suite - ensures `deleteJobOnComplete` does not delete definitively failed jobs --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1210553277813320
417 lines
11 KiB
TypeScript
417 lines
11 KiB
TypeScript
import type { TaskConfig } from 'payload'
|
|
|
|
import { lexicalEditor } from '@payloadcms/richtext-lexical'
|
|
import { fileURLToPath } from 'node:url'
|
|
import path from 'path'
|
|
|
|
import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js'
|
|
import { devUser } from '../credentials.js'
|
|
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
|
|
import { seed } from './seed.js'
|
|
import { externalWorkflow } from './workflows/externalWorkflow.js'
|
|
import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js'
|
|
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
|
|
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
|
|
import { longRunningWorkflow } from './workflows/longRunning.js'
|
|
import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js'
|
|
import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js'
|
|
import { retries0Workflow } from './workflows/retries0.js'
|
|
import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js'
|
|
import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js'
|
|
import { retriesTestWorkflow } from './workflows/retriesTest.js'
|
|
import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js'
|
|
import { subTaskWorkflow } from './workflows/subTask.js'
|
|
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
|
|
import { updatePostWorkflow } from './workflows/updatePost.js'
|
|
import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
|
|
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
|
|
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
|
|
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
|
|
|
|
const filename = fileURLToPath(import.meta.url)
|
|
const dirname = path.dirname(filename)
|
|
|
|
// eslint-disable-next-line no-restricted-exports
|
|
export default buildConfigWithDefaults({
|
|
collections: [
|
|
{
|
|
slug: 'posts',
|
|
admin: {
|
|
useAsTitle: 'title',
|
|
},
|
|
hooks: {
|
|
afterChange: [
|
|
async ({ req, doc, context }) => {
|
|
await req.payload.jobs.queue({
|
|
workflow: context.useJSONWorkflow ? 'updatePostJSONWorkflow' : 'updatePost',
|
|
input: {
|
|
post: doc.id,
|
|
message: 'hello',
|
|
},
|
|
req,
|
|
})
|
|
},
|
|
],
|
|
},
|
|
fields: [
|
|
{
|
|
name: 'title',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'content',
|
|
type: 'richText',
|
|
},
|
|
{
|
|
name: 'jobStep1Ran',
|
|
type: 'text',
|
|
},
|
|
{
|
|
name: 'jobStep2Ran',
|
|
type: 'text',
|
|
},
|
|
],
|
|
},
|
|
{
|
|
slug: 'simple',
|
|
admin: {
|
|
useAsTitle: 'title',
|
|
},
|
|
fields: [
|
|
{
|
|
name: 'title',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
},
|
|
],
|
|
admin: {
|
|
importMap: {
|
|
baseDir: path.resolve(dirname),
|
|
},
|
|
autoLogin: {
|
|
prefillOnly: true,
|
|
email: devUser.email,
|
|
password: devUser.password,
|
|
},
|
|
},
|
|
jobs: {
|
|
autoRun: [
|
|
{
|
|
// Every second
|
|
cron: '* * * * * *',
|
|
limit: 100,
|
|
queue: 'autorunSecond', // name of the queue
|
|
},
|
|
// add as many cron jobs as you want
|
|
],
|
|
shouldAutoRun: () => true,
|
|
jobsCollectionOverrides: ({ defaultJobsCollection }) => {
|
|
return {
|
|
...defaultJobsCollection,
|
|
admin: {
|
|
...(defaultJobsCollection?.admin || {}),
|
|
hidden: false,
|
|
},
|
|
}
|
|
},
|
|
processingOrder: {
|
|
queues: {
|
|
lifo: '-createdAt',
|
|
},
|
|
},
|
|
tasks: [
|
|
{
|
|
retries: 2,
|
|
slug: 'UpdatePost',
|
|
interfaceName: 'MyUpdatePostType',
|
|
inputSchema: [
|
|
{
|
|
name: 'post',
|
|
type: 'relationship',
|
|
relationTo: 'posts',
|
|
maxDepth: 0,
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'messageTwice',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: updatePostStep1,
|
|
} as TaskConfig<'UpdatePost'>,
|
|
{
|
|
retries: 2,
|
|
slug: 'UpdatePostStep2',
|
|
inputSchema: [
|
|
{
|
|
name: 'post',
|
|
type: 'relationship',
|
|
relationTo: 'posts',
|
|
maxDepth: 0,
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'messageTwice',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: updatePostStep2,
|
|
} as TaskConfig<'UpdatePostStep2'>,
|
|
{
|
|
retries: 3,
|
|
slug: 'CreateSimple',
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'shouldFail',
|
|
type: 'checkbox',
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'simpleID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ input, req }) => {
|
|
if (input.shouldFail) {
|
|
throw new Error('Failed on purpose')
|
|
}
|
|
const newSimple = await req.payload.create({
|
|
collection: 'simple',
|
|
req,
|
|
data: {
|
|
title: input.message,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
simpleID: newSimple.id,
|
|
},
|
|
}
|
|
},
|
|
} as TaskConfig<'CreateSimple'>,
|
|
{
|
|
slug: 'CreateSimpleRetriesUndefined',
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'shouldFail',
|
|
type: 'checkbox',
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'simpleID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ input, req }) => {
|
|
if (input.shouldFail) {
|
|
throw new Error('Failed on purpose')
|
|
}
|
|
const newSimple = await req.payload.create({
|
|
collection: 'simple',
|
|
req,
|
|
data: {
|
|
title: input.message,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
simpleID: newSimple.id,
|
|
},
|
|
}
|
|
},
|
|
} as TaskConfig<'CreateSimpleRetriesUndefined'>,
|
|
{
|
|
slug: 'CreateSimpleRetries0',
|
|
retries: 0,
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'shouldFail',
|
|
type: 'checkbox',
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'simpleID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ input, req }) => {
|
|
if (input.shouldFail) {
|
|
throw new Error('Failed on purpose')
|
|
}
|
|
const newSimple = await req.payload.create({
|
|
collection: 'simple',
|
|
req,
|
|
data: {
|
|
title: input.message,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
simpleID: newSimple.id,
|
|
},
|
|
}
|
|
},
|
|
} as TaskConfig<'CreateSimpleRetries0'>,
|
|
{
|
|
retries: 2,
|
|
slug: 'CreateSimpleWithDuplicateMessage',
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
{
|
|
name: 'shouldFail',
|
|
type: 'checkbox',
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'simpleID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ input, req }) => {
|
|
if (input.shouldFail) {
|
|
throw new Error('Failed on purpose')
|
|
}
|
|
const newSimple = await req.payload.create({
|
|
collection: 'simple',
|
|
req,
|
|
data: {
|
|
title: input.message + input.message,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
simpleID: newSimple.id,
|
|
},
|
|
}
|
|
},
|
|
} as TaskConfig<'CreateSimpleWithDuplicateMessage'>,
|
|
{
|
|
retries: 2,
|
|
slug: 'ExternalTask',
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
outputSchema: [
|
|
{
|
|
name: 'simpleID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler',
|
|
} as TaskConfig<'ExternalTask'>,
|
|
{
|
|
retries: 0,
|
|
slug: 'ThrowError',
|
|
inputSchema: [],
|
|
outputSchema: [],
|
|
handler: () => {
|
|
throw new Error('failed')
|
|
},
|
|
} as TaskConfig<'ThrowError'>,
|
|
{
|
|
retries: 0,
|
|
slug: 'ReturnError',
|
|
inputSchema: [],
|
|
outputSchema: [],
|
|
handler: () => {
|
|
return {
|
|
state: 'failed',
|
|
}
|
|
},
|
|
} as TaskConfig<'ReturnError'>,
|
|
{
|
|
retries: 0,
|
|
slug: 'ReturnCustomError',
|
|
inputSchema: [
|
|
{
|
|
name: 'errorMessage',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
outputSchema: [],
|
|
handler: ({ input }) => {
|
|
return {
|
|
state: 'failed',
|
|
errorMessage: input.errorMessage,
|
|
}
|
|
},
|
|
} as TaskConfig<'ReturnCustomError'>,
|
|
],
|
|
workflows: [
|
|
updatePostWorkflow,
|
|
updatePostJSONWorkflow,
|
|
retriesTestWorkflow,
|
|
retriesRollbackTestWorkflow,
|
|
retriesWorkflowLevelTestWorkflow,
|
|
noRetriesSetWorkflow,
|
|
retries0Workflow,
|
|
workflowAndTasksRetriesUndefinedWorkflow,
|
|
workflowRetries2TasksRetriesUndefinedWorkflow,
|
|
workflowRetries2TasksRetries0Workflow,
|
|
inlineTaskTestWorkflow,
|
|
failsImmediatelyWorkflow,
|
|
inlineTaskTestDelayedWorkflow,
|
|
externalWorkflow,
|
|
retriesBackoffTestWorkflow,
|
|
subTaskWorkflow,
|
|
subTaskFailsWorkflow,
|
|
longRunningWorkflow,
|
|
parallelTaskWorkflow,
|
|
],
|
|
},
|
|
editor: lexicalEditor(),
|
|
onInit: async (payload) => {
|
|
if (process.env.SEED_IN_CONFIG_ONINIT !== 'false') {
|
|
await seed(payload)
|
|
}
|
|
},
|
|
typescript: {
|
|
outputFile: path.resolve(dirname, 'payload-types.ts'),
|
|
},
|
|
})
|