diff --git a/packages/payload/src/queues/operations/runJobs/runJob/calculateBackoffWaitUntil.ts b/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts similarity index 94% rename from packages/payload/src/queues/operations/runJobs/runJob/calculateBackoffWaitUntil.ts rename to packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts index 4e5325878..e8ff239e8 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/calculateBackoffWaitUntil.ts +++ b/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts @@ -1,4 +1,4 @@ -import type { RetryConfig } from '../../../config/types/taskTypes.js' +import type { RetryConfig } from '../config/types/taskTypes.js' export function calculateBackoffWaitUntil({ retriesConfig, diff --git a/packages/payload/src/queues/errors/getWorkflowRetryBehavior.ts b/packages/payload/src/queues/errors/getWorkflowRetryBehavior.ts new file mode 100644 index 000000000..7bfa5218b --- /dev/null +++ b/packages/payload/src/queues/errors/getWorkflowRetryBehavior.ts @@ -0,0 +1,63 @@ +import type { Job } from '../../index.js' +import type { RetryConfig } from '../config/types/taskTypes.js' + +import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' + +/** + * Assuming there is no task that has already reached max retries, + * this function determines if the workflow should retry the job + * and if so, when it should retry. + */ +export function getWorkflowRetryBehavior({ + job, + retriesConfig, +}: { + job: Job + retriesConfig?: number | RetryConfig +}): + | { + hasFinalError: false + maxWorkflowRetries?: number + waitUntil?: Date + } + | { + hasFinalError: true + maxWorkflowRetries?: number + waitUntil?: Date + } { + const maxWorkflowRetries = ( + typeof retriesConfig === 'object' ? retriesConfig.attempts : retriesConfig + )! + + if ( + maxWorkflowRetries !== undefined && + maxWorkflowRetries !== null && + job.totalTried >= maxWorkflowRetries + ) { + return { + hasFinalError: true, + maxWorkflowRetries, + } + } + + if (!retriesConfig) { + // No retries provided => assuming no task reached max retries, we can retry + return { + hasFinalError: false, + maxWorkflowRetries: undefined, + waitUntil: undefined, + } + } + + // Job will retry. Let's determine when! + const waitUntil: Date = calculateBackoffWaitUntil({ + retriesConfig, + totalTried: job.totalTried ?? 0, + }) + + return { + hasFinalError: false, + maxWorkflowRetries, + waitUntil, + } +} diff --git a/packages/payload/src/queues/errors/handleTaskError.ts b/packages/payload/src/queues/errors/handleTaskError.ts new file mode 100644 index 000000000..3d6b491c9 --- /dev/null +++ b/packages/payload/src/queues/errors/handleTaskError.ts @@ -0,0 +1,164 @@ +import ObjectIdImport from 'bson-objectid' + +import type { PayloadRequest } from '../../index.js' +import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' +import type { TaskError } from './index.js' + +import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' +import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js' + +const ObjectId = (ObjectIdImport.default || + ObjectIdImport) as unknown as typeof ObjectIdImport.default + +export async function handleTaskError({ + error, + req, + updateJob, +}: { + error: TaskError + req: PayloadRequest + updateJob: UpdateJobFunction +}): Promise<{ + hasFinalError: boolean +}> { + const { + executedAt, + input, + job, + output, + parent, + retriesConfig, + taskConfig, + taskID, + taskSlug, + taskStatus, + workflowConfig, + } = error.args + + if (taskConfig?.onFail) { + await taskConfig.onFail() + } + + const errorJSON = { + name: error.name, + cancelled: Boolean('cancelled' in error && error.cancelled), + message: error.message, + stack: error.stack, + } + + const currentDate = new Date() + + ;(job.log ??= []).push({ + id: new ObjectId().toHexString(), + completedAt: currentDate.toISOString(), + error: errorJSON, + executedAt: executedAt.toISOString(), + input, + output: output ?? {}, + parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, + state: 'failed', + taskID, + taskSlug, + }) + + if (job.waitUntil) { + // Check if waitUntil is in the past + const waitUntil = new Date(job.waitUntil) + if (waitUntil < currentDate) { + // Outdated waitUntil, remove it + delete job.waitUntil + } + } + + let maxRetries: number = 0 + + if (retriesConfig?.attempts === undefined || retriesConfig?.attempts === null) { + // Inherit retries from workflow config, if they are undefined and the workflow config has retries configured + if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) { + maxRetries = + typeof workflowConfig.retries === 'object' + ? typeof workflowConfig.retries.attempts === 'number' + ? workflowConfig.retries.attempts + : 0 + : workflowConfig.retries + } else { + maxRetries = 0 + } + } else { + maxRetries = retriesConfig.attempts + } + + if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) { + /** + * Task reached max retries => workflow will not retry + */ + + await updateJob({ + error: errorJSON, + hasError: true, + log: job.log, + processing: false, + totalTried: (job.totalTried ?? 0) + 1, + waitUntil: job.waitUntil, + }) + + req.payload.logger.error({ + err: error, + job, + msg: `Error running task ${taskID}. Attempt ${job.totalTried} - max retries reached`, + taskSlug, + }) + return { + hasFinalError: true, + } + } + + /** + * Task can retry: + * - If workflow can retry, allow it to retry + * - If workflow reached max retries, do not retry and set final error + */ + + // First set task waitUntil - if the workflow waitUntil is later, it will be updated later + const taskWaitUntil: Date = calculateBackoffWaitUntil({ + retriesConfig, + totalTried: taskStatus?.totalTried ?? 0, + }) + + // Update job's waitUntil only if this waitUntil is later than the current one + if (!job.waitUntil || taskWaitUntil > new Date(job.waitUntil)) { + job.waitUntil = taskWaitUntil.toISOString() + } + + const { hasFinalError, maxWorkflowRetries, waitUntil } = getWorkflowRetryBehavior({ + job, + retriesConfig: workflowConfig.retries, + }) + + req.payload.logger.error({ + err: error, + job, + msg: `Error running task ${taskID}. Attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, + taskSlug, + }) + + // Update job's waitUntil only if this waitUntil is later than the current one + if (waitUntil && (!job.waitUntil || waitUntil > new Date(job.waitUntil))) { + job.waitUntil = waitUntil.toISOString() + } + + // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) + // we need to ensure the job is updated to reflect the error + await updateJob({ + error: hasFinalError ? errorJSON : undefined, + hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried + log: job.log, + processing: false, + totalTried: (job.totalTried ?? 0) + 1, + waitUntil: job.waitUntil, + }) + + return { + hasFinalError, + } +} diff --git a/packages/payload/src/queues/errors/handleWorkflowError.ts b/packages/payload/src/queues/errors/handleWorkflowError.ts new file mode 100644 index 000000000..6c5fed8da --- /dev/null +++ b/packages/payload/src/queues/errors/handleWorkflowError.ts @@ -0,0 +1,77 @@ +import type { PayloadRequest } from '../../index.js' +import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' +import type { WorkflowError } from './index.js' + +import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js' + +/** + * This is called if a workflow catches an error. It determines if it's a final error + * or not and handles logging. + * A Workflow error = error that happens anywhere in between running tasks. + * + * This function assumes that the error is not a TaskError, but a WorkflowError. If a task errors, + * only a TaskError should be thrown, not a WorkflowError. + */ +export async function handleWorkflowError({ + error, + req, + updateJob, +}: { + error: WorkflowError + req: PayloadRequest + updateJob: UpdateJobFunction +}): Promise<{ + hasFinalError: boolean +}> { + const { job, workflowConfig } = error.args + + const errorJSON = { + name: error.name, + cancelled: Boolean('cancelled' in error && error.cancelled), + message: error.message, + stack: error.stack, + } + + const { hasFinalError, maxWorkflowRetries, waitUntil } = getWorkflowRetryBehavior({ + job, + retriesConfig: workflowConfig.retries!, + }) + + if (!hasFinalError) { + if (job.waitUntil) { + // Check if waitUntil is in the past + const waitUntil = new Date(job.waitUntil) + if (waitUntil < new Date()) { + // Outdated waitUntil, remove it + delete job.waitUntil + } + } + + // Update job's waitUntil only if this waitUntil is later than the current one + if (waitUntil && (!job.waitUntil || waitUntil > new Date(job.waitUntil))) { + job.waitUntil = waitUntil.toISOString() + } + } + + const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` + + req.payload.logger.error({ + err: error, + msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, + }) + + // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) + // we need to ensure the job is updated to reflect the error + await updateJob({ + error: errorJSON, + hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried + log: job.log, + processing: false, + totalTried: (job.totalTried ?? 0) + 1, + waitUntil: job.waitUntil, + }) + + return { + hasFinalError, + } +} diff --git a/packages/payload/src/queues/errors/index.ts b/packages/payload/src/queues/errors/index.ts new file mode 100644 index 000000000..56b4b46ec --- /dev/null +++ b/packages/payload/src/queues/errors/index.ts @@ -0,0 +1,51 @@ +import type { Job, SingleTaskStatus, WorkflowConfig } from '../../index.js' +import type { RetryConfig, TaskConfig } from '../config/types/taskTypes.js' +import type { TaskParent } from '../operations/runJobs/runJob/getRunTaskFunction.js' + +export type TaskErrorArgs = { + executedAt: Date + input?: object + job: Job + message: string + output?: object + parent?: TaskParent + retriesConfig: RetryConfig + taskConfig?: TaskConfig + taskID: string + taskSlug: string + taskStatus: null | SingleTaskStatus + workflowConfig: WorkflowConfig +} + +export type WorkflowErrorArgs = { + job: Job + message: string + workflowConfig: WorkflowConfig +} + +export class TaskError extends Error { + args: TaskErrorArgs + constructor(args: TaskErrorArgs) { + super(args.message) + this.args = args + } +} +export class WorkflowError extends Error { + args: WorkflowErrorArgs + + constructor(args: WorkflowErrorArgs) { + super(args.message) + this.args = args + } +} + +export class JobCancelledError extends Error { + args: { + job: Job + } + + constructor(args: { job: Job }) { + super(`Job ${args.job.id} was cancelled`) + this.args = args + } +} diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index b88b508bd..99f6482e6 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -7,6 +7,7 @@ import type { RunJobResult } from './runJob/index.js' import { Forbidden } from '../../../errors/Forbidden.js' import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js' import { jobsCollectionSlug } from '../../config/index.js' +import { JobCancelledError } from '../../errors/index.js' import { updateJob, updateJobs } from '../../utilities/updateJob.js' import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js' import { importHandlerPath } from './runJob/importHandlerPath.js' @@ -226,7 +227,12 @@ export const runJobs = async (args: RunJobsArgs): Promise => { const successfullyCompletedJobs: (number | string)[] = [] - const runSingleJob = async (job: Job) => { + const runSingleJob = async ( + job: Job, + ): Promise<{ + id: number | string + result: RunJobResult + }> => { if (!job.workflowSlug && !job.taskSlug) { throw new Error('Job must have either a workflowSlug or a taskSlug') } @@ -245,68 +251,90 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } if (!workflowConfig) { - return null // Skip jobs with no workflow configuration + return { + id: job.id, + result: { + status: 'error', + }, + } // Skip jobs with no workflow configuration } - const updateJob = getUpdateJobFunction(job, jobReq) + try { + const updateJob = getUpdateJobFunction(job, jobReq) - // the runner will either be passed to the config - // OR it will be a path, which we will need to import via eval to avoid - // Next.js compiler dynamic import expression errors - let workflowHandler: WorkflowHandler | WorkflowJSON - if ( - typeof workflowConfig.handler === 'function' || - (typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler)) - ) { - workflowHandler = workflowConfig.handler - } else { - workflowHandler = await importHandlerPath(workflowConfig.handler) + // the runner will either be passed to the config + // OR it will be a path, which we will need to import via eval to avoid + // Next.js compiler dynamic import expression errors + let workflowHandler: WorkflowHandler | WorkflowJSON + if ( + typeof workflowConfig.handler === 'function' || + (typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler)) + ) { + workflowHandler = workflowConfig.handler + } else { + workflowHandler = await importHandlerPath(workflowConfig.handler) - if (!workflowHandler) { - const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` - const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.` - payload.logger.error(errorMessage) + if (!workflowHandler) { + const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` + const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.` + payload.logger.error(errorMessage) - await updateJob({ - error: { - error: errorMessage, - }, - hasError: true, - processing: false, + await updateJob({ + error: { + error: errorMessage, + }, + hasError: true, + processing: false, + }) + + return { + id: job.id, + result: { + status: 'error-reached-max-retries', + }, + } + } + } + + if (typeof workflowHandler === 'function') { + const result = await runJob({ + job, + req: jobReq, + updateJob, + workflowConfig, + workflowHandler, }) - return + if (result.status === 'success') { + successfullyCompletedJobs.push(job.id) + } + + return { id: job.id, result } + } else { + const result = await runJSONJob({ + job, + req: jobReq, + updateJob, + workflowConfig, + workflowHandler, + }) + + if (result.status === 'success') { + successfullyCompletedJobs.push(job.id) + } + + return { id: job.id, result } } - } - - if (typeof workflowHandler === 'function') { - const result = await runJob({ - job, - req: jobReq, - updateJob, - workflowConfig, - workflowHandler, - }) - - if (result.status !== 'error') { - successfullyCompletedJobs.push(job.id) + } catch (error) { + if (error instanceof JobCancelledError) { + return { + id: job.id, + result: { + status: 'error-reached-max-retries', + }, + } } - - return { id: job.id, result } - } else { - const result = await runJSONJob({ - job, - req: jobReq, - updateJob, - workflowConfig, - workflowHandler, - }) - - if (result.status !== 'error') { - successfullyCompletedJobs.push(job.id) - } - - return { id: job.id, result } + throw error } } diff --git a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts index 28a117194..66b015672 100644 --- a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts @@ -5,8 +5,9 @@ import type { WorkflowConfig } from '../../../config/types/workflowTypes.js' import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js' import type { JobRunStatus } from '../runJob/index.js' -import { getRunTaskFunction, type RunTaskFunctionState } from '../runJob/getRunTaskFunction.js' -import { handleWorkflowError } from '../runJob/handleWorkflowError.js' +import { handleWorkflowError } from '../../../errors/handleWorkflowError.js' +import { WorkflowError } from '../../../errors/index.js' +import { getRunTaskFunction } from '../runJob/getRunTaskFunction.js' type Args = { job: Job @@ -27,12 +28,6 @@ export const runJSONJob = async ({ workflowConfig, workflowHandler, }: Args): Promise => { - // Object so that we can pass contents by reference, not value. - // We want any mutations to be reflected in here. - const state: RunTaskFunctionState = { - reachedMaxRetries: false, - } - const stepsToRun: WorkflowStep[] = [] for (const step of workflowHandler) { @@ -51,12 +46,10 @@ export const runJSONJob = async ({ stepsToRun.push(step) } - const tasks = getRunTaskFunction(state, job, workflowConfig, req, false, updateJob) - const inlineTask = getRunTaskFunction(state, job, workflowConfig, req, true, updateJob) + const tasks = getRunTaskFunction(job, workflowConfig, req, false, updateJob) + const inlineTask = getRunTaskFunction(job, workflowConfig, req, true, updateJob) // Run the job - let hasFinalError = false - let error: Error | undefined try { await Promise.all( stepsToRun.map(async (step) => { @@ -73,17 +66,27 @@ export const runJSONJob = async ({ } }), ) - } catch (_err) { - const err = _err as Error - const errorResult = handleWorkflowError({ - error: err, - job, + } catch (error) { + const { hasFinalError } = await handleWorkflowError({ + error: + error instanceof WorkflowError + ? error + : new WorkflowError({ + job, + message: + typeof error === 'object' && error && 'message' in error + ? (error.message as string) + : 'An unhandled error occurred', + workflowConfig, + }), + req, - state, - workflowConfig, + updateJob, }) - error = err - hasFinalError = errorResult.hasFinalError + + return { + status: hasFinalError ? 'error-reached-max-retries' : 'error', + } } // Check if workflow has completed @@ -107,49 +110,23 @@ export const runJSONJob = async ({ } if (workflowCompleted) { - if (error) { - // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) - // we need to ensure the job is updated to reflect the error - await updateJob({ - completedAt: new Date().toISOString(), - error: hasFinalError ? error : undefined, - hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - processing: false, - totalTried: (job.totalTried ?? 0) + 1, - }) - } else { - await updateJob({ - completedAt: new Date().toISOString(), - processing: false, - totalTried: (job.totalTried ?? 0) + 1, - }) - } + await updateJob({ + completedAt: new Date().toISOString(), + processing: false, + totalTried: (job.totalTried ?? 0) + 1, + }) return { status: 'success', } } else { - if (error) { - // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) - // we need to ensure the job is updated to reflect the error - await updateJob({ - error: hasFinalError ? error : undefined, - hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - processing: false, - totalTried: (job.totalTried ?? 0) + 1, - }) - return { - status: hasFinalError ? 'error-reached-max-retries' : 'error', - } - } else { - // Retry the job - no need to bump processing or totalTried as this does not count as a retry. A condition of a different task might have just opened up! - return await runJSONJob({ - job, - req, - updateJob, - workflowConfig, - workflowHandler, - }) - } + // Retry the job - no need to bump processing or totalTried as this does not count as a retry. A condition of a different task might have just opened up! + return await runJSONJob({ + job, + req, + updateJob, + workflowConfig, + workflowHandler, + }) } } diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 587c2ac92..868ac5602 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -19,148 +19,18 @@ import type { } from '../../../config/types/workflowTypes.js' import type { UpdateJobFunction } from './getUpdateJobFunction.js' -import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' -import { importHandlerPath } from './importHandlerPath.js' +import { TaskError } from '../../../errors/index.js' +import { getTaskHandlerFromConfig } from './importHandlerPath.js' const ObjectId = (ObjectIdImport.default || ObjectIdImport) as unknown as typeof ObjectIdImport.default -// Helper object type to force being passed by reference -export type RunTaskFunctionState = { - reachedMaxRetries: boolean -} - -async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) { - if (!taskConfig) { - throw new Error('Task config is required to get the task handler') - } - if (typeof taskConfig.handler === 'function') { - return taskConfig.handler - } else { - return await importHandlerPath>(taskConfig.handler) - } -} - -export async function handleTaskFailed({ - error, - executedAt, - input, - job, - maxRetries, - output, - parent, - req, - retriesConfig, - state, - taskConfig, - taskHandlerResult, - taskID, - taskSlug, - taskStatus, - updateJob, -}: { - error?: Error - executedAt: Date - input: object - job: Job - maxRetries: number - output: object - parent?: TaskParent - req: PayloadRequest - retriesConfig: number | RetryConfig - state: RunTaskFunctionState - taskConfig?: TaskConfig - taskHandlerResult?: TaskHandlerResult - taskID: string - taskSlug: string - taskStatus: null | SingleTaskStatus - updateJob: UpdateJobFunction -}): Promise { - req.payload.logger.error({ err: error, job, msg: `Error running task ${taskID}`, taskSlug }) - - if (taskConfig?.onFail) { - await taskConfig.onFail() - } - - const errorJSON = error - ? { - name: error.name, - message: error.message, - stack: error.stack, - } - : { - message: - taskHandlerResult?.state === 'failed' - ? (taskHandlerResult.errorMessage ?? taskHandlerResult.state) - : 'failed', - } - - const currentDate = new Date() - - ;(job.log ??= []).push({ - id: new ObjectId().toHexString(), - completedAt: currentDate.toISOString(), - error: errorJSON, - executedAt: executedAt.toISOString(), - input, - output, - parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, - state: 'failed', - taskID, - taskSlug, - }) - - if (job.waitUntil) { - // Check if waitUntil is in the past - const waitUntil = new Date(job.waitUntil) - if (waitUntil < currentDate) { - // Outdated waitUntil, remove it - delete job.waitUntil - } - } - - if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) { - state.reachedMaxRetries = true - - await updateJob({ - error, - hasError: true, - log: job.log, - processing: false, - waitUntil: job.waitUntil, - }) - - throw new Error( - `Task ${taskSlug} has failed more than the allowed retries in workflow ${job.workflowSlug}${error ? `. Error: ${String(error)}` : ''}`, - ) - } else { - // Job will retry. Let's determine when! - const waitUntil: Date = calculateBackoffWaitUntil({ - retriesConfig, - totalTried: taskStatus?.totalTried ?? 0, - }) - - // Update job's waitUntil only if this waitUntil is later than the current one - if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) { - job.waitUntil = waitUntil.toISOString() - } - - await updateJob({ - log: job.log, - processing: false, - waitUntil: job.waitUntil, - }) - throw error ?? new Error('Task failed') - } -} - export type TaskParent = { taskID: string taskSlug: string } export const getRunTaskFunction = ( - state: RunTaskFunctionState, job: Job, workflowConfig: WorkflowConfig, req: PayloadRequest, @@ -240,47 +110,21 @@ export const getRunTaskFunction = ( : await getTaskHandlerFromConfig(taskConfig) if (!runner || typeof runner !== 'function') { - const errorMessage = isInline - ? `Can't find runner for inline task with ID ${taskID}` - : `Can't find runner while importing with the path ${typeof workflowConfig.handler === 'string' ? workflowConfig.handler : 'unknown - no string path'} in job type ${job.workflowSlug} for task ${taskSlug}.` - req.payload.logger.error(errorMessage) - - await updateJob({ - error: { - error: errorMessage, - }, - hasError: true, - log: [ - ...(job.log || []), - { - id: new ObjectId().toHexString(), - completedAt: new Date().toISOString(), - error: errorMessage, - executedAt: executedAt.toISOString(), - parent: jobConfig.addParentToTaskLog ? parent : undefined, - state: 'failed', - taskID, - taskSlug, - }, - ], - processing: false, + throw new TaskError({ + executedAt, + input, + job, + message: isInline + ? `Inline task with ID ${taskID} does not have a valid handler.` + : `Task with slug ${taskSlug} in workflow ${job.workflowSlug} does not have a valid handler.`, + parent, + retriesConfig: finalRetriesConfig, + taskConfig, + taskID, + taskSlug, + taskStatus, + workflowConfig, }) - - throw new Error(errorMessage) - } - - let maxRetries: number | undefined = finalRetriesConfig?.attempts - - if (maxRetries === undefined || maxRetries === null) { - // Inherit retries from workflow config, if they are undefined and the workflow config has retries configured - if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) { - maxRetries = - typeof workflowConfig.retries === 'object' - ? workflowConfig.retries.attempts - : workflowConfig.retries - } else { - maxRetries = 0 - } } let taskHandlerResult: TaskHandlerResult @@ -288,58 +132,50 @@ export const getRunTaskFunction = ( try { taskHandlerResult = await runner({ - inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, { + inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob, { taskID, taskSlug, }), input, job: job as unknown as Job, req, - tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, { + tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob, { taskID, taskSlug, }), }) - } catch (err) { - await handleTaskFailed({ - error: err as Error | undefined, + } catch (err: any) { + throw new TaskError({ executedAt, input: input!, job, - maxRetries: maxRetries!, + message: err.message || 'Task handler threw an error', output, parent, - req, retriesConfig: finalRetriesConfig, - state, taskConfig, taskID, taskSlug, taskStatus, - updateJob, + workflowConfig, }) - throw new Error('Task failed') } if (taskHandlerResult.state === 'failed') { - await handleTaskFailed({ + throw new TaskError({ executedAt, input: input!, job, - maxRetries: maxRetries!, + message: taskHandlerResult.errorMessage ?? 'Task handler returned a failed state', output, parent, - req, retriesConfig: finalRetriesConfig, - state, taskConfig, - taskHandlerResult, taskID, taskSlug, taskStatus, - updateJob, + workflowConfig, }) - throw new Error('Task failed') } else { output = taskHandlerResult.output } diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts index bfeba3bfb..0751f1ba0 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts @@ -1,10 +1,16 @@ import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' +import { JobCancelledError } from '../../../errors/index.js' import { updateJob } from '../../../utilities/updateJob.js' export type UpdateJobFunction = (jobData: Partial) => Promise +/** + * Helper for updating a job that does the following, additionally to updating the job: + * - Merges incoming data from the updated job into the original job object + * - Handles job cancellation by throwing a `JobCancelledError` if the job was cancelled. + */ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFunction { return async (jobData) => { const updatedJob = await updateJob({ @@ -15,6 +21,10 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu req, }) + if (!updatedJob) { + return job + } + // Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function for (const key in updatedJob) { if (key === 'log') { @@ -31,11 +41,9 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu } if ((updatedJob?.error as Record)?.cancelled) { - const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error - cancelledError.cancelled = true - throw cancelledError + throw new JobCancelledError({ job }) } - return updatedJob! + return updatedJob } } diff --git a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts deleted file mode 100644 index 2481db7b2..000000000 --- a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts +++ /dev/null @@ -1,76 +0,0 @@ -import type { Job } from '../../../../index.js' -import type { PayloadRequest } from '../../../../types/index.js' -import type { WorkflowConfig } from '../../../config/types/workflowTypes.js' -import type { RunTaskFunctionState } from './getRunTaskFunction.js' - -import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' - -/** - * This is called if a workflow catches an error. It determines if it's a final error - * or not and handles logging. - */ -export function handleWorkflowError({ - error, - job, - req, - state, - workflowConfig, -}: { - error: Error - job: Job - req: PayloadRequest - state: RunTaskFunctionState - workflowConfig: WorkflowConfig -}): { - hasFinalError: boolean -} { - const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` - - let hasFinalError = state.reachedMaxRetries || !!('cancelled' in error && error.cancelled) // If any TASK reached max retries, the job has an error - const maxWorkflowRetries: number = ( - typeof workflowConfig.retries === 'object' - ? workflowConfig.retries.attempts - : workflowConfig.retries - )! - - if ( - maxWorkflowRetries !== undefined && - maxWorkflowRetries !== null && - job.totalTried >= maxWorkflowRetries - ) { - hasFinalError = true - state.reachedMaxRetries = true - } - - // Now let's handle workflow retries - if (!hasFinalError) { - if (job.waitUntil) { - // Check if waitUntil is in the past - const waitUntil = new Date(job.waitUntil) - if (waitUntil < new Date()) { - // Outdated waitUntil, remove it - delete job.waitUntil - } - } - - // Job will retry. Let's determine when! - const waitUntil: Date = calculateBackoffWaitUntil({ - retriesConfig: workflowConfig.retries!, - totalTried: job.totalTried ?? 0, - }) - - // Update job's waitUntil only if this waitUntil is later than the current one - if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) { - job.waitUntil = waitUntil.toISOString() - } - } - - req.payload.logger.error({ - err: error, - msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, - }) - - return { - hasFinalError, - } -} diff --git a/packages/payload/src/queues/operations/runJobs/runJob/importHandlerPath.ts b/packages/payload/src/queues/operations/runJobs/runJob/importHandlerPath.ts index 48c2fc679..b7d23d1f3 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/importHandlerPath.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/importHandlerPath.ts @@ -1,5 +1,10 @@ import { pathToFileURL } from 'url' +import type { TaskConfig, TaskHandler, TaskType } from '../../../config/types/taskTypes.js' + +/** + * Imports a handler function from a given path. + */ export async function importHandlerPath(path: string): Promise { let runner!: T const [runnerPath, runnerImportName] = path.split('#') @@ -35,3 +40,19 @@ export async function importHandlerPath(path: string): Promise { return runner } + +/** + * The `handler` property of a task config can either be a function or a path to a module that exports a function. + * This function resolves the handler to a function, either by importing it from the path or returning the function directly + * if it is already a function. + */ +export async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) { + if (!taskConfig) { + throw new Error('Task config is required to get the task handler') + } + if (typeof taskConfig.handler === 'function') { + return taskConfig.handler + } else { + return await importHandlerPath>(taskConfig.handler) + } +} diff --git a/packages/payload/src/queues/operations/runJobs/runJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJob/index.ts index 7b3b755d7..fe8f6256e 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/index.ts @@ -1,12 +1,12 @@ -import type { APIError } from '../../../../errors/APIError.js' import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' import type { WorkflowConfig, WorkflowHandler } from '../../../config/types/workflowTypes.js' -import type { RunTaskFunctionState } from './getRunTaskFunction.js' import type { UpdateJobFunction } from './getUpdateJobFunction.js' +import { handleTaskError } from '../../../errors/handleTaskError.js' +import { handleWorkflowError } from '../../../errors/handleWorkflowError.js' +import { JobCancelledError, TaskError, WorkflowError } from '../../../errors/index.js' import { getRunTaskFunction } from './getRunTaskFunction.js' -import { handleWorkflowError } from './handleWorkflowError.js' type Args = { job: Job @@ -29,46 +29,44 @@ export const runJob = async ({ workflowConfig, workflowHandler, }: Args): Promise => { - // Object so that we can pass contents by reference, not value. - // We want any mutations to be reflected in here. - const state: RunTaskFunctionState = { - reachedMaxRetries: false, - } - // Run the job try { await workflowHandler({ - inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob), + inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob), job, req, - tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob), - }) - } catch (_err) { - const err = _err as APIError - const { hasFinalError } = handleWorkflowError({ - error: err, - job, - req, - state, - workflowConfig, + tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob), }) + } catch (error) { + if (error instanceof JobCancelledError) { + throw error // Job cancellation is handled in a top-level error handler, as higher up code may themselves throw this error + } + if (error instanceof TaskError) { + const { hasFinalError } = await handleTaskError({ + error, + req, + updateJob, + }) - const errorJSON = hasFinalError - ? { - name: err.name, - cancelled: Boolean('cancelled' in err && err.cancelled), - message: err.message, - stack: err.stack, - } - : undefined - // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) - // we need to ensure the job is updated to reflect the error - await updateJob({ - error: errorJSON, - hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - log: job.log, - processing: false, - totalTried: (job.totalTried ?? 0) + 1, + return { + status: hasFinalError ? 'error-reached-max-retries' : 'error', + } + } + + const { hasFinalError } = await handleWorkflowError({ + error: + error instanceof WorkflowError + ? error + : new WorkflowError({ + job, + message: + typeof error === 'object' && error && 'message' in error + ? (error.message as string) + : 'An unhandled error occurred', + workflowConfig, + }), + req, + updateJob, }) return { @@ -76,7 +74,7 @@ export const runJob = async ({ } } - // Workflow has completed + // Workflow has completed successfully await updateJob({ completedAt: new Date().toISOString(), log: job.log, diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts index d4addac8e..14c425a94 100644 --- a/packages/payload/src/queues/restEndpointRun.ts +++ b/packages/payload/src/queues/restEndpointRun.ts @@ -6,6 +6,9 @@ const configHasJobs = (config: SanitizedConfig): boolean => { return Boolean(config.jobs?.tasks?.length || config.jobs?.workflows?.length) } +/** + * /api/payload-jobs/run endpoint + */ export const runJobsEndpoint: Endpoint = { handler: async (req) => { if (!configHasJobs(req.payload.config)) { diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts index 3517e19dd..6ce4479ea 100644 --- a/packages/payload/src/queues/utilities/updateJob.ts +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -40,6 +40,11 @@ export async function updateJob(args: ArgsByID & BaseArgs) { } } +/** + * Helper for updating jobs in the most performant way possible. + * Handles deciding whether it can used direct db methods or not, and if so, + * manually runs the afterRead hook that populates the `taskStatus` property. + */ export async function updateJobs({ id, data, diff --git a/test/helpers/initPayloadInt.ts b/test/helpers/initPayloadInt.ts index 0ed59d95a..184db198a 100644 --- a/test/helpers/initPayloadInt.ts +++ b/test/helpers/initPayloadInt.ts @@ -9,18 +9,22 @@ import { NextRESTClient } from './NextRESTClient.js' /** * Initialize Payload configured for integration tests */ -export async function initPayloadInt( +export async function initPayloadInt( dirname: string, testSuiteNameOverride?: string, - initializePayload = true, -): Promise<{ config: SanitizedConfig; payload?: Payload; restClient?: NextRESTClient }> { + initializePayload?: TInitializePayload, +): Promise< + TInitializePayload extends false + ? { config: SanitizedConfig } + : { config: SanitizedConfig; payload: Payload; restClient: NextRESTClient } +> { const testSuiteName = testSuiteNameOverride ?? path.basename(dirname) await runInit(testSuiteName, false, true) console.log('importing config', path.resolve(dirname, 'config.ts')) const { default: config } = await import(path.resolve(dirname, 'config.ts')) - if (!initializePayload) { - return { config: await config } + if (initializePayload === false) { + return { config: await config } as any } console.log('starting payload') @@ -29,5 +33,5 @@ export async function initPayloadInt( console.log('initializing rest client') const restClient = new NextRESTClient(payload.config) console.log('initPayloadInt done') - return { config: payload.config, payload, restClient } + return { config: payload.config, payload, restClient } as any } diff --git a/test/queues/config.ts b/test/queues/config.ts index a5fb76a96..331d89fd1 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -9,6 +9,7 @@ import { devUser } from '../credentials.js' import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js' import { seed } from './seed.js' import { externalWorkflow } from './workflows/externalWorkflow.js' +import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js' import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' import { longRunningWorkflow } from './workflows/longRunning.js' @@ -393,6 +394,7 @@ export default buildConfigWithDefaults({ workflowRetries2TasksRetriesUndefinedWorkflow, workflowRetries2TasksRetries0Workflow, inlineTaskTestWorkflow, + failsImmediatelyWorkflow, inlineTaskTestDelayedWorkflow, externalWorkflow, retriesBackoffTestWorkflow, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 4a5001154..90c103175 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -41,6 +41,7 @@ describe('Queues', () => { if (data.token) { token = data.token } + payload.config.jobs.deleteJobOnComplete = true }) it('will run access control on jobs runner', async () => { @@ -182,7 +183,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(3) - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflow-level retries are respected', async () => { @@ -218,8 +218,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(2) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflows dont limit retries if no retries property is sett', async () => { @@ -255,8 +253,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(3) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => { @@ -292,8 +288,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(0) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => { @@ -329,8 +323,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(0) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => { @@ -366,8 +358,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(2) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => { @@ -403,8 +393,6 @@ describe('Queues', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(0) - - payload.config.jobs.deleteJobOnComplete = true }) /* @@ -492,7 +480,7 @@ describe('Queues', () => { id: job.id, }) expect(jobAfterRun.totalTried).toBe(5) - expect((jobAfterRun.taskStatus as JobTaskStatus).inline['1'].totalTried).toBe(5) + expect((jobAfterRun.taskStatus as JobTaskStatus).inline?.['1']?.totalTried).toBe(5) // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(4) @@ -518,7 +506,7 @@ describe('Queues', () => { if (index === arr.length - 1) { return null } - return new Date(arr[index + 1]).getTime() - new Date(time).getTime() + return new Date(arr[index + 1] as string).getTime() - new Date(time).getTime() }) .filter((p) => p !== null) @@ -527,8 +515,6 @@ describe('Queues', () => { expect(durations[1]).toBeGreaterThan(600) expect(durations[2]).toBeGreaterThan(1200) expect(durations[3]).toBeGreaterThan(2400) - - payload.config.jobs.deleteJobOnComplete = true }) it('ensure jobs run in FIFO order by default', async () => { @@ -647,7 +633,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('hello!') + expect(allSimples.docs[0]?.title).toBe('hello!') }) it('can create and autorun jobs', async () => { @@ -677,12 +663,12 @@ describe('Queues', () => { const { id } = await payload.jobs.queue({ workflow: 'inlineTaskTest', input: { - message: 'hello!', + message: 'deleteJobOnComplete test', }, }) const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) - expect(before.id).toBe(id) + expect(before?.id).toBe(id) await payload.jobs.run() @@ -690,6 +676,21 @@ describe('Queues', () => { expect(after).toBeNull() }) + it('should not delete failed jobs if deleteJobOnComplete is true', async () => { + const { id } = await payload.jobs.queue({ + workflow: 'failsImmediately', + input: {}, + }) + + const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) + expect(before?.id).toBe(id) + + await payload.jobs.run() + + const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) + expect(after?.id).toBe(id) + }) + it('should respect deleteJobOnComplete false configuration', async () => { payload.config.jobs.deleteJobOnComplete = false const { id } = await payload.jobs.queue({ @@ -700,14 +701,12 @@ describe('Queues', () => { }) const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) - expect(before.id).toBe(id) + expect(before?.id).toBe(id) await payload.jobs.run() const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) - expect(after.id).toBe(id) - - payload.config.jobs.deleteJobOnComplete = true + expect(after?.id).toBe(id) }) it('can queue single tasks', async () => { @@ -726,7 +725,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') }) it('can queue and run via the endpoint single tasks without workflows', async () => { @@ -751,7 +750,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') payload.config.jobs.workflows = workflowsRef }) @@ -885,8 +884,8 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(8) - expect(allSimples.docs[0].title).toBe('from single task') - expect(allSimples.docs[7].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') + expect(allSimples.docs[7]?.title).toBe('from single task') }) it('can queue single tasks hundreds of times', async () => { @@ -912,9 +911,8 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(numberOfTasks) // Default limit: 10 - expect(allSimples.docs[0].title).toBe('from single task') - expect(allSimples.docs[numberOfTasks - 1].title).toBe('from single task') - payload.config.jobs.deleteJobOnComplete = true + expect(allSimples.docs[0]?.title).toBe('from single task') + expect(allSimples.docs[numberOfTasks - 1]?.title).toBe('from single task') }) it('ensure default jobs run limit of 10 works', async () => { @@ -935,8 +933,8 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(10) // Default limit: 10 - expect(allSimples.docs[0].title).toBe('from single task') - expect(allSimples.docs[9].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') + expect(allSimples.docs[9]?.title).toBe('from single task') }) it('ensure jobs run limit can be customized', async () => { @@ -959,9 +957,9 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(42) // Default limit: 10 - expect(allSimples.docs[0].title).toBe('from single task') - expect(allSimples.docs[30].title).toBe('from single task') - expect(allSimples.docs[41].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') + expect(allSimples.docs[30]?.title).toBe('from single task') + expect(allSimples.docs[41]?.title).toBe('from single task') }) it('can queue different kinds of single tasks multiple times', async () => { @@ -1026,7 +1024,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('external') + expect(allSimples.docs[0]?.title).toBe('external') }) it('can queue external workflow that is running external task', async () => { @@ -1045,13 +1043,13 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('externalWorkflow') + expect(allSimples.docs[0]?.title).toBe('externalWorkflow') }) it('ensure payload.jobs.runByID works and only runs the specified job', async () => { payload.config.jobs.deleteJobOnComplete = false - let lastJobID: string = null + let lastJobID: null | string = null for (let i = 0; i < 3; i++) { const job = await payload.jobs.queue({ task: 'CreateSimple', @@ -1061,6 +1059,9 @@ describe('Queues', () => { }) lastJobID = job.id } + if (!lastJobID) { + throw new Error('No job ID found after queuing jobs') + } await payload.jobs.runByID({ id: lastJobID, @@ -1072,7 +1073,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') const allCompletedJobs = await payload.find({ collection: 'payload-jobs', @@ -1085,13 +1086,13 @@ describe('Queues', () => { }) expect(allCompletedJobs.totalDocs).toBe(1) - expect(allCompletedJobs.docs[0].id).toBe(lastJobID) + expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID) }) it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => { payload.config.jobs.deleteJobOnComplete = false - let lastJobID: string = null + let lastJobID: null | string = null for (let i = 0; i < 3; i++) { const job = await payload.jobs.queue({ task: 'CreateSimple', @@ -1101,6 +1102,9 @@ describe('Queues', () => { }) lastJobID = job.id } + if (!lastJobID) { + throw new Error('No job ID found after queuing jobs') + } await payload.jobs.run({ where: { @@ -1116,7 +1120,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('from single task') + expect(allSimples.docs[0]?.title).toBe('from single task') const allCompletedJobs = await payload.find({ collection: 'payload-jobs', @@ -1129,7 +1133,7 @@ describe('Queues', () => { }) expect(allCompletedJobs.totalDocs).toBe(1) - expect(allCompletedJobs.docs[0].id).toBe(lastJobID) + expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID) }) it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => { @@ -1158,7 +1162,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('from single task 2') + expect(allSimples.docs[0]?.title).toBe('from single task 2') const allCompletedJobs = await payload.find({ collection: 'payload-jobs', @@ -1171,7 +1175,7 @@ describe('Queues', () => { }) expect(allCompletedJobs.totalDocs).toBe(1) - expect((allCompletedJobs.docs[0].input as any).message).toBe('from single task 2') + expect((allCompletedJobs.docs[0]?.input as any).message).toBe('from single task 2') }) it('can run sub-tasks', async () => { @@ -1191,24 +1195,24 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(2) - expect(allSimples.docs[0].title).toBe('hello!') - expect(allSimples.docs[1].title).toBe('hello!') + expect(allSimples.docs[0]?.title).toBe('hello!') + expect(allSimples.docs[1]?.title).toBe('hello!') const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', id: job.id, }) - expect(jobAfterRun.log[0].taskID).toBe('create doc 1') + expect(jobAfterRun?.log?.[0]?.taskID).toBe('create doc 1') //expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs') // jobAfterRun.log[0].parent should not exist - expect(jobAfterRun.log[0].parent).toBeUndefined() + expect(jobAfterRun?.log?.[0]?.parent).toBeUndefined() - expect(jobAfterRun.log[1].taskID).toBe('create doc 2') + expect(jobAfterRun?.log?.[1]?.taskID).toBe('create doc 2') //expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs') - expect(jobAfterRun.log[1].parent).toBeUndefined() + expect(jobAfterRun?.log?.[1]?.parent).toBeUndefined() - expect(jobAfterRun.log[2].taskID).toBe('create two docs') + expect(jobAfterRun?.log?.[2]?.taskID).toBe('create two docs') }) it('ensure successful sub-tasks are not retried', async () => { @@ -1237,7 +1241,7 @@ describe('Queues', () => { }) expect(allSimples.totalDocs).toBe(1) - expect(allSimples.docs[0].title).toBe('hello!') + expect(allSimples?.docs?.[0]?.title).toBe('hello!') const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', @@ -1339,8 +1343,8 @@ describe('Queues', () => { expect(jobAfterRun.hasError).toBe(true) expect(jobAfterRun.log?.length).toBe(1) - expect(jobAfterRun.log[0].error.message).toBe('failed') - expect(jobAfterRun.log[0].state).toBe('failed') + expect(jobAfterRun?.log?.[0]?.error?.message).toBe('failed') + expect(jobAfterRun?.log?.[0]?.state).toBe('failed') }) it('can tasks return error', async () => { @@ -1360,8 +1364,8 @@ describe('Queues', () => { expect(jobAfterRun.hasError).toBe(true) expect(jobAfterRun.log?.length).toBe(1) - expect(jobAfterRun.log[0].error.message).toBe('failed') - expect(jobAfterRun.log[0].state).toBe('failed') + expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state') + expect(jobAfterRun?.log?.[0]?.state).toBe('failed') }) it('can tasks return error with custom error message', async () => { @@ -1383,8 +1387,8 @@ describe('Queues', () => { expect(jobAfterRun.hasError).toBe(true) expect(jobAfterRun.log?.length).toBe(1) - expect(jobAfterRun.log[0].error.message).toBe('custom error message') - expect(jobAfterRun.log[0].state).toBe('failed') + expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message') + expect(jobAfterRun?.log?.[0]?.state).toBe('failed') }) it('can reliably run workflows with parallel tasks', async () => { diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index b6b91c4f5..54945be89 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -123,6 +123,7 @@ export interface Config { workflowRetries2TasksRetriesUndefined: WorkflowWorkflowRetries2TasksRetriesUndefined; workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0; inlineTaskTest: WorkflowInlineTaskTest; + failsImmediately: WorkflowFailsImmediately; inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed; externalWorkflow: WorkflowExternalWorkflow; retriesBackoffTest: WorkflowRetriesBackoffTest; @@ -314,6 +315,7 @@ export interface PayloadJob { | 'workflowRetries2TasksRetriesUndefined' | 'workflowRetries2TasksRetries0' | 'inlineTaskTest' + | 'failsImmediately' | 'inlineTaskTestDelayed' | 'externalWorkflow' | 'retriesBackoffTest' @@ -724,6 +726,13 @@ export interface WorkflowInlineTaskTest { message: string; }; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowFailsImmediately". + */ +export interface WorkflowFailsImmediately { + input?: unknown; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "WorkflowInlineTaskTestDelayed". diff --git a/test/queues/seed.ts b/test/queues/seed.ts index 5c1f1d91e..fb8f56c36 100644 --- a/test/queues/seed.ts +++ b/test/queues/seed.ts @@ -1,14 +1,8 @@ import type { Payload } from 'payload' -import path from 'path' -import { fileURLToPath } from 'url' - import { devUser } from '../credentials.js' import { seedDB } from '../helpers/seed.js' -const filename = fileURLToPath(import.meta.url) -const dirname = path.dirname(filename) - export const seed = async (_payload: Payload) => { await _payload.create({ collection: 'users', @@ -22,9 +16,11 @@ export const seed = async (_payload: Payload) => { export async function clearAndSeedEverything(_payload: Payload) { return await seedDB({ _payload, - collectionSlugs: _payload.config.collections.map((collection) => collection.slug), + collectionSlugs: [ + ..._payload.config.collections.map((collection) => collection.slug), + 'payload-jobs', + ], seedFunction: seed, - snapshotKey: 'fieldsTest', - uploadsDir: path.resolve(dirname, './collections/Upload/uploads'), + snapshotKey: 'queuesTest', }) } diff --git a/test/queues/workflows/failsImmediately.ts b/test/queues/workflows/failsImmediately.ts new file mode 100644 index 000000000..9435e1923 --- /dev/null +++ b/test/queues/workflows/failsImmediately.ts @@ -0,0 +1,10 @@ +import type { WorkflowConfig } from 'payload' + +export const failsImmediatelyWorkflow: WorkflowConfig<'failsImmediately'> = { + slug: 'failsImmediately', + inputSchema: [], + retries: 0, + handler: () => { + throw new Error('This workflow fails immediately') + }, +}