Commit Graph

15 Commits

Author SHA1 Message Date
Alessio Gravili
e0ffada80b feat: support parallel job queue tasks, speed up task running (#13614)
Currently, attempting to run tasks in parallel will result in DB errors.

## Solution

The problem was caused due to inefficient db update calls. After each
task completes, we need to update the log array in the payload-jobs
collection. On postgres, that's a different table.

Currently, the update works the following way:
1. Nuke the table
2. Re-insert every single row, including the new one

This will throw db errors if multiple processes start doing that.
Additionally, due to conflicts, new log rows may be lost.

This PR makes use of the the [new db $push operation
](https://github.com/payloadcms/payload/pull/13453) we recently added to
atomically push a new log row to the database in a single round-trip.
This not only reduces the amount of db round trips (=> faster job queue
system) but allows multiple tasks to perform this db operation in
parallel, without conflicts.

## Problem

**Example:**

```ts
export const fastParallelTaskWorkflow: WorkflowConfig<'fastParallelTask'> = {
  slug: 'fastParallelTask',
  handler: async ({nlineTask }) => {
    const taskFunctions = []
    for (let i = 0; i < 20; i++) {
      const idx = i + 1
      taskFunctions.push(async () => {
        return await inlineTask(`parallel task ${idx}`, {
          input: {
            test: idx,
          },
          task: () => {
            return {
              output: {
                taskID: idx.toString(),
              },
            }
          },
        })
      })
    }

    await Promise.all(taskFunctions.map((f) => f()))
  },
}
```

On SQLite, this would throw the following error:

```bash
Caught error Error: UNIQUE constraint failed: payload_jobs_log.id
    at Object.next (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:335:20)
    at Statement.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/libsql@0.4.7/node_modules/libsql/index.js:360:16)
    at executeStmt (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:285:34)
    at Sqlite3Client.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5/node_modules/@libsql/client/lib-cjs/sqlite3.js:101:16)
    at /Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:288:58
    at LibSQLPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/session.ts:79:18)
    at LibSQLPreparedQuery.values (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:286:21)
    at LibSQLPreparedQuery.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/libsql/session.ts:214:27)
    at QueryPromise.all (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:402:26)
    at QueryPromise.execute (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/sqlite-core/query-builders/insert.ts:414:40)
    at QueryPromise.then (/Users/alessio/Documents/GitHub/payload/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/query-promise.ts:31:15) {
  rawCode: 1555,
  code: 'SQLITE_CONSTRAINT_PRIMARYKEY',
  libsqlError: true
}
```

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1211001438499053
2025-08-27 20:32:42 +00:00
Alessio Gravili
3114b89d4c perf: 23% faster job queue system on postgres/sqlite (#13187)
Previously, a single run of the simplest job queue workflow (1 single
task, no db calls by user code in the task - we're just testing db
system overhead) would result in **22 db roundtrips** on drizzle. This
PR reduces it to **17 db roundtrips** by doing the following:

- Modifies db.updateJobs to use the new optimized upsertRow function if
the update is simple
- Do not unnecessarily pass the job log to the final job update when the
workflow completes => allows using the optimized upsertRow function, as
only the main table is involved

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210888186878606
2025-07-30 16:23:43 +03:00
Alessio Gravili
c08b2aea89 feat: scheduling jobs (#12863)
Adds a new `schedule` property to workflow and task configs that can be
used to have Payload automatically _queue_ jobs following a certain
_schedule_.

Docs:
https://payloadcms.com/docs/dynamic/jobs-queue/schedules?branch=feat/schedule-jobs

## API Example

```ts
export default buildConfig({
  // ...
  jobs: {
    // ...
    scheduler: 'manual', // Or `cron` if you're not using serverless. If `manual` is used, then user needs to set up running /api/payload-jobs/handleSchedules or payload.jobs.handleSchedules in regular intervals
    tasks: [
      {
        schedule: [
          {
            cron: '* * * * * *',
            queue: 'autorunSecond',
            // Hooks are optional
            hooks: {
              // Not an array, as providing and calling `defaultBeforeSchedule` would be more error-prone if this was an array
              beforeSchedule: async (args) => {
                // Handles verifying that there are no jobs already scheduled or processing.
                // You can override this behavior by not calling defaultBeforeSchedule, e.g. if you wanted
                // to allow a maximum of 3 scheduled jobs in the queue instead of 1, or add any additional conditions
                const result = await args.defaultBeforeSchedule(args)
                return {
                  ...result,
                  input: {
                    message: 'This task runs every second',
                  },
                }
              },
              afterSchedule: async (args) => {
                await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global
                args.req.payload.logger.info(
                  'EverySecond task scheduled: ' +
                  (args.status === 'success' ? args.job.id : 'skipped or failed to schedule'),
                )
              },
            },
          },
        ],
        slug: 'EverySecond',
        inputSchema: [
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        handler: ({ input, req }) => {
          req.payload.logger.info(input.message)
          return {
            output: {},
          }
        },
      }
    ]
  }
})
```

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210495300843759
2025-07-18 06:48:27 -04:00
Alessio Gravili
59f536c2c9 refactor: simplify job queue error handling (#12845)
This simplifies workflow / task error handling, as well as cancelling
jobs. Previously, we were handling errors when they occur and passing
through error state using a `state` object - errors were then handled in
multiple areas of the code.

This PR adds new, clean `TaskError`, `WorkflowError` and
`JobCancelledError` errors that are thrown when they occur and are
handled **in one single place**, massively cleaning up complex functions
like
[payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts](https://github.com/payloadcms/payload/compare/refactor/jobs-errors?expand=1#diff-53dc7ccb7c8e023c9ba63fdd2e78c32ad0be606a2c64a3512abad87893f5fd21)

Performance will also be positively improved by this change -
previously, as task / workflow failure or cancellation would have
resulted in multiple, separate `updateJob` db calls, as data
modifications to the job object required for storing failure state were
done multiple times in multiple areas of the codebase. Most notably,
task error state was handled and updated separately from workflow error
state.
Now, it's just a clean, single `updateJob` call

This PR also does the following:
- adds a new test for `deleteJobOnComplete` behavior
- cleans up test suite
- ensures `deleteJobOnComplete` does not delete definitively failed jobs

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210553277813320
2025-06-17 22:24:53 +00:00
Alessio Gravili
7c05c775cb docs: improve jobs autorun docs, adds e2e test (#12196)
This clarifies that jobs.autoRun only *runs* already-queued jobs. It does not queue the jobs for you.

Also adds an e2e test as this functionality had no e2e coverage
2025-06-05 09:19:19 -07:00
Alessio Gravili
c844b4c848 feat: configurable job queue processing order (LIFO/FIFO), allow sequential execution of jobs (#11897)
Previously, jobs were executed in FIFO order on MongoDB, and LIFO on
Postgres, with no way to configure this behavior.

This PR makes FIFO the default on both MongoDB and Postgres and
introduces the following new options to configure the processing order
globally or on a queue-by-queue basis:
- a `processingOrder` property to the jobs config
- a `processingOrder` argument to `payload.jobs.run()` to override
what's set in the jobs config

It also adds a new `sequential` option to `payload.jobs.run()`, which
can be useful for debugging.
2025-03-31 15:00:36 -06:00
Alessio Gravili
9a1c3cf4cc fix: support parallel job queue tasks (#11917)
This adds support for running multiple job queue tasks in parallel
within the same workflow while preventing conflicts. Previously, this
would have caused the following issues:
- Job log entries get lost - the final job log is incomplete, despite
all tasks having been executed
- Write conflicts in postgres, leading to unique constraint violation
errors

The solution involves handling job log data updates in a way that avoids
overwriting, and ensuring the final update reflects the latest job log
data. Each job log entry now initializes its own ID, so a given job log
entry’s ID remains the same across multiple, parallel task executions.

## Postgres

In Postgres, we need to enable transactions for the
`payload.db.updateJobs` operation; otherwise, two tasks updating the
same job in parallel can conflict. This happens because Postgres handles
array rows by deleting them all, then re-inserting (rather than
upserting). The rows are stored in a separate table, and the following
scenario can occur:

Op 1: deletes all job log rows
Op 2: deletes all job log rows
Op 1: inserts 200 job log rows
Op 2: insert the same 200 job log rows again => `error: “duplicate key
value violates unique constraint "payload_jobs_log_pkey”`

Because transactions were not used, the rows inserted by Op 1
immediately became visible to Op 2, causing the conflict. Enabling
transactions fixes this. In theory, it can still happen if Op 1 commits
before Op 2 starts inserting (due to the read committed isolation
level), but it should occur far less frequently.

Alongside this change, we should consider inserting the rows using an
upsert (update on conflict), which will get rid of this error
completely. That way, if the insertion of Op 1 is visible to Op 2, Op 2
will simply overwrite it, rather than erroring. Individual job entries
are immutable and job entries cannot be deleted, thus this shouldn't
corrupt any data.

## Mongo

In Mongo, the issue is addressed by ensuring that log row deletions
caused due to different log states in concurrent operations are not
merged back to the client job log, and by making sure the final update
includes all job logs.

There is no duplicate key error in Mongo because the array log resides
in the same document and duplicates are simply upserted. We cannot use
transactions in Mongo, as it appears to lock the document in a way that
prevents reliable parallel updates, leading to:

`MongoServerError: WriteConflict error: this operation conflicted with
another operation. Please retry your operation or multi-document
transaction`
2025-03-31 13:06:05 -06:00
Alessio Gravili
38131ed2c3 feat: ability to cancel jobs (#11409)
This adds new `payload.jobs.cancel` and `payload.jobs.cancelByID` methods that allow you to cancel already-running jobs, or prevent queued jobs from running.

While it's not possible to cancel a function mid-execution, this will stop job execution the next time the job makes a request to the db, which happens after every task.
2025-02-28 17:58:43 +00:00
Sasha
117949b8d9 test: regenerate payload-types.ts for all test suites (#11238)
Regenerates `payload-types.ts` for all test suites.
2025-02-18 00:45:59 +02:00
Alessio Gravili
08fb159943 feat: allow running sub-tasks from tasks (#10373)
Task handlers now receive `inlineTask` as an arg, which can be used to
run inline sub-tasks. In the task log, those inline tasks will have a
`parent` property that points to the parent task.

Example:

```ts
{
        slug: 'subTask',
        inputSchema: [
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        handler: async ({ job, inlineTask }) => {
          await inlineTask('create two docs', {
            task: async ({ input, inlineTask }) => {
            
              const { newSimple } = await inlineTask('create doc 1', {
                task: async ({ req }) => {
                  const newSimple = await req.payload.create({
                    collection: 'simple',
                    req,
                    data: {
                      title: input.message,
                    },
                  })
                  return {
                    output: {
                      newSimple,
                    },
                  }
                },
              })

              const { newSimple2 } = await inlineTask('create doc 2', {
                task: async ({ req }) => {
                  const newSimple2 = await req.payload.create({
                    collection: 'simple',
                    req,
                    data: {
                      title: input.message,
                    },
                  })
                  return {
                    output: {
                      newSimple2,
                    },
                  }
                },
              })
              return {
                output: {
                  simpleID1: newSimple.id,
                  simpleID2: newSimple2.id,
                },
              }
            },
            input: {
              message: job.input.message,
            },
          })
        },
      } as WorkflowConfig<'subTask'>
```

Job log example:

```ts
[
  {
    executedAt: '2025-01-06T03:55:44.682Z',
    completedAt: '2025-01-06T03:55:44.684Z',
    taskSlug: 'inline',
    taskID: 'create doc 1',
    output: { newSimple: [Object] },
    parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1b'
  },
  {
    executedAt: '2025-01-06T03:55:44.690Z',
    completedAt: '2025-01-06T03:55:44.692Z',
    taskSlug: 'inline',
    taskID: 'create doc 2',
    output: { newSimple2: [Object] },
    parent: { taskSlug: 'inline', taskID: 'create two docs' }, // <= New
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1c'
  },
  {
    executedAt: '2025-01-06T03:55:44.681Z',
    completedAt: '2025-01-06T03:55:44.697Z',
    taskSlug: 'inline',
    taskID: 'create two docs',
    input: { message: 'hello!' },
    output: {
      simpleID1: '677b54401e34772cc63c8693',
      simpleID2: '677b54401e34772cc63c8697'
    },
    parent: {},
    state: 'succeeded',
    id: '677b5440ba35d345d1214d1d'
  }
]
```
2025-01-07 17:24:00 +00:00
Alessio Gravili
b3308736c4 feat: jsdocs for generated types, by using admin.description (#9917)
This makes use of admin.description to generate JSDocs for field,
collection and global generated types.


![image](https://github.com/user-attachments/assets/980d825f-49a2-426d-933a-2ff3d205ea24)


![image](https://github.com/user-attachments/assets/d0b1f288-1ea1-4d80-8c05-003d59a4e41a)

For the future, we should add a dedicated property to override these
JSDocs.

You can view the effect of this PR on our test suite generated types
here:
05f552bbbc
2024-12-19 22:22:43 -05:00
Alessio Gravili
a89d54454a fix: ensure jobs do not retry indefinitely by default, fix undefined values in error messages (#9605)
## Fix default retries

By default, if no `retries` property has been set, jobs / tasks should
not be retried. This was not the case previously, as the `maxRetries`
variable was `undefined`, causing jobs to retry endlessly. This PR sets
them to `0` by default.

Additionally, this fixes some undesirable behavior of the workflow
retries property. Workflow retries now act as **maximum**,
workflow-level retries. Only tasks that do not have a retry property set
will inherit the workflow-level retries.

## Fix error messages

Previously, you were able to encounter error messages with undefined
values like these:

![CleanShot 2024-11-28 at 15 23
37@2x](https://github.com/user-attachments/assets/81617ca8-11de-4d35-b9bf-cc6c5bc515be)

Reason is that it was always using `job.workflowSlug` for the error
messages. However, if you queue a task directly, without a workflow,
`job.workflowSlug` is undefined and `job.taskSlug` should be used
instead.

This PR then gets rid of the second undefined value by ensuring that
`maxRetries´ is never undefined
2024-12-02 22:05:48 +00:00
Sasha
e40141b559 fix: queues types with strict: true (#9281)
Fixes types for workflows / jobs `input` and `output` when using
`strict: true` or `strictNullChecks: true` by ensuring that all
properties in generates types are requried
2024-11-18 21:49:08 +02:00
Jacob Fletcher
c96fa613bc feat!: on demand rsc (#8364)
Currently, Payload renders all custom components on initial compile of
the admin panel. This is problematic for two key reasons:
1. Custom components do not receive contextual data, i.e. fields do not
receive their field data, edit views do not receive their document data,
etc.
2. Components are unnecessarily rendered before they are used

This was initially required to support React Server Components within
the Payload Admin Panel for two key reasons:
1. Fields can be dynamically rendered within arrays, blocks, etc.
2. Documents can be recursively rendered within a "drawer" UI, i.e.
relationship fields
3. Payload supports server/client component composition 

In order to achieve this, components need to be rendered on the server
and passed as "slots" to the client. Currently, the pattern for this is
to render custom server components in the "client config". Then when a
view or field is needed to be rendered, we first check the client config
for a "pre-rendered" component, otherwise render our client-side
fallback component.

But for the reasons listed above, this pattern doesn't exactly make
custom server components very useful within the Payload Admin Panel,
which is where this PR comes in. Now, instead of pre-rendering all
components on initial compile, we're able to render custom components
_on demand_, only as they are needed.

To achieve this, we've established [this
pattern](https://github.com/payloadcms/payload/pull/8481) of React
Server Functions in the Payload Admin Panel. With Server Functions, we
can iterate the Payload Config and return JSX through React's
`text/x-component` content-type. This means we're able to pass
contextual props to custom components, such as data for fields and
views.

## Breaking Changes

1. Add the following to your root layout file, typically located at
`(app)/(payload)/layout.tsx`:

    ```diff
    /* THIS FILE WAS GENERATED AUTOMATICALLY BY PAYLOAD. */
    /* DO NOT MODIFY IT BECAUSE IT COULD BE REWRITTEN AT ANY TIME. */
    + import type { ServerFunctionClient } from 'payload'

    import config from '@payload-config'
    import { RootLayout } from '@payloadcms/next/layouts'
    import { handleServerFunctions } from '@payloadcms/next/utilities'
    import React from 'react'

    import { importMap } from './admin/importMap.js'
    import './custom.scss'

    type Args = {
      children: React.ReactNode
    }

+ const serverFunctions: ServerFunctionClient = async function (args) {
    +  'use server'
    +  return handleServerFunctions({
    +    ...args,
    +    config,
    +    importMap,
    +  })
    + }

    const Layout = ({ children }: Args) => (
      <RootLayout
        config={config}
        importMap={importMap}
    +  serverFunctions={serverFunctions}
      >
        {children}
      </RootLayout>
    )

    export default Layout
    ```

2. If you were previously posting to the `/api/form-state` endpoint, it
no longer exists. Instead, you'll need to invoke the `form-state` Server
Function, which can be done through the _new_ `getFormState` utility:

    ```diff
    - import { getFormState } from '@payloadcms/ui'
    - const { state } = await getFormState({
    -   apiRoute: '',
    -   body: {
    -     // ...
    -   },
    -   serverURL: ''
    - })

    + const { getFormState } = useServerFunctions()
    +
    + const { state } = await getFormState({
    +   // ...
    + })
    ```

## Breaking Changes

```diff
- useFieldProps()
- useCellProps()
```

More details coming soon.

---------

Co-authored-by: Alessio Gravili <alessio@gravili.de>
Co-authored-by: Jarrod Flesch <jarrodmflesch@gmail.com>
Co-authored-by: James <james@trbl.design>
2024-11-11 13:59:05 -05:00
James Mikrut
8970c6b3a6 feat: adds jobs queue (#8228)
Adds a jobs queue to Payload.

- [x] Docs, w/ examples for Vercel Cron, additional services
- [x] Type the `job` using GeneratedTypes in `JobRunnerArgs`
(@AlessioGr)
- [x] Write the `runJobs` function 
- [x] Allow for some type of `payload.runTask` 
- [x] Open up a new bin script for running jobs
- [x] Determine strategy for runner endpoint to either await jobs
successfully or return early and stay open until job work completes
(serverless ramifications here)
- [x] Allow for job runner to accept how many jobs to run in one
invocation
- [x] Make a Payload local API method for creating a new job easily
(payload.createJob) or similar which is strongly typed (@AlessioGr)
- [x] Make `payload.runJobs` or similar  (@AlessioGr)
- [x] Write tests for retrying up to max retries for a given step
- [x] Write tests for dynamic import of a runner

The shape of the config should permit the definition of steps separate
from the job workflows themselves.

```js
const config = {
  // Not sure if we need this property anymore
  queues: {
  },
  // A job is an instance of a workflow, stored in DB
  // and triggered by something at some point
  jobs: {
    // Be able to override the jobs collection
    collectionOverrides: () => {},

    // Workflows are groups of tasks that handle
    // the flow from task to task.
    // When defined on the config, they are considered as predefined workflows
    // BUT - in the future, we'll allow for UI-based workflow definition as well.
    workflows: [
      {
        slug: 'job-name',
        // Temporary name for this
        // should be able to pass function 
        // or path to it for Node to dynamically import
        controlFlowInJS: '/my-runner.js',

        // Temporary name as well
        // should be able to eventually define workflows
        // in UI (meaning they need to be serialized in JSON)
        // Should not be able to define both control flows
        controlFlowInJSON: [
          {
            task: 'myTask',
            next: {
              // etc
            }
          }
        ],

        // Workflows take input
        // which are a group of fields
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
      },
    ],

    // Tasks are defined separately as isolated functions
    // that can be retried on fail
    tasks: [
      {
        slug: 'myTask',
        retries: 2,
        // Each task takes input
        // Used to auto-type the task func args
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        // Each task takes output
        // Used to auto-type the function signature
        output: [
          {
            name: 'success',
            type: 'checkbox',
          }
        ],
        onSuccess: () => {},
        onFail: () => {},
        run: myRunner,
      },
    ]
  }
}
```

### `payload.createJob`

This function should allow for the creation of jobs based on either a
workflow (group of tasks) or an individual task.

To create a job using a workflow:

```js
const job = await payload.createJob({
  // Accept the `name` of a workflow so we can match to either a 
  // code-based workflow OR a workflow defined in the DB
  // Should auto-type the input
  workflowName: 'myWorkflow',
  input: {
    // typed to the args of the workflow by name
  }
})
```

To create a job using a task:

```js
const job = await payload.createJob({
  // Accept the `name` of a task
  task: 'myTask',
  input: {
    // typed to the args of the task by name
  }
})
```

---------

Co-authored-by: Alessio Gravili <alessio@gravili.de>
Co-authored-by: Dan Ribbens <dan.ribbens@gmail.com>
2024-10-30 17:56:50 +00:00