feat: allow running sub-tasks from tasks (#10373)

Task handlers now receive `inlineTask` as an arg, which can be used to
run inline sub-tasks. In the task log, those inline tasks will have a
`parent` property that points to the parent task.

Example:

```ts
{
        slug: 'subTask',
        inputSchema: [
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        handler: async ({ job, inlineTask }) => {
          await inlineTask('create two docs', {
            task: async ({ input, inlineTask }) => {
            
              const { newSimple } = await inlineTask('create doc 1', {
                task: async ({ req }) => {
                  const newSimple = await req.payload.create({
                    collection: 'simple',
                    req,
                    data: {
                      title: input.message,
                    },
                  })
                  return {
                    output: {
                      newSimple,
                    },
                  }
                },
              })

              const { newSimple2 } = await inlineTask('create doc 2', {
                task: async ({ req }) => {
                  const newSimple2 = await req.payload.create({
                    collection: 'simple',
                    req,
                    data: {
                      title: input.message,
                    },
                  })
                  return {
                    output: {
                      newSimple2,
                    },
                  }
                },
              })
              return {
                output: {
                  simpleID1: newSimple.id,
                  simpleID2: newSimple2.id,
                },
              }
            },
            input: {
              message: job.input.message,
            },
          })
        },
      } as WorkflowConfig<'subTask'>
```

Job log example:

```ts
[
  {
    executedAt: '2025-01-06T03:55:44.682Z',
    completedAt: '2025-01-06T03:55:44.684Z',
    taskSlug: 'inline',
    taskID: 'create doc 1',
    output: { newSimple: [Object] },
    parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1b'
  },
  {
    executedAt: '2025-01-06T03:55:44.690Z',
    completedAt: '2025-01-06T03:55:44.692Z',
    taskSlug: 'inline',
    taskID: 'create doc 2',
    output: { newSimple2: [Object] },
    parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1c'
  },
  {
    executedAt: '2025-01-06T03:55:44.681Z',
    completedAt: '2025-01-06T03:55:44.697Z',
    taskSlug: 'inline',
    taskID: 'create two docs',
    input: { message: 'hello!' },
    output: {
      simpleID1: '677b54401e34772cc63c8693',
      simpleID2: '677b54401e34772cc63c8697'
    },
    parent: {},
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1d'
  }
]
```
This commit is contained in:
Alessio Gravili
2025-01-07 10:24:00 -07:00
committed by GitHub
parent ab53ababc8
commit 08fb159943
10 changed files with 416 additions and 48 deletions

View File

@@ -203,3 +203,49 @@ export default buildConfig({
}
})
```
## Nested tasks
You can run sub-tasks within an existing task, by using the `tasks` or `ìnlineTask` arguments passed to the task `handler` function:
```ts
export default buildConfig({
// ...
jobs: {
// It is recommended to set `addParentToTaskLog` to `true` when using nested tasks, so that the parent task is included in the task log
// This allows for better observability and debugging of the task execution
addParentToTaskLog: true,
tasks: [
{
slug: 'parentTask',
inputSchema: [
{
name: 'text',
type: 'text'
},
],
handler: async ({ input, req, tasks, inlineTask }) => {
await inlineTask('Sub Task 1', {
task: () => {
// Do something
return {
output: {},
}
},
})
await tasks.CreateSimple('Sub Task 2', {
input: { message: 'hello' },
})
return {
output: {},
}
}
} as TaskConfig<'parentTask'>,
]
}
})
```

View File

@@ -1304,6 +1304,7 @@ export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config
export type {
RunInlineTaskFunction,
RunTaskFunction,
RunTaskFunctions,
TaskConfig,
TaskHandler,
TaskHandlerArgs,

View File

@@ -30,6 +30,70 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
})
}
const logFields: Field[] = [
{
name: 'executedAt',
type: 'date',
required: true,
},
{
name: 'completedAt',
type: 'date',
required: true,
},
{
name: 'taskSlug',
type: 'select',
options: [...taskSlugs],
required: true,
},
{
name: 'taskID',
type: 'text',
required: true,
},
{
name: 'input',
type: 'json',
},
{
name: 'output',
type: 'json',
},
{
name: 'state',
type: 'radio',
options: ['failed', 'succeeded'],
required: true,
},
{
name: 'error',
type: 'json',
admin: {
condition: (_, data) => data.state === 'failed',
},
required: true,
},
]
if (config?.jobs?.addParentToTaskLog) {
logFields.push({
name: 'parent',
type: 'group',
fields: [
{
name: 'taskSlug',
type: 'select',
options: [...taskSlugs],
},
{
name: 'taskID',
type: 'text',
},
],
})
}
const jobsCollection: CollectionConfig = {
slug: 'payload-jobs',
admin: {
@@ -89,51 +153,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
admin: {
description: 'Task execution log',
},
fields: [
{
name: 'executedAt',
type: 'date',
required: true,
},
{
name: 'completedAt',
type: 'date',
required: true,
},
{
name: 'taskSlug',
type: 'select',
options: [...taskSlugs],
required: true,
},
{
name: 'taskID',
type: 'text',
required: true,
},
{
name: 'input',
type: 'json',
},
{
name: 'output',
type: 'json',
},
{
name: 'state',
type: 'radio',
options: ['failed', 'succeeded'],
required: true,
},
{
name: 'error',
type: 'json',
admin: {
condition: (_, data) => data.state === 'failed',
},
required: true,
},
],
fields: logFields,
},
],
label: 'Status',
@@ -204,5 +224,6 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
},
lockDocuments: false,
}
return jobsCollection
}

View File

@@ -19,6 +19,14 @@ export type JobsConfig = {
*/
run?: RunJobAccess
}
/**
* Adds information about the parent job to the task log. This is useful for debugging and tracking the flow of tasks.
*
* In 4.0, this will default to `true`.
*
* @default false
*/
addParentToTaskLog?: boolean
/**
* Determine whether or not to delete a job after it has successfully completed.
*/

View File

@@ -20,6 +20,10 @@ export type TaskHandlerArgs<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
TWorkflowSlug extends keyof TypedJobs['workflows'] = string,
> = {
/**
* Use this function to run a sub-task from within another task.
*/
inlineTask: RunInlineTaskFunction
input: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['input']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
@@ -27,6 +31,7 @@ export type TaskHandlerArgs<
: never
job: RunningJob<TWorkflowSlug>
req: PayloadRequest
tasks: RunTaskFunctions
}
/**
@@ -92,7 +97,13 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
*/
retries?: number | RetryConfig | undefined
// This is the same as TaskHandler, but typed out explicitly in order to improve type inference
task: (args: { input: TTaskInput; job: RunningJob<any>; req: PayloadRequest }) =>
task: (args: {
inlineTask: RunInlineTaskFunction
input: TTaskInput
job: RunningJob<any>
req: PayloadRequest
tasks: RunTaskFunctions
}) =>
| {
output: TTaskOutput
state?: 'failed' | 'succeeded'

View File

@@ -1,5 +1,6 @@
import type { Field } from '../../../fields/config/types.js'
import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js'
import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js'
import type {
RetryConfig,
RunInlineTaskFunction,
@@ -18,8 +19,12 @@ export type JobLog = {
* ID added by the array field when the log is saved in the database
*/
id?: string
input?: any
output?: any
input?: Record<string, any>
output?: Record<string, any>
/**
* Sub-tasks (tasks that are run within a task) will have a parent task ID
*/
parent?: TaskParent
state: 'failed' | 'succeeded'
taskID: string
taskSlug: string

View File

@@ -44,6 +44,7 @@ export async function handleTaskFailed({
job,
maxRetries,
output,
parent,
req,
retriesConfig,
runnerOutput,
@@ -60,6 +61,7 @@ export async function handleTaskFailed({
job: BaseJob
maxRetries: number
output: object
parent?: TaskParent
req: PayloadRequest
retriesConfig: number | RetryConfig
runnerOutput?: TaskHandlerResult<string>
@@ -93,6 +95,7 @@ export async function handleTaskFailed({
executedAt: executedAt.toISOString(),
input,
output,
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
@@ -142,6 +145,11 @@ export async function handleTaskFailed({
}
}
export type TaskParent = {
taskID: string
taskSlug: string
}
export const getRunTaskFunction = <TIsInline extends boolean>(
state: RunTaskFunctionState,
job: BaseJob,
@@ -149,6 +157,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
req: PayloadRequest,
isInline: TIsInline,
updateJob: UpdateJobFunction,
parent?: TaskParent,
): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => {
const runTask: <TTaskSlug extends string>(
taskSlug: TTaskSlug,
@@ -240,6 +249,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
completedAt: new Date().toISOString(),
error: errorMessage,
executedAt: executedAt.toISOString(),
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
@@ -269,9 +279,17 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
try {
const runnerOutput = await runner({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, {
taskID,
taskSlug,
}),
input,
job: job as unknown as RunningJob<WorkflowTypes>, // TODO: Type this better
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, {
taskID,
taskSlug,
}),
})
if (runnerOutput.state === 'failed') {
@@ -281,6 +299,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
job,
maxRetries,
output,
parent,
req,
retriesConfig: finalRetriesConfig,
runnerOutput,
@@ -303,6 +322,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
job,
maxRetries,
output,
parent,
req,
retriesConfig: finalRetriesConfig,
state,
@@ -327,6 +347,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
executedAt: executedAt.toISOString(),
input,
output,
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
state: 'succeeded',
taskID,
taskSlug,

View File

@@ -846,6 +846,142 @@ export default buildConfigWithDefaults({
})
},
} as WorkflowConfig<'retriesBackoffTest'>,
{
slug: 'subTask',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, inlineTask }) => {
await inlineTask('create two docs', {
task: async ({ input, inlineTask }) => {
const { newSimple } = await inlineTask('create doc 1', {
task: async ({ req }) => {
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
newSimple,
},
}
},
})
const { newSimple2 } = await inlineTask('create doc 2', {
task: async ({ req }) => {
const newSimple2 = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
newSimple2,
},
}
},
})
return {
output: {
simpleID1: newSimple.id,
simpleID2: newSimple2.id,
},
}
},
input: {
message: job.input.message,
},
})
},
} as WorkflowConfig<'subTask'>,
{
slug: 'subTaskFails',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
retries: 3,
handler: async ({ job, inlineTask }) => {
await inlineTask('create two docs', {
task: async ({ input, inlineTask }) => {
const { newSimple } = await inlineTask('create doc 1 - succeeds', {
task: async ({ req }) => {
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountTask1Retried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountTask1Retried !== undefined
? // @ts-expect-error
job.input.amountTask1Retried + 1
: 0,
},
},
id: job.id,
})
return {
output: {
newSimple,
},
}
},
})
await inlineTask('create doc 2 - fails', {
task: async ({ req }) => {
await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountTask2Retried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountTask2Retried !== undefined
? // @ts-expect-error
job.input.amountTask2Retried + 1
: 0,
},
},
id: job.id,
})
throw new Error('Failed on purpose')
},
})
return {
output: {
simpleID1: newSimple.id,
},
}
},
input: {
message: job.input.message,
},
})
},
} as WorkflowConfig<'subTaskFails'>,
],
},
editor: lexicalEditor(),

View File

@@ -1051,4 +1051,80 @@ describe('Queues', () => {
expect(allCompletedJobs.totalDocs).toBe(1)
expect((allCompletedJobs.docs[0].input as any).message).toBe('from single task 2')
})
it('can run sub-tasks', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTask',
input: {
message: 'hello!',
},
})
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs[0].title).toBe('hello!')
expect(allSimples.docs[1].title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.log[0].taskID).toBe('create doc 1')
//expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs')
// jobAfterRun.log[0].parent should not exist
expect(jobAfterRun.log[0].parent).toBeUndefined()
expect(jobAfterRun.log[1].taskID).toBe('create doc 2')
//expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs')
expect(jobAfterRun.log[1].parent).toBeUndefined()
expect(jobAfterRun.log[2].taskID).toBe('create two docs')
})
it('ensure successful sub-tasks are not retried', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTaskFails',
input: {
message: 'hello!',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run()
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error
expect(jobAfterRun.input.amountTask2Retried).toBe(3)
// @ts-expect-error
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
})
})

View File

@@ -66,6 +66,8 @@ export interface Config {
inlineTaskTest: WorkflowInlineTaskTest;
externalWorkflow: WorkflowExternalWorkflow;
retriesBackoffTest: WorkflowRetriesBackoffTest;
subTask: WorkflowSubTask;
subTaskFails: WorkflowSubTaskFails;
};
};
}
@@ -221,6 +223,21 @@ export interface PayloadJob {
| number
| boolean
| null;
parent?: {
taskSlug?:
| (
| 'inline'
| 'UpdatePost'
| 'UpdatePostStep2'
| 'CreateSimple'
| 'CreateSimpleRetriesUndefined'
| 'CreateSimpleRetries0'
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask'
)
| null;
taskID?: string | null;
};
state: 'failed' | 'succeeded';
error?:
| {
@@ -249,6 +266,8 @@ export interface PayloadJob {
| 'inlineTaskTest'
| 'externalWorkflow'
| 'retriesBackoffTest'
| 'subTask'
| 'subTaskFails'
)
| null;
taskSlug?:
@@ -390,6 +409,12 @@ export interface PayloadJobsSelect<T extends boolean = true> {
taskID?: T;
input?: T;
output?: T;
parent?:
| T
| {
taskSlug?: T;
taskID?: T;
};
state?: T;
error?: T;
id?: T;
@@ -641,6 +666,24 @@ export interface WorkflowRetriesBackoffTest {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowSubTask".
*/
export interface WorkflowSubTask {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowSubTaskFails".
*/
export interface WorkflowSubTaskFails {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "auth".