From e0ffada80b5d0d29ece7380c7680719e514f38f0 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Wed, 27 Aug 2025 13:32:42 -0700 Subject: [PATCH] feat: support parallel job queue tasks, speed up task running (#13614) Currently, attempting to run tasks in parallel will result in DB errors. ## Solution The problem was caused due to inefficient db update calls. After each task completes, we need to update the log array in the payload-jobs collection. On postgres, that's a different table. Currently, the update works the following way: 1. Nuke the table 2. Re-insert every single row, including the new one This will throw db errors if multiple processes start doing that. Additionally, due to conflicts, new log rows may be lost. This PR makes use of the the [new db $push operation ](https://github.com/payloadcms/payload/pull/13453) we recently added to atomically push a new log row to the database in a single round-trip. This not only reduces the amount of db round trips (=> faster job queue system) but allows multiple tasks to perform this db operation in parallel, without conflicts. ## Problem **Example:** ```ts export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = { slug: 'fastParallelTask', handler: async ({nlineTask }) => { const taskFunctions = [] for (let i = 0; i < 20; i++) { const idx = i + 1 taskFunctions.push(async () => { return await inlineTask(`parallel task ${idx}`, { input: { test: idx, }, task: () => { return { output: { taskID: idx.toString(), }, } }, }) }) } await Promise.all(taskFunctions.map((f) => f())) }, } ``` On SQLite, this would throw the following error: ```bash Caught error Error: UNIQUE constraint failed: payload_jobs_log.id at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:335:20) at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:360:16) at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34) at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16) at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58 at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18) at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21) at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27) at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26) at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40) at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) { rawCode: 1555, code: 'SQLITE_CONSTRAINT_PRIMARYKEY', libsqlError: true } ``` --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1211001438499053 --- packages/db-mongodb/src/updateJobs.ts | 43 ++++++++++++++++--- packages/drizzle/src/updateJobs.ts | 6 ++- .../src/queues/errors/handleTaskError.ts | 36 +++++++++------- .../src/queues/errors/handleWorkflowError.ts | 1 - .../runJobs/runJob/getRunTaskFunction.ts | 11 +++-- .../payload/src/queues/utilities/updateJob.ts | 6 ++- test/_community/payload-types.ts | 26 +++++------ test/queues/getConfig.ts | 2 + test/queues/int.spec.ts | 26 +++++++++++ test/queues/payload-types.ts | 11 +++++ test/queues/postgres-logs.int.spec.ts | 4 +- .../workflows/fastParallelTaskWorkflow.ts | 34 +++++++++++++++ 12 files changed, 162 insertions(+), 44 deletions(-) create mode 100644 test/queues/workflows/fastParallelTaskWorkflow.ts diff --git a/packages/db-mongodb/src/updateJobs.ts b/packages/db-mongodb/src/updateJobs.ts index 5b9dab40b..304ba0627 100644 --- a/packages/db-mongodb/src/updateJobs.ts +++ b/packages/db-mongodb/src/updateJobs.ts @@ -1,4 +1,4 @@ -import type { MongooseUpdateQueryOptions } from 'mongoose' +import type { MongooseUpdateQueryOptions, UpdateQuery } from 'mongoose' import type { Job, UpdateJobs, Where } from 'payload' import type { MongooseAdapter } from './index.js' @@ -14,9 +14,13 @@ export const updateJobs: UpdateJobs = async function updateMany( this: MongooseAdapter, { id, data, limit, req, returning, sort: sortArg, where: whereArg }, ) { - if (!(data?.log as object[])?.length) { + if ( + !(data?.log as object[])?.length && + !(data.log && typeof data.log === 'object' && '$push' in data.log) + ) { delete data.log } + const where = id ? { id: { equals: id } } : (whereArg as Where) const { collectionConfig, Model } = getCollection({ @@ -47,17 +51,44 @@ export const updateJobs: UpdateJobs = async function updateMany( where, }) - transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'write' }) + let updateData: UpdateQuery = data + + const $inc: Record = {} + const $push: Record = {} + + transform({ + $inc, + $push, + adapter: this, + data, + fields: collectionConfig.fields, + operation: 'write', + }) + + const updateOps: UpdateQuery = {} + + if (Object.keys($inc).length) { + updateOps.$inc = $inc + } + if (Object.keys($push).length) { + updateOps.$push = $push + } + if (Object.keys(updateOps).length) { + updateOps.$set = updateData + updateData = updateOps + } let result: Job[] = [] try { if (id) { if (returning === false) { - await Model.updateOne(query, data, options) + await Model.updateOne(query, updateData, options) + transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'read' }) + return null } else { - const doc = await Model.findOneAndUpdate(query, data, options) + const doc = await Model.findOneAndUpdate(query, updateData, options) result = doc ? [doc] : [] } } else { @@ -74,7 +105,7 @@ export const updateJobs: UpdateJobs = async function updateMany( query = { _id: { $in: documentsToUpdate.map((doc) => doc._id) } } } - await Model.updateMany(query, data, options) + await Model.updateMany(query, updateData, options) if (returning === false) { return null diff --git a/packages/drizzle/src/updateJobs.ts b/packages/drizzle/src/updateJobs.ts index 6bfec911a..7463dab24 100644 --- a/packages/drizzle/src/updateJobs.ts +++ b/packages/drizzle/src/updateJobs.ts @@ -13,9 +13,13 @@ export const updateJobs: UpdateJobs = async function updateMany( this: DrizzleAdapter, { id, data, limit: limitArg, req, returning, sort: sortArg, where: whereArg }, ) { - if (!(data?.log as object[])?.length) { + if ( + !(data?.log as object[])?.length && + !(data.log && typeof data.log === 'object' && '$push' in data.log) + ) { delete data.log } + const whereToUse: Where = id ? { id: { equals: id } } : whereArg const limit = id ? 1 : limitArg diff --git a/packages/payload/src/queues/errors/handleTaskError.ts b/packages/payload/src/queues/errors/handleTaskError.ts index 194ecc245..ce266f3ed 100644 --- a/packages/payload/src/queues/errors/handleTaskError.ts +++ b/packages/payload/src/queues/errors/handleTaskError.ts @@ -1,6 +1,6 @@ import ObjectIdImport from 'bson-objectid' -import type { PayloadRequest } from '../../index.js' +import type { JobLog, PayloadRequest } from '../../index.js' import type { RunJobsSilent } from '../localAPI.js' import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' import type { TaskError } from './index.js' @@ -59,19 +59,6 @@ export async function handleTaskError({ const currentDate = getCurrentDate() - ;(job.log ??= []).push({ - id: new ObjectId().toHexString(), - completedAt: currentDate.toISOString(), - error: errorJSON, - executedAt: executedAt.toISOString(), - input, - output: output ?? {}, - parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, - state: 'failed', - taskID, - taskSlug, - }) - if (job.waitUntil) { // Check if waitUntil is in the past const waitUntil = new Date(job.waitUntil) @@ -99,6 +86,19 @@ export async function handleTaskError({ maxRetries = retriesConfig.attempts } + const taskLogToPush: JobLog = { + id: new ObjectId().toHexString(), + completedAt: currentDate.toISOString(), + error: errorJSON, + executedAt: executedAt.toISOString(), + input, + output: output ?? {}, + parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, + state: 'failed', + taskID, + taskSlug, + } + if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) { /** * Task reached max retries => workflow will not retry @@ -107,7 +107,9 @@ export async function handleTaskError({ await updateJob({ error: errorJSON, hasError: true, - log: job.log, + log: { + $push: taskLogToPush, + } as any, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, @@ -167,7 +169,9 @@ export async function handleTaskError({ await updateJob({ error: hasFinalError ? errorJSON : undefined, hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - log: job.log, + log: { + $push: taskLogToPush, + } as any, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, diff --git a/packages/payload/src/queues/errors/handleWorkflowError.ts b/packages/payload/src/queues/errors/handleWorkflowError.ts index 2716aebde..aa3672e26 100644 --- a/packages/payload/src/queues/errors/handleWorkflowError.ts +++ b/packages/payload/src/queues/errors/handleWorkflowError.ts @@ -79,7 +79,6 @@ export async function handleWorkflowError({ await updateJob({ error: errorJSON, hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried - log: job.log, processing: false, totalTried: (job.totalTried ?? 0) + 1, waitUntil: job.waitUntil, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 023d05290..6088f9690 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -13,6 +13,7 @@ import type { TaskType, } from '../../../config/types/taskTypes.js' import type { + JobLog, SingleTaskStatus, WorkflowConfig, WorkflowTypes, @@ -184,7 +185,7 @@ export const getRunTaskFunction = ( await taskConfig.onSuccess() } - ;(job.log ??= []).push({ + const newLogItem: JobLog = { id: new ObjectId().toHexString(), completedAt: getCurrentDate().toISOString(), executedAt: executedAt.toISOString(), @@ -194,10 +195,14 @@ export const getRunTaskFunction = ( state: 'succeeded', taskID, taskSlug, - }) + } await updateJob({ - log: job.log, + log: { + $push: newLogItem, + } as any, + // Set to null to skip main row update on postgres. 2 => 1 db round trips + updatedAt: null as any, }) return output diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts index c4ef03343..f512d7a10 100644 --- a/packages/payload/src/queues/utilities/updateJob.ts +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -83,8 +83,10 @@ export async function updateJobs({ : undefined, } - // Ensure updatedAt date is always updated - data.updatedAt = new Date().toISOString() + if (typeof data.updatedAt === 'undefined') { + // Ensure updatedAt date is always updated + data.updatedAt = new Date().toISOString() + } const args: UpdateJobsArgs = id ? { diff --git a/test/_community/payload-types.ts b/test/_community/payload-types.ts index 599c9dec1..6d7c96401 100644 --- a/test/_community/payload-types.ts +++ b/test/_community/payload-types.ts @@ -84,7 +84,7 @@ export interface Config { 'payload-migrations': PayloadMigrationsSelect | PayloadMigrationsSelect; }; db: { - defaultIDType: string; + defaultIDType: number; }; globals: { menu: Menu; @@ -124,7 +124,7 @@ export interface UserAuthOperations { * via the `definition` "posts". */ export interface Post { - id: string; + id: number; title?: string | null; content?: { root: { @@ -149,7 +149,7 @@ export interface Post { * via the `definition` "media". */ export interface Media { - id: string; + id: number; updatedAt: string; createdAt: string; url?: string | null; @@ -193,7 +193,7 @@ export interface Media { * via the `definition` "users". */ export interface User { - id: string; + id: number; updatedAt: string; createdAt: string; email: string; @@ -217,24 +217,24 @@ export interface User { * via the `definition` "payload-locked-documents". */ export interface PayloadLockedDocument { - id: string; + id: number; document?: | ({ relationTo: 'posts'; - value: string | Post; + value: number | Post; } | null) | ({ relationTo: 'media'; - value: string | Media; + value: number | Media; } | null) | ({ relationTo: 'users'; - value: string | User; + value: number | User; } | null); globalSlug?: string | null; user: { relationTo: 'users'; - value: string | User; + value: number | User; }; updatedAt: string; createdAt: string; @@ -244,10 +244,10 @@ export interface PayloadLockedDocument { * via the `definition` "payload-preferences". */ export interface PayloadPreference { - id: string; + id: number; user: { relationTo: 'users'; - value: string | User; + value: number | User; }; key?: string | null; value?: @@ -267,7 +267,7 @@ export interface PayloadPreference { * via the `definition` "payload-migrations". */ export interface PayloadMigration { - id: string; + id: number; name?: string | null; batch?: number | null; updatedAt: string; @@ -393,7 +393,7 @@ export interface PayloadMigrationsSelect { * via the `definition` "menu". */ export interface Menu { - id: string; + id: number; globalText?: string | null; updatedAt?: string | null; createdAt?: string | null; diff --git a/test/queues/getConfig.ts b/test/queues/getConfig.ts index a5edc5912..7630b847a 100644 --- a/test/queues/getConfig.ts +++ b/test/queues/getConfig.ts @@ -19,6 +19,7 @@ import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js' import { UpdatePostTask } from './tasks/UpdatePostTask.js' import { externalWorkflow } from './workflows/externalWorkflow.js' import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js' +import { fastParallelTaskWorkflow } from './workflows/fastParallelTaskWorkflow.js' import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' import { longRunningWorkflow } from './workflows/longRunning.js' @@ -157,6 +158,7 @@ export const getConfig: () => Partial = () => ({ workflowRetries2TasksRetries0Workflow, inlineTaskTestWorkflow, failsImmediatelyWorkflow, + fastParallelTaskWorkflow, inlineTaskTestDelayedWorkflow, externalWorkflow, retriesBackoffTestWorkflow, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index f35071046..edcc30b90 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -1422,6 +1422,8 @@ describe('Queues - Payload', () => { id: job.id, }) + // error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore. + expect(Boolean(jobAfterRun.error)).toBe(false) expect(jobAfterRun.hasError).toBe(false) expect(jobAfterRun.log?.length).toBe(amount) @@ -1442,6 +1444,30 @@ describe('Queues - Payload', () => { } }) + it('can reliably run workflows with parallel tasks that complete immediately', async () => { + const amount = 20 + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + workflow: 'fastParallelTask', + input: { + amount, + }, + }) + + await payload.jobs.run({ silent: false }) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore. + expect(Boolean(jobAfterRun.error)).toBe(false) + expect(jobAfterRun.hasError).toBe(false) + expect(jobAfterRun.log?.length).toBe(amount) + }) + it('can create and autorun jobs', async () => { await payload.jobs.queue({ workflow: 'inlineTaskTest', diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 580bf0b64..e399fb659 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -125,6 +125,7 @@ export interface Config { workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0; inlineTaskTest: WorkflowInlineTaskTest; failsImmediately: WorkflowFailsImmediately; + fastParallelTask: WorkflowFastParallelTask; inlineTaskTestDelayed: WorkflowInlineTaskTestDelayed; externalWorkflow: WorkflowExternalWorkflow; retriesBackoffTest: WorkflowRetriesBackoffTest; @@ -325,6 +326,7 @@ export interface PayloadJob { | 'workflowRetries2TasksRetries0' | 'inlineTaskTest' | 'failsImmediately' + | 'fastParallelTask' | 'inlineTaskTestDelayed' | 'externalWorkflow' | 'retriesBackoffTest' @@ -760,6 +762,15 @@ export interface WorkflowInlineTaskTest { export interface WorkflowFailsImmediately { input?: unknown; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowFastParallelTask". + */ +export interface WorkflowFastParallelTask { + input: { + amount: number; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "WorkflowInlineTaskTestDelayed". diff --git a/test/queues/postgres-logs.int.spec.ts b/test/queues/postgres-logs.int.spec.ts index 7a10cb84a..ca82344aa 100644 --- a/test/queues/postgres-logs.int.spec.ts +++ b/test/queues/postgres-logs.int.spec.ts @@ -43,7 +43,7 @@ describePostgres('queues - postgres logs', () => { }, }) - // Count every console log (= db call) + // Count every console log (= db call) const consoleCount = jest.spyOn(console, 'log').mockImplementation(() => {}) const res = await payload.jobs.run({}) @@ -52,7 +52,7 @@ describePostgres('queues - postgres logs', () => { jobStatus: { '1': { status: 'success' } }, remainingJobsFromQueried: 0, }) - expect(consoleCount).toHaveBeenCalledTimes(17) // Should be 17 sql calls if the optimizations are used. If not, this would be 22 calls + expect(consoleCount).toHaveBeenCalledTimes(14) // Should be 14 sql calls if the optimizations are used. If not, this would be 22 calls consoleCount.mockRestore() }) }) diff --git a/test/queues/workflows/fastParallelTaskWorkflow.ts b/test/queues/workflows/fastParallelTaskWorkflow.ts new file mode 100644 index 000000000..c7388c2ce --- /dev/null +++ b/test/queues/workflows/fastParallelTaskWorkflow.ts @@ -0,0 +1,34 @@ +import type { WorkflowConfig } from 'payload' + +export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = { + slug: 'fastParallelTask', + inputSchema: [ + { + name: 'amount', + type: 'number', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + const taskFunctions = [] + for (let i = 0; i < job.input.amount; i++) { + const idx = i + 1 + taskFunctions.push(async () => { + return await inlineTask(`fast parallel task ${idx}`, { + input: { + test: idx, + }, + task: () => { + return { + output: { + taskID: idx.toString(), + }, + } + }, + }) + }) + } + + await Promise.all(taskFunctions.map((f) => f())) + }, +}