From 032c4242447de1a2a63387b3c766daa3b633f90d Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Thu, 20 Mar 2025 11:31:14 -0600 Subject: [PATCH] perf: use direct db calls in job-queue system (#11489) Previously, our job queue system relied on `payload.*` operations, which ran very frequently: - whenever job execution starts, as all jobs need to be set to `processing: true` - every single time a task completes or fails, as the job log needs to be updated - whenever job execution stops, to mark it as completed and to delete it (if `deleteJobOnComplete` is set) This PR replaces these with direct `payload.db.*` calls, which are significantly faster than payload operations. Given how often the job queue system communicates with the database, this should be a massive performance improvement. ## How it affects running hooks To generate the task status, we previously used an `afterRead` hook. Since direct db adapter calls no longer execute hooks, this PR introduces new `updateJob` and `updateJobs` helpers to handle task status generation outside the normal payload hook lifecycle. Additionally, a new `runHooks` property has been added to the global job configuration. While setting this to `true` can be useful if custom hooks were added to the `payload-jobs` collection config, this will revert the job system to use normal payload operations. This should be avoided as it degrades performance. In most cases, the `onSuccess` or `onFail` properties in the job config will be sufficient and much faster. Furthermore, if the `depth` property is set in the global job configuration, the job queue system will also fall back to the slower, normal payload operations. --------- Co-authored-by: Dan Ribbens --- packages/payload/src/config/sanitize.ts | 14 +++ packages/payload/src/index.ts | 2 + packages/payload/src/queues/config/index.ts | 18 +-- .../payload/src/queues/config/types/index.ts | 15 ++- packages/payload/src/queues/localAPI.ts | 91 ++++++++------ .../src/queues/operations/runJobs/index.ts | 78 +++++++----- .../runJobs/runJob/getUpdateJobFunction.ts | 16 +-- .../runJobs/runJob/handleWorkflowError.ts | 2 +- .../queues/operations/runJobs/runJob/index.ts | 1 + .../queues/utilities/sanitizeUpdateData.ts | 28 +++++ .../payload/src/queues/utilities/updateJob.ts | 115 ++++++++++++++++++ test/queues/workflows/noRetriesSet.ts | 4 +- test/queues/workflows/retries0.ts | 3 +- test/queues/workflows/retriesBackoffTest.ts | 3 +- test/queues/workflows/retriesTest.ts | 3 +- .../workflows/retriesWorkflowLevelTest.ts | 3 +- test/queues/workflows/subTaskFails.ts | 8 +- .../workflowAndTasksRetriesUndefined.ts | 3 +- .../workflowRetries2TasksRetries0.ts | 3 +- .../workflowRetries2TasksRetriesUndefined.ts | 3 +- 20 files changed, 320 insertions(+), 93 deletions(-) create mode 100644 packages/payload/src/queues/utilities/sanitizeUpdateData.ts create mode 100644 packages/payload/src/queues/utilities/updateJob.ts diff --git a/packages/payload/src/config/sanitize.ts b/packages/payload/src/config/sanitize.ts index aaff0a1271..46ae83ad9e 100644 --- a/packages/payload/src/config/sanitize.ts +++ b/packages/payload/src/config/sanitize.ts @@ -286,6 +286,20 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise defaultAmount) { + console.warn( + `The jobsCollectionOverrides function is returning a collection with an additional ${hook} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`, + ) + break + } + } + } } const sanitizedJobsCollection = await sanitizeCollection( config as unknown as Config, diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index 0dad27afa9..f67e1af1e2 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -1390,7 +1390,9 @@ export type { PreferenceUpdateRequest, TabsPreferences, } from './preferences/types.js' +export { jobAfterRead } from './queues/config/index.js' export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js' + export type { RunInlineTaskFunction, RunTaskFunction, diff --git a/packages/payload/src/queues/config/index.ts b/packages/payload/src/queues/config/index.ts index a4220b9ad0..f4cb07924d 100644 --- a/packages/payload/src/queues/config/index.ts +++ b/packages/payload/src/queues/config/index.ts @@ -1,6 +1,7 @@ import type { CollectionConfig } from '../../collections/config/types.js' -import type { Config } from '../../config/types.js' +import type { Config, SanitizedConfig } from '../../config/types.js' import type { Field } from '../../fields/config/types.js' +import type { BaseJob } from './types/workflowTypes.js' import { runJobsEndpoint } from '../restEndpointRun.js' import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' @@ -211,12 +212,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu ({ doc, req }) => { // This hook is used to add the virtual `tasks` field to the document, that is computed from the `log` field - doc.taskStatus = getJobTaskStatus({ - jobLog: doc.log, - tasksConfig: req.payload.config.jobs.tasks, - }) - - return doc + return jobAfterRead({ config: req.payload.config, doc }) }, ], /** @@ -240,3 +236,11 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu return jobsCollection } + +export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: BaseJob }): BaseJob { + doc.taskStatus = getJobTaskStatus({ + jobLog: doc.log || [], + tasksConfig: config.jobs.tasks, + }) + return doc +} diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index d3b1836c54..d41fbe646b 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -68,7 +68,11 @@ export type JobsConfig = { /** * Specify depth for retrieving jobs from the queue. * This should be as low as possible in order for job retrieval - * to be as efficient as possible. Defaults to 0. + * to be as efficient as possible. Setting it to anything higher than + * 0 will drastically affect performance, as less efficient database + * queries will be used. + * + * @default 0 */ depth?: number /** @@ -76,6 +80,15 @@ export type JobsConfig = { * a new collection. */ jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig + /** + * By default, the job system uses direct database calls for optimal performance. + * If you added custom hooks to your jobs collection, you can set this to true to + * use the standard Payload API for all job operations. This is discouraged, as it will + * drastically affect performance. + * + * @default false + */ + runHooks?: boolean /** * A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function. * If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run. diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index d2cb3c55bd..01d3b0037b 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -8,8 +8,9 @@ import { type TypedJobs, type Where, } from '../index.js' -import { jobsCollectionSlug } from './config/index.js' +import { jobAfterRead, jobsCollectionSlug } from './config/index.js' import { runJobs } from './operations/runJobs/index.js' +import { updateJob, updateJobs } from './utilities/updateJob.js' export const getJobsLocalAPI = (payload: Payload) => ({ queue: async < @@ -72,13 +73,27 @@ export const getJobsLocalAPI = (payload: Payload) => ({ data.taskSlug = args.task as string } - return (await payload.create({ - collection: jobsCollectionSlug, - data, - req: args.req, - })) as TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] + type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] ? RunningJob : RunningJobFromTask // Type assertion is still needed here + + if (payload?.config?.jobs?.depth || payload?.config?.jobs?.runHooks) { + return (await payload.create({ + collection: jobsCollectionSlug, + data, + depth: payload.config.jobs.depth ?? 0, + req: args.req, + })) as ReturnType + } else { + return jobAfterRead({ + config: payload.config, + doc: await payload.db.create({ + collection: jobsCollectionSlug, + data, + req: args.req, + }), + }) as unknown as ReturnType + } }, run: async (args?: { @@ -143,37 +158,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ }) } - await payload.db.updateMany({ - collection: jobsCollectionSlug, - data: { - completedAt: null, - error: { - cancelled: true, - }, - hasError: true, - processing: false, - waitUntil: null, - } as Partial< - { - completedAt: null - waitUntil: null - } & BaseJob - >, - req: newReq, - where: { and }, - }) - }, - - cancelByID: async (args: { - id: number | string - overrideAccess?: boolean - req?: PayloadRequest - }): Promise => { - const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload)) - - await payload.db.updateOne({ - id: args.id, - collection: jobsCollectionSlug, + await updateJobs({ data: { completedAt: null, error: { @@ -186,7 +171,39 @@ export const getJobsLocalAPI = (payload: Payload) => ({ completedAt: null waitUntil: null } & BaseJob, + depth: 0, // No depth, since we're not returning + disableTransaction: true, req: newReq, + returning: false, + where: { and }, + }) + }, + + cancelByID: async (args: { + id: number | string + overrideAccess?: boolean + req?: PayloadRequest + }): Promise => { + const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload)) + + await updateJob({ + id: args.id, + data: { + completedAt: null, + error: { + cancelled: true, + }, + hasError: true, + processing: false, + waitUntil: null, + } as { + completedAt: null + waitUntil: null + } & BaseJob, + depth: 0, // No depth, since we're not returning + disableTransaction: true, + req: newReq, + returning: false, }) }, }) diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index e474401b6c..5da73361db 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -13,6 +13,7 @@ import type { RunJobResult } from './runJob/index.js' import { Forbidden } from '../../../errors/Forbidden.js' import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js' import { jobsCollectionSlug } from '../../config/index.js' +import { updateJob, updateJobs } from '../../utilities/updateJob.js' import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js' import { importHandlerPath } from './runJob/importHandlerPath.js' import { runJob } from './runJob/index.js' @@ -106,40 +107,45 @@ export const runJobs = async ({ // the same job being picked up by another worker const jobsQuery: { docs: BaseJob[] - } = id - ? { - docs: [ - (await req.payload.update({ - id, - collection: jobsCollectionSlug, - data: { - processing: true, - seenByWorker: true, - }, - depth: req.payload.config.jobs.depth, - disableTransaction: true, - showHiddenFields: true, - })) as BaseJob, - ], - } - : ((await req.payload.update({ - collection: jobsCollectionSlug, + } = { docs: [] } + + if (id) { + // Only one job to run + jobsQuery.docs = [ + await updateJob({ + id, data: { processing: true, - seenByWorker: true, }, depth: req.payload.config.jobs.depth, disableTransaction: true, - limit, - showHiddenFields: true, - where, - })) as unknown as PaginatedDocs) + req, + returning: true, + }), + ] + } else { + const updatedDocs = await updateJobs({ + data: { + processing: true, + }, + depth: req.payload.config.jobs.depth, + disableTransaction: true, + limit, + req, + returning: true, + where, + }) + + if (updatedDocs) { + jobsQuery.docs = updatedDocs + } + } /** * Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried). * This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing. */ - const { newJobs } = jobsQuery.docs.reduce( + const { existingJobs, newJobs } = jobsQuery.docs.reduce( (acc, job) => { if (job.totalTried > 0) { acc.existingJobs.push(job) @@ -159,7 +165,11 @@ export const runJobs = async ({ } if (jobsQuery?.docs?.length) { - req.payload.logger.info(`Running ${jobsQuery.docs.length} jobs.`) + req.payload.logger.info({ + msg: `Running ${jobsQuery.docs.length} jobs.`, + new: newJobs?.length, + retrying: existingJobs?.length, + }) } const jobsToDelete: (number | string)[] | undefined = req.payload.config.jobs.deleteJobOnComplete ? [] @@ -253,11 +263,19 @@ export const runJobs = async ({ if (jobsToDelete && jobsToDelete.length > 0) { try { - await req.payload.delete({ - collection: jobsCollectionSlug, - req, - where: { id: { in: jobsToDelete } }, - }) + if (req.payload.config.jobs.runHooks) { + await req.payload.delete({ + collection: jobsCollectionSlug, + depth: 0, // can be 0 since we're not returning anything + disableTransaction: true, + where: { id: { in: jobsToDelete } }, + }) + } else { + await req.payload.db.deleteMany({ + collection: jobsCollectionSlug, + where: { id: { in: jobsToDelete } }, + }) + } } catch (err) { req.payload.logger.error({ err, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts index 5967792ec2..a4b97f88b2 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts @@ -2,27 +2,29 @@ import type { PayloadRequest } from '../../../../types/index.js' import type { BaseJob } from '../../../config/types/workflowTypes.js' -import { jobsCollectionSlug } from '../../../config/index.js' +import { updateJob } from '../../../utilities/updateJob.js' export type UpdateJobFunction = (jobData: Partial) => Promise export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction { return async (jobData) => { - const updatedJob = (await req.payload.update({ + const updatedJob = await updateJob({ id: job.id, - collection: jobsCollectionSlug, data: jobData, - depth: 0, + depth: req.payload.config.jobs.depth, disableTransaction: true, - })) as BaseJob + req, + }) // Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function for (const key in updatedJob) { job[key] = updatedJob[key] } - if ((updatedJob.error as any)?.cancelled) { - throw new Error('Job cancelled') + if ((updatedJob.error as Record)?.cancelled) { + const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error + cancelledError.cancelled = true + throw cancelledError } return updatedJob diff --git a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts index 2e0ebadb80..0defe0de07 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts @@ -26,7 +26,7 @@ export function handleWorkflowError({ } { const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` - let hasFinalError = state.reachedMaxRetries // If any TASK reached max retries, the job has an error + let hasFinalError = state.reachedMaxRetries || !!('cancelled' in error && error.cancelled) // If any TASK reached max retries, the job has an error const maxWorkflowRetries: number = (typeof workflowConfig.retries === 'object' ? workflowConfig.retries.attempts diff --git a/packages/payload/src/queues/operations/runJobs/runJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJob/index.ts index 176ca11695..cbc3dce169 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/index.ts @@ -60,6 +60,7 @@ export const runJob = async ({ const errorJSON = hasFinalError ? { name: err.name, + cancelled: Boolean('cancelled' in err && err.cancelled), message: err.message, stack: err.stack, } diff --git a/packages/payload/src/queues/utilities/sanitizeUpdateData.ts b/packages/payload/src/queues/utilities/sanitizeUpdateData.ts new file mode 100644 index 0000000000..9eb7fce2d7 --- /dev/null +++ b/packages/payload/src/queues/utilities/sanitizeUpdateData.ts @@ -0,0 +1,28 @@ +import ObjectIdImport from 'bson-objectid' + +import type { BaseJob } from '../config/types/workflowTypes.js' + +const ObjectId = (ObjectIdImport.default || + ObjectIdImport) as unknown as typeof ObjectIdImport.default + +/** + * Our payload operations sanitize the input data to, for example, add missing IDs to array rows. + * This function is used to manually sanitize the data for direct db adapter operations + */ +export function sanitizeUpdateData({ data }: { data: Partial }): Partial { + if (data.log) { + const sanitizedData = { ...data } + sanitizedData.log = sanitizedData?.log?.map((log) => { + if (log.id) { + return log + } + return { + ...log, + id: new ObjectId().toHexString(), + } + }) + return sanitizedData + } + + return data +} diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts new file mode 100644 index 0000000000..90867620c6 --- /dev/null +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -0,0 +1,115 @@ +import type { ManyOptions } from '../../collections/operations/local/update.js' +import type { PayloadRequest, Where } from '../../types/index.js' +import type { BaseJob } from '../config/types/workflowTypes.js' + +import { jobAfterRead, jobsCollectionSlug } from '../config/index.js' +import { sanitizeUpdateData } from './sanitizeUpdateData.js' + +type BaseArgs = { + data: Partial + depth?: number + disableTransaction?: boolean + limit?: number + req: PayloadRequest + returning?: boolean +} + +type ArgsByID = { + id: number | string + limit?: never + where?: never +} + +/** + * Convenience method for updateJobs by id + */ +export async function updateJob(args: ArgsByID & BaseArgs) { + const result = await updateJobs({ + ...args, + id: undefined, + limit: 1, + where: { id: { equals: args.id } }, + }) + if (result) { + return result[0] + } +} + +type ArgsWhere = { + id?: never | undefined + limit?: number + where: Where +} + +type RunJobsArgs = (ArgsByID | ArgsWhere) & BaseArgs + +export async function updateJobs({ + id, + data, + depth, + disableTransaction, + limit: limitArg, + req, + returning, + where, +}: RunJobsArgs): Promise { + const limit = id ? 1 : limitArg + if (depth || req.payload.config?.jobs?.runHooks) { + const result = await req.payload.update({ + id, + collection: jobsCollectionSlug, + data, + depth, + disableTransaction, + limit, + req, + where, + } as ManyOptions) + if (returning === false || !result) { + return null + } + return result.docs as BaseJob[] + } + + const updatedJobs = [] + + // TODO: this can be optimized in the future - partial updates are supported in mongodb. In postgres, + // we can support this by manually constructing the sql query. We should use req.payload.db.updateMany instead + // of req.payload.db.updateOne once this is supported + const jobsToUpdate = await req.payload.db.find({ + collection: jobsCollectionSlug, + limit, + pagination: false, + req: disableTransaction === true ? undefined : req, + where, + }) + if (!jobsToUpdate?.docs) { + return null + } + + for (const job of jobsToUpdate.docs) { + const updateData = { + ...job, + ...data, + } + const updatedJob = await req.payload.db.updateOne({ + id: job.id, + collection: jobsCollectionSlug, + data: sanitizeUpdateData({ data: updateData }), + req: disableTransaction === true ? undefined : req, + returning, + }) + updatedJobs.push(updatedJob) + } + + if (returning === false || !updatedJobs?.length) { + return null + } + + return updatedJobs.map((updatedJob) => { + return jobAfterRead({ + config: req.payload.config, + doc: updatedJob, + }) + }) +} diff --git a/test/queues/workflows/noRetriesSet.ts b/test/queues/workflows/noRetriesSet.ts index b549cd0df8..2639dcaf27 100644 --- a/test/queues/workflows/noRetriesSet.ts +++ b/test/queues/workflows/noRetriesSet.ts @@ -10,7 +10,7 @@ export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = { }, ], handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -23,6 +23,8 @@ export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = { id: job.id, }) + job.input = updatedJob.input as any + await tasks.CreateSimple('1', { input: { message: job.input.message, diff --git a/test/queues/workflows/retries0.ts b/test/queues/workflows/retries0.ts index 1eb7b06d2d..3f1f700ad0 100644 --- a/test/queues/workflows/retries0.ts +++ b/test/queues/workflows/retries0.ts @@ -11,7 +11,7 @@ export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = { ], retries: 0, handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -23,6 +23,7 @@ export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = { }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimple('1', { input: { diff --git a/test/queues/workflows/retriesBackoffTest.ts b/test/queues/workflows/retriesBackoffTest.ts index 27e6b20b70..7fb03d1cb4 100644 --- a/test/queues/workflows/retriesBackoffTest.ts +++ b/test/queues/workflows/retriesBackoffTest.ts @@ -45,13 +45,14 @@ export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> = // @ts-expect-error timeTried is new arbitrary data and not in the type job.input.timeTried[totalTried] = new Date().toISOString() - await req.payload.update({ + const updated = await req.payload.update({ collection: 'payload-jobs', data: { input: job.input, }, id: job.id, }) + job.input = updated.input as any if (totalTried < 4) { // Cleanup the post diff --git a/test/queues/workflows/retriesTest.ts b/test/queues/workflows/retriesTest.ts index f462caf501..50b47993fa 100644 --- a/test/queues/workflows/retriesTest.ts +++ b/test/queues/workflows/retriesTest.ts @@ -10,7 +10,7 @@ export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = { }, ], handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -22,6 +22,7 @@ export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = { }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimple('1', { input: { diff --git a/test/queues/workflows/retriesWorkflowLevelTest.ts b/test/queues/workflows/retriesWorkflowLevelTest.ts index d34ded952b..2b653db16f 100644 --- a/test/queues/workflows/retriesWorkflowLevelTest.ts +++ b/test/queues/workflows/retriesWorkflowLevelTest.ts @@ -11,7 +11,7 @@ export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLe ], retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -23,6 +23,7 @@ export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLe }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimple('1', { input: { diff --git a/test/queues/workflows/subTaskFails.ts b/test/queues/workflows/subTaskFails.ts index 1c1f142415..16df107114 100644 --- a/test/queues/workflows/subTaskFails.ts +++ b/test/queues/workflows/subTaskFails.ts @@ -23,7 +23,7 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = { }, }) - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -38,6 +38,8 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = { }, id: job.id, }) + job.input = updatedJob.input as any + return { output: { newSimple, @@ -48,7 +50,7 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = { await inlineTask('create doc 2 - fails', { task: async ({ req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -63,6 +65,8 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = { }, id: job.id, }) + job.input = updatedJob.input as any + throw new Error('Failed on purpose') }, }) diff --git a/test/queues/workflows/workflowAndTasksRetriesUndefined.ts b/test/queues/workflows/workflowAndTasksRetriesUndefined.ts index 03e89957ed..9b4c21e248 100644 --- a/test/queues/workflows/workflowAndTasksRetriesUndefined.ts +++ b/test/queues/workflows/workflowAndTasksRetriesUndefined.ts @@ -11,7 +11,7 @@ export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowA }, ], handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -23,6 +23,7 @@ export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowA }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimpleRetriesUndefined('1', { input: { diff --git a/test/queues/workflows/workflowRetries2TasksRetries0.ts b/test/queues/workflows/workflowRetries2TasksRetries0.ts index 7cf803dc26..936c97a62a 100644 --- a/test/queues/workflows/workflowRetries2TasksRetries0.ts +++ b/test/queues/workflows/workflowRetries2TasksRetries0.ts @@ -12,7 +12,7 @@ export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetr }, ], handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -24,6 +24,7 @@ export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetr }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimpleRetries0('1', { input: { diff --git a/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts b/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts index 3bbd9ee78e..785fc6ccdd 100644 --- a/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts +++ b/test/queues/workflows/workflowRetries2TasksRetriesUndefined.ts @@ -12,7 +12,7 @@ export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'work }, ], handler: async ({ job, tasks, req }) => { - await req.payload.update({ + const updatedJob = await req.payload.update({ collection: 'payload-jobs', data: { input: { @@ -24,6 +24,7 @@ export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'work }, id: job.id, }) + job.input = updatedJob.input as any await tasks.CreateSimpleRetriesUndefined('1', { input: {