accd95ec8a7fb08ad4246fc3403f3fef7d1ac34d
3 Commits
| Author | SHA1 | Message | Date | |
|---|---|---|---|---|
|
|
e0ffada80b |
feat: support parallel job queue tasks, speed up task running (#13614)
Currently, attempting to run tasks in parallel will result in DB errors. ## Solution The problem was caused due to inefficient db update calls. After each task completes, we need to update the log array in the payload-jobs collection. On postgres, that's a different table. Currently, the update works the following way: 1. Nuke the table 2. Re-insert every single row, including the new one This will throw db errors if multiple processes start doing that. Additionally, due to conflicts, new log rows may be lost. This PR makes use of the the [new db $push operation ](https://github.com/payloadcms/payload/pull/13453) we recently added to atomically push a new log row to the database in a single round-trip. This not only reduces the amount of db round trips (=> faster job queue system) but allows multiple tasks to perform this db operation in parallel, without conflicts. ## Problem **Example:** ```ts export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = { slug: 'fastParallelTask', handler: async ({nlineTask }) => { const taskFunctions = [] for (let i = 0; i < 20; i++) { const idx = i + 1 taskFunctions.push(async () => { return await inlineTask(`parallel task ${idx}`, { input: { test: idx, }, task: () => { return { output: { taskID: idx.toString(), }, } }, }) }) } await Promise.all(taskFunctions.map((f) => f())) }, } ``` On SQLite, this would throw the following error: ```bash Caught error Error: UNIQUE constraint failed: payload_jobs_log.id at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:335:20) at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:360:16) at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34) at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16) at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58 at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18) at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21) at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27) at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26) at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40) at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) { rawCode: 1555, code: 'SQLITE_CONSTRAINT_PRIMARYKEY', libsqlError: true } ``` --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1211001438499053 |
||
|
|
3114b89d4c |
perf: 23% faster job queue system on postgres/sqlite (#13187)
Previously, a single run of the simplest job queue workflow (1 single task, no db calls by user code in the task - we're just testing db system overhead) would result in **22 db roundtrips** on drizzle. This PR reduces it to **17 db roundtrips** by doing the following: - Modifies db.updateJobs to use the new optimized upsertRow function if the update is simple - Do not unnecessarily pass the job log to the final job update when the workflow completes => allows using the optimized upsertRow function, as only the main table is involved --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1210888186878606 |
||
|
|
c08b2aea89 |
feat: scheduling jobs (#12863)
Adds a new `schedule` property to workflow and task configs that can be used to have Payload automatically _queue_ jobs following a certain _schedule_. Docs: https://payloadcms.com/docs/dynamic/jobs-queue/schedules?branch=feat/schedule-jobs ## API Example ```ts export default buildConfig({ // ... jobs: { // ... scheduler: 'manual', // Or `cron` if you're not using serverless. If `manual` is used, then user needs to set up running /api/payload-jobs/handleSchedules or payload.jobs.handleSchedules in regular intervals tasks: [ { schedule: [ { cron: '* * * * * *', queue: 'autorunSecond', // Hooks are optional hooks: { // Not an array, as providing and calling `defaultBeforeSchedule` would be more error-prone if this was an array beforeSchedule: async (args) => { // Handles verifying that there are no jobs already scheduled or processing. // You can override this behavior by not calling defaultBeforeSchedule, e.g. if you wanted // to allow a maximum of 3 scheduled jobs in the queue instead of 1, or add any additional conditions const result = await args.defaultBeforeSchedule(args) return { ...result, input: { message: 'This task runs every second', }, } }, afterSchedule: async (args) => { await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global args.req.payload.logger.info( 'EverySecond task scheduled: ' + (args.status === 'success' ? args.job.id : 'skipped or failed to schedule'), ) }, }, }, ], slug: 'EverySecond', inputSchema: [ { name: 'message', type: 'text', required: true, }, ], handler: ({ input, req }) => { req.payload.logger.info(input.message) return { output: {}, } }, } ] } }) ``` --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1210495300843759 |