feat: support parallel job queue tasks, speed up task running (#13614)

Currently, attempting to run tasks in parallel will result in DB errors.

## Solution

The problem was caused due to inefficient db update calls. After each
task completes, we need to update the log array in the payload-jobs
collection. On postgres, that's a different table.

Currently, the update works the following way:
1. Nuke the table
2. Re-insert every single row, including the new one

This will throw db errors if multiple processes start doing that.
Additionally, due to conflicts, new log rows may be lost.

This PR makes use of the the [new db $push operation
](https://github.com/payloadcms/payload/pull/13453) we recently added to
atomically push a new log row to the database in a single round-trip.
This not only reduces the amount of db round trips (=> faster job queue
system) but allows multiple tasks to perform this db operation in
parallel, without conflicts.

## Problem

**Example:**

```ts
export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = {
  slug: 'fastParallelTask',
  handler: async ({nlineTask }) => {
    const taskFunctions = []
    for (let i = 0; i < 20; i++) {
      const idx = i + 1
      taskFunctions.push(async () => {
        return await inlineTask(`parallel task ${idx}`, {
          input: {
            test: idx,
          },
          task: () => {
            return {
              output: {
                taskID: idx.toString(),
              },
            }
          },
        })
      })
    }

    await Promise.all(taskFunctions.map((f) => f()))
  },
}
```

On SQLite, this would throw the following error:

```bash
Caught error Error: UNIQUE constraint failed: payload_jobs_log.id
    at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:335:20)
    at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:360:16)
    at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34)
    at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16)
    at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58
    at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18)
    at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21)
    at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27)
    at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26)
    at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40)
    at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) {
  rawCode: 1555,
  code: 'SQLITE_CONSTRAINT_PRIMARYKEY',
  libsqlError: true
}
```

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1211001438499053
This commit is contained in:
Alessio Gravili
2025-08-27 13:32:42 -07:00
committed by GitHub
parent 303381e049
commit e0ffada80b
12 changed files with 162 additions and 44 deletions

View File

@@ -84,7 +84,7 @@ export interface Config {
'payload-migrations': PayloadMigrationsSelect<false> | PayloadMigrationsSelect<true>;
};
db: {
defaultIDType: string;
defaultIDType: number;
};
globals: {
menu: Menu;
@@ -124,7 +124,7 @@ export interface UserAuthOperations {
* via the `definition` "posts".
*/
export interface Post {
id: string;
id: number;
title?: string | null;
content?: {
root: {
@@ -149,7 +149,7 @@ export interface Post {
* via the `definition` "media".
*/
export interface Media {
id: string;
id: number;
updatedAt: string;
createdAt: string;
url?: string | null;
@@ -193,7 +193,7 @@ export interface Media {
* via the `definition` "users".
*/
export interface User {
id: string;
id: number;
updatedAt: string;
createdAt: string;
email: string;
@@ -217,24 +217,24 @@ export interface User {
* via the `definition` "payload-locked-documents".
*/
export interface PayloadLockedDocument {
id: string;
id: number;
document?:
| ({
relationTo: 'posts';
value: string | Post;
value: number | Post;
} | null)
| ({
relationTo: 'media';
value: string | Media;
value: number | Media;
} | null)
| ({
relationTo: 'users';
value: string | User;
value: number | User;
} | null);
globalSlug?: string | null;
user: {
relationTo: 'users';
value: string | User;
value: number | User;
};
updatedAt: string;
createdAt: string;
@@ -244,10 +244,10 @@ export interface PayloadLockedDocument {
* via the `definition` "payload-preferences".
*/
export interface PayloadPreference {
id: string;
id: number;
user: {
relationTo: 'users';
value: string | User;
value: number | User;
};
key?: string | null;
value?:
@@ -267,7 +267,7 @@ export interface PayloadPreference {
* via the `definition` "payload-migrations".
*/
export interface PayloadMigration {
id: string;
id: number;
name?: string | null;
batch?: number | null;
updatedAt: string;
@@ -393,7 +393,7 @@ export interface PayloadMigrationsSelect<T extends boolean = true> {
* via the `definition` "menu".
*/
export interface Menu {
id: string;
id: number;
globalText?: string | null;
updatedAt?: string | null;
createdAt?: string | null;

View File

@@ -19,6 +19,7 @@ import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js'
import { UpdatePostTask } from './tasks/UpdatePostTask.js'
import { externalWorkflow } from './workflows/externalWorkflow.js'
import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js'
import { fastParallelTaskWorkflow } from './workflows/fastParallelTaskWorkflow.js'
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js'
import { longRunningWorkflow } from './workflows/longRunning.js'
@@ -157,6 +158,7 @@ export const getConfig: () => Partial<Config> = () => ({
workflowRetries2TasksRetries0Workflow,
inlineTaskTestWorkflow,
failsImmediatelyWorkflow,
fastParallelTaskWorkflow,
inlineTaskTestDelayedWorkflow,
externalWorkflow,
retriesBackoffTestWorkflow,

View File

@@ -1422,6 +1422,8 @@ describe('Queues - Payload', () => {
id: job.id,
})
// error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore.
expect(Boolean(jobAfterRun.error)).toBe(false)
expect(jobAfterRun.hasError).toBe(false)
expect(jobAfterRun.log?.length).toBe(amount)
@@ -1442,6 +1444,30 @@ describe('Queues - Payload', () => {
}
})
it('can reliably run workflows with parallel tasks that complete immediately', async () => {
const amount = 20
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'fastParallelTask',
input: {
amount,
},
})
await payload.jobs.run({ silent: false })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore.
expect(Boolean(jobAfterRun.error)).toBe(false)
expect(jobAfterRun.hasError).toBe(false)
expect(jobAfterRun.log?.length).toBe(amount)
})
it('can create and autorun jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',

View File

@@ -125,6 +125,7 @@ export interface Config {
workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0;
inlineTaskTest: WorkflowInlineTaskTest;
failsImmediately: WorkflowFailsImmediately;
fastParallelTask: WorkflowFastParallelTask;
inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed;
externalWorkflow: WorkflowExternalWorkflow;
retriesBackoffTest: WorkflowRetriesBackoffTest;
@@ -325,6 +326,7 @@ export interface PayloadJob {
| 'workflowRetries2TasksRetries0'
| 'inlineTaskTest'
| 'failsImmediately'
| 'fastParallelTask'
| 'inlineTaskTestDelayed'
| 'externalWorkflow'
| 'retriesBackoffTest'
@@ -760,6 +762,15 @@ export interface WorkflowInlineTaskTest {
export interface WorkflowFailsImmediately {
input?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowFastParallelTask".
*/
export interface WorkflowFastParallelTask {
input: {
amount: number;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowInlineTaskTestDelayed".

View File

@@ -43,7 +43,7 @@ describePostgres('queues - postgres logs', () => {
},
})
// Count every console log (= db call)
// Count every console log (= db call)
const consoleCount = jest.spyOn(console, 'log').mockImplementation(() => {})
const res = await payload.jobs.run({})
@@ -52,7 +52,7 @@ describePostgres('queues - postgres logs', () => {
jobStatus: { '1': { status: 'success' } },
remainingJobsFromQueried: 0,
})
expect(consoleCount).toHaveBeenCalledTimes(17) // Should be 17 sql calls if the optimizations are used. If not, this would be 22 calls
expect(consoleCount).toHaveBeenCalledTimes(14) // Should be 14 sql calls if the optimizations are used. If not, this would be 22 calls
consoleCount.mockRestore()
})
})

View File

@@ -0,0 +1,34 @@
import type { WorkflowConfig } from 'payload'
export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = {
slug: 'fastParallelTask',
inputSchema: [
{
name: 'amount',
type: 'number',
required: true,
},
],
handler: async ({ job, inlineTask }) => {
const taskFunctions = []
for (let i = 0; i < job.input.amount; i++) {
const idx = i + 1
taskFunctions.push(async () => {
return await inlineTask(`fast parallel task ${idx}`, {
input: {
test: idx,
},
task: () => {
return {
output: {
taskID: idx.toString(),
},
}
},
})
})
}
await Promise.all(taskFunctions.map((f) => f()))
},
}