### What? Fixes a formatting issue that prevents payloadcms.com/docs/beta/jobs-queue/overview from displaying properly. There were a couple `<strong>` tags in the `jobs-queue/overview` docs that did not have proper closing `</strong>` tags.
464 lines
18 KiB
Plaintext
464 lines
18 KiB
Plaintext
---
|
|
title: Jobs Queue
|
|
label: Overview
|
|
order: 10
|
|
desc: Payload provides all you need to run job queues, which are helpful to offload long-running processes into separate workers.
|
|
keywords: jobs queue, application framework, typescript, node, react, nextjs
|
|
---
|
|
|
|
Payload's Jobs Queue gives you a simple, yet powerful way to offload large or future tasks to separate compute resources.
|
|
|
|
For example, when building applications with Payload, you might run into a case where you need to perform some complex logic in a Payload [Hook](/docs/hooks/overview) but you don't want that hook to "block" or slow down the response returned from the Payload API.
|
|
|
|
Instead of running long or expensive logic in a Hook, you can instead create a Job and add it to a Queue. It can then be picked up by a separate worker which periodically checks the queue for new jobs, and then executes each job accordingly. This way, your Payload API responses can remain as fast as possible, and you can still perform logic as necessary without blocking or affecting your users' experience.
|
|
|
|
Jobs are also handy for delegating certain actions to take place in the future, such as scheduling a post to be published at a later date. In this example, you could create a Job that will automatically publish a post at a certain time.
|
|
|
|
#### How it works
|
|
|
|
There are a few concepts that you should become familiarized with before using Payload's Jobs Queue - [Tasks](#tasks), [Workflows](#workflows), [Jobs](#jobs), and finally [Queues](#queues).
|
|
|
|
## Tasks
|
|
|
|
<Banner type="default">
|
|
A <strong>"Task"</strong> is a function definition that performs business logic and whose input and output are both strongly typed.
|
|
</Banner>
|
|
|
|
You can register Tasks on the Payload config, and then create Jobs or Workflows that use them. Think of Tasks like tidy, isolated "functions that do one specific thing".
|
|
|
|
Payload Tasks can be configured to automatically retried if they fail, which makes them valuable for "durable" workflows like AI applications where LLMs can return non-deterministic results, and might need to be retried.
|
|
|
|
Tasks can either be defined within the `jobs.tasks` array in your payload config, or they can be defined inline within a workflow.
|
|
|
|
### Defining tasks in the config
|
|
|
|
Simply add a task to the `jobs.tasks` array in your Payload config. A task consists of the following fields:
|
|
|
|
| Option | Description |
|
|
| --------------------------- | -------------------------------------------------------------------------------- |
|
|
| `slug` | Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows.|
|
|
| `handler` | The function that should be responsible for running the job. You can either pass a string-based path to the job function file, or the job function itself. If you are using large dependencies within your job, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
|
|
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
|
|
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug. |
|
|
| `outputSchema` | Define the output field schema - payload will generate a type for this schema. |
|
|
| `label` | Define a human-friendly label for this task. |
|
|
| `onFail` | Function to be executed if the task fails. |
|
|
| `onSuccess` | Function to be executed if the task fails. |
|
|
| `retries` | Specify the number of times that this step should be retried if it fails. |
|
|
|
|
The logic for the Task is defined in the `handler` - which can be defined as a function, or a path to a function. The `handler` will run once a worker picks picks up a Job that includes this task.
|
|
|
|
It should return an object with an `output` key, which should contain the output of the task as you've defined.
|
|
|
|
Example:
|
|
|
|
```ts
|
|
export default buildConfig({
|
|
// ...
|
|
jobs: {
|
|
tasks: [
|
|
{
|
|
// Configure this task to automatically retry
|
|
// up to two times
|
|
retries: 2,
|
|
|
|
// This is a unique identifier for the task
|
|
|
|
slug: 'createPost',
|
|
|
|
// These are the arguments that your Task will accept
|
|
inputSchema: [
|
|
{
|
|
name: 'title',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
|
|
// These are the properties that the function should output
|
|
outputSchema: [
|
|
{
|
|
name: 'postID',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
|
|
// This is the function that is run when the task is invoked
|
|
handler: async ({ input, job, req }) => {
|
|
const newPost = await req.payload.create({
|
|
collection: 'post',
|
|
req,
|
|
data: {
|
|
title: input.title,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
postID: newPost.id,
|
|
},
|
|
}
|
|
},
|
|
} as TaskConfig<'createPost'>,
|
|
]
|
|
}
|
|
})
|
|
```
|
|
|
|
In addition to defining handlers as functions directly provided to your Payload config, you can also pass an _absolute path_ to where the handler is defined. If your task has large dependencies, and you are planning on executing your jobs in a separate process that has access to the filesystem, this could be a handy way to make sure that your Payload + Next.js app remains quick to compile and has minimal dependencies.
|
|
|
|
In general, this is an advanced use case. Here's how this would look:
|
|
|
|
`payload.config.ts:`
|
|
|
|
```ts
|
|
import { fileURLToPath } from 'node:url'
|
|
import path from 'path'
|
|
|
|
const filename = fileURLToPath(import.meta.url)
|
|
const dirname = path.dirname(filename)
|
|
|
|
export default buildConfig({
|
|
jobs: {
|
|
tasks: [
|
|
{
|
|
// ...
|
|
// The #createPostHandler is a named export within the `createPost.ts` file
|
|
handler: path.resolve(dirname, 'src/tasks/createPost.ts') + '#createPostHandler',
|
|
}
|
|
]
|
|
}
|
|
})
|
|
```
|
|
|
|
Then, the `createPost` file itself:
|
|
|
|
`src/tasks/createPost.ts:`
|
|
|
|
```ts
|
|
import type { TaskHandler } from 'payload'
|
|
|
|
export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job, req }) => {
|
|
const newPost = await req.payload.create({
|
|
collection: 'post',
|
|
req,
|
|
data: {
|
|
title: input.title,
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
postID: newPost.id,
|
|
},
|
|
}
|
|
}
|
|
```
|
|
|
|
## Workflows
|
|
|
|
<Banner type="default">
|
|
A <strong>"Workflow"</strong> is an optional way to <em>combine multiple tasks together</em> in a way that can be gracefully retried from the point of failure.
|
|
</Banner>
|
|
|
|
They're most helpful when you have multiple tasks in a row, and you want to configure each task to be able to be retried if they fail.
|
|
|
|
If a task within a workflow fails, the Workflow will automatically "pick back up" on the task where it failed and **not re-execute any prior tasks that have already been executed**.
|
|
|
|
#### Defining a workflow
|
|
|
|
The most important aspect of a Workflow is the `handler`, where you can declare when and how the tasks should run by simply calling the `runTask` function. If any task within the workflow, fails, the entire `handler` function will re-run.
|
|
|
|
However, importantly, tasks that have successfully been completed will simply re-return the cached and saved output without running again. The Workflow will pick back up where it failed and only task from the failure point onward will be re-executed.
|
|
|
|
To define a JS-based workflow, simply add a workflow to the `jobs.wokflows` array in your Payload config. A workflow consists of the following fields:
|
|
|
|
| Option | Description |
|
|
| --------------------------- | -------------------------------------------------------------------------------- |
|
|
| `slug` | Define a slug-based name for this workflow. This slug needs to be unique among both tasks and workflows.|
|
|
| `handler` | The function that should be responsible for running the workflow. You can either pass a string-based path to the workflow function file, or workflow job function itself. If you are using large dependencies within your workflow, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
|
|
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
|
|
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug. |
|
|
| `label` | Define a human-friendly label for this workflow. |
|
|
| `queue` | Optionally, define the queue name that this workflow should be tied to. Defaults to "default". |
|
|
|
|
Example:
|
|
|
|
```ts
|
|
export default buildConfig({
|
|
// ...
|
|
jobs: {
|
|
tasks: [
|
|
// ...
|
|
]
|
|
workflows: [
|
|
{
|
|
slug: 'createPostAndUpdate',
|
|
|
|
// The arguments that the workflow will accept
|
|
inputSchema: [
|
|
{
|
|
name: 'title',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
|
|
// The handler that defines the "control flow" of the workflow
|
|
// Notice how it calls `runTask` to execute tasks
|
|
handler: async ({ job, runTask }) => {
|
|
|
|
// This workflow first runs a task called `createPost`
|
|
const output = await runTask({
|
|
task: 'createPost',
|
|
|
|
// You need to define a unique ID for this task invocation
|
|
// that will always be the same if this workflow fails
|
|
// and is re-executed in the future
|
|
id: '1',
|
|
input: {
|
|
title: job.input.title,
|
|
},
|
|
})
|
|
|
|
// Once the prior task completes, it will run a task
|
|
// called `updatePost`
|
|
await runTask({
|
|
task: 'updatePost',
|
|
id: '2',
|
|
input: {
|
|
post: job.taskStatus.createPost['1'].output.postID, // or output.postID
|
|
title: job.input.title + '2',
|
|
},
|
|
})
|
|
},
|
|
} as WorkflowConfig<'updatePost'>
|
|
]
|
|
}
|
|
})
|
|
```
|
|
|
|
#### Running tasks inline
|
|
|
|
In the above example, our workflow was executing tasks that we already had defined in our Payload config. But, you can also run tasks without predefining them.
|
|
|
|
To do this, you can use the `runTaskInline` function.
|
|
|
|
The drawbacks of this approach are that tasks cannot be re-used across workflows as easily, and the **task data stored in the job** will not be typed. In the following example, the inline task data will be stored on the job under `job.taskStatus.inline['2']` but completely untyped, as types for dynamic tasks like these cannot be generated beforehand.
|
|
|
|
Example:
|
|
|
|
```ts
|
|
export default buildConfig({
|
|
// ...
|
|
jobs: {
|
|
tasks: [
|
|
// ...
|
|
]
|
|
workflows: [
|
|
{
|
|
slug: 'createPostAndUpdate',
|
|
inputSchema: [
|
|
{
|
|
name: 'title',
|
|
type: 'text',
|
|
required: true,
|
|
},
|
|
],
|
|
handler: async ({ job, runTask }) => {
|
|
// Here, we run a predefined task.
|
|
// The `createPost` handler arguments and return type
|
|
// are both strongly typed
|
|
const output = await runTask({
|
|
task: 'createPost',
|
|
id: '1',
|
|
input: {
|
|
title: job.input.title,
|
|
},
|
|
})
|
|
|
|
|
|
// Here, this task is not defined in the Payload config
|
|
// and is "inline". Its output will be stored on the Job in the database
|
|
// however its arguments will be untyped.
|
|
const { newPost } = await runTaskInline({
|
|
task: async ({ req }) => {
|
|
const newPost = await req.payload.update({
|
|
collection: 'post',
|
|
id: '2',
|
|
req,
|
|
retries: 3,
|
|
data: {
|
|
title: 'updated!',
|
|
},
|
|
})
|
|
return {
|
|
output: {
|
|
newPost
|
|
},
|
|
}
|
|
},
|
|
id: '2',
|
|
})
|
|
},
|
|
} as WorkflowConfig<'updatePost'>
|
|
]
|
|
}
|
|
})
|
|
```
|
|
|
|
## Jobs
|
|
|
|
Now that we have covered Tasks and Workflows, we can tie them together with a concept called a Job.
|
|
|
|
<Banner type="default">
|
|
Whereas you define Workflows and Tasks, which control your business logic, a <strong>Job</strong> is an individual instance of either a Task or a Workflow which contains many tasks.
|
|
</Banner>
|
|
|
|
For example, let's say we have a Workflow or Task that describes the logic to sync information from Payload to a third-party system. This is how you'd declare how to sync that info, but it wouldn't do anything on its own. In order to run that task or workflow, you'd create a Job that references the corresponding Task or Workflow.
|
|
|
|
Jobs are stored in the Payload database in the `payload-jobs` collection, and you can decide to keep a running list of all jobs, or configure Payload to delete the job when it has been successfully executed.
|
|
|
|
#### Queuing a new job
|
|
|
|
In order to queue a job, you can use the `payload.jobs.queue` function.
|
|
|
|
Here's how you'd queue a new Job, which will run a `createPostAndUpdate` workflow:
|
|
|
|
```ts
|
|
const createdJob = await payload.jobs.queue({
|
|
// Pass the name of the workflow
|
|
workflow: 'createPostAndUpdate',
|
|
// The input type will be automatically typed
|
|
// according to the input you've defined for this workflow
|
|
input: {
|
|
title: 'my title',
|
|
},
|
|
})
|
|
```
|
|
|
|
In addition to being able to queue new Jobs based on Workflows, you can also queue a job for a single Task:
|
|
|
|
```ts
|
|
const createdJob = await payload.jobs.queue({
|
|
task: 'createPost',
|
|
input: {
|
|
title: 'my title',
|
|
},
|
|
})
|
|
```
|
|
|
|
## Queues
|
|
|
|
Now let's talk about how to _run these jobs_. Right now, all we've covered is how to queue up jobs to run, but so far, we aren't actually running any jobs. This is the final piece of the puzzle.
|
|
|
|
<Banner type="default">
|
|
A <strong>Queue</strong> is a list of jobs that should be executed in order of when they were added.
|
|
</Banner>
|
|
|
|
When you go to run jobs, Payload will query for any jobs that are added to the queue and then run them. By default, all queued jobs are added to the `default` queue.
|
|
|
|
**But, imagine if you wanted to have some jobs that run nightly, and other jobs which should run every five minutes.**
|
|
|
|
By specifying the `queue` name when you queue a new job using `payload.jobs.queue()`, you can queue certain jobs with `queue: 'nightly'`, and other jobs can be left as the default queue.
|
|
|
|
Then, you could configure two different runner strategies:
|
|
|
|
1. A `cron` that runs nightly, querying for jobs added to the `nightly` queue
|
|
2. Another that runs any jobs that were added to the `default` queue every ~5 minutes or so
|
|
|
|
## Executing jobs
|
|
|
|
As mentioned above, you can queue jobs, but the jobs won't run unless a worker picks up your jobs and runs them. This can be done in two ways:
|
|
|
|
#### Endpoint
|
|
|
|
You can execute jobs by making a fetch request to the `/api/payload-jobs/run` endpoint:
|
|
|
|
```ts
|
|
// Here, we're saying we want to run only 100 jobs for this invocation
|
|
// and we want to pull jobs from the `nightly` queue:
|
|
await fetch('/api/payload-jobs/run?limit=100&queue=nightly', {
|
|
method: 'GET',
|
|
headers: {
|
|
'Authorization': `Bearer ${token}`,
|
|
},
|
|
});
|
|
```
|
|
|
|
This endpoint is automatically mounted for you and is helpful in conjunction with serverless platforms like Vercel, where you might want to use Vercel Cron to invoke a serverless function that executes your jobs.
|
|
|
|
**Vercel Cron Example**
|
|
|
|
If you're deploying on Vercel, you can add a `vercel.json` file in the root of your project that configures Vercel Cron to invoke the `run` endpoint on a cron schedule.
|
|
|
|
Here's an example of what this file will look like:
|
|
|
|
```json
|
|
{
|
|
"crons": [
|
|
{
|
|
"path": "/api/payload-jobs/run",
|
|
"schedule": "*/5 * * * *"
|
|
}
|
|
]
|
|
}
|
|
```
|
|
|
|
The configuration above schedules the endpoint `/api/payload-jobs/run` to be invoked every 5 minutes.
|
|
|
|
The last step will be to secure your `run` endpoint so that only the proper users can invoke the runner.
|
|
|
|
To do this, you can set an environment variable on your Vercel project called `CRON_SECRET`, which should be a random string—ideally 16 characters or longer.
|
|
|
|
Then, you can modify the `access` function for running jobs by ensuring that only Vercel can invoke your runner.
|
|
|
|
```ts
|
|
export default buildConfig({
|
|
// Other configurations...
|
|
jobs: {
|
|
access: {
|
|
run: ({ req }: { req: PayloadRequest }): boolean => {
|
|
// Allow logged in users to execute this endpoint (default)
|
|
if (req.user) return true
|
|
|
|
// If there is no logged in user, then check
|
|
// for the Vercel Cron secret to be present as an
|
|
// Authorization header:
|
|
const authHeader = req.headers.get('authorization');
|
|
return authHeader === `Bearer ${process.env.CRON_SECRET}`;
|
|
},
|
|
},
|
|
// Other job configurations...
|
|
}
|
|
})
|
|
```
|
|
|
|
This works because Vercel automatically makes the `CRON_SECRET` environment variable available to the endpoint as the `Authorization` header when triggered by the Vercel Cron, ensuring that the jobs can be run securely.
|
|
|
|
After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background.
|
|
|
|
#### Local API
|
|
|
|
If you want to process jobs programmatically from your server-side code, you can use the Local API:
|
|
|
|
```ts
|
|
const results = await payload.jobs.run()
|
|
|
|
// You can customize the queue name and limit by passing them as arguments:
|
|
await payload.jobs.run({ queue: 'nightly', limit: 100 })
|
|
```
|
|
|
|
#### Bin script
|
|
|
|
Finally, you can process jobs via the bin script that comes with Payload out of the box.
|
|
|
|
```sh
|
|
npx payload jobs:run --queue default --limit 10
|
|
```
|
|
|
|
In addition, the bin script allows you to pass a `--cron` flag to the `jobs:run` command to run the jobs on a scheduled, cron basis:
|
|
|
|
```sh
|
|
npx payload jobs:run --cron "*/5 * * * *"
|
|
```
|