chore: cleanup queues test suite (#11410)
This PR extracts each workflow of our queues test suite into its own file
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
import type { TaskConfig, WorkflowConfig } from 'payload'
|
import type { TaskConfig } from 'payload'
|
||||||
|
|
||||||
import { lexicalEditor } from '@payloadcms/richtext-lexical'
|
import { lexicalEditor } from '@payloadcms/richtext-lexical'
|
||||||
import { fileURLToPath } from 'node:url'
|
import { fileURLToPath } from 'node:url'
|
||||||
@@ -8,6 +8,21 @@ import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js'
|
|||||||
import { devUser } from '../credentials.js'
|
import { devUser } from '../credentials.js'
|
||||||
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
|
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
|
||||||
import { clearAndSeedEverything } from './seed.js'
|
import { clearAndSeedEverything } from './seed.js'
|
||||||
|
import { externalWorkflow } from './workflows/externalWorkflow.js'
|
||||||
|
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
|
||||||
|
import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js'
|
||||||
|
import { retries0Workflow } from './workflows/retries0.js'
|
||||||
|
import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js'
|
||||||
|
import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js'
|
||||||
|
import { retriesTestWorkflow } from './workflows/retriesTest.js'
|
||||||
|
import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js'
|
||||||
|
import { subTaskWorkflow } from './workflows/subTask.js'
|
||||||
|
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
|
||||||
|
import { updatePostWorkflow } from './workflows/updatePost.js'
|
||||||
|
import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
|
||||||
|
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
|
||||||
|
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
|
||||||
|
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
|
||||||
|
|
||||||
const filename = fileURLToPath(import.meta.url)
|
const filename = fileURLToPath(import.meta.url)
|
||||||
const dirname = path.dirname(filename)
|
const dirname = path.dirname(filename)
|
||||||
@@ -310,678 +325,21 @@ export default buildConfigWithDefaults({
|
|||||||
} as TaskConfig<'ExternalTask'>,
|
} as TaskConfig<'ExternalTask'>,
|
||||||
],
|
],
|
||||||
workflows: [
|
workflows: [
|
||||||
{
|
updatePostWorkflow,
|
||||||
slug: 'updatePost',
|
updatePostJSONWorkflow,
|
||||||
interfaceName: 'MyUpdatePostWorkflowType',
|
retriesTestWorkflow,
|
||||||
inputSchema: [
|
retriesRollbackTestWorkflow,
|
||||||
{
|
retriesWorkflowLevelTestWorkflow,
|
||||||
name: 'post',
|
noRetriesSetWorkflow,
|
||||||
type: 'relationship',
|
retries0Workflow,
|
||||||
relationTo: 'posts',
|
workflowAndTasksRetriesUndefinedWorkflow,
|
||||||
maxDepth: 0,
|
workflowRetries2TasksRetriesUndefinedWorkflow,
|
||||||
required: true,
|
workflowRetries2TasksRetries0Workflow,
|
||||||
},
|
inlineTaskTestWorkflow,
|
||||||
{
|
externalWorkflow,
|
||||||
name: 'message',
|
retriesBackoffTestWorkflow,
|
||||||
type: 'text',
|
subTaskWorkflow,
|
||||||
required: true,
|
subTaskFailsWorkflow,
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks }) => {
|
|
||||||
await tasks.UpdatePost('1', {
|
|
||||||
input: {
|
|
||||||
post: job.input.post,
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.UpdatePostStep2('2', {
|
|
||||||
input: {
|
|
||||||
post: job.taskStatus.UpdatePost['1'].input.post,
|
|
||||||
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'updatePost'>,
|
|
||||||
{
|
|
||||||
slug: 'updatePostJSONWorkflow',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'post',
|
|
||||||
type: 'relationship',
|
|
||||||
relationTo: 'posts',
|
|
||||||
maxDepth: 0,
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: [
|
|
||||||
{
|
|
||||||
task: 'UpdatePost',
|
|
||||||
id: '1',
|
|
||||||
input: ({ job }) => ({
|
|
||||||
post: job.input.post,
|
|
||||||
message: job.input.message,
|
|
||||||
}),
|
|
||||||
},
|
|
||||||
{
|
|
||||||
task: 'UpdatePostStep2',
|
|
||||||
id: '2',
|
|
||||||
input: ({ job }) => ({
|
|
||||||
post: job.taskStatus.UpdatePost['1'].input.post,
|
|
||||||
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
|
|
||||||
}),
|
|
||||||
condition({ job }) {
|
|
||||||
return job?.taskStatus?.UpdatePost?.['1']?.complete
|
|
||||||
},
|
|
||||||
completesJob: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
} as WorkflowConfig<'updatePostJSONWorkflow'>,
|
|
||||||
{
|
|
||||||
slug: 'retriesTest',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimple('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimple('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'retriesTest'>,
|
|
||||||
{
|
|
||||||
slug: 'retriesRollbackTest',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, inlineTask, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await inlineTask('1', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
const newSimple = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
simpleID: newSimple.id,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
await inlineTask('2', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: 'should not exist',
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// Fail afterwards, so that we can also test that transactions work (i.e. the job is rolled back)
|
|
||||||
|
|
||||||
throw new Error('Failed on purpose')
|
|
||||||
},
|
|
||||||
retries: {
|
|
||||||
attempts: 4,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'retriesRollbackTest'>,
|
|
||||||
{
|
|
||||||
slug: 'retriesWorkflowLevelTest',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimple('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimple('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'retriesWorkflowLevelTest'>,
|
|
||||||
{
|
|
||||||
slug: 'workflowNoRetriesSet',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimple('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimple('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'workflowNoRetriesSet'>,
|
|
||||||
{
|
|
||||||
slug: 'workflowRetries0',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
retries: 0,
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimple('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimple('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'workflowRetries0'>,
|
|
||||||
{
|
|
||||||
slug: 'workflowAndTasksRetriesUndefined',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimpleRetriesUndefined('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimpleRetriesUndefined('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'workflowAndTasksRetriesUndefined'>,
|
|
||||||
{
|
|
||||||
slug: 'workflowRetries2TasksRetriesUndefined',
|
|
||||||
retries: 2,
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimpleRetriesUndefined('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimpleRetriesUndefined('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'workflowRetries2TasksRetriesUndefined'>,
|
|
||||||
{
|
|
||||||
slug: 'workflowRetries2TasksRetries0',
|
|
||||||
retries: 2,
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, tasks, req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
await tasks.CreateSimpleRetries0('1', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// At this point there should always be one post created.
|
|
||||||
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
|
||||||
await tasks.CreateSimpleRetries0('2', {
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
shouldFail: true,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
// This will never be reached
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'workflowRetries2TasksRetries0'>,
|
|
||||||
{
|
|
||||||
slug: 'inlineTaskTest',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, inlineTask }) => {
|
|
||||||
await inlineTask('1', {
|
|
||||||
task: async ({ input, req }) => {
|
|
||||||
const newSimple = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
simpleID: newSimple.id,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'inlineTaskTest'>,
|
|
||||||
{
|
|
||||||
slug: 'externalWorkflow',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: path.resolve(dirname, 'runners/externalWorkflow.ts') + '#externalWorkflowHandler',
|
|
||||||
} as WorkflowConfig<'externalWorkflow'>,
|
|
||||||
{
|
|
||||||
slug: 'retriesBackoffTest',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, inlineTask, req }) => {
|
|
||||||
const newJob = await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountRetried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
job.input = newJob.input as any
|
|
||||||
|
|
||||||
await inlineTask('1', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0
|
|
||||||
|
|
||||||
const { id } = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: 'should not exist',
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
||||||
if (!job.input.timeTried) {
|
|
||||||
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
||||||
job.input.timeTried = {}
|
|
||||||
}
|
|
||||||
|
|
||||||
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
||||||
job.input.timeTried[totalTried] = new Date().toISOString()
|
|
||||||
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: job.input,
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
|
|
||||||
if (totalTried < 4) {
|
|
||||||
// Cleanup the post
|
|
||||||
await req.payload.delete({
|
|
||||||
collection: 'simple',
|
|
||||||
id,
|
|
||||||
req,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Last try it should succeed
|
|
||||||
throw new Error('Failed on purpose')
|
|
||||||
}
|
|
||||||
return {
|
|
||||||
output: {},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
retries: {
|
|
||||||
attempts: 4,
|
|
||||||
backoff: {
|
|
||||||
type: 'exponential',
|
|
||||||
// Should retry in 300ms, then 600, then 1200, then 2400, then succeed
|
|
||||||
delay: 300,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'retriesBackoffTest'>,
|
|
||||||
{
|
|
||||||
slug: 'subTask',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
handler: async ({ job, inlineTask }) => {
|
|
||||||
await inlineTask('create two docs', {
|
|
||||||
task: async ({ input, inlineTask }) => {
|
|
||||||
const { newSimple } = await inlineTask('create doc 1', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
const newSimple = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
newSimple,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
const { newSimple2 } = await inlineTask('create doc 2', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
const newSimple2 = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
newSimple2,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
simpleID1: newSimple.id,
|
|
||||||
simpleID2: newSimple2.id,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'subTask'>,
|
|
||||||
{
|
|
||||||
slug: 'subTaskFails',
|
|
||||||
inputSchema: [
|
|
||||||
{
|
|
||||||
name: 'message',
|
|
||||||
type: 'text',
|
|
||||||
required: true,
|
|
||||||
},
|
|
||||||
],
|
|
||||||
retries: 3,
|
|
||||||
handler: async ({ job, inlineTask }) => {
|
|
||||||
await inlineTask('create two docs', {
|
|
||||||
task: async ({ input, inlineTask }) => {
|
|
||||||
const { newSimple } = await inlineTask('create doc 1 - succeeds', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
const newSimple = await req.payload.create({
|
|
||||||
collection: 'simple',
|
|
||||||
req,
|
|
||||||
data: {
|
|
||||||
title: input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountTask1Retried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountTask1Retried !== undefined
|
|
||||||
? // @ts-expect-error
|
|
||||||
job.input.amountTask1Retried + 1
|
|
||||||
: 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
newSimple,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
await inlineTask('create doc 2 - fails', {
|
|
||||||
task: async ({ req }) => {
|
|
||||||
await req.payload.update({
|
|
||||||
collection: 'payload-jobs',
|
|
||||||
data: {
|
|
||||||
input: {
|
|
||||||
...job.input,
|
|
||||||
amountTask2Retried:
|
|
||||||
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
||||||
job.input.amountTask2Retried !== undefined
|
|
||||||
? // @ts-expect-error
|
|
||||||
job.input.amountTask2Retried + 1
|
|
||||||
: 0,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
id: job.id,
|
|
||||||
})
|
|
||||||
throw new Error('Failed on purpose')
|
|
||||||
},
|
|
||||||
})
|
|
||||||
return {
|
|
||||||
output: {
|
|
||||||
simpleID1: newSimple.id,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
},
|
|
||||||
input: {
|
|
||||||
message: job.input.message,
|
|
||||||
},
|
|
||||||
})
|
|
||||||
},
|
|
||||||
} as WorkflowConfig<'subTaskFails'>,
|
|
||||||
],
|
],
|
||||||
},
|
},
|
||||||
editor: lexicalEditor(),
|
editor: lexicalEditor(),
|
||||||
|
|||||||
19
test/queues/workflows/externalWorkflow.ts
Normal file
19
test/queues/workflows/externalWorkflow.ts
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
import path from 'path'
|
||||||
|
import { fileURLToPath } from 'url'
|
||||||
|
|
||||||
|
const filename = fileURLToPath(import.meta.url)
|
||||||
|
const dirname = path.dirname(filename)
|
||||||
|
|
||||||
|
export const externalWorkflow: WorkflowConfig<'externalWorkflow'> = {
|
||||||
|
slug: 'externalWorkflow',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: path.resolve(dirname, '../runners/externalWorkflow.ts') + '#externalWorkflowHandler',
|
||||||
|
}
|
||||||
33
test/queues/workflows/inlineTaskTest.ts
Normal file
33
test/queues/workflows/inlineTaskTest.ts
Normal file
@@ -0,0 +1,33 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const inlineTaskTestWorkflow: WorkflowConfig<'inlineTaskTest'> = {
|
||||||
|
slug: 'inlineTaskTest',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, inlineTask }) => {
|
||||||
|
await inlineTask('1', {
|
||||||
|
task: async ({ input, req }) => {
|
||||||
|
const newSimple = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
simpleID: newSimple.id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
42
test/queues/workflows/noRetriesSet.ts
Normal file
42
test/queues/workflows/noRetriesSet.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = {
|
||||||
|
slug: 'workflowNoRetriesSet',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimple('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimple('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
43
test/queues/workflows/retries0.ts
Normal file
43
test/queues/workflows/retries0.ts
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = {
|
||||||
|
slug: 'workflowRetries0',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
retries: 0,
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimple('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimple('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
81
test/queues/workflows/retriesBackoffTest.ts
Normal file
81
test/queues/workflows/retriesBackoffTest.ts
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> = {
|
||||||
|
slug: 'retriesBackoffTest',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, inlineTask, req }) => {
|
||||||
|
const newJob = await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
job.input = newJob.input as any
|
||||||
|
|
||||||
|
await inlineTask('1', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0
|
||||||
|
|
||||||
|
const { id } = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: 'should not exist',
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
||||||
|
if (!job.input.timeTried) {
|
||||||
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
||||||
|
job.input.timeTried = {}
|
||||||
|
}
|
||||||
|
|
||||||
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
||||||
|
job.input.timeTried[totalTried] = new Date().toISOString()
|
||||||
|
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: job.input,
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
if (totalTried < 4) {
|
||||||
|
// Cleanup the post
|
||||||
|
await req.payload.delete({
|
||||||
|
collection: 'simple',
|
||||||
|
id,
|
||||||
|
req,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Last try it should succeed
|
||||||
|
throw new Error('Failed on purpose')
|
||||||
|
}
|
||||||
|
return {
|
||||||
|
output: {},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
retries: {
|
||||||
|
attempts: 4,
|
||||||
|
backoff: {
|
||||||
|
type: 'exponential',
|
||||||
|
// Should retry in 300ms, then 600, then 1200, then 2400, then succeed
|
||||||
|
delay: 300,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
61
test/queues/workflows/retriesRollbackTest.ts
Normal file
61
test/queues/workflows/retriesRollbackTest.ts
Normal file
@@ -0,0 +1,61 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const retriesRollbackTestWorkflow: WorkflowConfig<'retriesRollbackTest'> = {
|
||||||
|
slug: 'retriesRollbackTest',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, inlineTask, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await inlineTask('1', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
const newSimple = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
simpleID: newSimple.id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
await inlineTask('2', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: 'should not exist',
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// Fail afterwards, so that we can also test that transactions work (i.e. the job is rolled back)
|
||||||
|
|
||||||
|
throw new Error('Failed on purpose')
|
||||||
|
},
|
||||||
|
retries: {
|
||||||
|
attempts: 4,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
42
test/queues/workflows/retriesTest.ts
Normal file
42
test/queues/workflows/retriesTest.ts
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = {
|
||||||
|
slug: 'retriesTest',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimple('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimple('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
43
test/queues/workflows/retriesWorkflowLevelTest.ts
Normal file
43
test/queues/workflows/retriesWorkflowLevelTest.ts
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLevelTest'> = {
|
||||||
|
slug: 'retriesWorkflowLevelTest',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimple('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimple('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
60
test/queues/workflows/subTask.ts
Normal file
60
test/queues/workflows/subTask.ts
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const subTaskWorkflow: WorkflowConfig<'subTask'> = {
|
||||||
|
slug: 'subTask',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, inlineTask }) => {
|
||||||
|
await inlineTask('create two docs', {
|
||||||
|
task: async ({ input, inlineTask }) => {
|
||||||
|
const { newSimple } = await inlineTask('create doc 1', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
const newSimple = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
newSimple,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
const { newSimple2 } = await inlineTask('create doc 2', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
const newSimple2 = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
newSimple2,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
simpleID1: newSimple.id,
|
||||||
|
simpleID2: newSimple2.id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
80
test/queues/workflows/subTaskFails.ts
Normal file
80
test/queues/workflows/subTaskFails.ts
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = {
|
||||||
|
slug: 'subTaskFails',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
retries: 3,
|
||||||
|
handler: async ({ job, inlineTask }) => {
|
||||||
|
await inlineTask('create two docs', {
|
||||||
|
task: async ({ input, inlineTask }) => {
|
||||||
|
const { newSimple } = await inlineTask('create doc 1 - succeeds', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
const newSimple = await req.payload.create({
|
||||||
|
collection: 'simple',
|
||||||
|
req,
|
||||||
|
data: {
|
||||||
|
title: input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountTask1Retried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountTask1Retried !== undefined
|
||||||
|
? // @ts-expect-error
|
||||||
|
job.input.amountTask1Retried + 1
|
||||||
|
: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
newSimple,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
await inlineTask('create doc 2 - fails', {
|
||||||
|
task: async ({ req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountTask2Retried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountTask2Retried !== undefined
|
||||||
|
? // @ts-expect-error
|
||||||
|
job.input.amountTask2Retried + 1
|
||||||
|
: 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
throw new Error('Failed on purpose')
|
||||||
|
},
|
||||||
|
})
|
||||||
|
return {
|
||||||
|
output: {
|
||||||
|
simpleID1: newSimple.id,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
37
test/queues/workflows/updatePost.ts
Normal file
37
test/queues/workflows/updatePost.ts
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const updatePostWorkflow: WorkflowConfig<'updatePost'> = {
|
||||||
|
slug: 'updatePost',
|
||||||
|
interfaceName: 'MyUpdatePostWorkflowType',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'post',
|
||||||
|
type: 'relationship',
|
||||||
|
relationTo: 'posts',
|
||||||
|
maxDepth: 0,
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks }) => {
|
||||||
|
await tasks.UpdatePost('1', {
|
||||||
|
input: {
|
||||||
|
post: job.input.post,
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.UpdatePostStep2('2', {
|
||||||
|
input: {
|
||||||
|
// @ts-expect-error
|
||||||
|
post: job.taskStatus.UpdatePost['1'].input.post,
|
||||||
|
// @ts-expect-error
|
||||||
|
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
},
|
||||||
|
}
|
||||||
41
test/queues/workflows/updatePostJSON.ts
Normal file
41
test/queues/workflows/updatePostJSON.ts
Normal file
@@ -0,0 +1,41 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const updatePostJSONWorkflow: WorkflowConfig<'updatePostJSONWorkflow'> = {
|
||||||
|
slug: 'updatePostJSONWorkflow',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'post',
|
||||||
|
type: 'relationship',
|
||||||
|
relationTo: 'posts',
|
||||||
|
maxDepth: 0,
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: [
|
||||||
|
{
|
||||||
|
task: 'UpdatePost',
|
||||||
|
id: '1',
|
||||||
|
input: ({ job }) => ({
|
||||||
|
post: job.input.post,
|
||||||
|
message: job.input.message,
|
||||||
|
}),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
task: 'UpdatePostStep2',
|
||||||
|
id: '2',
|
||||||
|
input: ({ job }) => ({
|
||||||
|
post: job.taskStatus.UpdatePost['1'].input.post,
|
||||||
|
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
|
||||||
|
}),
|
||||||
|
condition({ job }) {
|
||||||
|
return !!job?.taskStatus?.UpdatePost?.['1']?.complete
|
||||||
|
},
|
||||||
|
completesJob: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
43
test/queues/workflows/workflowAndTasksRetriesUndefined.ts
Normal file
43
test/queues/workflows/workflowAndTasksRetriesUndefined.ts
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowAndTasksRetriesUndefined'> =
|
||||||
|
{
|
||||||
|
slug: 'workflowAndTasksRetriesUndefined',
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimpleRetriesUndefined('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimpleRetriesUndefined('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
44
test/queues/workflows/workflowRetries2TasksRetries0.ts
Normal file
44
test/queues/workflows/workflowRetries2TasksRetries0.ts
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetries2TasksRetries0'> =
|
||||||
|
{
|
||||||
|
slug: 'workflowRetries2TasksRetries0',
|
||||||
|
retries: 2,
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimpleRetries0('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimpleRetries0('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
@@ -0,0 +1,44 @@
|
|||||||
|
import type { WorkflowConfig } from 'payload'
|
||||||
|
|
||||||
|
export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowRetries2TasksRetriesUndefined'> =
|
||||||
|
{
|
||||||
|
slug: 'workflowRetries2TasksRetriesUndefined',
|
||||||
|
retries: 2,
|
||||||
|
inputSchema: [
|
||||||
|
{
|
||||||
|
name: 'message',
|
||||||
|
type: 'text',
|
||||||
|
required: true,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
handler: async ({ job, tasks, req }) => {
|
||||||
|
await req.payload.update({
|
||||||
|
collection: 'payload-jobs',
|
||||||
|
data: {
|
||||||
|
input: {
|
||||||
|
...job.input,
|
||||||
|
amountRetried:
|
||||||
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
||||||
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
id: job.id,
|
||||||
|
})
|
||||||
|
|
||||||
|
await tasks.CreateSimpleRetriesUndefined('1', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// At this point there should always be one post created.
|
||||||
|
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
|
||||||
|
await tasks.CreateSimpleRetriesUndefined('2', {
|
||||||
|
input: {
|
||||||
|
message: job.input.message,
|
||||||
|
shouldFail: true,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
// This will never be reached
|
||||||
|
},
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user