diff --git a/test/queues/config.ts b/test/queues/config.ts index 988d86ea6..67e1c9126 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -1,4 +1,4 @@ -import type { TaskConfig, WorkflowConfig } from 'payload' +import type { TaskConfig } from 'payload' import { lexicalEditor } from '@payloadcms/richtext-lexical' import { fileURLToPath } from 'node:url' @@ -8,6 +8,21 @@ import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js' import { devUser } from '../credentials.js' import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js' import { clearAndSeedEverything } from './seed.js' +import { externalWorkflow } from './workflows/externalWorkflow.js' +import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' +import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js' +import { retries0Workflow } from './workflows/retries0.js' +import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js' +import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js' +import { retriesTestWorkflow } from './workflows/retriesTest.js' +import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js' +import { subTaskWorkflow } from './workflows/subTask.js' +import { subTaskFailsWorkflow } from './workflows/subTaskFails.js' +import { updatePostWorkflow } from './workflows/updatePost.js' +import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js' +import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js' +import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js' +import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js' const filename = fileURLToPath(import.meta.url) const dirname = path.dirname(filename) @@ -310,678 +325,21 @@ export default buildConfigWithDefaults({ } as TaskConfig<'ExternalTask'>, ], workflows: [ - { - slug: 'updatePost', - interfaceName: 'MyUpdatePostWorkflowType', - inputSchema: [ - { - name: 'post', - type: 'relationship', - relationTo: 'posts', - maxDepth: 0, - required: true, - }, - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks }) => { - await tasks.UpdatePost('1', { - input: { - post: job.input.post, - message: job.input.message, - }, - }) - - await tasks.UpdatePostStep2('2', { - input: { - post: job.taskStatus.UpdatePost['1'].input.post, - messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice, - }, - }) - }, - } as WorkflowConfig<'updatePost'>, - { - slug: 'updatePostJSONWorkflow', - inputSchema: [ - { - name: 'post', - type: 'relationship', - relationTo: 'posts', - maxDepth: 0, - required: true, - }, - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: [ - { - task: 'UpdatePost', - id: '1', - input: ({ job }) => ({ - post: job.input.post, - message: job.input.message, - }), - }, - { - task: 'UpdatePostStep2', - id: '2', - input: ({ job }) => ({ - post: job.taskStatus.UpdatePost['1'].input.post, - messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice, - }), - condition({ job }) { - return job?.taskStatus?.UpdatePost?.['1']?.complete - }, - completesJob: true, - }, - ], - } as WorkflowConfig<'updatePostJSONWorkflow'>, - { - slug: 'retriesTest', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimple('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimple('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'retriesTest'>, - { - slug: 'retriesRollbackTest', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, inlineTask, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await inlineTask('1', { - task: async ({ req }) => { - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: job.input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - }) - - await inlineTask('2', { - task: async ({ req }) => { - await req.payload.create({ - collection: 'simple', - req, - data: { - title: 'should not exist', - }, - }) - // Fail afterwards, so that we can also test that transactions work (i.e. the job is rolled back) - - throw new Error('Failed on purpose') - }, - retries: { - attempts: 4, - }, - }) - }, - } as WorkflowConfig<'retriesRollbackTest'>, - { - slug: 'retriesWorkflowLevelTest', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimple('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimple('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'retriesWorkflowLevelTest'>, - { - slug: 'workflowNoRetriesSet', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimple('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimple('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'workflowNoRetriesSet'>, - { - slug: 'workflowRetries0', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - retries: 0, - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimple('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimple('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'workflowRetries0'>, - { - slug: 'workflowAndTasksRetriesUndefined', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimpleRetriesUndefined('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimpleRetriesUndefined('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'workflowAndTasksRetriesUndefined'>, - { - slug: 'workflowRetries2TasksRetriesUndefined', - retries: 2, - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimpleRetriesUndefined('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimpleRetriesUndefined('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'workflowRetries2TasksRetriesUndefined'>, - { - slug: 'workflowRetries2TasksRetries0', - retries: 2, - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, tasks, req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - - await tasks.CreateSimpleRetries0('1', { - input: { - message: job.input.message, - }, - }) - - // At this point there should always be one post created. - // job.input.amountRetried will go up to 2 as CreatePost has 2 retries - await tasks.CreateSimpleRetries0('2', { - input: { - message: job.input.message, - shouldFail: true, - }, - }) - // This will never be reached - }, - } as WorkflowConfig<'workflowRetries2TasksRetries0'>, - { - slug: 'inlineTaskTest', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, inlineTask }) => { - await inlineTask('1', { - task: async ({ input, req }) => { - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - input: { - message: job.input.message, - }, - }) - }, - } as WorkflowConfig<'inlineTaskTest'>, - { - slug: 'externalWorkflow', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: path.resolve(dirname, 'runners/externalWorkflow.ts') + '#externalWorkflowHandler', - } as WorkflowConfig<'externalWorkflow'>, - { - slug: 'retriesBackoffTest', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, inlineTask, req }) => { - const newJob = await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountRetried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, - }, - }, - id: job.id, - }) - job.input = newJob.input as any - - await inlineTask('1', { - task: async ({ req }) => { - const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0 - - const { id } = await req.payload.create({ - collection: 'simple', - req, - data: { - title: 'should not exist', - }, - }) - - // @ts-expect-error timeTried is new arbitrary data and not in the type - if (!job.input.timeTried) { - // @ts-expect-error timeTried is new arbitrary data and not in the type - job.input.timeTried = {} - } - - // @ts-expect-error timeTried is new arbitrary data and not in the type - job.input.timeTried[totalTried] = new Date().toISOString() - - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: job.input, - }, - id: job.id, - }) - - if (totalTried < 4) { - // Cleanup the post - await req.payload.delete({ - collection: 'simple', - id, - req, - }) - - // Last try it should succeed - throw new Error('Failed on purpose') - } - return { - output: {}, - } - }, - retries: { - attempts: 4, - backoff: { - type: 'exponential', - // Should retry in 300ms, then 600, then 1200, then 2400, then succeed - delay: 300, - }, - }, - }) - }, - } as WorkflowConfig<'retriesBackoffTest'>, - { - slug: 'subTask', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - handler: async ({ job, inlineTask }) => { - await inlineTask('create two docs', { - task: async ({ input, inlineTask }) => { - const { newSimple } = await inlineTask('create doc 1', { - task: async ({ req }) => { - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - newSimple, - }, - } - }, - }) - - const { newSimple2 } = await inlineTask('create doc 2', { - task: async ({ req }) => { - const newSimple2 = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - newSimple2, - }, - } - }, - }) - return { - output: { - simpleID1: newSimple.id, - simpleID2: newSimple2.id, - }, - } - }, - input: { - message: job.input.message, - }, - }) - }, - } as WorkflowConfig<'subTask'>, - { - slug: 'subTaskFails', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - retries: 3, - handler: async ({ job, inlineTask }) => { - await inlineTask('create two docs', { - task: async ({ input, inlineTask }) => { - const { newSimple } = await inlineTask('create doc 1 - succeeds', { - task: async ({ req }) => { - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountTask1Retried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountTask1Retried !== undefined - ? // @ts-expect-error - job.input.amountTask1Retried + 1 - : 0, - }, - }, - id: job.id, - }) - return { - output: { - newSimple, - }, - } - }, - }) - - await inlineTask('create doc 2 - fails', { - task: async ({ req }) => { - await req.payload.update({ - collection: 'payload-jobs', - data: { - input: { - ...job.input, - amountTask2Retried: - // @ts-expect-error amountRetried is new arbitrary data and not in the type - job.input.amountTask2Retried !== undefined - ? // @ts-expect-error - job.input.amountTask2Retried + 1 - : 0, - }, - }, - id: job.id, - }) - throw new Error('Failed on purpose') - }, - }) - return { - output: { - simpleID1: newSimple.id, - }, - } - }, - input: { - message: job.input.message, - }, - }) - }, - } as WorkflowConfig<'subTaskFails'>, + updatePostWorkflow, + updatePostJSONWorkflow, + retriesTestWorkflow, + retriesRollbackTestWorkflow, + retriesWorkflowLevelTestWorkflow, + noRetriesSetWorkflow, + retries0Workflow, + workflowAndTasksRetriesUndefinedWorkflow, + workflowRetries2TasksRetriesUndefinedWorkflow, + workflowRetries2TasksRetries0Workflow, + inlineTaskTestWorkflow, + externalWorkflow, + retriesBackoffTestWorkflow, + subTaskWorkflow, + subTaskFailsWorkflow, ], }, editor: lexicalEditor(), diff --git a/test/queues/workflows/externalWorkflow.ts b/test/queues/workflows/externalWorkflow.ts new file mode 100644 index 000000000..15106784d --- /dev/null +++ b/test/queues/workflows/externalWorkflow.ts @@ -0,0 +1,19 @@ +import type { WorkflowConfig } from 'payload' + +import path from 'path' +import { fileURLToPath } from 'url' + +const filename = fileURLToPath(import.meta.url) +const dirname = path.dirname(filename) + +export const externalWorkflow: WorkflowConfig<'externalWorkflow'> = { + slug: 'externalWorkflow', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: path.resolve(dirname, '../runners/externalWorkflow.ts') + '#externalWorkflowHandler', +} diff --git a/test/queues/workflows/inlineTaskTest.ts b/test/queues/workflows/inlineTaskTest.ts new file mode 100644 index 000000000..a0f20afc9 --- /dev/null +++ b/test/queues/workflows/inlineTaskTest.ts @@ -0,0 +1,33 @@ +import type { WorkflowConfig } from 'payload' + +export const inlineTaskTestWorkflow: WorkflowConfig<'inlineTaskTest'> = { + slug: 'inlineTaskTest', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + await inlineTask('1', { + task: async ({ input, req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, +} diff --git a/test/queues/workflows/noRetriesSet.ts b/test/queues/workflows/noRetriesSet.ts new file mode 100644 index 000000000..b549cd0df --- /dev/null +++ b/test/queues/workflows/noRetriesSet.ts @@ -0,0 +1,42 @@ +import type { WorkflowConfig } from 'payload' + +export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = { + slug: 'workflowNoRetriesSet', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, +} diff --git a/test/queues/workflows/retries0.ts b/test/queues/workflows/retries0.ts new file mode 100644 index 000000000..1eb7b06d2 --- /dev/null +++ b/test/queues/workflows/retries0.ts @@ -0,0 +1,43 @@ +import type { WorkflowConfig } from 'payload' + +export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = { + slug: 'workflowRetries0', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + retries: 0, + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, +} diff --git a/test/queues/workflows/retriesBackoffTest.ts b/test/queues/workflows/retriesBackoffTest.ts new file mode 100644 index 000000000..27e6b20b7 --- /dev/null +++ b/test/queues/workflows/retriesBackoffTest.ts @@ -0,0 +1,81 @@ +import type { WorkflowConfig } from 'payload' + +export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> = { + slug: 'retriesBackoffTest', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask, req }) => { + const newJob = await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + job.input = newJob.input as any + + await inlineTask('1', { + task: async ({ req }) => { + const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0 + + const { id } = await req.payload.create({ + collection: 'simple', + req, + data: { + title: 'should not exist', + }, + }) + + // @ts-expect-error timeTried is new arbitrary data and not in the type + if (!job.input.timeTried) { + // @ts-expect-error timeTried is new arbitrary data and not in the type + job.input.timeTried = {} + } + + // @ts-expect-error timeTried is new arbitrary data and not in the type + job.input.timeTried[totalTried] = new Date().toISOString() + + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: job.input, + }, + id: job.id, + }) + + if (totalTried < 4) { + // Cleanup the post + await req.payload.delete({ + collection: 'simple', + id, + req, + }) + + // Last try it should succeed + throw new Error('Failed on purpose') + } + return { + output: {}, + } + }, + retries: { + attempts: 4, + backoff: { + type: 'exponential', + // Should retry in 300ms, then 600, then 1200, then 2400, then succeed + delay: 300, + }, + }, + }) + }, +} diff --git a/test/queues/workflows/retriesRollbackTest.ts b/test/queues/workflows/retriesRollbackTest.ts new file mode 100644 index 000000000..eae88c7e5 --- /dev/null +++ b/test/queues/workflows/retriesRollbackTest.ts @@ -0,0 +1,61 @@ +import type { WorkflowConfig } from 'payload' + +export const retriesRollbackTestWorkflow: WorkflowConfig<'retriesRollbackTest'> = { + slug: 'retriesRollbackTest', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await inlineTask('1', { + task: async ({ req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: job.input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, + }) + + await inlineTask('2', { + task: async ({ req }) => { + await req.payload.create({ + collection: 'simple', + req, + data: { + title: 'should not exist', + }, + }) + // Fail afterwards, so that we can also test that transactions work (i.e. the job is rolled back) + + throw new Error('Failed on purpose') + }, + retries: { + attempts: 4, + }, + }) + }, +} diff --git a/test/queues/workflows/retriesTest.ts b/test/queues/workflows/retriesTest.ts new file mode 100644 index 000000000..f462caf50 --- /dev/null +++ b/test/queues/workflows/retriesTest.ts @@ -0,0 +1,42 @@ +import type { WorkflowConfig } from 'payload' + +export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = { + slug: 'retriesTest', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, +} diff --git a/test/queues/workflows/retriesWorkflowLevelTest.ts b/test/queues/workflows/retriesWorkflowLevelTest.ts new file mode 100644 index 000000000..d34ded952 --- /dev/null +++ b/test/queues/workflows/retriesWorkflowLevelTest.ts @@ -0,0 +1,43 @@ +import type { WorkflowConfig } from 'payload' + +export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLevelTest'> = { + slug: 'retriesWorkflowLevelTest', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, +} diff --git a/test/queues/workflows/subTask.ts b/test/queues/workflows/subTask.ts new file mode 100644 index 000000000..399f6db53 --- /dev/null +++ b/test/queues/workflows/subTask.ts @@ -0,0 +1,60 @@ +import type { WorkflowConfig } from 'payload' + +export const subTaskWorkflow: WorkflowConfig<'subTask'> = { + slug: 'subTask', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + await inlineTask('create two docs', { + task: async ({ input, inlineTask }) => { + const { newSimple } = await inlineTask('create doc 1', { + task: async ({ req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + newSimple, + }, + } + }, + }) + + const { newSimple2 } = await inlineTask('create doc 2', { + task: async ({ req }) => { + const newSimple2 = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + newSimple2, + }, + } + }, + }) + return { + output: { + simpleID1: newSimple.id, + simpleID2: newSimple2.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, +} diff --git a/test/queues/workflows/subTaskFails.ts b/test/queues/workflows/subTaskFails.ts new file mode 100644 index 000000000..1c1f14241 --- /dev/null +++ b/test/queues/workflows/subTaskFails.ts @@ -0,0 +1,80 @@ +import type { WorkflowConfig } from 'payload' + +export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = { + slug: 'subTaskFails', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + retries: 3, + handler: async ({ job, inlineTask }) => { + await inlineTask('create two docs', { + task: async ({ input, inlineTask }) => { + const { newSimple } = await inlineTask('create doc 1 - succeeds', { + task: async ({ req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountTask1Retried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountTask1Retried !== undefined + ? // @ts-expect-error + job.input.amountTask1Retried + 1 + : 0, + }, + }, + id: job.id, + }) + return { + output: { + newSimple, + }, + } + }, + }) + + await inlineTask('create doc 2 - fails', { + task: async ({ req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountTask2Retried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountTask2Retried !== undefined + ? // @ts-expect-error + job.input.amountTask2Retried + 1 + : 0, + }, + }, + id: job.id, + }) + throw new Error('Failed on purpose') + }, + }) + return { + output: { + simpleID1: newSimple.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, +} diff --git a/test/queues/workflows/updatePost.ts b/test/queues/workflows/updatePost.ts new file mode 100644 index 000000000..abf6df140 --- /dev/null +++ b/test/queues/workflows/updatePost.ts @@ -0,0 +1,37 @@ +import type { WorkflowConfig } from 'payload' + +export const updatePostWorkflow: WorkflowConfig<'updatePost'> = { + slug: 'updatePost', + interfaceName: 'MyUpdatePostWorkflowType', + inputSchema: [ + { + name: 'post', + type: 'relationship', + relationTo: 'posts', + maxDepth: 0, + required: true, + }, + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks }) => { + await tasks.UpdatePost('1', { + input: { + post: job.input.post, + message: job.input.message, + }, + }) + + await tasks.UpdatePostStep2('2', { + input: { + // @ts-expect-error + post: job.taskStatus.UpdatePost['1'].input.post, + // @ts-expect-error + messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice, + }, + }) + }, +} diff --git a/test/queues/workflows/updatePostJSON.ts b/test/queues/workflows/updatePostJSON.ts new file mode 100644 index 000000000..6baadfc58 --- /dev/null +++ b/test/queues/workflows/updatePostJSON.ts @@ -0,0 +1,41 @@ +import type { WorkflowConfig } from 'payload' + +export const updatePostJSONWorkflow: WorkflowConfig<'updatePostJSONWorkflow'> = { + slug: 'updatePostJSONWorkflow', + inputSchema: [ + { + name: 'post', + type: 'relationship', + relationTo: 'posts', + maxDepth: 0, + required: true, + }, + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: [ + { + task: 'UpdatePost', + id: '1', + input: ({ job }) => ({ + post: job.input.post, + message: job.input.message, + }), + }, + { + task: 'UpdatePostStep2', + id: '2', + input: ({ job }) => ({ + post: job.taskStatus.UpdatePost['1'].input.post, + messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice, + }), + condition({ job }) { + return !!job?.taskStatus?.UpdatePost?.['1']?.complete + }, + completesJob: true, + }, + ], +} diff --git a/test/queues/workflows/workflowAndTasksRetriesUndefined.ts b/test/queues/workflows/workflowAndTasksRetriesUndefined.ts new file mode 100644 index 000000000..03e89957e --- /dev/null +++ b/test/queues/workflows/workflowAndTasksRetriesUndefined.ts @@ -0,0 +1,43 @@ +import type { WorkflowConfig } from 'payload' + +export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowAndTasksRetriesUndefined'> = + { + slug: 'workflowAndTasksRetriesUndefined', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetriesUndefined('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetriesUndefined('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } diff --git a/test/queues/workflows/workflowRetries2TasksRetries0.ts b/test/queues/workflows/workflowRetries2TasksRetries0.ts new file mode 100644 index 000000000..7cf803dc2 --- /dev/null +++ b/test/queues/workflows/workflowRetries2TasksRetries0.ts @@ -0,0 +1,44 @@ +import type { WorkflowConfig } from 'payload' + +export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetries2TasksRetries0'> = + { + slug: 'workflowRetries2TasksRetries0', + retries: 2, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetries0('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetries0('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } diff --git a/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts b/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts new file mode 100644 index 000000000..3bbd9ee78 --- /dev/null +++ b/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts @@ -0,0 +1,44 @@ +import type { WorkflowConfig } from 'payload' + +export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowRetries2TasksRetriesUndefined'> = + { + slug: 'workflowRetries2TasksRetriesUndefined', + retries: 2, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetriesUndefined('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetriesUndefined('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + }