perf: 23% faster job queue system on postgres/sqlite (#13187)

Previously, a single run of the simplest job queue workflow (1 single
task, no db calls by user code in the task - we're just testing db
system overhead) would result in **22 db roundtrips** on drizzle. This
PR reduces it to **17 db roundtrips** by doing the following:

- Modifies db.updateJobs to use the new optimized upsertRow function if
the update is simple
- Do not unnecessarily pass the job log to the final job update when the
workflow completes => allows using the optimized upsertRow function, as
only the main table is involved

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210888186878606
This commit is contained in:
Alessio Gravili
2025-07-30 06:23:43 -07:00
committed by GitHub
parent 227a20e94b
commit 3114b89d4c
7 changed files with 164 additions and 76 deletions

View File

@@ -6,6 +6,7 @@ import type { DrizzleAdapter } from './types.js'
import { findMany } from './find/findMany.js'
import { upsertRow } from './upsertRow/index.js'
import { shouldUseOptimizedUpsertRow } from './upsertRow/shouldUseOptimizedUpsertRow.js'
import { getTransaction } from './utilities/getTransaction.js'
export const updateJobs: UpdateJobs = async function updateMany(
@@ -23,6 +24,27 @@ export const updateJobs: UpdateJobs = async function updateMany(
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))
const sort = sortArg !== undefined && sortArg !== null ? sortArg : collection.defaultSort
const useOptimizedUpsertRow = shouldUseOptimizedUpsertRow({
data,
fields: collection.flattenedFields,
})
if (useOptimizedUpsertRow && id) {
const result = await upsertRow({
id,
adapter: this,
data,
db,
fields: collection.flattenedFields,
ignoreResult: returning === false,
operation: 'update',
req,
tableName,
})
return returning === false ? null : [result]
}
const jobs = await findMany({
adapter: this,
collectionSlug: 'payload-jobs',
@@ -42,7 +64,9 @@ export const updateJobs: UpdateJobs = async function updateMany(
// TODO: We need to batch this to reduce the amount of db calls. This can get very slow if we are updating a lot of rows.
for (const job of jobs.docs) {
const updateData = {
const updateData = useOptimizedUpsertRow
? data
: {
...job,
...data,
}

View File

@@ -89,9 +89,11 @@ export const runJob = async ({
}
// Workflow has completed successfully
// Do not update the job log here, as that would result in unnecessary db calls when using postgres.
// Solely updating simple fields here will result in optimized db calls.
// Job log modifications are already updated at the end of the runTask function.
await updateJob({
completedAt: getCurrentDate().toISOString(),
log: job.log,
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})

View File

@@ -0,0 +1,19 @@
/* eslint-disable no-restricted-exports */
import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js'
import { getConfig } from './getConfig.js'
const config = getConfig()
import { postgresAdapter } from '@payloadcms/db-postgres'
export const databaseAdapter = postgresAdapter({
pool: {
connectionString: process.env.POSTGRES_URL || 'postgres://127.0.0.1:5432/payloadtests',
},
logger: true,
})
export default buildConfigWithDefaults({
...config,
db: databaseAdapter,
})

View File

@@ -10,6 +10,7 @@ import { CreateSimpleRetries0Task } from './tasks/CreateSimpleRetries0Task.js'
import { CreateSimpleRetriesUndefinedTask } from './tasks/CreateSimpleRetriesUndefinedTask.js'
import { CreateSimpleTask } from './tasks/CreateSimpleTask.js'
import { CreateSimpleWithDuplicateMessageTask } from './tasks/CreateSimpleWithDuplicateMessageTask.js'
import { DoNothingTask } from './tasks/DoNothingTask.js'
import { ExternalTask } from './tasks/ExternalTask.js'
import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js'
import { ReturnErrorTask } from './tasks/ReturnErrorTask.js'
@@ -141,6 +142,7 @@ export const getConfig: () => Partial<Config> = () => ({
ThrowErrorTask,
ReturnErrorTask,
ReturnCustomErrorTask,
DoNothingTask,
],
workflows: [
updatePostWorkflow,

View File

@@ -88,20 +88,14 @@ export interface Config {
db: {
defaultIDType: string;
};
globals: {
'payload-jobs-stats': PayloadJobsStat;
};
globalsSelect: {
'payload-jobs-stats': PayloadJobsStatsSelect<false> | PayloadJobsStatsSelect<true>;
};
globals: {};
globalsSelect: {};
locale: null;
user: User & {
collection: 'users';
};
jobs: {
tasks: {
EverySecond: TaskEverySecond;
EverySecondMax2: TaskEverySecondMax2;
UpdatePost: MyUpdatePostType;
UpdatePostStep2: TaskUpdatePostStep2;
CreateSimple: TaskCreateSimple;
@@ -112,6 +106,7 @@ export interface Config {
ThrowError: TaskThrowError;
ReturnError: TaskReturnError;
ReturnCustomError: TaskReturnCustomError;
DoNothingTask: TaskDoNothingTask;
inline: {
input: unknown;
output: unknown;
@@ -210,6 +205,13 @@ export interface User {
hash?: string | null;
loginAttempts?: number | null;
lockUntil?: string | null;
sessions?:
| {
id: string;
createdAt?: string | null;
expiresAt: string;
}[]
| null;
password?: string | null;
}
/**
@@ -266,8 +268,6 @@ export interface PayloadJob {
completedAt: string;
taskSlug:
| 'inline'
| 'EverySecond'
| 'EverySecondMax2'
| 'UpdatePost'
| 'UpdatePostStep2'
| 'CreateSimple'
@@ -277,7 +277,8 @@ export interface PayloadJob {
| 'ExternalTask'
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError';
| 'ReturnCustomError'
| 'DoNothingTask';
taskID: string;
input?:
| {
@@ -336,8 +337,6 @@ export interface PayloadJob {
taskSlug?:
| (
| 'inline'
| 'EverySecond'
| 'EverySecondMax2'
| 'UpdatePost'
| 'UpdatePostStep2'
| 'CreateSimple'
@@ -348,20 +347,12 @@ export interface PayloadJob {
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError'
| 'DoNothingTask'
)
| null;
queue?: string | null;
waitUntil?: string | null;
processing?: boolean | null;
meta?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
updatedAt: string;
createdAt: string;
}
@@ -465,6 +456,13 @@ export interface UsersSelect<T extends boolean = true> {
hash?: T;
loginAttempts?: T;
lockUntil?: T;
sessions?:
| T
| {
id?: T;
createdAt?: T;
expiresAt?: T;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
@@ -495,7 +493,6 @@ export interface PayloadJobsSelect<T extends boolean = true> {
queue?: T;
waitUntil?: T;
processing?: T;
meta?: T;
updatedAt?: T;
createdAt?: T;
}
@@ -531,54 +528,6 @@ export interface PayloadMigrationsSelect<T extends boolean = true> {
updatedAt?: T;
createdAt?: T;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-jobs-stats".
*/
export interface PayloadJobsStat {
id: string;
stats?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
updatedAt?: string | null;
createdAt?: string | null;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-jobs-stats_select".
*/
export interface PayloadJobsStatsSelect<T extends boolean = true> {
stats?: T;
updatedAt?: T;
createdAt?: T;
globalType?: T;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskEverySecond".
*/
export interface TaskEverySecond {
input: {
message: string;
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskEverySecondMax2".
*/
export interface TaskEverySecondMax2 {
input: {
message: string;
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "MyUpdatePostType".
@@ -693,6 +642,16 @@ export interface TaskReturnCustomError {
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskDoNothingTask".
*/
export interface TaskDoNothingTask {
input: {
message: string;
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "MyUpdatePostWorkflowType".

View File

@@ -0,0 +1,59 @@
import type { Payload } from 'payload'
/* eslint-disable jest/require-top-level-describe */
import assert from 'assert'
import path from 'path'
import { fileURLToPath } from 'url'
import { initPayloadInt } from '../helpers/initPayloadInt.js'
import { withoutAutoRun } from './utilities.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
const describePostgres = process.env.PAYLOAD_DATABASE?.startsWith('postgres')
? describe
: describe.skip
let payload: Payload
describePostgres('queues - postgres logs', () => {
beforeAll(async () => {
const initialized = await initPayloadInt(
dirname,
undefined,
undefined,
'config.postgreslogs.ts',
)
assert(initialized.payload)
assert(initialized.restClient)
;({ payload } = initialized)
})
afterAll(async () => {
await payload.destroy()
})
it('ensure running jobs uses minimal db calls', async () => {
await withoutAutoRun(async () => {
await payload.jobs.queue({
task: 'DoNothingTask',
input: {
message: 'test',
},
})
// Count every console log (= db call)
const consoleCount = jest.spyOn(console, 'log').mockImplementation(() => {})
const res = await payload.jobs.run({})
expect(res).toEqual({
jobStatus: { '1': { status: 'success' } },
remainingJobsFromQueried: 0,
})
expect(consoleCount).toHaveBeenCalledTimes(17) // Should be 17 sql calls if the optimizations are used. If not, this would be 22 calls
consoleCount.mockRestore()
})
})
})

View File

@@ -0,0 +1,23 @@
/* eslint-disable @typescript-eslint/require-await */
import type { TaskConfig } from 'payload'
export const DoNothingTask: TaskConfig<'DoNothingTask'> = {
retries: 2,
slug: 'DoNothingTask',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
outputSchema: [],
handler: async ({ input }) => {
return {
state: 'succeeded',
output: {
message: input.message,
},
}
},
}