feat: configurable job queue processing order (LIFO/FIFO), allow sequential execution of jobs (#11897)
Previously, jobs were executed in FIFO order on MongoDB, and LIFO on Postgres, with no way to configure this behavior. This PR makes FIFO the default on both MongoDB and Postgres and introduces the following new options to configure the processing order globally or on a queue-by-queue basis: - a `processingOrder` property to the jobs config - a `processingOrder` argument to `payload.jobs.run()` to override what's set in the jobs config It also adds a new `sequential` option to `payload.jobs.run()`, which can be useful for debugging.
This commit is contained in:
@@ -28,7 +28,7 @@ Then, you could configure two different runner strategies:
|
||||
|
||||
As mentioned above, you can queue jobs, but the jobs won't run unless a worker picks up your jobs and runs them. This can be done in four ways:
|
||||
|
||||
#### Cron jobs
|
||||
### Cron jobs
|
||||
|
||||
You can use the `jobs.autoRun` property to configure cron jobs:
|
||||
|
||||
@@ -63,7 +63,7 @@ export default buildConfig({
|
||||
and should not be used on serverless platforms like Vercel.
|
||||
</Banner>
|
||||
|
||||
#### Endpoint
|
||||
### Endpoint
|
||||
|
||||
You can execute jobs by making a fetch request to the `/api/payload-jobs/run` endpoint:
|
||||
|
||||
@@ -130,7 +130,7 @@ This works because Vercel automatically makes the `CRON_SECRET` environment vari
|
||||
|
||||
After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background.
|
||||
|
||||
#### Local API
|
||||
### Local API
|
||||
|
||||
If you want to process jobs programmatically from your server-side code, you can use the Local API:
|
||||
|
||||
@@ -156,7 +156,7 @@ const results = await payload.jobs.runByID({
|
||||
})
|
||||
```
|
||||
|
||||
#### Bin script
|
||||
### Bin script
|
||||
|
||||
Finally, you can process jobs via the bin script that comes with Payload out of the box.
|
||||
|
||||
@@ -169,3 +169,76 @@ In addition, the bin script allows you to pass a `--cron` flag to the `jobs:run`
|
||||
```sh
|
||||
npx payload jobs:run --cron "*/5 * * * *"
|
||||
```
|
||||
|
||||
## Processing Order
|
||||
|
||||
By default, jobs are processed first in, first out (FIFO). This means that the first job added to the queue will be the first one processed. However, you can also configure the order in which jobs are processed.
|
||||
|
||||
### Jobs Configuration
|
||||
|
||||
You can configure the order in which jobs are processed in the jobs configuration by passing the `processingOrder` property. This mimics the Payload [sort](../queries/sort) property that's used for functionality such as `payload.find()`.
|
||||
|
||||
```ts
|
||||
export default buildConfig({
|
||||
// Other configurations...
|
||||
jobs: {
|
||||
tasks: [
|
||||
// your tasks here
|
||||
],
|
||||
processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
You can also set this on a queue-by-queue basis:
|
||||
|
||||
```ts
|
||||
export default buildConfig({
|
||||
// Other configurations...
|
||||
jobs: {
|
||||
tasks: [
|
||||
// your tasks here
|
||||
],
|
||||
processingOrder: {
|
||||
default: 'createdAt', // FIFO
|
||||
queues: {
|
||||
nightly: '-createdAt', // LIFO
|
||||
myQueue: '-createdAt', // LIFO
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
If you need even more control over the processing order, you can pass a function that returns the processing order - this function will be called every time a queue starts processing jobs.
|
||||
|
||||
```ts
|
||||
export default buildConfig({
|
||||
// Other configurations...
|
||||
jobs: {
|
||||
tasks: [
|
||||
// your tasks here
|
||||
],
|
||||
processingOrder: ({ queue }) => {
|
||||
if (queue === 'myQueue') {
|
||||
return '-createdAt' // LIFO
|
||||
}
|
||||
return 'createdAt' // FIFO
|
||||
},
|
||||
},
|
||||
})
|
||||
```
|
||||
|
||||
### Local API
|
||||
|
||||
You can configure the order in which jobs are processed in the `payload.jobs.queue` method by passing the `processingOrder` property.
|
||||
|
||||
```ts
|
||||
const createdJob = await payload.jobs.queue({
|
||||
workflow: 'createPostAndUpdate',
|
||||
input: {
|
||||
title: 'my title',
|
||||
},
|
||||
processingOrder: '-createdAt', // Process jobs in reverse order of creation = LIFO
|
||||
})
|
||||
```
|
||||
|
||||
@@ -4,6 +4,7 @@ import type { BaseJob, UpdateJobs, Where } from 'payload'
|
||||
import type { MongooseAdapter } from './index.js'
|
||||
|
||||
import { buildQuery } from './queries/buildQuery.js'
|
||||
import { buildSortParam } from './queries/buildSortParam.js'
|
||||
import { getCollection } from './utilities/getEntity.js'
|
||||
import { getSession } from './utilities/getSession.js'
|
||||
import { handleError } from './utilities/handleError.js'
|
||||
@@ -11,7 +12,7 @@ import { transform } from './utilities/transform.js'
|
||||
|
||||
export const updateJobs: UpdateJobs = async function updateMany(
|
||||
this: MongooseAdapter,
|
||||
{ id, data, limit, req, returning, where: whereArg },
|
||||
{ id, data, limit, req, returning, sort: sortArg, where: whereArg },
|
||||
) {
|
||||
if (!(data?.log as object[])?.length) {
|
||||
delete data.log
|
||||
@@ -23,6 +24,14 @@ export const updateJobs: UpdateJobs = async function updateMany(
|
||||
collectionSlug: 'payload-jobs',
|
||||
})
|
||||
|
||||
const sort: Record<string, unknown> | undefined = buildSortParam({
|
||||
adapter: this,
|
||||
config: this.payload.config,
|
||||
fields: collectionConfig.flattenedFields,
|
||||
sort: sortArg || collectionConfig.defaultSort,
|
||||
timestamps: true,
|
||||
})
|
||||
|
||||
const options: MongooseUpdateQueryOptions = {
|
||||
lean: true,
|
||||
new: true,
|
||||
@@ -54,7 +63,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
|
||||
const documentsToUpdate = await Model.find(
|
||||
query,
|
||||
{},
|
||||
{ ...options, limit, projection: { _id: 1 } },
|
||||
{ ...options, limit, projection: { _id: 1 }, sort },
|
||||
)
|
||||
if (documentsToUpdate.length === 0) {
|
||||
return null
|
||||
@@ -69,7 +78,14 @@ export const updateJobs: UpdateJobs = async function updateMany(
|
||||
return null
|
||||
}
|
||||
|
||||
result = await Model.find(query, {}, options)
|
||||
result = await Model.find(
|
||||
query,
|
||||
{},
|
||||
{
|
||||
...options,
|
||||
sort,
|
||||
},
|
||||
)
|
||||
}
|
||||
} catch (error) {
|
||||
handleError({ collection: collectionConfig.slug, error, req })
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import type { CollectionConfig } from '../../../index.js'
|
||||
import type { Payload, PayloadRequest } from '../../../types/index.js'
|
||||
import type { Payload, PayloadRequest, Sort } from '../../../types/index.js'
|
||||
import type { RunJobsArgs } from '../../operations/runJobs/index.js'
|
||||
import type { TaskConfig } from './taskTypes.js'
|
||||
import type { WorkflowConfig } from './workflowTypes.js'
|
||||
|
||||
@@ -80,6 +81,22 @@ export type JobsConfig = {
|
||||
* a new collection.
|
||||
*/
|
||||
jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig
|
||||
/**
|
||||
* Adjust the job processing order using a Payload sort string. This can be set globally or per queue.
|
||||
*
|
||||
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
|
||||
*
|
||||
* @default all jobs for all queues will be executed in FIFO order.
|
||||
*/
|
||||
processingOrder?:
|
||||
| ((args: RunJobsArgs) => Promise<Sort> | Sort)
|
||||
| {
|
||||
default?: Sort
|
||||
queues: {
|
||||
[queue: string]: Sort
|
||||
}
|
||||
}
|
||||
| Sort
|
||||
/**
|
||||
* By default, the job system uses direct database calls for optimal performance.
|
||||
* If you added custom hooks to your jobs collection, you can set this to true to
|
||||
|
||||
@@ -5,6 +5,7 @@ import {
|
||||
type Payload,
|
||||
type PayloadRequest,
|
||||
type RunningJob,
|
||||
type Sort,
|
||||
type TypedJobs,
|
||||
type Where,
|
||||
} from '../index.js'
|
||||
@@ -99,8 +100,19 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
run: async (args?: {
|
||||
limit?: number
|
||||
overrideAccess?: boolean
|
||||
/**
|
||||
* Adjust the job processing order using a Payload sort string.
|
||||
*
|
||||
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
|
||||
*/
|
||||
processingOrder?: Sort
|
||||
queue?: string
|
||||
req?: PayloadRequest
|
||||
/**
|
||||
* By default, jobs are run in parallel.
|
||||
* If you want to run them in sequence, set this to true.
|
||||
*/
|
||||
sequential?: boolean
|
||||
where?: Where
|
||||
}): Promise<ReturnType<typeof runJobs>> => {
|
||||
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
|
||||
@@ -108,8 +120,10 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
return await runJobs({
|
||||
limit: args?.limit,
|
||||
overrideAccess: args?.overrideAccess !== false,
|
||||
processingOrder: args?.processingOrder,
|
||||
queue: args?.queue,
|
||||
req: newReq,
|
||||
sequential: args?.sequential,
|
||||
where: args?.where,
|
||||
})
|
||||
},
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
// @ts-strict-ignore
|
||||
import type { PaginatedDocs } from '../../../database/types.js'
|
||||
import type { PayloadRequest, Where } from '../../../types/index.js'
|
||||
import type { PayloadRequest, Sort, Where } from '../../../types/index.js'
|
||||
import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js'
|
||||
import type {
|
||||
BaseJob,
|
||||
@@ -26,8 +25,21 @@ export type RunJobsArgs = {
|
||||
id?: number | string
|
||||
limit?: number
|
||||
overrideAccess?: boolean
|
||||
/**
|
||||
* Adjust the job processing order
|
||||
*
|
||||
* FIFO would equal `createdAt` and LIFO would equal `-createdAt`.
|
||||
*
|
||||
* @default all jobs for all queues will be executed in FIFO order.
|
||||
*/
|
||||
processingOrder?: Sort
|
||||
queue?: string
|
||||
req: PayloadRequest
|
||||
/**
|
||||
* By default, jobs are run in parallel.
|
||||
* If you want to run them in sequence, set this to true.
|
||||
*/
|
||||
sequential?: boolean
|
||||
where?: Where
|
||||
}
|
||||
|
||||
@@ -43,14 +55,18 @@ export type RunJobsResult = {
|
||||
remainingJobsFromQueried: number
|
||||
}
|
||||
|
||||
export const runJobs = async ({
|
||||
id,
|
||||
limit = 10,
|
||||
overrideAccess,
|
||||
queue,
|
||||
req,
|
||||
where: whereFromProps,
|
||||
}: RunJobsArgs): Promise<RunJobsResult> => {
|
||||
export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
|
||||
const {
|
||||
id,
|
||||
limit = 10,
|
||||
overrideAccess,
|
||||
processingOrder,
|
||||
queue,
|
||||
req,
|
||||
sequential,
|
||||
where: whereFromProps,
|
||||
} = args
|
||||
|
||||
if (!overrideAccess) {
|
||||
const hasAccess = await req.payload.config.jobs.access.run({ req })
|
||||
if (!hasAccess) {
|
||||
@@ -124,6 +140,21 @@ export const runJobs = async ({
|
||||
}),
|
||||
]
|
||||
} else {
|
||||
let defaultProcessingOrder: Sort =
|
||||
req.payload.collections[jobsCollectionSlug].config.defaultSort ?? 'createdAt'
|
||||
|
||||
const processingOrderConfig = req.payload.config.jobs?.processingOrder
|
||||
if (typeof processingOrderConfig === 'function') {
|
||||
defaultProcessingOrder = await processingOrderConfig(args)
|
||||
} else if (typeof processingOrderConfig === 'object' && !Array.isArray(processingOrderConfig)) {
|
||||
if (queue && processingOrderConfig.queues && processingOrderConfig.queues[queue]) {
|
||||
defaultProcessingOrder = processingOrderConfig.queues[queue]
|
||||
} else if (processingOrderConfig.default) {
|
||||
defaultProcessingOrder = processingOrderConfig.default
|
||||
}
|
||||
} else if (typeof processingOrderConfig === 'string') {
|
||||
defaultProcessingOrder = processingOrderConfig
|
||||
}
|
||||
const updatedDocs = await updateJobs({
|
||||
data: {
|
||||
processing: true,
|
||||
@@ -133,6 +164,7 @@ export const runJobs = async ({
|
||||
limit,
|
||||
req,
|
||||
returning: true,
|
||||
sort: processingOrder ?? defaultProcessingOrder,
|
||||
where,
|
||||
})
|
||||
|
||||
@@ -175,7 +207,7 @@ export const runJobs = async ({
|
||||
? []
|
||||
: undefined
|
||||
|
||||
const jobPromises = jobsQuery.docs.map(async (job) => {
|
||||
const runSingleJob = async (job) => {
|
||||
if (!job.workflowSlug && !job.taskSlug) {
|
||||
throw new Error('Job must have either a workflowSlug or a taskSlug')
|
||||
}
|
||||
@@ -257,9 +289,20 @@ export const runJobs = async ({
|
||||
|
||||
return { id: job.id, result }
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
const resultsArray = await Promise.all(jobPromises)
|
||||
let resultsArray: { id: number | string; result: RunJobResult }[] = []
|
||||
if (sequential) {
|
||||
for (const job of jobsQuery.docs) {
|
||||
const result = await runSingleJob(job)
|
||||
if (result !== null) {
|
||||
resultsArray.push(result)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
const jobPromises = jobsQuery.docs.map(runSingleJob)
|
||||
resultsArray = await Promise.all(jobPromises)
|
||||
}
|
||||
|
||||
if (jobsToDelete && jobsToDelete.length > 0) {
|
||||
try {
|
||||
|
||||
@@ -24,6 +24,7 @@ import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
|
||||
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
|
||||
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
|
||||
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
|
||||
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
|
||||
import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js'
|
||||
|
||||
const filename = fileURLToPath(import.meta.url)
|
||||
@@ -104,6 +105,11 @@ export default buildConfigWithDefaults({
|
||||
},
|
||||
}
|
||||
},
|
||||
processingOrder: {
|
||||
queues: {
|
||||
lifo: '-createdAt',
|
||||
},
|
||||
},
|
||||
tasks: [
|
||||
{
|
||||
retries: 2,
|
||||
@@ -376,6 +382,7 @@ export default buildConfigWithDefaults({
|
||||
workflowRetries2TasksRetriesUndefinedWorkflow,
|
||||
workflowRetries2TasksRetries0Workflow,
|
||||
inlineTaskTestWorkflow,
|
||||
inlineTaskTestDelayedWorkflow,
|
||||
externalWorkflow,
|
||||
retriesBackoffTestWorkflow,
|
||||
subTaskWorkflow,
|
||||
|
||||
@@ -533,6 +533,106 @@ describe('Queues', () => {
|
||||
payload.config.jobs.deleteJobOnComplete = true
|
||||
})
|
||||
|
||||
it('ensure jobs run in FIFO order by default', async () => {
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
input: {
|
||||
message: 'task 1',
|
||||
},
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
input: {
|
||||
message: 'task 2',
|
||||
},
|
||||
})
|
||||
|
||||
await payload.jobs.run({
|
||||
sequential: true,
|
||||
})
|
||||
|
||||
const allSimples = await payload.find({
|
||||
collection: 'simple',
|
||||
limit: 100,
|
||||
sort: 'createdAt',
|
||||
})
|
||||
|
||||
expect(allSimples.totalDocs).toBe(2)
|
||||
expect(allSimples.docs?.[0]?.title).toBe('task 1')
|
||||
expect(allSimples.docs?.[1]?.title).toBe('task 2')
|
||||
})
|
||||
|
||||
it('ensure jobs can run LIFO if processingOrder is passed', async () => {
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
input: {
|
||||
message: 'task 1',
|
||||
},
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
input: {
|
||||
message: 'task 2',
|
||||
},
|
||||
})
|
||||
|
||||
await payload.jobs.run({
|
||||
sequential: true,
|
||||
processingOrder: '-createdAt',
|
||||
})
|
||||
|
||||
const allSimples = await payload.find({
|
||||
collection: 'simple',
|
||||
limit: 100,
|
||||
sort: 'createdAt',
|
||||
})
|
||||
|
||||
expect(allSimples.totalDocs).toBe(2)
|
||||
expect(allSimples.docs?.[0]?.title).toBe('task 2')
|
||||
expect(allSimples.docs?.[1]?.title).toBe('task 1')
|
||||
})
|
||||
|
||||
it('ensure job config processingOrder using queues object is respected', async () => {
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
queue: 'lifo',
|
||||
input: {
|
||||
message: 'task 1',
|
||||
},
|
||||
})
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTestDelayed',
|
||||
queue: 'lifo',
|
||||
input: {
|
||||
message: 'task 2',
|
||||
},
|
||||
})
|
||||
|
||||
await payload.jobs.run({
|
||||
sequential: true,
|
||||
queue: 'lifo',
|
||||
})
|
||||
|
||||
const allSimples = await payload.find({
|
||||
collection: 'simple',
|
||||
limit: 100,
|
||||
sort: 'createdAt',
|
||||
})
|
||||
|
||||
expect(allSimples.totalDocs).toBe(2)
|
||||
expect(allSimples.docs?.[0]?.title).toBe('task 2')
|
||||
expect(allSimples.docs?.[1]?.title).toBe('task 1')
|
||||
})
|
||||
|
||||
it('can create new inline jobs', async () => {
|
||||
await payload.jobs.queue({
|
||||
workflow: 'inlineTaskTest',
|
||||
|
||||
@@ -123,6 +123,7 @@ export interface Config {
|
||||
workflowRetries2TasksRetriesUndefined: WorkflowWorkflowRetries2TasksRetriesUndefined;
|
||||
workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0;
|
||||
inlineTaskTest: WorkflowInlineTaskTest;
|
||||
inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed;
|
||||
externalWorkflow: WorkflowExternalWorkflow;
|
||||
retriesBackoffTest: WorkflowRetriesBackoffTest;
|
||||
subTask: WorkflowSubTask;
|
||||
@@ -313,6 +314,7 @@ export interface PayloadJob {
|
||||
| 'workflowRetries2TasksRetriesUndefined'
|
||||
| 'workflowRetries2TasksRetries0'
|
||||
| 'inlineTaskTest'
|
||||
| 'inlineTaskTestDelayed'
|
||||
| 'externalWorkflow'
|
||||
| 'retriesBackoffTest'
|
||||
| 'subTask'
|
||||
@@ -722,6 +724,15 @@ export interface WorkflowInlineTaskTest {
|
||||
message: string;
|
||||
};
|
||||
}
|
||||
/**
|
||||
* This interface was referenced by `Config`'s JSON-Schema
|
||||
* via the `definition` "WorkflowInlineTaskTestDelayed".
|
||||
*/
|
||||
export interface WorkflowInlineTaskTestDelayed {
|
||||
input: {
|
||||
message: string;
|
||||
};
|
||||
}
|
||||
/**
|
||||
* This interface was referenced by `Config`'s JSON-Schema
|
||||
* via the `definition` "WorkflowExternalWorkflow".
|
||||
|
||||
38
test/queues/workflows/inlineTaskTestDelayed.ts
Normal file
38
test/queues/workflows/inlineTaskTestDelayed.ts
Normal file
@@ -0,0 +1,38 @@
|
||||
import type { WorkflowConfig } from 'payload'
|
||||
|
||||
export const inlineTaskTestDelayedWorkflow: WorkflowConfig<'inlineTaskTestDelayed'> = {
|
||||
slug: 'inlineTaskTestDelayed',
|
||||
inputSchema: [
|
||||
{
|
||||
name: 'message',
|
||||
type: 'text',
|
||||
required: true,
|
||||
},
|
||||
],
|
||||
handler: async ({ job, inlineTask }) => {
|
||||
await inlineTask('1', {
|
||||
task: async ({ input, req }) => {
|
||||
// Wait 100ms
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
const newSimple = await req.payload.create({
|
||||
collection: 'simple',
|
||||
req,
|
||||
data: {
|
||||
title: input.message,
|
||||
},
|
||||
})
|
||||
await new Promise((resolve) => setTimeout(resolve, 100))
|
||||
|
||||
return {
|
||||
output: {
|
||||
simpleID: newSimple.id,
|
||||
},
|
||||
}
|
||||
},
|
||||
input: {
|
||||
message: job.input.message,
|
||||
},
|
||||
})
|
||||
},
|
||||
}
|
||||
Reference in New Issue
Block a user