From c844b4c848c21bc5c73634bf328f98bcc05c0bb3 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Mon, 31 Mar 2025 15:00:36 -0600 Subject: [PATCH] feat: configurable job queue processing order (LIFO/FIFO), allow sequential execution of jobs (#11897) Previously, jobs were executed in FIFO order on MongoDB, and LIFO on Postgres, with no way to configure this behavior. This PR makes FIFO the default on both MongoDB and Postgres and introduces the following new options to configure the processing order globally or on a queue-by-queue basis: - a `processingOrder` property to the jobs config - a `processingOrder` argument to `payload.jobs.run()` to override what's set in the jobs config It also adds a new `sequential` option to `payload.jobs.run()`, which can be useful for debugging. --- docs/jobs-queue/queues.mdx | 81 +++++++++++++- packages/db-mongodb/src/updateJobs.ts | 22 +++- .../payload/src/queues/config/types/index.ts | 19 +++- packages/payload/src/queues/localAPI.ts | 14 +++ .../src/queues/operations/runJobs/index.ts | 69 +++++++++--- test/queues/config.ts | 7 ++ test/queues/int.spec.ts | 100 ++++++++++++++++++ test/queues/payload-types.ts | 11 ++ .../queues/workflows/inlineTaskTestDelayed.ts | 38 +++++++ 9 files changed, 340 insertions(+), 21 deletions(-) create mode 100644 test/queues/workflows/inlineTaskTestDelayed.ts diff --git a/docs/jobs-queue/queues.mdx b/docs/jobs-queue/queues.mdx index 43c2ab6ef..e255a4f00 100644 --- a/docs/jobs-queue/queues.mdx +++ b/docs/jobs-queue/queues.mdx @@ -28,7 +28,7 @@ Then, you could configure two different runner strategies: As mentioned above, you can queue jobs, but the jobs won't run unless a worker picks up your jobs and runs them. This can be done in four ways: -#### Cron jobs +### Cron jobs You can use the `jobs.autoRun` property to configure cron jobs: @@ -63,7 +63,7 @@ export default buildConfig({ and should not be used on serverless platforms like Vercel. -#### Endpoint +### Endpoint You can execute jobs by making a fetch request to the `/api/payload-jobs/run` endpoint: @@ -130,7 +130,7 @@ This works because Vercel automatically makes the `CRON_SECRET` environment vari After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background. -#### Local API +### Local API If you want to process jobs programmatically from your server-side code, you can use the Local API: @@ -156,7 +156,7 @@ const results = await payload.jobs.runByID({ }) ``` -#### Bin script +### Bin script Finally, you can process jobs via the bin script that comes with Payload out of the box. @@ -169,3 +169,76 @@ In addition, the bin script allows you to pass a `--cron` flag to the `jobs:run` ```sh npx payload jobs:run --cron "*/5 * * * *" ``` + +## Processing Order + +By default, jobs are processed first in, first out (FIFO). This means that the first job added to the queue will be the first one processed. However, you can also configure the order in which jobs are processed. + +### Jobs Configuration + +You can configure the order in which jobs are processed in the jobs configuration by passing the `processingOrder` property. This mimics the Payload [sort](../queries/sort) property that's used for functionality such as `payload.find()`. + +```ts +export default buildConfig({ + // Other configurations... + jobs: { + tasks: [ + // your tasks here + ], + processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO + }, +}) +``` + +You can also set this on a queue-by-queue basis: + +```ts +export default buildConfig({ + // Other configurations... + jobs: { + tasks: [ + // your tasks here + ], + processingOrder: { + default: 'createdAt', // FIFO + queues: { + nightly: '-createdAt', // LIFO + myQueue: '-createdAt', // LIFO + }, + }, + }, +}) +``` + +If you need even more control over the processing order, you can pass a function that returns the processing order - this function will be called every time a queue starts processing jobs. + +```ts +export default buildConfig({ + // Other configurations... + jobs: { + tasks: [ + // your tasks here + ], + processingOrder: ({ queue }) => { + if (queue === 'myQueue') { + return '-createdAt' // LIFO + } + return 'createdAt' // FIFO + }, + }, +}) +``` + +### Local API + +You can configure the order in which jobs are processed in the `payload.jobs.queue` method by passing the `processingOrder` property. + +```ts +const createdJob = await payload.jobs.queue({ + workflow: 'createPostAndUpdate', + input: { + title: 'my title', + }, + processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO +}) +``` diff --git a/packages/db-mongodb/src/updateJobs.ts b/packages/db-mongodb/src/updateJobs.ts index ba72887c0..cac9c0dc2 100644 --- a/packages/db-mongodb/src/updateJobs.ts +++ b/packages/db-mongodb/src/updateJobs.ts @@ -4,6 +4,7 @@ import type { BaseJob, UpdateJobs, Where } from 'payload' import type { MongooseAdapter } from './index.js' import { buildQuery } from './queries/buildQuery.js' +import { buildSortParam } from './queries/buildSortParam.js' import { getCollection } from './utilities/getEntity.js' import { getSession } from './utilities/getSession.js' import { handleError } from './utilities/handleError.js' @@ -11,7 +12,7 @@ import { transform } from './utilities/transform.js' export const updateJobs: UpdateJobs = async function updateMany( this: MongooseAdapter, - { id, data, limit, req, returning, where: whereArg }, + { id, data, limit, req, returning, sort: sortArg, where: whereArg }, ) { if (!(data?.log as object[])?.length) { delete data.log @@ -23,6 +24,14 @@ export const updateJobs: UpdateJobs = async function updateMany( collectionSlug: 'payload-jobs', }) + const sort: Record | undefined = buildSortParam({ + adapter: this, + config: this.payload.config, + fields: collectionConfig.flattenedFields, + sort: sortArg || collectionConfig.defaultSort, + timestamps: true, + }) + const options: MongooseUpdateQueryOptions = { lean: true, new: true, @@ -54,7 +63,7 @@ export const updateJobs: UpdateJobs = async function updateMany( const documentsToUpdate = await Model.find( query, {}, - { ...options, limit, projection: { _id: 1 } }, + { ...options, limit, projection: { _id: 1 }, sort }, ) if (documentsToUpdate.length === 0) { return null @@ -69,7 +78,14 @@ export const updateJobs: UpdateJobs = async function updateMany( return null } - result = await Model.find(query, {}, options) + result = await Model.find( + query, + {}, + { + ...options, + sort, + }, + ) } } catch (error) { handleError({ collection: collectionConfig.slug, error, req }) diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index d41fbe646..916724fd8 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -1,5 +1,6 @@ import type { CollectionConfig } from '../../../index.js' -import type { Payload, PayloadRequest } from '../../../types/index.js' +import type { Payload, PayloadRequest, Sort } from '../../../types/index.js' +import type { RunJobsArgs } from '../../operations/runJobs/index.js' import type { TaskConfig } from './taskTypes.js' import type { WorkflowConfig } from './workflowTypes.js' @@ -80,6 +81,22 @@ export type JobsConfig = { * a new collection. */ jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig + /** + * Adjust the job processing order using a Payload sort string. This can be set globally or per queue. + * + * FIFO would equal `createdAt` and LIFO would equal `-createdAt`. + * + * @default all jobs for all queues will be executed in FIFO order. + */ + processingOrder?: + | ((args: RunJobsArgs) => Promise | Sort) + | { + default?: Sort + queues: { + [queue: string]: Sort + } + } + | Sort /** * By default, the job system uses direct database calls for optimal performance. * If you added custom hooks to your jobs collection, you can set this to true to diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index 01d3b0037..b23e68936 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -5,6 +5,7 @@ import { type Payload, type PayloadRequest, type RunningJob, + type Sort, type TypedJobs, type Where, } from '../index.js' @@ -99,8 +100,19 @@ export const getJobsLocalAPI = (payload: Payload) => ({ run: async (args?: { limit?: number overrideAccess?: boolean + /** + * Adjust the job processing order using a Payload sort string. + * + * FIFO would equal `createdAt` and LIFO would equal `-createdAt`. + */ + processingOrder?: Sort queue?: string req?: PayloadRequest + /** + * By default, jobs are run in parallel. + * If you want to run them in sequence, set this to true. + */ + sequential?: boolean where?: Where }): Promise> => { const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) @@ -108,8 +120,10 @@ export const getJobsLocalAPI = (payload: Payload) => ({ return await runJobs({ limit: args?.limit, overrideAccess: args?.overrideAccess !== false, + processingOrder: args?.processingOrder, queue: args?.queue, req: newReq, + sequential: args?.sequential, where: args?.where, }) }, diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index 5da73361d..c8ba056a0 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -1,6 +1,5 @@ // @ts-strict-ignore -import type { PaginatedDocs } from '../../../database/types.js' -import type { PayloadRequest, Where } from '../../../types/index.js' +import type { PayloadRequest, Sort, Where } from '../../../types/index.js' import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js' import type { BaseJob, @@ -26,8 +25,21 @@ export type RunJobsArgs = { id?: number | string limit?: number overrideAccess?: boolean + /** + * Adjust the job processing order + * + * FIFO would equal `createdAt` and LIFO would equal `-createdAt`. + * + * @default all jobs for all queues will be executed in FIFO order. + */ + processingOrder?: Sort queue?: string req: PayloadRequest + /** + * By default, jobs are run in parallel. + * If you want to run them in sequence, set this to true. + */ + sequential?: boolean where?: Where } @@ -43,14 +55,18 @@ export type RunJobsResult = { remainingJobsFromQueried: number } -export const runJobs = async ({ - id, - limit = 10, - overrideAccess, - queue, - req, - where: whereFromProps, -}: RunJobsArgs): Promise => { +export const runJobs = async (args: RunJobsArgs): Promise => { + const { + id, + limit = 10, + overrideAccess, + processingOrder, + queue, + req, + sequential, + where: whereFromProps, + } = args + if (!overrideAccess) { const hasAccess = await req.payload.config.jobs.access.run({ req }) if (!hasAccess) { @@ -124,6 +140,21 @@ export const runJobs = async ({ }), ] } else { + let defaultProcessingOrder: Sort = + req.payload.collections[jobsCollectionSlug].config.defaultSort ?? 'createdAt' + + const processingOrderConfig = req.payload.config.jobs?.processingOrder + if (typeof processingOrderConfig === 'function') { + defaultProcessingOrder = await processingOrderConfig(args) + } else if (typeof processingOrderConfig === 'object' && !Array.isArray(processingOrderConfig)) { + if (queue && processingOrderConfig.queues && processingOrderConfig.queues[queue]) { + defaultProcessingOrder = processingOrderConfig.queues[queue] + } else if (processingOrderConfig.default) { + defaultProcessingOrder = processingOrderConfig.default + } + } else if (typeof processingOrderConfig === 'string') { + defaultProcessingOrder = processingOrderConfig + } const updatedDocs = await updateJobs({ data: { processing: true, @@ -133,6 +164,7 @@ export const runJobs = async ({ limit, req, returning: true, + sort: processingOrder ?? defaultProcessingOrder, where, }) @@ -175,7 +207,7 @@ export const runJobs = async ({ ? [] : undefined - const jobPromises = jobsQuery.docs.map(async (job) => { + const runSingleJob = async (job) => { if (!job.workflowSlug && !job.taskSlug) { throw new Error('Job must have either a workflowSlug or a taskSlug') } @@ -257,9 +289,20 @@ export const runJobs = async ({ return { id: job.id, result } } - }) + } - const resultsArray = await Promise.all(jobPromises) + let resultsArray: { id: number | string; result: RunJobResult }[] = [] + if (sequential) { + for (const job of jobsQuery.docs) { + const result = await runSingleJob(job) + if (result !== null) { + resultsArray.push(result) + } + } + } else { + const jobPromises = jobsQuery.docs.map(runSingleJob) + resultsArray = await Promise.all(jobPromises) + } if (jobsToDelete && jobsToDelete.length > 0) { try { diff --git a/test/queues/config.ts b/test/queues/config.ts index 5cbd83fbc..5bdc9c8c4 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -24,6 +24,7 @@ import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js' import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js' import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js' import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js' +import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js' const filename = fileURLToPath(import.meta.url) @@ -104,6 +105,11 @@ export default buildConfigWithDefaults({ }, } }, + processingOrder: { + queues: { + lifo: '-createdAt', + }, + }, tasks: [ { retries: 2, @@ -376,6 +382,7 @@ export default buildConfigWithDefaults({ workflowRetries2TasksRetriesUndefinedWorkflow, workflowRetries2TasksRetries0Workflow, inlineTaskTestWorkflow, + inlineTaskTestDelayedWorkflow, externalWorkflow, retriesBackoffTestWorkflow, subTaskWorkflow, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index d196075d4..315e6581c 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -533,6 +533,106 @@ describe('Queues', () => { payload.config.jobs.deleteJobOnComplete = true }) + it('ensure jobs run in FIFO order by default', async () => { + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + input: { + message: 'task 1', + }, + }) + + await new Promise((resolve) => setTimeout(resolve, 100)) + + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + input: { + message: 'task 2', + }, + }) + + await payload.jobs.run({ + sequential: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + sort: 'createdAt', + }) + + expect(allSimples.totalDocs).toBe(2) + expect(allSimples.docs?.[0]?.title).toBe('task 1') + expect(allSimples.docs?.[1]?.title).toBe('task 2') + }) + + it('ensure jobs can run LIFO if processingOrder is passed', async () => { + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + input: { + message: 'task 1', + }, + }) + + await new Promise((resolve) => setTimeout(resolve, 100)) + + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + input: { + message: 'task 2', + }, + }) + + await payload.jobs.run({ + sequential: true, + processingOrder: '-createdAt', + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + sort: 'createdAt', + }) + + expect(allSimples.totalDocs).toBe(2) + expect(allSimples.docs?.[0]?.title).toBe('task 2') + expect(allSimples.docs?.[1]?.title).toBe('task 1') + }) + + it('ensure job config processingOrder using queues object is respected', async () => { + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + queue: 'lifo', + input: { + message: 'task 1', + }, + }) + + await new Promise((resolve) => setTimeout(resolve, 100)) + + await payload.jobs.queue({ + workflow: 'inlineTaskTestDelayed', + queue: 'lifo', + input: { + message: 'task 2', + }, + }) + + await payload.jobs.run({ + sequential: true, + queue: 'lifo', + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + sort: 'createdAt', + }) + + expect(allSimples.totalDocs).toBe(2) + expect(allSimples.docs?.[0]?.title).toBe('task 2') + expect(allSimples.docs?.[1]?.title).toBe('task 1') + }) + it('can create new inline jobs', async () => { await payload.jobs.queue({ workflow: 'inlineTaskTest', diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index ba1be1786..eb7892fa4 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -123,6 +123,7 @@ export interface Config { workflowRetries2TasksRetriesUndefined: WorkflowWorkflowRetries2TasksRetriesUndefined; workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0; inlineTaskTest: WorkflowInlineTaskTest; + inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed; externalWorkflow: WorkflowExternalWorkflow; retriesBackoffTest: WorkflowRetriesBackoffTest; subTask: WorkflowSubTask; @@ -313,6 +314,7 @@ export interface PayloadJob { | 'workflowRetries2TasksRetriesUndefined' | 'workflowRetries2TasksRetries0' | 'inlineTaskTest' + | 'inlineTaskTestDelayed' | 'externalWorkflow' | 'retriesBackoffTest' | 'subTask' @@ -722,6 +724,15 @@ export interface WorkflowInlineTaskTest { message: string; }; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowInlineTaskTestDelayed". + */ +export interface WorkflowInlineTaskTestDelayed { + input: { + message: string; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "WorkflowExternalWorkflow". diff --git a/test/queues/workflows/inlineTaskTestDelayed.ts b/test/queues/workflows/inlineTaskTestDelayed.ts new file mode 100644 index 000000000..5d81a7b7c --- /dev/null +++ b/test/queues/workflows/inlineTaskTestDelayed.ts @@ -0,0 +1,38 @@ +import type { WorkflowConfig } from 'payload' + +export const inlineTaskTestDelayedWorkflow: WorkflowConfig<'inlineTaskTestDelayed'> = { + slug: 'inlineTaskTestDelayed', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + await inlineTask('1', { + task: async ({ input, req }) => { + // Wait 100ms + await new Promise((resolve) => setTimeout(resolve, 100)) + + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + await new Promise((resolve) => setTimeout(resolve, 100)) + + return { + output: { + simpleID: newSimple.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, +}