refactor: simplify job queue error handling (#12845)

This simplifies workflow / task error handling, as well as cancelling
jobs. Previously, we were handling errors when they occur and passing
through error state using a `state` object - errors were then handled in
multiple areas of the code.

This PR adds new, clean `TaskError`, `WorkflowError` and
`JobCancelledError` errors that are thrown when they occur and are
handled **in one single place**, massively cleaning up complex functions
like
[payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts](https://github.com/payloadcms/payload/compare/refactor/jobs-errors?expand=1#diff-53dc7ccb7c8e023c9ba63fdd2e78c32ad0be606a2c64a3512abad87893f5fd21)

Performance will also be positively improved by this change -
previously, as task / workflow failure or cancellation would have
resulted in multiple, separate `updateJob` db calls, as data
modifications to the job object required for storing failure state were
done multiple times in multiple areas of the codebase. Most notably,
task error state was handled and updated separately from workflow error
state.
Now, it's just a clean, single `updateJob` call

This PR also does the following:
- adds a new test for `deleteJobOnComplete` behavior
- cleans up test suite
- ensures `deleteJobOnComplete` does not delete definitively failed jobs

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210553277813320
This commit is contained in:
Alessio Gravili
2025-06-17 15:24:53 -07:00
committed by GitHub
parent dffdee89d8
commit 59f536c2c9
20 changed files with 676 additions and 496 deletions

View File

@@ -1,4 +1,4 @@
import type { RetryConfig } from '../../../config/types/taskTypes.js'
import type { RetryConfig } from '../config/types/taskTypes.js'
export function calculateBackoffWaitUntil({
retriesConfig,

View File

@@ -0,0 +1,63 @@
import type { Job } from '../../index.js'
import type { RetryConfig } from '../config/types/taskTypes.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
/**
* Assuming there is no task that has already reached max retries,
* this function determines if the workflow should retry the job
* and if so, when it should retry.
*/
export function getWorkflowRetryBehavior({
job,
retriesConfig,
}: {
job: Job
retriesConfig?: number | RetryConfig
}):
| {
hasFinalError: false
maxWorkflowRetries?: number
waitUntil?: Date
}
| {
hasFinalError: true
maxWorkflowRetries?: number
waitUntil?: Date
} {
const maxWorkflowRetries = (
typeof retriesConfig === 'object' ? retriesConfig.attempts : retriesConfig
)!
if (
maxWorkflowRetries !== undefined &&
maxWorkflowRetries !== null &&
job.totalTried >= maxWorkflowRetries
) {
return {
hasFinalError: true,
maxWorkflowRetries,
}
}
if (!retriesConfig) {
// No retries provided => assuming no task reached max retries, we can retry
return {
hasFinalError: false,
maxWorkflowRetries: undefined,
waitUntil: undefined,
}
}
// Job will retry. Let's determine when!
const waitUntil: Date = calculateBackoffWaitUntil({
retriesConfig,
totalTried: job.totalTried ?? 0,
})
return {
hasFinalError: false,
maxWorkflowRetries,
waitUntil,
}
}

View File

@@ -0,0 +1,164 @@
import ObjectIdImport from 'bson-objectid'
import type { PayloadRequest } from '../../index.js'
import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js'
import type { TaskError } from './index.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js'
const ObjectId = (ObjectIdImport.default ||
ObjectIdImport) as unknown as typeof ObjectIdImport.default
export async function handleTaskError({
error,
req,
updateJob,
}: {
error: TaskError
req: PayloadRequest
updateJob: UpdateJobFunction
}): Promise<{
hasFinalError: boolean
}> {
const {
executedAt,
input,
job,
output,
parent,
retriesConfig,
taskConfig,
taskID,
taskSlug,
taskStatus,
workflowConfig,
} = error.args
if (taskConfig?.onFail) {
await taskConfig.onFail()
}
const errorJSON = {
name: error.name,
cancelled: Boolean('cancelled' in error && error.cancelled),
message: error.message,
stack: error.stack,
}
const currentDate = new Date()
;(job.log ??= []).push({
id: new ObjectId().toHexString(),
completedAt: currentDate.toISOString(),
error: errorJSON,
executedAt: executedAt.toISOString(),
input,
output: output ?? {},
parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
})
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < currentDate) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
let maxRetries: number = 0
if (retriesConfig?.attempts === undefined || retriesConfig?.attempts === null) {
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
maxRetries =
typeof workflowConfig.retries === 'object'
? typeof workflowConfig.retries.attempts === 'number'
? workflowConfig.retries.attempts
: 0
: workflowConfig.retries
} else {
maxRetries = 0
}
} else {
maxRetries = retriesConfig.attempts
}
if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) {
/**
* Task reached max retries => workflow will not retry
*/
await updateJob({
error: errorJSON,
hasError: true,
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
waitUntil: job.waitUntil,
})
req.payload.logger.error({
err: error,
job,
msg: `Error running task ${taskID}. Attempt ${job.totalTried} - max retries reached`,
taskSlug,
})
return {
hasFinalError: true,
}
}
/**
* Task can retry:
* - If workflow can retry, allow it to retry
* - If workflow reached max retries, do not retry and set final error
*/
// First set task waitUntil - if the workflow waitUntil is later, it will be updated later
const taskWaitUntil: Date = calculateBackoffWaitUntil({
retriesConfig,
totalTried: taskStatus?.totalTried ?? 0,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (!job.waitUntil || taskWaitUntil > new Date(job.waitUntil)) {
job.waitUntil = taskWaitUntil.toISOString()
}
const { hasFinalError, maxWorkflowRetries, waitUntil } = getWorkflowRetryBehavior({
job,
retriesConfig: workflowConfig.retries,
})
req.payload.logger.error({
err: error,
job,
msg: `Error running task ${taskID}. Attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`,
taskSlug,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (waitUntil && (!job.waitUntil || waitUntil > new Date(job.waitUntil))) {
job.waitUntil = waitUntil.toISOString()
}
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: hasFinalError ? errorJSON : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
waitUntil: job.waitUntil,
})
return {
hasFinalError,
}
}

View File

@@ -0,0 +1,77 @@
import type { PayloadRequest } from '../../index.js'
import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js'
import type { WorkflowError } from './index.js'
import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js'
/**
* This is called if a workflow catches an error. It determines if it's a final error
* or not and handles logging.
* A Workflow error = error that happens anywhere in between running tasks.
*
* This function assumes that the error is not a TaskError, but a WorkflowError. If a task errors,
* only a TaskError should be thrown, not a WorkflowError.
*/
export async function handleWorkflowError({
error,
req,
updateJob,
}: {
error: WorkflowError
req: PayloadRequest
updateJob: UpdateJobFunction
}): Promise<{
hasFinalError: boolean
}> {
const { job, workflowConfig } = error.args
const errorJSON = {
name: error.name,
cancelled: Boolean('cancelled' in error && error.cancelled),
message: error.message,
stack: error.stack,
}
const { hasFinalError, maxWorkflowRetries, waitUntil } = getWorkflowRetryBehavior({
job,
retriesConfig: workflowConfig.retries!,
})
if (!hasFinalError) {
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < new Date()) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
// Update job's waitUntil only if this waitUntil is later than the current one
if (waitUntil && (!job.waitUntil || waitUntil > new Date(job.waitUntil))) {
job.waitUntil = waitUntil.toISOString()
}
}
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
req.payload.logger.error({
err: error,
msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`,
})
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: errorJSON,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
waitUntil: job.waitUntil,
})
return {
hasFinalError,
}
}

View File

@@ -0,0 +1,51 @@
import type { Job, SingleTaskStatus, WorkflowConfig } from '../../index.js'
import type { RetryConfig, TaskConfig } from '../config/types/taskTypes.js'
import type { TaskParent } from '../operations/runJobs/runJob/getRunTaskFunction.js'
export type TaskErrorArgs = {
executedAt: Date
input?: object
job: Job
message: string
output?: object
parent?: TaskParent
retriesConfig: RetryConfig
taskConfig?: TaskConfig<string>
taskID: string
taskSlug: string
taskStatus: null | SingleTaskStatus<string>
workflowConfig: WorkflowConfig
}
export type WorkflowErrorArgs = {
job: Job
message: string
workflowConfig: WorkflowConfig
}
export class TaskError extends Error {
args: TaskErrorArgs
constructor(args: TaskErrorArgs) {
super(args.message)
this.args = args
}
}
export class WorkflowError extends Error {
args: WorkflowErrorArgs
constructor(args: WorkflowErrorArgs) {
super(args.message)
this.args = args
}
}
export class JobCancelledError extends Error {
args: {
job: Job
}
constructor(args: { job: Job }) {
super(`Job ${args.job.id} was cancelled`)
this.args = args
}
}

View File

@@ -7,6 +7,7 @@ import type { RunJobResult } from './runJob/index.js'
import { Forbidden } from '../../../errors/Forbidden.js'
import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js'
import { jobsCollectionSlug } from '../../config/index.js'
import { JobCancelledError } from '../../errors/index.js'
import { updateJob, updateJobs } from '../../utilities/updateJob.js'
import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js'
import { importHandlerPath } from './runJob/importHandlerPath.js'
@@ -226,7 +227,12 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
const successfullyCompletedJobs: (number | string)[] = []
const runSingleJob = async (job: Job) => {
const runSingleJob = async (
job: Job,
): Promise<{
id: number | string
result: RunJobResult
}> => {
if (!job.workflowSlug && !job.taskSlug) {
throw new Error('Job must have either a workflowSlug or a taskSlug')
}
@@ -245,68 +251,90 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
if (!workflowConfig) {
return null // Skip jobs with no workflow configuration
return {
id: job.id,
result: {
status: 'error',
},
} // Skip jobs with no workflow configuration
}
const updateJob = getUpdateJobFunction(job, jobReq)
try {
const updateJob = getUpdateJobFunction(job, jobReq)
// the runner will either be passed to the config
// OR it will be a path, which we will need to import via eval to avoid
// Next.js compiler dynamic import expression errors
let workflowHandler: WorkflowHandler | WorkflowJSON
if (
typeof workflowConfig.handler === 'function' ||
(typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler))
) {
workflowHandler = workflowConfig.handler
} else {
workflowHandler = await importHandlerPath<typeof workflowHandler>(workflowConfig.handler)
// the runner will either be passed to the config
// OR it will be a path, which we will need to import via eval to avoid
// Next.js compiler dynamic import expression errors
let workflowHandler: WorkflowHandler | WorkflowJSON
if (
typeof workflowConfig.handler === 'function' ||
(typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler))
) {
workflowHandler = workflowConfig.handler
} else {
workflowHandler = await importHandlerPath<typeof workflowHandler>(workflowConfig.handler)
if (!workflowHandler) {
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.`
payload.logger.error(errorMessage)
if (!workflowHandler) {
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.`
payload.logger.error(errorMessage)
await updateJob({
error: {
error: errorMessage,
},
hasError: true,
processing: false,
await updateJob({
error: {
error: errorMessage,
},
hasError: true,
processing: false,
})
return {
id: job.id,
result: {
status: 'error-reached-max-retries',
},
}
}
}
if (typeof workflowHandler === 'function') {
const result = await runJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
return
if (result.status === 'success') {
successfullyCompletedJobs.push(job.id)
}
return { id: job.id, result }
} else {
const result = await runJSONJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
if (result.status === 'success') {
successfullyCompletedJobs.push(job.id)
}
return { id: job.id, result }
}
}
if (typeof workflowHandler === 'function') {
const result = await runJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
if (result.status !== 'error') {
successfullyCompletedJobs.push(job.id)
} catch (error) {
if (error instanceof JobCancelledError) {
return {
id: job.id,
result: {
status: 'error-reached-max-retries',
},
}
}
return { id: job.id, result }
} else {
const result = await runJSONJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
if (result.status !== 'error') {
successfullyCompletedJobs.push(job.id)
}
return { id: job.id, result }
throw error
}
}

View File

@@ -5,8 +5,9 @@ import type { WorkflowConfig } from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js'
import type { JobRunStatus } from '../runJob/index.js'
import { getRunTaskFunction, type RunTaskFunctionState } from '../runJob/getRunTaskFunction.js'
import { handleWorkflowError } from '../runJob/handleWorkflowError.js'
import { handleWorkflowError } from '../../../errors/handleWorkflowError.js'
import { WorkflowError } from '../../../errors/index.js'
import { getRunTaskFunction } from '../runJob/getRunTaskFunction.js'
type Args = {
job: Job
@@ -27,12 +28,6 @@ export const runJSONJob = async ({
workflowConfig,
workflowHandler,
}: Args): Promise<RunJSONJobResult> => {
// Object so that we can pass contents by reference, not value.
// We want any mutations to be reflected in here.
const state: RunTaskFunctionState = {
reachedMaxRetries: false,
}
const stepsToRun: WorkflowStep<string>[] = []
for (const step of workflowHandler) {
@@ -51,12 +46,10 @@ export const runJSONJob = async ({
stepsToRun.push(step)
}
const tasks = getRunTaskFunction(state, job, workflowConfig, req, false, updateJob)
const inlineTask = getRunTaskFunction(state, job, workflowConfig, req, true, updateJob)
const tasks = getRunTaskFunction(job, workflowConfig, req, false, updateJob)
const inlineTask = getRunTaskFunction(job, workflowConfig, req, true, updateJob)
// Run the job
let hasFinalError = false
let error: Error | undefined
try {
await Promise.all(
stepsToRun.map(async (step) => {
@@ -73,17 +66,27 @@ export const runJSONJob = async ({
}
}),
)
} catch (_err) {
const err = _err as Error
const errorResult = handleWorkflowError({
error: err,
job,
} catch (error) {
const { hasFinalError } = await handleWorkflowError({
error:
error instanceof WorkflowError
? error
: new WorkflowError({
job,
message:
typeof error === 'object' && error && 'message' in error
? (error.message as string)
: 'An unhandled error occurred',
workflowConfig,
}),
req,
state,
workflowConfig,
updateJob,
})
error = err
hasFinalError = errorResult.hasFinalError
return {
status: hasFinalError ? 'error-reached-max-retries' : 'error',
}
}
// Check if workflow has completed
@@ -107,49 +110,23 @@ export const runJSONJob = async ({
}
if (workflowCompleted) {
if (error) {
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
completedAt: new Date().toISOString(),
error: hasFinalError ? error : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
} else {
await updateJob({
completedAt: new Date().toISOString(),
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
}
await updateJob({
completedAt: new Date().toISOString(),
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
return {
status: 'success',
}
} else {
if (error) {
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: hasFinalError ? error : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
return {
status: hasFinalError ? 'error-reached-max-retries' : 'error',
}
} else {
// Retry the job - no need to bump processing or totalTried as this does not count as a retry. A condition of a different task might have just opened up!
return await runJSONJob({
job,
req,
updateJob,
workflowConfig,
workflowHandler,
})
}
// Retry the job - no need to bump processing or totalTried as this does not count as a retry. A condition of a different task might have just opened up!
return await runJSONJob({
job,
req,
updateJob,
workflowConfig,
workflowHandler,
})
}
}

View File

@@ -19,148 +19,18 @@ import type {
} from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
import { importHandlerPath } from './importHandlerPath.js'
import { TaskError } from '../../../errors/index.js'
import { getTaskHandlerFromConfig } from './importHandlerPath.js'
const ObjectId = (ObjectIdImport.default ||
ObjectIdImport) as unknown as typeof ObjectIdImport.default
// Helper object type to force being passed by reference
export type RunTaskFunctionState = {
reachedMaxRetries: boolean
}
async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) {
if (!taskConfig) {
throw new Error('Task config is required to get the task handler')
}
if (typeof taskConfig.handler === 'function') {
return taskConfig.handler
} else {
return await importHandlerPath<TaskHandler<TaskType>>(taskConfig.handler)
}
}
export async function handleTaskFailed({
error,
executedAt,
input,
job,
maxRetries,
output,
parent,
req,
retriesConfig,
state,
taskConfig,
taskHandlerResult,
taskID,
taskSlug,
taskStatus,
updateJob,
}: {
error?: Error
executedAt: Date
input: object
job: Job
maxRetries: number
output: object
parent?: TaskParent
req: PayloadRequest
retriesConfig: number | RetryConfig
state: RunTaskFunctionState
taskConfig?: TaskConfig<string>
taskHandlerResult?: TaskHandlerResult<string>
taskID: string
taskSlug: string
taskStatus: null | SingleTaskStatus<string>
updateJob: UpdateJobFunction
}): Promise<never> {
req.payload.logger.error({ err: error, job, msg: `Error running task ${taskID}`, taskSlug })
if (taskConfig?.onFail) {
await taskConfig.onFail()
}
const errorJSON = error
? {
name: error.name,
message: error.message,
stack: error.stack,
}
: {
message:
taskHandlerResult?.state === 'failed'
? (taskHandlerResult.errorMessage ?? taskHandlerResult.state)
: 'failed',
}
const currentDate = new Date()
;(job.log ??= []).push({
id: new ObjectId().toHexString(),
completedAt: currentDate.toISOString(),
error: errorJSON,
executedAt: executedAt.toISOString(),
input,
output,
parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
})
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < currentDate) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) {
state.reachedMaxRetries = true
await updateJob({
error,
hasError: true,
log: job.log,
processing: false,
waitUntil: job.waitUntil,
})
throw new Error(
`Task ${taskSlug} has failed more than the allowed retries in workflow ${job.workflowSlug}${error ? `. Error: ${String(error)}` : ''}`,
)
} else {
// Job will retry. Let's determine when!
const waitUntil: Date = calculateBackoffWaitUntil({
retriesConfig,
totalTried: taskStatus?.totalTried ?? 0,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) {
job.waitUntil = waitUntil.toISOString()
}
await updateJob({
log: job.log,
processing: false,
waitUntil: job.waitUntil,
})
throw error ?? new Error('Task failed')
}
}
export type TaskParent = {
taskID: string
taskSlug: string
}
export const getRunTaskFunction = <TIsInline extends boolean>(
state: RunTaskFunctionState,
job: Job,
workflowConfig: WorkflowConfig,
req: PayloadRequest,
@@ -240,47 +110,21 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
: await getTaskHandlerFromConfig(taskConfig)
if (!runner || typeof runner !== 'function') {
const errorMessage = isInline
? `Can't find runner for inline task with ID ${taskID}`
: `Can't find runner while importing with the path ${typeof workflowConfig.handler === 'string' ? workflowConfig.handler : 'unknown - no string path'} in job type ${job.workflowSlug} for task ${taskSlug}.`
req.payload.logger.error(errorMessage)
await updateJob({
error: {
error: errorMessage,
},
hasError: true,
log: [
...(job.log || []),
{
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
error: errorMessage,
executedAt: executedAt.toISOString(),
parent: jobConfig.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
},
],
processing: false,
throw new TaskError({
executedAt,
input,
job,
message: isInline
? `Inline task with ID ${taskID} does not have a valid handler.`
: `Task with slug ${taskSlug} in workflow ${job.workflowSlug} does not have a valid handler.`,
parent,
retriesConfig: finalRetriesConfig,
taskConfig,
taskID,
taskSlug,
taskStatus,
workflowConfig,
})
throw new Error(errorMessage)
}
let maxRetries: number | undefined = finalRetriesConfig?.attempts
if (maxRetries === undefined || maxRetries === null) {
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
maxRetries =
typeof workflowConfig.retries === 'object'
? workflowConfig.retries.attempts
: workflowConfig.retries
} else {
maxRetries = 0
}
}
let taskHandlerResult: TaskHandlerResult<string>
@@ -288,58 +132,50 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
try {
taskHandlerResult = await runner({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, {
inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob, {
taskID,
taskSlug,
}),
input,
job: job as unknown as Job<WorkflowTypes>,
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, {
tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob, {
taskID,
taskSlug,
}),
})
} catch (err) {
await handleTaskFailed({
error: err as Error | undefined,
} catch (err: any) {
throw new TaskError({
executedAt,
input: input!,
job,
maxRetries: maxRetries!,
message: err.message || 'Task handler threw an error',
output,
parent,
req,
retriesConfig: finalRetriesConfig,
state,
taskConfig,
taskID,
taskSlug,
taskStatus,
updateJob,
workflowConfig,
})
throw new Error('Task failed')
}
if (taskHandlerResult.state === 'failed') {
await handleTaskFailed({
throw new TaskError({
executedAt,
input: input!,
job,
maxRetries: maxRetries!,
message: taskHandlerResult.errorMessage ?? 'Task handler returned a failed state',
output,
parent,
req,
retriesConfig: finalRetriesConfig,
state,
taskConfig,
taskHandlerResult,
taskID,
taskSlug,
taskStatus,
updateJob,
workflowConfig,
})
throw new Error('Task failed')
} else {
output = taskHandlerResult.output
}

View File

@@ -1,10 +1,16 @@
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import { JobCancelledError } from '../../../errors/index.js'
import { updateJob } from '../../../utilities/updateJob.js'
export type UpdateJobFunction = (jobData: Partial<Job>) => Promise<Job>
/**
* Helper for updating a job that does the following, additionally to updating the job:
* - Merges incoming data from the updated job into the original job object
* - Handles job cancellation by throwing a `JobCancelledError` if the job was cancelled.
*/
export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFunction {
return async (jobData) => {
const updatedJob = await updateJob({
@@ -15,6 +21,10 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu
req,
})
if (!updatedJob) {
return job
}
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
for (const key in updatedJob) {
if (key === 'log') {
@@ -31,11 +41,9 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu
}
if ((updatedJob?.error as Record<string, unknown>)?.cancelled) {
const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error
cancelledError.cancelled = true
throw cancelledError
throw new JobCancelledError({ job })
}
return updatedJob!
return updatedJob
}
}

View File

@@ -1,76 +0,0 @@
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type { WorkflowConfig } from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
/**
* This is called if a workflow catches an error. It determines if it's a final error
* or not and handles logging.
*/
export function handleWorkflowError({
error,
job,
req,
state,
workflowConfig,
}: {
error: Error
job: Job
req: PayloadRequest
state: RunTaskFunctionState
workflowConfig: WorkflowConfig
}): {
hasFinalError: boolean
} {
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
let hasFinalError = state.reachedMaxRetries || !!('cancelled' in error && error.cancelled) // If any TASK reached max retries, the job has an error
const maxWorkflowRetries: number = (
typeof workflowConfig.retries === 'object'
? workflowConfig.retries.attempts
: workflowConfig.retries
)!
if (
maxWorkflowRetries !== undefined &&
maxWorkflowRetries !== null &&
job.totalTried >= maxWorkflowRetries
) {
hasFinalError = true
state.reachedMaxRetries = true
}
// Now let's handle workflow retries
if (!hasFinalError) {
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < new Date()) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
// Job will retry. Let's determine when!
const waitUntil: Date = calculateBackoffWaitUntil({
retriesConfig: workflowConfig.retries!,
totalTried: job.totalTried ?? 0,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) {
job.waitUntil = waitUntil.toISOString()
}
}
req.payload.logger.error({
err: error,
msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`,
})
return {
hasFinalError,
}
}

View File

@@ -1,5 +1,10 @@
import { pathToFileURL } from 'url'
import type { TaskConfig, TaskHandler, TaskType } from '../../../config/types/taskTypes.js'
/**
* Imports a handler function from a given path.
*/
export async function importHandlerPath<T>(path: string): Promise<T> {
let runner!: T
const [runnerPath, runnerImportName] = path.split('#')
@@ -35,3 +40,19 @@ export async function importHandlerPath<T>(path: string): Promise<T> {
return runner
}
/**
* The `handler` property of a task config can either be a function or a path to a module that exports a function.
* This function resolves the handler to a function, either by importing it from the path or returning the function directly
* if it is already a function.
*/
export async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) {
if (!taskConfig) {
throw new Error('Task config is required to get the task handler')
}
if (typeof taskConfig.handler === 'function') {
return taskConfig.handler
} else {
return await importHandlerPath<TaskHandler<TaskType>>(taskConfig.handler)
}
}

View File

@@ -1,12 +1,12 @@
import type { APIError } from '../../../../errors/APIError.js'
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type { WorkflowConfig, WorkflowHandler } from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
import { handleTaskError } from '../../../errors/handleTaskError.js'
import { handleWorkflowError } from '../../../errors/handleWorkflowError.js'
import { JobCancelledError, TaskError, WorkflowError } from '../../../errors/index.js'
import { getRunTaskFunction } from './getRunTaskFunction.js'
import { handleWorkflowError } from './handleWorkflowError.js'
type Args = {
job: Job
@@ -29,46 +29,44 @@ export const runJob = async ({
workflowConfig,
workflowHandler,
}: Args): Promise<RunJobResult> => {
// Object so that we can pass contents by reference, not value.
// We want any mutations to be reflected in here.
const state: RunTaskFunctionState = {
reachedMaxRetries: false,
}
// Run the job
try {
await workflowHandler({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob),
inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob),
job,
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob),
})
} catch (_err) {
const err = _err as APIError
const { hasFinalError } = handleWorkflowError({
error: err,
job,
req,
state,
workflowConfig,
tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob),
})
} catch (error) {
if (error instanceof JobCancelledError) {
throw error // Job cancellation is handled in a top-level error handler, as higher up code may themselves throw this error
}
if (error instanceof TaskError) {
const { hasFinalError } = await handleTaskError({
error,
req,
updateJob,
})
const errorJSON = hasFinalError
? {
name: err.name,
cancelled: Boolean('cancelled' in err && err.cancelled),
message: err.message,
stack: err.stack,
}
: undefined
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: errorJSON,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
return {
status: hasFinalError ? 'error-reached-max-retries' : 'error',
}
}
const { hasFinalError } = await handleWorkflowError({
error:
error instanceof WorkflowError
? error
: new WorkflowError({
job,
message:
typeof error === 'object' && error && 'message' in error
? (error.message as string)
: 'An unhandled error occurred',
workflowConfig,
}),
req,
updateJob,
})
return {
@@ -76,7 +74,7 @@ export const runJob = async ({
}
}
// Workflow has completed
// Workflow has completed successfully
await updateJob({
completedAt: new Date().toISOString(),
log: job.log,

View File

@@ -6,6 +6,9 @@ const configHasJobs = (config: SanitizedConfig): boolean => {
return Boolean(config.jobs?.tasks?.length || config.jobs?.workflows?.length)
}
/**
* /api/payload-jobs/run endpoint
*/
export const runJobsEndpoint: Endpoint = {
handler: async (req) => {
if (!configHasJobs(req.payload.config)) {

View File

@@ -40,6 +40,11 @@ export async function updateJob(args: ArgsByID & BaseArgs) {
}
}
/**
* Helper for updating jobs in the most performant way possible.
* Handles deciding whether it can used direct db methods or not, and if so,
* manually runs the afterRead hook that populates the `taskStatus` property.
*/
export async function updateJobs({
id,
data,

View File

@@ -9,18 +9,22 @@ import { NextRESTClient } from './NextRESTClient.js'
/**
* Initialize Payload configured for integration tests
*/
export async function initPayloadInt(
export async function initPayloadInt<TInitializePayload extends boolean | undefined = true>(
dirname: string,
testSuiteNameOverride?: string,
initializePayload = true,
): Promise<{ config: SanitizedConfig; payload?: Payload; restClient?: NextRESTClient }> {
initializePayload?: TInitializePayload,
): Promise<
TInitializePayload extends false
? { config: SanitizedConfig }
: { config: SanitizedConfig; payload: Payload; restClient: NextRESTClient }
> {
const testSuiteName = testSuiteNameOverride ?? path.basename(dirname)
await runInit(testSuiteName, false, true)
console.log('importing config', path.resolve(dirname, 'config.ts'))
const { default: config } = await import(path.resolve(dirname, 'config.ts'))
if (!initializePayload) {
return { config: await config }
if (initializePayload === false) {
return { config: await config } as any
}
console.log('starting payload')
@@ -29,5 +33,5 @@ export async function initPayloadInt(
console.log('initializing rest client')
const restClient = new NextRESTClient(payload.config)
console.log('initPayloadInt done')
return { config: payload.config, payload, restClient }
return { config: payload.config, payload, restClient } as any
}

View File

@@ -9,6 +9,7 @@ import { devUser } from '../credentials.js'
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
import { seed } from './seed.js'
import { externalWorkflow } from './workflows/externalWorkflow.js'
import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js'
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
import { longRunningWorkflow } from './workflows/longRunning.js'
@@ -393,6 +394,7 @@ export default buildConfigWithDefaults({
workflowRetries2TasksRetriesUndefinedWorkflow,
workflowRetries2TasksRetries0Workflow,
inlineTaskTestWorkflow,
failsImmediatelyWorkflow,
inlineTaskTestDelayedWorkflow,
externalWorkflow,
retriesBackoffTestWorkflow,

View File

@@ -41,6 +41,7 @@ describe('Queues', () => {
if (data.token) {
token = data.token
}
payload.config.jobs.deleteJobOnComplete = true
})
it('will run access control on jobs runner', async () => {
@@ -182,7 +183,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflow-level retries are respected', async () => {
@@ -218,8 +218,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflows dont limit retries if no retries property is sett', async () => {
@@ -255,8 +253,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => {
@@ -292,8 +288,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => {
@@ -329,8 +323,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => {
@@ -366,8 +358,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => {
@@ -403,8 +393,6 @@ describe('Queues', () => {
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
payload.config.jobs.deleteJobOnComplete = true
})
/*
@@ -492,7 +480,7 @@ describe('Queues', () => {
id: job.id,
})
expect(jobAfterRun.totalTried).toBe(5)
expect((jobAfterRun.taskStatus as JobTaskStatus).inline['1'].totalTried).toBe(5)
expect((jobAfterRun.taskStatus as JobTaskStatus).inline?.['1']?.totalTried).toBe(5)
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
@@ -518,7 +506,7 @@ describe('Queues', () => {
if (index === arr.length - 1) {
return null
}
return new Date(arr[index + 1]).getTime() - new Date(time).getTime()
return new Date(arr[index + 1] as string).getTime() - new Date(time).getTime()
})
.filter((p) => p !== null)
@@ -527,8 +515,6 @@ describe('Queues', () => {
expect(durations[1]).toBeGreaterThan(600)
expect(durations[2]).toBeGreaterThan(1200)
expect(durations[3]).toBeGreaterThan(2400)
payload.config.jobs.deleteJobOnComplete = true
})
it('ensure jobs run in FIFO order by default', async () => {
@@ -647,7 +633,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('hello!')
expect(allSimples.docs[0]?.title).toBe('hello!')
})
it('can create and autorun jobs', async () => {
@@ -677,12 +663,12 @@ describe('Queues', () => {
const { id } = await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
message: 'deleteJobOnComplete test',
},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before.id).toBe(id)
expect(before?.id).toBe(id)
await payload.jobs.run()
@@ -690,6 +676,21 @@ describe('Queues', () => {
expect(after).toBeNull()
})
it('should not delete failed jobs if deleteJobOnComplete is true', async () => {
const { id } = await payload.jobs.queue({
workflow: 'failsImmediately',
input: {},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run()
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after?.id).toBe(id)
})
it('should respect deleteJobOnComplete false configuration', async () => {
payload.config.jobs.deleteJobOnComplete = false
const { id } = await payload.jobs.queue({
@@ -700,14 +701,12 @@ describe('Queues', () => {
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before.id).toBe(id)
expect(before?.id).toBe(id)
await payload.jobs.run()
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after.id).toBe(id)
payload.config.jobs.deleteJobOnComplete = true
expect(after?.id).toBe(id)
})
it('can queue single tasks', async () => {
@@ -726,7 +725,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
})
it('can queue and run via the endpoint single tasks without workflows', async () => {
@@ -751,7 +750,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
payload.config.jobs.workflows = workflowsRef
})
@@ -885,8 +884,8 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(8)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[7].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[7]?.title).toBe('from single task')
})
it('can queue single tasks hundreds of times', async () => {
@@ -912,9 +911,8 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(numberOfTasks) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[numberOfTasks - 1].title).toBe('from single task')
payload.config.jobs.deleteJobOnComplete = true
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[numberOfTasks - 1]?.title).toBe('from single task')
})
it('ensure default jobs run limit of 10 works', async () => {
@@ -935,8 +933,8 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(10) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[9].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[9]?.title).toBe('from single task')
})
it('ensure jobs run limit can be customized', async () => {
@@ -959,9 +957,9 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(42) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[30].title).toBe('from single task')
expect(allSimples.docs[41].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[30]?.title).toBe('from single task')
expect(allSimples.docs[41]?.title).toBe('from single task')
})
it('can queue different kinds of single tasks multiple times', async () => {
@@ -1026,7 +1024,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('external')
expect(allSimples.docs[0]?.title).toBe('external')
})
it('can queue external workflow that is running external task', async () => {
@@ -1045,13 +1043,13 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('externalWorkflow')
expect(allSimples.docs[0]?.title).toBe('externalWorkflow')
})
it('ensure payload.jobs.runByID works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: string = null
let lastJobID: null | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
@@ -1061,6 +1059,9 @@ describe('Queues', () => {
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.runByID({
id: lastJobID,
@@ -1072,7 +1073,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
@@ -1085,13 +1086,13 @@ describe('Queues', () => {
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0].id).toBe(lastJobID)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: string = null
let lastJobID: null | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
@@ -1101,6 +1102,9 @@ describe('Queues', () => {
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.run({
where: {
@@ -1116,7 +1120,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
@@ -1129,7 +1133,7 @@ describe('Queues', () => {
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0].id).toBe(lastJobID)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => {
@@ -1158,7 +1162,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task 2')
expect(allSimples.docs[0]?.title).toBe('from single task 2')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
@@ -1171,7 +1175,7 @@ describe('Queues', () => {
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect((allCompletedJobs.docs[0].input as any).message).toBe('from single task 2')
expect((allCompletedJobs.docs[0]?.input as any).message).toBe('from single task 2')
})
it('can run sub-tasks', async () => {
@@ -1191,24 +1195,24 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs[0].title).toBe('hello!')
expect(allSimples.docs[1].title).toBe('hello!')
expect(allSimples.docs[0]?.title).toBe('hello!')
expect(allSimples.docs[1]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.log[0].taskID).toBe('create doc 1')
expect(jobAfterRun?.log?.[0]?.taskID).toBe('create doc 1')
//expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs')
// jobAfterRun.log[0].parent should not exist
expect(jobAfterRun.log[0].parent).toBeUndefined()
expect(jobAfterRun?.log?.[0]?.parent).toBeUndefined()
expect(jobAfterRun.log[1].taskID).toBe('create doc 2')
expect(jobAfterRun?.log?.[1]?.taskID).toBe('create doc 2')
//expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs')
expect(jobAfterRun.log[1].parent).toBeUndefined()
expect(jobAfterRun?.log?.[1]?.parent).toBeUndefined()
expect(jobAfterRun.log[2].taskID).toBe('create two docs')
expect(jobAfterRun?.log?.[2]?.taskID).toBe('create two docs')
})
it('ensure successful sub-tasks are not retried', async () => {
@@ -1237,7 +1241,7 @@ describe('Queues', () => {
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('hello!')
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
@@ -1339,8 +1343,8 @@ describe('Queues', () => {
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('failed')
expect(jobAfterRun.log[0].state).toBe('failed')
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('failed')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error', async () => {
@@ -1360,8 +1364,8 @@ describe('Queues', () => {
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('failed')
expect(jobAfterRun.log[0].state).toBe('failed')
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error with custom error message', async () => {
@@ -1383,8 +1387,8 @@ describe('Queues', () => {
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun.log[0].error.message).toBe('custom error message')
expect(jobAfterRun.log[0].state).toBe('failed')
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can reliably run workflows with parallel tasks', async () => {

View File

@@ -123,6 +123,7 @@ export interface Config {
workflowRetries2TasksRetriesUndefined: WorkflowWorkflowRetries2TasksRetriesUndefined;
workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0;
inlineTaskTest: WorkflowInlineTaskTest;
failsImmediately: WorkflowFailsImmediately;
inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed;
externalWorkflow: WorkflowExternalWorkflow;
retriesBackoffTest: WorkflowRetriesBackoffTest;
@@ -314,6 +315,7 @@ export interface PayloadJob {
| 'workflowRetries2TasksRetriesUndefined'
| 'workflowRetries2TasksRetries0'
| 'inlineTaskTest'
| 'failsImmediately'
| 'inlineTaskTestDelayed'
| 'externalWorkflow'
| 'retriesBackoffTest'
@@ -724,6 +726,13 @@ export interface WorkflowInlineTaskTest {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowFailsImmediately".
*/
export interface WorkflowFailsImmediately {
input?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowInlineTaskTestDelayed".

View File

@@ -1,14 +1,8 @@
import type { Payload } from 'payload'
import path from 'path'
import { fileURLToPath } from 'url'
import { devUser } from '../credentials.js'
import { seedDB } from '../helpers/seed.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export const seed = async (_payload: Payload) => {
await _payload.create({
collection: 'users',
@@ -22,9 +16,11 @@ export const seed = async (_payload: Payload) => {
export async function clearAndSeedEverything(_payload: Payload) {
return await seedDB({
_payload,
collectionSlugs: _payload.config.collections.map((collection) => collection.slug),
collectionSlugs: [
..._payload.config.collections.map((collection) => collection.slug),
'payload-jobs',
],
seedFunction: seed,
snapshotKey: 'fieldsTest',
uploadsDir: path.resolve(dirname, './collections/Upload/uploads'),
snapshotKey: 'queuesTest',
})
}

View File

@@ -0,0 +1,10 @@
import type { WorkflowConfig } from 'payload'
export const failsImmediatelyWorkflow: WorkflowConfig<'failsImmediately'> = {
slug: 'failsImmediately',
inputSchema: [],
retries: 0,
handler: () => {
throw new Error('This workflow fails immediately')
},
}