fix: support parallel job queue tasks (#11917)

This adds support for running multiple job queue tasks in parallel
within the same workflow while preventing conflicts. Previously, this
would have caused the following issues:
- Job log entries get lost - the final job log is incomplete, despite
all tasks having been executed
- Write conflicts in postgres, leading to unique constraint violation
errors

The solution involves handling job log data updates in a way that avoids
overwriting, and ensuring the final update reflects the latest job log
data. Each job log entry now initializes its own ID, so a given job log
entry’s ID remains the same across multiple, parallel task executions.

## Postgres

In Postgres, we need to enable transactions for the
`payload.db.updateJobs` operation; otherwise, two tasks updating the
same job in parallel can conflict. This happens because Postgres handles
array rows by deleting them all, then re-inserting (rather than
upserting). The rows are stored in a separate table, and the following
scenario can occur:

Op 1: deletes all job log rows
Op 2: deletes all job log rows
Op 1: inserts 200 job log rows
Op 2: insert the same 200 job log rows again => `error: “duplicate key
value violates unique constraint "payload_jobs_log_pkey”`

Because transactions were not used, the rows inserted by Op 1
immediately became visible to Op 2, causing the conflict. Enabling
transactions fixes this. In theory, it can still happen if Op 1 commits
before Op 2 starts inserting (due to the read committed isolation
level), but it should occur far less frequently.

Alongside this change, we should consider inserting the rows using an
upsert (update on conflict), which will get rid of this error
completely. That way, if the insertion of Op 1 is visible to Op 2, Op 2
will simply overwrite it, rather than erroring. Individual job entries
are immutable and job entries cannot be deleted, thus this shouldn't
corrupt any data.

## Mongo

In Mongo, the issue is addressed by ensuring that log row deletions
caused due to different log states in concurrent operations are not
merged back to the client job log, and by making sure the final update
includes all job logs.

There is no duplicate key error in Mongo because the array log resides
in the same document and duplicates are simply upserted. We cannot use
transactions in Mongo, as it appears to lock the document in a way that
prevents reliable parallel updates, leading to:

`MongoServerError: WriteConflict error: this operation conflicted with
another operation. Please retry your operation or multi-document
transaction`
This commit is contained in:
Alessio Gravili
2025-03-31 13:06:05 -06:00
committed by GitHub
parent a083d47368
commit 9a1c3cf4cc
14 changed files with 177 additions and 54 deletions

View File

@@ -13,6 +13,9 @@ export const updateJobs: UpdateJobs = async function updateMany(
this: MongooseAdapter,
{ id, data, limit, req, returning, where: whereArg },
) {
if (!(data?.log as object[])?.length) {
delete data.log
}
const where = id ? { id: { equals: id } } : (whereArg as Where)
const { collectionConfig, Model } = getCollection({

View File

@@ -12,6 +12,9 @@ export const updateJobs: UpdateJobs = async function updateMany(
this: DrizzleAdapter,
{ id, data, limit: limitArg, req, returning, sort: sortArg, where: whereArg },
) {
if (!(data?.log as object[])?.length) {
delete data.log
}
const whereToUse: Where = id ? { id: { equals: id } } : whereArg
const limit = id ? 1 : limitArg
@@ -55,6 +58,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
req,
tableName,
})
results.push(result)
}

View File

@@ -2,7 +2,6 @@ import type { BaseJob, DatabaseAdapter } from '../index.js'
import type { UpdateJobs } from './types.js'
import { jobsCollectionSlug } from '../queues/config/index.js'
import { sanitizeUpdateData } from '../queues/utilities/sanitizeUpdateData.js'
export const defaultUpdateJobs: UpdateJobs = async function updateMany(
this: DatabaseAdapter,
@@ -42,7 +41,7 @@ export const defaultUpdateJobs: UpdateJobs = async function updateMany(
const updatedJob = await this.updateOne({
id: job.id,
collection: jobsCollectionSlug,
data: sanitizeUpdateData({ data: updateData }),
data: updateData,
req,
returning,
})

View File

@@ -18,7 +18,7 @@ export type JobLog = {
/**
* ID added by the array field when the log is saved in the database
*/
id?: string
id: string
input?: Record<string, any>
output?: Record<string, any>
/**

View File

@@ -1,3 +1,5 @@
import ObjectIdImport from 'bson-objectid'
// @ts-strict-ignore
import type { PayloadRequest } from '../../../../types/index.js'
import type {
@@ -22,6 +24,9 @@ import type { UpdateJobFunction } from './getUpdateJobFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
import { importHandlerPath } from './importHandlerPath.js'
const ObjectId = (ObjectIdImport.default ||
ObjectIdImport) as unknown as typeof ObjectIdImport.default
// Helper object type to force being passed by reference
export type RunTaskFunctionState = {
reachedMaxRetries: boolean
@@ -96,6 +101,7 @@ export async function handleTaskFailed({
}
job.log.push({
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
error: errorJSON,
executedAt: executedAt.toISOString(),
@@ -252,6 +258,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
log: [
...job.log,
{
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
error: errorMessage,
executedAt: executedAt.toISOString(),
@@ -350,6 +357,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
job.log = []
}
job.log.push({
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
executedAt: executedAt.toISOString(),
input,

View File

@@ -18,8 +18,21 @@ export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJ
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
for (const key in updatedJob) {
if (key === 'log') {
if (!job.log) {
job.log = []
}
// Add all new log entries to the original job.log object. Do not delete any existing log entries.
// Do not update existing log entries, as existing log entries should be immutable.
for (const logEntry of updatedJob.log) {
if (!job.log.some((entry) => entry.id === logEntry.id)) {
job.log.push(logEntry)
}
}
} else {
job[key] = updatedJob[key]
}
}
if ((updatedJob.error as Record<string, unknown>)?.cancelled) {
const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error

View File

@@ -70,6 +70,7 @@ export const runJob = async ({
await updateJob({
error: errorJSON,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
@@ -82,6 +83,7 @@ export const runJob = async ({
// Workflow has completed
await updateJob({
completedAt: new Date().toISOString(),
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})

View File

@@ -1,28 +0,0 @@
import ObjectIdImport from 'bson-objectid'
import type { BaseJob } from '../config/types/workflowTypes.js'
const ObjectId = (ObjectIdImport.default ||
ObjectIdImport) as unknown as typeof ObjectIdImport.default
/**
* Our payload operations sanitize the input data to, for example, add missing IDs to array rows.
* This function is used to manually sanitize the data for direct db adapter operations
*/
export function sanitizeUpdateData({ data }: { data: Partial<BaseJob> }): Partial<BaseJob> {
if (data.log) {
const sanitizedData = { ...data }
sanitizedData.log = sanitizedData?.log?.map((log) => {
if (log.id) {
return log
}
return {
...log,
id: new ObjectId().toHexString(),
}
})
return sanitizedData
}
return data
}

View File

@@ -4,7 +4,6 @@ import type { PayloadRequest, Sort, Where } from '../../types/index.js'
import type { BaseJob } from '../config/types/workflowTypes.js'
import { jobAfterRead, jobsCollectionSlug } from '../config/index.js'
import { sanitizeUpdateData } from './sanitizeUpdateData.js'
type BaseArgs = {
data: Partial<BaseJob>
@@ -72,17 +71,24 @@ export async function updateJobs({
return result.docs as BaseJob[]
}
const jobReq = {
transactionID:
req.payload.db.name !== 'mongoose'
? ((await req.payload.db.beginTransaction()) as string)
: undefined,
}
const args: UpdateJobsArgs = id
? {
id,
data: sanitizeUpdateData({ data }),
req: disableTransaction === true ? undefined : req,
data,
req: jobReq,
returning,
}
: {
data: sanitizeUpdateData({ data }),
data,
limit,
req: disableTransaction === true ? undefined : req,
req: jobReq,
returning,
sort,
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
@@ -91,6 +97,10 @@ export async function updateJobs({
const updatedJobs: BaseJob[] | null = await req.payload.db.updateJobs(args)
if (req.payload.db.name !== 'mongoose' && jobReq.transactionID) {
await req.payload.db.commitTransaction(jobReq.transactionID)
}
if (returning === false || !updatedJobs?.length) {
return null
}

View File

@@ -86,7 +86,7 @@ export interface Config {
'payload-query-presets': PayloadQueryPresetsSelect<false> | PayloadQueryPresetsSelect<true>;
};
db: {
defaultIDType: string;
defaultIDType: number;
};
globals: {};
globalsSelect: {};
@@ -122,7 +122,7 @@ export interface UserAuthOperations {
* via the `definition` "pages".
*/
export interface Page {
id: string;
id: number;
text?: string | null;
updatedAt: string;
createdAt: string;
@@ -133,7 +133,7 @@ export interface Page {
* via the `definition` "users".
*/
export interface User {
id: string;
id: number;
name?: string | null;
roles?: ('admin' | 'user' | 'anonymous')[] | null;
updatedAt: string;
@@ -152,7 +152,7 @@ export interface User {
* via the `definition` "posts".
*/
export interface Post {
id: string;
id: number;
text?: string | null;
updatedAt: string;
createdAt: string;
@@ -163,24 +163,24 @@ export interface Post {
* via the `definition` "payload-locked-documents".
*/
export interface PayloadLockedDocument {
id: string;
id: number;
document?:
| ({
relationTo: 'pages';
value: string | Page;
value: number | Page;
} | null)
| ({
relationTo: 'users';
value: string | User;
value: number | User;
} | null)
| ({
relationTo: 'posts';
value: string | Post;
value: number | Post;
} | null);
globalSlug?: string | null;
user: {
relationTo: 'users';
value: string | User;
value: number | User;
};
updatedAt: string;
createdAt: string;
@@ -190,10 +190,10 @@ export interface PayloadLockedDocument {
* via the `definition` "payload-preferences".
*/
export interface PayloadPreference {
id: string;
id: number;
user: {
relationTo: 'users';
value: string | User;
value: number | User;
};
key?: string | null;
value?:
@@ -213,7 +213,7 @@ export interface PayloadPreference {
* via the `definition` "payload-migrations".
*/
export interface PayloadMigration {
id: string;
id: number;
name?: string | null;
batch?: number | null;
updatedAt: string;
@@ -224,23 +224,23 @@ export interface PayloadMigration {
* via the `definition` "payload-query-presets".
*/
export interface PayloadQueryPreset {
id: string;
id: number;
title: string;
isShared?: boolean | null;
access?: {
read?: {
constraint?: ('everyone' | 'onlyMe' | 'specificUsers' | 'specificRoles') | null;
users?: (string | User)[] | null;
users?: (number | User)[] | null;
roles?: ('admin' | 'user' | 'anonymous')[] | null;
};
update?: {
constraint?: ('everyone' | 'onlyMe' | 'specificUsers' | 'specificRoles') | null;
users?: (string | User)[] | null;
users?: (number | User)[] | null;
roles?: ('admin' | 'user' | 'anonymous')[] | null;
};
delete?: {
constraint?: ('everyone' | 'onlyMe' | 'specificUsers') | null;
users?: (string | User)[] | null;
users?: (number | User)[] | null;
};
};
where?:

View File

@@ -24,6 +24,7 @@ import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js'
import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js'
import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js'
import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js'
import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
@@ -380,6 +381,7 @@ export default buildConfigWithDefaults({
subTaskWorkflow,
subTaskFailsWorkflow,
longRunningWorkflow,
parallelTaskWorkflow,
],
},
editor: lexicalEditor(),

View File

@@ -1265,4 +1265,40 @@ describe('Queues', () => {
expect(jobAfterRun.log[0].error.message).toBe('custom error message')
expect(jobAfterRun.log[0].state).toBe('failed')
})
it('can reliably run workflows with parallel tasks', async () => {
const amount = 500
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'parallelTask',
input: {},
})
await payload.jobs.run()
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(false)
expect(jobAfterRun.log?.length).toBe(amount)
const simpleDocs = await payload.find({
collection: 'simple',
limit: amount,
depth: 0,
})
expect(simpleDocs.docs.length).toBe(amount)
// Ensure all docs are created (= all tasks are run once)
for (let i = 1; i <= simpleDocs.docs.length; i++) {
const simpleDoc = simpleDocs.docs.find((doc) => doc.title === `parallel task ${i}`)
const logEntry = jobAfterRun?.log?.find((log) => log.taskID === `parallel task ${i}`)
expect(simpleDoc).toBeDefined()
expect(logEntry).toBeDefined()
expect((logEntry?.output as any)?.simpleID).toBe(simpleDoc?.id)
}
})
})

View File

@@ -54,6 +54,7 @@ export type SupportedTimezones =
| 'Asia/Singapore'
| 'Asia/Tokyo'
| 'Asia/Seoul'
| 'Australia/Brisbane'
| 'Australia/Sydney'
| 'Pacific/Guam'
| 'Pacific/Noumea'
@@ -102,6 +103,9 @@ export interface Config {
CreateSimpleRetries0: TaskCreateSimpleRetries0;
CreateSimpleWithDuplicateMessage: TaskCreateSimpleWithDuplicateMessage;
ExternalTask: TaskExternalTask;
ThrowError: TaskThrowError;
ReturnError: TaskReturnError;
ReturnCustomError: TaskReturnCustomError;
inline: {
input: unknown;
output: unknown;
@@ -124,6 +128,7 @@ export interface Config {
subTask: WorkflowSubTask;
subTaskFails: WorkflowSubTaskFails;
longRunning: WorkflowLongRunning;
parallelTask: WorkflowParallelTask;
};
};
}
@@ -259,7 +264,10 @@ export interface PayloadJob {
| 'CreateSimpleRetriesUndefined'
| 'CreateSimpleRetries0'
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask';
| 'ExternalTask'
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError';
taskID: string;
input?:
| {
@@ -310,6 +318,7 @@ export interface PayloadJob {
| 'subTask'
| 'subTaskFails'
| 'longRunning'
| 'parallelTask'
)
| null;
taskSlug?:
@@ -322,6 +331,9 @@ export interface PayloadJob {
| 'CreateSimpleRetries0'
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask'
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError'
)
| null;
queue?: string | null;
@@ -583,6 +595,32 @@ export interface TaskExternalTask {
simpleID: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskThrowError".
*/
export interface TaskThrowError {
input?: unknown;
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskReturnError".
*/
export interface TaskReturnError {
input?: unknown;
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskReturnCustomError".
*/
export interface TaskReturnCustomError {
input: {
errorMessage: string;
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "MyUpdatePostWorkflowType".
@@ -727,6 +765,13 @@ export interface WorkflowSubTaskFails {
export interface WorkflowLongRunning {
input?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowParallelTask".
*/
export interface WorkflowParallelTask {
input?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "auth".

View File

@@ -0,0 +1,29 @@
import type { WorkflowConfig } from 'payload'
export const parallelTaskWorkflow: WorkflowConfig<'parallelTask'> = {
slug: 'parallelTask',
inputSchema: [],
handler: async ({ job, inlineTask }) => {
const taskIDs = Array.from({ length: 500 }, (_, i) => i + 1).map((i) => i.toString())
await Promise.all(
taskIDs.map(async (taskID) => {
return await inlineTask(`parallel task ${taskID}`, {
task: async ({ req }) => {
const newSimple = await req.payload.db.create({
collection: 'simple',
data: {
title: 'parallel task ' + taskID,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
})
}),
)
},
}