Currently, attempting to run tasks in parallel will result in DB errors.
## Solution
The problem was caused due to inefficient db update calls. After each
task completes, we need to update the log array in the payload-jobs
collection. On postgres, that's a different table.
Currently, the update works the following way:
1. Nuke the table
2. Re-insert every single row, including the new one
This will throw db errors if multiple processes start doing that.
Additionally, due to conflicts, new log rows may be lost.
This PR makes use of the the [new db $push operation
](https://github.com/payloadcms/payload/pull/13453) we recently added to
atomically push a new log row to the database in a single round-trip.
This not only reduces the amount of db round trips (=> faster job queue
system) but allows multiple tasks to perform this db operation in
parallel, without conflicts.
## Problem
**Example:**
```ts
export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = {
slug: 'fastParallelTask',
handler: async ({nlineTask }) => {
const taskFunctions = []
for (let i = 0; i < 20; i++) {
const idx = i + 1
taskFunctions.push(async () => {
return await inlineTask(`parallel task ${idx}`, {
input: {
test: idx,
},
task: () => {
return {
output: {
taskID: idx.toString(),
},
}
},
})
})
}
await Promise.all(taskFunctions.map((f) => f()))
},
}
```
On SQLite, this would throw the following error:
```bash
Caught error Error: UNIQUE constraint failed: payload_jobs_log.id
at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:335:20)
at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:360:16)
at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34)
at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16)
at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58
at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18)
at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21)
at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27)
at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26)
at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40)
at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) {
rawCode: 1555,
code: 'SQLITE_CONSTRAINT_PRIMARYKEY',
libsqlError: true
}
```
---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
- https://app.asana.com/0/0/1211001438499053
Previously, a single run of the simplest job queue workflow (1 single
task, no db calls by user code in the task - we're just testing db
system overhead) would result in **22 db roundtrips** on drizzle. This
PR reduces it to **17 db roundtrips** by doing the following:
- Modifies db.updateJobs to use the new optimized upsertRow function if
the update is simple
- Do not unnecessarily pass the job log to the final job update when the
workflow completes => allows using the optimized upsertRow function, as
only the main table is involved
---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
- https://app.asana.com/0/0/1210888186878606
Adds a new `schedule` property to workflow and task configs that can be
used to have Payload automatically _queue_ jobs following a certain
_schedule_.
Docs:
https://payloadcms.com/docs/dynamic/jobs-queue/schedules?branch=feat/schedule-jobs
## API Example
```ts
export default buildConfig({
// ...
jobs: {
// ...
scheduler: 'manual', // Or `cron` if you're not using serverless. If `manual` is used, then user needs to set up running /api/payload-jobs/handleSchedules or payload.jobs.handleSchedules in regular intervals
tasks: [
{
schedule: [
{
cron: '* * * * * *',
queue: 'autorunSecond',
// Hooks are optional
hooks: {
// Not an array, as providing and calling `defaultBeforeSchedule` would be more error-prone if this was an array
beforeSchedule: async (args) => {
// Handles verifying that there are no jobs already scheduled or processing.
// You can override this behavior by not calling defaultBeforeSchedule, e.g. if you wanted
// to allow a maximum of 3 scheduled jobs in the queue instead of 1, or add any additional conditions
const result = await args.defaultBeforeSchedule(args)
return {
...result,
input: {
message: 'This task runs every second',
},
}
},
afterSchedule: async (args) => {
await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global
args.req.payload.logger.info(
'EverySecond task scheduled: ' +
(args.status === 'success' ? args.job.id : 'skipped or failed to schedule'),
)
},
},
},
],
slug: 'EverySecond',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: ({ input, req }) => {
req.payload.logger.info(input.message)
return {
output: {},
}
},
}
]
}
})
```
---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
- https://app.asana.com/0/0/1210495300843759
This simplifies workflow / task error handling, as well as cancelling
jobs. Previously, we were handling errors when they occur and passing
through error state using a `state` object - errors were then handled in
multiple areas of the code.
This PR adds new, clean `TaskError`, `WorkflowError` and
`JobCancelledError` errors that are thrown when they occur and are
handled **in one single place**, massively cleaning up complex functions
like
[payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts](https://github.com/payloadcms/payload/compare/refactor/jobs-errors?expand=1#diff-53dc7ccb7c8e023c9ba63fdd2e78c32ad0be606a2c64a3512abad87893f5fd21)
Performance will also be positively improved by this change -
previously, as task / workflow failure or cancellation would have
resulted in multiple, separate `updateJob` db calls, as data
modifications to the job object required for storing failure state were
done multiple times in multiple areas of the codebase. Most notably,
task error state was handled and updated separately from workflow error
state.
Now, it's just a clean, single `updateJob` call
This PR also does the following:
- adds a new test for `deleteJobOnComplete` behavior
- cleans up test suite
- ensures `deleteJobOnComplete` does not delete definitively failed jobs
---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
- https://app.asana.com/0/0/1210553277813320
This clarifies that jobs.autoRun only *runs* already-queued jobs. It does not queue the jobs for you.
Also adds an e2e test as this functionality had no e2e coverage
Previously, jobs were executed in FIFO order on MongoDB, and LIFO on
Postgres, with no way to configure this behavior.
This PR makes FIFO the default on both MongoDB and Postgres and
introduces the following new options to configure the processing order
globally or on a queue-by-queue basis:
- a `processingOrder` property to the jobs config
- a `processingOrder` argument to `payload.jobs.run()` to override
what's set in the jobs config
It also adds a new `sequential` option to `payload.jobs.run()`, which
can be useful for debugging.
This adds support for running multiple job queue tasks in parallel
within the same workflow while preventing conflicts. Previously, this
would have caused the following issues:
- Job log entries get lost - the final job log is incomplete, despite
all tasks having been executed
- Write conflicts in postgres, leading to unique constraint violation
errors
The solution involves handling job log data updates in a way that avoids
overwriting, and ensuring the final update reflects the latest job log
data. Each job log entry now initializes its own ID, so a given job log
entry’s ID remains the same across multiple, parallel task executions.
## Postgres
In Postgres, we need to enable transactions for the
`payload.db.updateJobs` operation; otherwise, two tasks updating the
same job in parallel can conflict. This happens because Postgres handles
array rows by deleting them all, then re-inserting (rather than
upserting). The rows are stored in a separate table, and the following
scenario can occur:
Op 1: deletes all job log rows
Op 2: deletes all job log rows
Op 1: inserts 200 job log rows
Op 2: insert the same 200 job log rows again => `error: “duplicate key
value violates unique constraint "payload_jobs_log_pkey”`
Because transactions were not used, the rows inserted by Op 1
immediately became visible to Op 2, causing the conflict. Enabling
transactions fixes this. In theory, it can still happen if Op 1 commits
before Op 2 starts inserting (due to the read committed isolation
level), but it should occur far less frequently.
Alongside this change, we should consider inserting the rows using an
upsert (update on conflict), which will get rid of this error
completely. That way, if the insertion of Op 1 is visible to Op 2, Op 2
will simply overwrite it, rather than erroring. Individual job entries
are immutable and job entries cannot be deleted, thus this shouldn't
corrupt any data.
## Mongo
In Mongo, the issue is addressed by ensuring that log row deletions
caused due to different log states in concurrent operations are not
merged back to the client job log, and by making sure the final update
includes all job logs.
There is no duplicate key error in Mongo because the array log resides
in the same document and duplicates are simply upserted. We cannot use
transactions in Mongo, as it appears to lock the document in a way that
prevents reliable parallel updates, leading to:
`MongoServerError: WriteConflict error: this operation conflicted with
another operation. Please retry your operation or multi-document
transaction`
This adds new `payload.jobs.cancel` and `payload.jobs.cancelByID` methods that allow you to cancel already-running jobs, or prevent queued jobs from running.
While it's not possible to cancel a function mid-execution, this will stop job execution the next time the job makes a request to the db, which happens after every task.
## Fix default retries
By default, if no `retries` property has been set, jobs / tasks should
not be retried. This was not the case previously, as the `maxRetries`
variable was `undefined`, causing jobs to retry endlessly. This PR sets
them to `0` by default.
Additionally, this fixes some undesirable behavior of the workflow
retries property. Workflow retries now act as **maximum**,
workflow-level retries. Only tasks that do not have a retry property set
will inherit the workflow-level retries.
## Fix error messages
Previously, you were able to encounter error messages with undefined
values like these:

Reason is that it was always using `job.workflowSlug` for the error
messages. However, if you queue a task directly, without a workflow,
`job.workflowSlug` is undefined and `job.taskSlug` should be used
instead.
This PR then gets rid of the second undefined value by ensuring that
`maxRetries´ is never undefined
Fixes types for workflows / jobs `input` and `output` when using
`strict: true` or `strictNullChecks: true` by ensuring that all
properties in generates types are requried
Currently, Payload renders all custom components on initial compile of
the admin panel. This is problematic for two key reasons:
1. Custom components do not receive contextual data, i.e. fields do not
receive their field data, edit views do not receive their document data,
etc.
2. Components are unnecessarily rendered before they are used
This was initially required to support React Server Components within
the Payload Admin Panel for two key reasons:
1. Fields can be dynamically rendered within arrays, blocks, etc.
2. Documents can be recursively rendered within a "drawer" UI, i.e.
relationship fields
3. Payload supports server/client component composition
In order to achieve this, components need to be rendered on the server
and passed as "slots" to the client. Currently, the pattern for this is
to render custom server components in the "client config". Then when a
view or field is needed to be rendered, we first check the client config
for a "pre-rendered" component, otherwise render our client-side
fallback component.
But for the reasons listed above, this pattern doesn't exactly make
custom server components very useful within the Payload Admin Panel,
which is where this PR comes in. Now, instead of pre-rendering all
components on initial compile, we're able to render custom components
_on demand_, only as they are needed.
To achieve this, we've established [this
pattern](https://github.com/payloadcms/payload/pull/8481) of React
Server Functions in the Payload Admin Panel. With Server Functions, we
can iterate the Payload Config and return JSX through React's
`text/x-component` content-type. This means we're able to pass
contextual props to custom components, such as data for fields and
views.
## Breaking Changes
1. Add the following to your root layout file, typically located at
`(app)/(payload)/layout.tsx`:
```diff
/* THIS FILE WAS GENERATED AUTOMATICALLY BY PAYLOAD. */
/* DO NOT MODIFY IT BECAUSE IT COULD BE REWRITTEN AT ANY TIME. */
+ import type { ServerFunctionClient } from 'payload'
import config from '@payload-config'
import { RootLayout } from '@payloadcms/next/layouts'
import { handleServerFunctions } from '@payloadcms/next/utilities'
import React from 'react'
import { importMap } from './admin/importMap.js'
import './custom.scss'
type Args = {
children: React.ReactNode
}
+ const serverFunctions: ServerFunctionClient = async function (args) {
+ 'use server'
+ return handleServerFunctions({
+ ...args,
+ config,
+ importMap,
+ })
+ }
const Layout = ({ children }: Args) => (
<RootLayout
config={config}
importMap={importMap}
+ serverFunctions={serverFunctions}
>
{children}
</RootLayout>
)
export default Layout
```
2. If you were previously posting to the `/api/form-state` endpoint, it
no longer exists. Instead, you'll need to invoke the `form-state` Server
Function, which can be done through the _new_ `getFormState` utility:
```diff
- import { getFormState } from '@payloadcms/ui'
- const { state } = await getFormState({
- apiRoute: '',
- body: {
- // ...
- },
- serverURL: ''
- })
+ const { getFormState } = useServerFunctions()
+
+ const { state } = await getFormState({
+ // ...
+ })
```
## Breaking Changes
```diff
- useFieldProps()
- useCellProps()
```
More details coming soon.
---------
Co-authored-by: Alessio Gravili <alessio@gravili.de>
Co-authored-by: Jarrod Flesch <jarrodmflesch@gmail.com>
Co-authored-by: James <james@trbl.design>
Adds a jobs queue to Payload.
- [x] Docs, w/ examples for Vercel Cron, additional services
- [x] Type the `job` using GeneratedTypes in `JobRunnerArgs`
(@AlessioGr)
- [x] Write the `runJobs` function
- [x] Allow for some type of `payload.runTask`
- [x] Open up a new bin script for running jobs
- [x] Determine strategy for runner endpoint to either await jobs
successfully or return early and stay open until job work completes
(serverless ramifications here)
- [x] Allow for job runner to accept how many jobs to run in one
invocation
- [x] Make a Payload local API method for creating a new job easily
(payload.createJob) or similar which is strongly typed (@AlessioGr)
- [x] Make `payload.runJobs` or similar (@AlessioGr)
- [x] Write tests for retrying up to max retries for a given step
- [x] Write tests for dynamic import of a runner
The shape of the config should permit the definition of steps separate
from the job workflows themselves.
```js
const config = {
// Not sure if we need this property anymore
queues: {
},
// A job is an instance of a workflow, stored in DB
// and triggered by something at some point
jobs: {
// Be able to override the jobs collection
collectionOverrides: () => {},
// Workflows are groups of tasks that handle
// the flow from task to task.
// When defined on the config, they are considered as predefined workflows
// BUT - in the future, we'll allow for UI-based workflow definition as well.
workflows: [
{
slug: 'job-name',
// Temporary name for this
// should be able to pass function
// or path to it for Node to dynamically import
controlFlowInJS: '/my-runner.js',
// Temporary name as well
// should be able to eventually define workflows
// in UI (meaning they need to be serialized in JSON)
// Should not be able to define both control flows
controlFlowInJSON: [
{
task: 'myTask',
next: {
// etc
}
}
],
// Workflows take input
// which are a group of fields
input: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
},
],
// Tasks are defined separately as isolated functions
// that can be retried on fail
tasks: [
{
slug: 'myTask',
retries: 2,
// Each task takes input
// Used to auto-type the task func args
input: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
// Each task takes output
// Used to auto-type the function signature
output: [
{
name: 'success',
type: 'checkbox',
}
],
onSuccess: () => {},
onFail: () => {},
run: myRunner,
},
]
}
}
```
### `payload.createJob`
This function should allow for the creation of jobs based on either a
workflow (group of tasks) or an individual task.
To create a job using a workflow:
```js
const job = await payload.createJob({
// Accept the `name` of a workflow so we can match to either a
// code-based workflow OR a workflow defined in the DB
// Should auto-type the input
workflowName: 'myWorkflow',
input: {
// typed to the args of the workflow by name
}
})
```
To create a job using a task:
```js
const job = await payload.createJob({
// Accept the `name` of a task
task: 'myTask',
input: {
// typed to the args of the task by name
}
})
```
---------
Co-authored-by: Alessio Gravili <alessio@gravili.de>
Co-authored-by: Dan Ribbens <dan.ribbens@gmail.com>