perf: reduce job queue db calls (#11846)

Continuation of #11489. This adds a new, optional `updateJobs` db
adapter method that reduces the amount of database calls for the jobs
queue.

## MongoDB

### Previous: running a set of 50 queued jobs
- 1x db.find (= 1x `Model.paginate`)
- 50x db.updateOne (= 50x `Model.findOneAndUpdate`)

### Now: running a set of 50 queued jobs
- 1x db.updateJobs (= 1x `Model.find` and 1x `Model.updateMany`)

**=> 51 db round trips before, 2 db round trips after**


### Previous: upon task completion
- 1x db.find (= 1x `Model.paginate`)
- 1x db.updateOne (= 1x `Model.findOneAndUpdate`)

### Now: upon task completion
- 1x db.updateJobs (= 1x `Model.findOneAndUpdate`)


**=> 2 db round trips before, 1 db round trip after**


## Drizzle (e.g. Postgres)

### running a set of 50 queued jobs
 - 1x db.query[tablename].findMany
 - 50x db.select 
 - 50x upsertRow
 
This is unaffected by this PR and will be addressed in a future PR
This commit is contained in:
Alessio Gravili
2025-03-25 12:09:52 -06:00
committed by GitHub
parent 74f935bfb9
commit a5c3aa0e4f
12 changed files with 279 additions and 47 deletions

View File

@@ -17,7 +17,6 @@ import type {
TypeWithVersion,
UpdateGlobalArgs,
UpdateGlobalVersionArgs,
UpdateManyArgs,
UpdateOneArgs,
UpdateVersionArgs,
} from 'payload'
@@ -55,6 +54,7 @@ import { commitTransaction } from './transactions/commitTransaction.js'
import { rollbackTransaction } from './transactions/rollbackTransaction.js'
import { updateGlobal } from './updateGlobal.js'
import { updateGlobalVersion } from './updateGlobalVersion.js'
import { updateJobs } from './updateJobs.js'
import { updateMany } from './updateMany.js'
import { updateOne } from './updateOne.js'
import { updateVersion } from './updateVersion.js'
@@ -227,6 +227,7 @@ export function mongooseAdapter({
mongoMemoryServer,
sessions: {},
transactionOptions: transactionOptions === false ? undefined : transactionOptions,
updateJobs,
updateMany,
url,
versions: {},

View File

@@ -0,0 +1,83 @@
import type { MongooseUpdateQueryOptions } from 'mongoose'
import type { BaseJob, UpdateJobs, Where } from 'payload'
import type { MongooseAdapter } from './index.js'
import { buildQuery } from './queries/buildQuery.js'
import { getCollection } from './utilities/getEntity.js'
import { getSession } from './utilities/getSession.js'
import { handleError } from './utilities/handleError.js'
import { transform } from './utilities/transform.js'
export const updateJobs: UpdateJobs = async function updateMany(
this: MongooseAdapter,
{ id, data, limit, req, returning, where: whereArg },
) {
const where = id ? { id: { equals: id } } : (whereArg as Where)
const { collectionConfig, Model } = getCollection({
adapter: this,
collectionSlug: 'payload-jobs',
})
const options: MongooseUpdateQueryOptions = {
lean: true,
new: true,
session: await getSession(this, req),
}
let query = await buildQuery({
adapter: this,
collectionSlug: collectionConfig.slug,
fields: collectionConfig.flattenedFields,
where,
})
transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'write' })
let result: BaseJob[] = []
try {
if (id) {
if (returning === false) {
await Model.updateOne(query, data, options)
return null
} else {
const doc = await Model.findOneAndUpdate(query, data, options)
result = doc ? [doc] : []
}
} else {
if (typeof limit === 'number' && limit > 0) {
const documentsToUpdate = await Model.find(
query,
{},
{ ...options, limit, projection: { _id: 1 } },
)
if (documentsToUpdate.length === 0) {
return null
}
query = { _id: { $in: documentsToUpdate.map((doc) => doc._id) } }
}
await Model.updateMany(query, data, options)
if (returning === false) {
return null
}
result = await Model.find(query, {}, options)
}
} catch (error) {
handleError({ collection: collectionConfig.slug, error, req })
}
transform({
adapter: this,
data: result,
fields: collectionConfig.fields,
operation: 'read',
})
return result
}

View File

@@ -33,6 +33,7 @@ import {
rollbackTransaction,
updateGlobal,
updateGlobalVersion,
updateJobs,
updateMany,
updateOne,
updateVersion,
@@ -172,6 +173,7 @@ export function postgresAdapter(args: Args): DatabaseAdapterObj<PostgresAdapter>
find,
findGlobal,
findGlobalVersions,
updateJobs,
// @ts-expect-error - vestiges of when tsconfig was not strict. Feel free to improve
findOne,
findVersions,

View File

@@ -34,6 +34,7 @@ import {
rollbackTransaction,
updateGlobal,
updateGlobalVersion,
updateJobs,
updateMany,
updateOne,
updateVersion,
@@ -127,6 +128,7 @@ export function sqliteAdapter(args: Args): DatabaseAdapterObj<SQLiteAdapter> {
tables: {},
// @ts-expect-error - vestiges of when tsconfig was not strict. Feel free to improve
transactionOptions: args.transactionOptions || undefined,
updateJobs,
updateMany,
versionsSuffix: args.versionsSuffix || '_v',

View File

@@ -34,6 +34,7 @@ import {
rollbackTransaction,
updateGlobal,
updateGlobalVersion,
updateJobs,
updateMany,
updateOne,
updateVersion,
@@ -138,6 +139,7 @@ export function vercelPostgresAdapter(args: Args = {}): DatabaseAdapterObj<Verce
tables: {},
tablesFilter: args.tablesFilter,
transactionOptions: args.transactionOptions || undefined,
updateJobs,
versionsSuffix: args.versionsSuffix || '_v',
// DatabaseAdapter

View File

@@ -33,6 +33,7 @@ export { commitTransaction } from './transactions/commitTransaction.js'
export { rollbackTransaction } from './transactions/rollbackTransaction.js'
export { updateGlobal } from './updateGlobal.js'
export { updateGlobalVersion } from './updateGlobalVersion.js'
export { updateJobs } from './updateJobs.js'
export { updateMany } from './updateMany.js'
export { updateOne } from './updateOne.js'
export { updateVersion } from './updateVersion.js'

View File

@@ -0,0 +1,66 @@
import type { UpdateJobs, Where } from 'payload'
import toSnakeCase from 'to-snake-case'
import type { DrizzleAdapter } from './types.js'
import { findMany } from './find/findMany.js'
import { upsertRow } from './upsertRow/index.js'
import { getTransaction } from './utilities/getTransaction.js'
export const updateJobs: UpdateJobs = async function updateMany(
this: DrizzleAdapter,
{ id, data, limit: limitArg, req, returning, sort: sortArg, where: whereArg },
) {
const whereToUse: Where = id ? { id: { equals: id } } : whereArg
const limit = id ? 1 : limitArg
const db = await getTransaction(this, req)
const collection = this.payload.collections['payload-jobs'].config
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))
const sort = sortArg !== undefined && sortArg !== null ? sortArg : collection.defaultSort
const jobs = await findMany({
adapter: this,
collectionSlug: 'payload-jobs',
fields: collection.flattenedFields,
limit: id ? 1 : limit,
pagination: false,
req,
sort,
tableName,
where: whereToUse,
})
if (!jobs.docs.length) {
return []
}
const results = []
// TODO: We need to batch this to reduce the amount of db calls. This can get very slow if we are updating a lot of rows.
for (const job of jobs.docs) {
const updateData = {
...job,
...data,
}
const result = await upsertRow({
id: job.id,
adapter: this,
data: updateData,
db,
fields: collection.flattenedFields,
ignoreResult: returning === false,
operation: 'update',
req,
tableName,
})
results.push(result)
}
if (returning === false) {
return null
}
return results
}

View File

@@ -8,6 +8,7 @@ import type {
RollbackTransaction,
} from './types.js'
import { defaultUpdateJobs } from './defaultUpdateJobs.js'
import { createMigration } from './migrations/createMigration.js'
import { migrate } from './migrations/migrate.js'
import { migrateDown } from './migrations/migrateDown.js'
@@ -31,6 +32,7 @@ export function createDatabaseAdapter<T extends BaseDatabaseAdapter>(
| 'migrateReset'
| 'migrateStatus'
| 'migrationDir'
| 'updateJobs'
>,
): T {
return {
@@ -45,6 +47,7 @@ export function createDatabaseAdapter<T extends BaseDatabaseAdapter>(
migrateReset,
migrateStatus,
rollbackTransaction,
updateJobs: defaultUpdateJobs,
...args,
// Ensure migrationDir is set

View File

@@ -0,0 +1,53 @@
import type { BaseJob, DatabaseAdapter } from '../index.js'
import type { UpdateJobs } from './types.js'
import { jobsCollectionSlug } from '../queues/config/index.js'
import { sanitizeUpdateData } from '../queues/utilities/sanitizeUpdateData.js'
export const defaultUpdateJobs: UpdateJobs = async function updateMany(
this: DatabaseAdapter,
{ id, data, limit, req, returning, where },
) {
const updatedJobs: BaseJob[] | null = []
const jobsToUpdate: BaseJob[] = (
id
? [
await this.findOne({
collection: jobsCollectionSlug,
req,
where: { id: { equals: id } },
}),
]
: (
await this.find({
collection: jobsCollectionSlug,
limit,
pagination: false,
req,
where,
})
).docs
).filter(Boolean) as BaseJob[]
if (!jobsToUpdate) {
return null
}
for (const job of jobsToUpdate) {
const updateData = {
...job,
...data,
}
const updatedJob = await this.updateOne({
id: job.id,
collection: jobsCollectionSlug,
data: sanitizeUpdateData({ data: updateData }),
req,
returning,
})
updatedJobs.push(updatedJob)
}
return updatedJobs
}

View File

@@ -1,5 +1,5 @@
import type { TypeWithID } from '../collections/config/types.js'
import type { CollectionSlug, GlobalSlug } from '../index.js'
import type { BaseJob, CollectionSlug, GlobalSlug } from '../index.js'
import type {
Document,
JoinQuery,
@@ -147,6 +147,8 @@ export interface BaseDatabaseAdapter {
updateGlobalVersion: UpdateGlobalVersion
updateJobs: UpdateJobs
updateMany: UpdateMany
updateOne: UpdateOne
@@ -540,6 +542,32 @@ export type UpdateManyArgs = {
export type UpdateMany = (args: UpdateManyArgs) => Promise<Document[] | null>
export type UpdateJobsArgs = {
data: Record<string, unknown>
req?: Partial<PayloadRequest>
/**
* If true, returns the updated documents
*
* @default true
*/
returning?: boolean
} & (
| {
id: number | string
limit?: never
sort?: never
where?: never
}
| {
id?: never
limit?: number
sort?: Sort
where: Where
}
)
export type UpdateJobs = (args: UpdateJobsArgs) => Promise<BaseJob[] | null>
export type UpsertArgs = {
collection: CollectionSlug
data: Record<string, unknown>

View File

@@ -1160,6 +1160,8 @@ export type {
UpdateGlobalArgs,
UpdateGlobalVersion,
UpdateGlobalVersionArgs,
UpdateJobs,
UpdateJobsArgs,
UpdateMany,
UpdateManyArgs,
UpdateOne,

View File

@@ -1,5 +1,6 @@
import type { ManyOptions } from '../../collections/operations/local/update.js'
import type { PayloadRequest, Where } from '../../types/index.js'
import type { UpdateJobsArgs } from '../../database/types.js'
import type { PayloadRequest, Sort, Where } from '../../types/index.js'
import type { BaseJob } from '../config/types/workflowTypes.js'
import { jobAfterRead, jobsCollectionSlug } from '../config/index.js'
@@ -17,32 +18,29 @@ type BaseArgs = {
type ArgsByID = {
id: number | string
limit?: never
sort?: never
where?: never
}
type ArgsWhere = {
id?: never
limit?: number
sort?: Sort
where: Where
}
type RunJobsArgs = (ArgsByID | ArgsWhere) & BaseArgs
/**
* Convenience method for updateJobs by id
*/
export async function updateJob(args: ArgsByID & BaseArgs) {
const result = await updateJobs({
...args,
id: undefined,
limit: 1,
where: { id: { equals: args.id } },
})
const result = await updateJobs(args)
if (result) {
return result[0]
}
}
type ArgsWhere = {
id?: never | undefined
limit?: number
where: Where
}
type RunJobsArgs = (ArgsByID | ArgsWhere) & BaseArgs
export async function updateJobs({
id,
data,
@@ -51,9 +49,12 @@ export async function updateJobs({
limit: limitArg,
req,
returning,
where,
sort,
where: whereArg,
}: RunJobsArgs): Promise<BaseJob[] | null> {
const limit = id ? 1 : limitArg
const where = id ? { id: { equals: id } } : whereArg
if (depth || req.payload.config?.jobs?.runHooks) {
const result = await req.payload.update({
id,
@@ -71,36 +72,24 @@ export async function updateJobs({
return result.docs as BaseJob[]
}
const updatedJobs = []
const args: UpdateJobsArgs = id
? {
id,
data: sanitizeUpdateData({ data }),
req: disableTransaction === true ? undefined : req,
returning,
}
: {
data: sanitizeUpdateData({ data }),
limit,
req: disableTransaction === true ? undefined : req,
returning,
sort,
// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
where: where as Where,
}
// TODO: this can be optimized in the future - partial updates are supported in mongodb. In postgres,
// we can support this by manually constructing the sql query. We should use req.payload.db.updateMany instead
// of req.payload.db.updateOne once this is supported
const jobsToUpdate = await req.payload.db.find({
collection: jobsCollectionSlug,
limit,
pagination: false,
req: disableTransaction === true ? undefined : req,
where,
})
if (!jobsToUpdate?.docs) {
return null
}
for (const job of jobsToUpdate.docs) {
const updateData = {
...job,
...data,
}
const updatedJob = await req.payload.db.updateOne({
id: job.id,
collection: jobsCollectionSlug,
data: sanitizeUpdateData({ data: updateData }),
req: disableTransaction === true ? undefined : req,
returning,
})
updatedJobs.push(updatedJob)
}
const updatedJobs: BaseJob[] | null = await req.payload.db.updateJobs(args)
if (returning === false || !updatedJobs?.length) {
return null