Compare commits

...

2 Commits

Author SHA1 Message Date
Alessio Gravili
16170b84f1 remove diff 2025-03-25 12:44:52 -06:00
Alessio Gravili
300cb0d7e7 perf(drizzle): 35x faster initial update when running jobs 2025-03-25 12:32:03 -06:00

View File

@@ -1,10 +1,15 @@
import type { UpdateJobs, Where } from 'payload'
import type { LibSQLDatabase } from 'drizzle-orm/libsql'
import type { BaseJob, UpdateJobs, Where } from 'payload'
import { inArray } from 'drizzle-orm'
import toSnakeCase from 'to-snake-case'
import type { DrizzleAdapter } from './types.js'
import type { ChainedMethods, DrizzleAdapter } from './types.js'
import { chainMethods } from './find/chainMethods.js'
import { findMany } from './find/findMany.js'
import buildQuery from './queries/buildQuery.js'
import { transform } from './transform/read/index.js'
import { upsertRow } from './upsertRow/index.js'
import { getTransaction } from './utilities/getTransaction.js'
@@ -20,6 +25,128 @@ export const updateJobs: UpdateJobs = async function updateMany(
const tableName = this.tableNameMap.get(toSnakeCase(collection.slug))
const sort = sortArg !== undefined && sortArg !== null ? sortArg : collection.defaultSort
const dataKeys = Object.keys(data)
// The initial update is when all jobs are being updated to processing and fetched
const isInitialUpdate = dataKeys.length === 1 && dataKeys[0] === 'processing'
if (isInitialUpdate) {
// Performance optimization for the initial update - this needs to happen as quickly as possible
const _db = db as LibSQLDatabase
const rowToInsert: {
id?: number | string
processing: boolean
} = data as { processing: boolean }
const { orderBy, where } = buildQuery({
adapter: this,
fields: collection.flattenedFields,
sort,
tableName,
where: whereToUse,
})
const table = this.tables[tableName]
const jobsLogTable = this.tables['payload_jobs_log']
let idsToUpdate: (number | string)[] = []
let docsToUpdate: BaseJob[] = []
// Fetch all jobs that should be updated. This can't be done in the update query, as
// 1) we need to join the logs table to get the logs for each job
// 2) postgres doesn't support limit on update queries
const jobsQuery = _db
.select({
id: table.id,
})
.from(table)
.where(where)
const chainedMethods: ChainedMethods = []
if (typeof limit === 'number' && limit > 0) {
chainedMethods.push({
args: [limit],
method: 'limit',
})
}
if (orderBy) {
chainedMethods.push({
args: [() => orderBy.map(({ column, order }) => order(column))],
method: 'orderBy',
})
}
docsToUpdate = (await chainMethods({
methods: chainedMethods,
query: jobsQuery,
})) as BaseJob[]
idsToUpdate = docsToUpdate?.map((job) => job.id)
// Now fetch all log entries for these jobs
if (idsToUpdate.length) {
const logsQuery = _db
.select({
id: jobsLogTable.id,
completedAt: jobsLogTable.completedAt,
error: jobsLogTable.error,
executedAt: jobsLogTable.executedAt,
input: jobsLogTable.input,
output: jobsLogTable.output,
parentID: jobsLogTable._parentID,
state: jobsLogTable.state,
taskID: jobsLogTable.taskID,
taskSlug: jobsLogTable.taskSlug,
})
.from(jobsLogTable)
.where(inArray(jobsLogTable._parentID, idsToUpdate))
const logs = await logsQuery
// Group logs by parent ID
const logsByParentId = logs.reduce(
(acc, log) => {
const parentId = log.parentID
if (!acc[parentId]) {
acc[parentId] = []
}
acc[parentId].push(log)
return acc
},
{} as Record<number | string, any[]>,
)
// Attach logs to their respective jobs
for (const job of docsToUpdate) {
job.log = logsByParentId[job.id] || []
}
}
// Perform the actual update
const query = _db
.update(table)
.set(rowToInsert)
.where(inArray(table.id, idsToUpdate))
.returning()
const updatedJobs = (await query) as BaseJob[]
return updatedJobs.map((row) => {
// Attach logs to the updated job
row.log = docsToUpdate.find((job) => job.id === row.id)?.log || []
return transform<BaseJob>({
adapter: this,
config: this.payload.config,
data: row,
fields: collection.flattenedFields,
joinQuery: false,
})
})
}
const jobs = await findMany({
adapter: this,
collectionSlug: 'payload-jobs',
@@ -35,7 +162,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
return []
}
const results = []
const results: BaseJob[] = []
// TODO: We need to batch this to reduce the amount of db calls. This can get very slow if we are updating a lot of rows.
for (const job of jobs.docs) {
@@ -44,7 +171,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
...data,
}
const result = await upsertRow({
const result = await upsertRow<BaseJob>({
id: job.id,
adapter: this,
data: updateData,