Previously, our job queue system relied on `payload.*` operations, which ran very frequently: - whenever job execution starts, as all jobs need to be set to `processing: true` - every single time a task completes or fails, as the job log needs to be updated - whenever job execution stops, to mark it as completed and to delete it (if `deleteJobOnComplete` is set) This PR replaces these with direct `payload.db.*` calls, which are significantly faster than payload operations. Given how often the job queue system communicates with the database, this should be a massive performance improvement. ## How it affects running hooks To generate the task status, we previously used an `afterRead` hook. Since direct db adapter calls no longer execute hooks, this PR introduces new `updateJob` and `updateJobs` helpers to handle task status generation outside the normal payload hook lifecycle. Additionally, a new `runHooks` property has been added to the global job configuration. While setting this to `true` can be useful if custom hooks were added to the `payload-jobs` collection config, this will revert the job system to use normal payload operations. This should be avoided as it degrades performance. In most cases, the `onSuccess` or `onFail` properties in the job config will be sufficient and much faster. Furthermore, if the `depth` property is set in the global job configuration, the job queue system will also fall back to the slower, normal payload operations. --------- Co-authored-by: Dan Ribbens <DanRibbens@users.noreply.github.com>
83 lines
2.2 KiB
TypeScript
83 lines
2.2 KiB
TypeScript
import type { WorkflowConfig } from 'payload'
|
|
|
|
export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> = {
|
|
slug: 'retriesBackoffTest',
|
|
inputSchema: [
|
|
{
|
|
name: 'message',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ job, inlineTask, req }) => {
|
|
const newJob = await req.payload.update({
|
|
collection: 'payload-jobs',
|
|
data: {
|
|
input: {
|
|
...job.input,
|
|
amountRetried:
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
|
|
},
|
|
},
|
|
id: job.id,
|
|
})
|
|
job.input = newJob.input as any
|
|
|
|
await inlineTask('1', {
|
|
task: async ({ req }) => {
|
|
const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0
|
|
|
|
const { id } = await req.payload.create({
|
|
collection: 'simple',
|
|
req,
|
|
data: {
|
|
title: 'should not exist',
|
|
},
|
|
})
|
|
|
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
if (!job.input.timeTried) {
|
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
job.input.timeTried = {}
|
|
}
|
|
|
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
job.input.timeTried[totalTried] = new Date().toISOString()
|
|
|
|
const updated = await req.payload.update({
|
|
collection: 'payload-jobs',
|
|
data: {
|
|
input: job.input,
|
|
},
|
|
id: job.id,
|
|
})
|
|
job.input = updated.input as any
|
|
|
|
if (totalTried < 4) {
|
|
// Cleanup the post
|
|
await req.payload.delete({
|
|
collection: 'simple',
|
|
id,
|
|
req,
|
|
})
|
|
|
|
// Last try it should succeed
|
|
throw new Error('Failed on purpose')
|
|
}
|
|
return {
|
|
output: {},
|
|
}
|
|
},
|
|
retries: {
|
|
attempts: 4,
|
|
backoff: {
|
|
type: 'exponential',
|
|
// Should retry in 300ms, then 600, then 1200, then 2400, then succeed
|
|
delay: 300,
|
|
},
|
|
},
|
|
})
|
|
},
|
|
}
|