Files
payloadcms/test/queues/workflows/retriesBackoffTest.ts
Alessio Gravili 032c424244 perf: use direct db calls in job-queue system (#11489)
Previously, our job queue system relied on `payload.*` operations, which
ran very frequently:
- whenever job execution starts, as all jobs need to be set to
`processing: true`
- every single time a task completes or fails, as the job log needs to
be updated
- whenever job execution stops, to mark it as completed and to delete it
(if `deleteJobOnComplete` is set)

This PR replaces these with direct `payload.db.*` calls, which are
significantly faster than payload operations. Given how often the job
queue system communicates with the database, this should be a massive
performance improvement.

## How it affects running hooks

To generate the task status, we previously used an `afterRead` hook.
Since direct db adapter calls no longer execute hooks, this PR
introduces new `updateJob` and `updateJobs` helpers to handle task
status generation outside the normal payload hook lifecycle.

Additionally, a new `runHooks` property has been added to the global job
configuration. While setting this to `true` can be useful if custom
hooks were added to the `payload-jobs` collection config, this will
revert the job system to use normal payload operations.
This should be avoided as it degrades performance. In most cases, the
`onSuccess` or `onFail` properties in the job config will be sufficient
and much faster.

Furthermore, if the `depth` property is set in the global job
configuration, the job queue system will also fall back to the slower,
normal payload operations.

---------

Co-authored-by: Dan Ribbens <DanRibbens@users.noreply.github.com>
2025-03-20 13:31:14 -04:00

83 lines
2.2 KiB
TypeScript

import type { WorkflowConfig } from 'payload'
export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> = {
slug: 'retriesBackoffTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, inlineTask, req }) => {
const newJob = await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountRetried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
},
},
id: job.id,
})
job.input = newJob.input as any
await inlineTask('1', {
task: async ({ req }) => {
const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0
const { id } = await req.payload.create({
collection: 'simple',
req,
data: {
title: 'should not exist',
},
})
// @ts-expect-error timeTried is new arbitrary data and not in the type
if (!job.input.timeTried) {
// @ts-expect-error timeTried is new arbitrary data and not in the type
job.input.timeTried = {}
}
// @ts-expect-error timeTried is new arbitrary data and not in the type
job.input.timeTried[totalTried] = new Date().toISOString()
const updated = await req.payload.update({
collection: 'payload-jobs',
data: {
input: job.input,
},
id: job.id,
})
job.input = updated.input as any
if (totalTried < 4) {
// Cleanup the post
await req.payload.delete({
collection: 'simple',
id,
req,
})
// Last try it should succeed
throw new Error('Failed on purpose')
}
return {
output: {},
}
},
retries: {
attempts: 4,
backoff: {
type: 'exponential',
// Should retry in 300ms, then 600, then 1200, then 2400, then succeed
delay: 300,
},
},
})
},
}