From 50f3ca93ee2038f962441c3e532ab1f335b29533 Mon Sep 17 00:00:00 2001 From: James Mikrut Date: Tue, 5 Nov 2024 14:25:14 -0500 Subject: [PATCH] docs: improves jobs queue (#9038) improves docs for jobs queue --- docs/jobs-queue/overview.mdx | 249 +++++++++++++++++++++++------------ 1 file changed, 165 insertions(+), 84 deletions(-) diff --git a/docs/jobs-queue/overview.mdx b/docs/jobs-queue/overview.mdx index 4e386be70..78d920253 100644 --- a/docs/jobs-queue/overview.mdx +++ b/docs/jobs-queue/overview.mdx @@ -1,16 +1,34 @@ --- title: Jobs Queue -label: Jobs Queue +label: Overview order: 10 desc: Payload provides all you need to run job queues, which are helpful to offload long-running processes into separate workers. keywords: jobs queue, application framework, typescript, node, react, nextjs --- -## Defining tasks +Payload's Jobs Queue gives you a simple, yet powerful way to offload large or future tasks to separate compute resources. -A task is a simple function that can be executed directly or within a workflow. The difference between tasks and functions is that tasks can be run in the background, and can be retried if they fail. +For example, when building applications with Payload, you might run into a case where you need to perform some complex logic in a Payload [Hook](/docs/hooks/overview) but you don't want that hook to "block" or slow down the response returned from the Payload API. -Tasks can either be defined within the `jobs.tasks` array in your payload config, or they can be run inline within a workflow. +Instead of running long or expensive logic in a Hook, you can instead create a Job and add it to a Queue. It can then be picked up by a separate worker which periodically checks the queue for new jobs, and then executes each job accordingly. This way, your Payload API responses can remain as fast as possible, and you can still perform logic as necessary without blocking or affecting your users' experience. + +Jobs are also handy for delegating certain actions to take place in the future, such as scheduling a post to be published at a later date. In this example, you could create a Job that will automatically publish a post at a certain time. + +#### How it works + +There are a few concepts that you should become familiarized with before using Payload's Jobs Queue - [Tasks](#tasks), [Workflows](#workflows), [Jobs](#jobs), and finally [Queues](#queues). + +## Tasks + + + A "Task" is a function definition that performs business logic and whose input and output are both strongly typed. + + +You can register Tasks on the Payload config, and then create Jobs or Workflows that use them. Think of Tasks like tidy, isolated "functions that do one specific thing". + +Payload Tasks can be configured to automatically retried if they fail, which makes them valuable for "durable" workflows like AI applications where LLMs can return non-deterministic results, and might need to be retried. + +Tasks can either be defined within the `jobs.tasks` array in your payload config, or they can be defined inline within a workflow. ### Defining tasks in the config @@ -28,7 +46,9 @@ Simply add a task to the `jobs.tasks` array in your Payload config. A task consi | `onSuccess` | Function to be executed if the task fails. | | `retries` | Specify the number of times that this step should be retried if it fails. | -The handler is the function, or a path to the function, that will run once the job picks up this task. The handler function should return an object with an `output` key, which should contain the output of the task. +The logic for the Task is defined in the `handler` - which can be defined as a function, or a path to a function. The `handler` will run once a worker picks picks up a Job that includes this task. + +It should return an object with an `output` key, which should contain the output of the task as you've defined. Example: @@ -38,8 +58,15 @@ export default buildConfig({ jobs: { tasks: [ { + // Configure this task to automatically retry + // up to two times retries: 2, + + // This is a unique identifier for the task + slug: 'createPost', + + // These are the arguments that your Task will accept inputSchema: [ { name: 'title', @@ -47,6 +74,8 @@ export default buildConfig({ required: true, }, ], + + // These are the properties that the function should output outputSchema: [ { name: 'postID', @@ -54,6 +83,8 @@ export default buildConfig({ required: true, }, ], + + // This is the function that is run when the task is invoked handler: async ({ input, job, req }) => { const newPost = await req.payload.create({ collection: 'post', @@ -74,9 +105,11 @@ export default buildConfig({ }) ``` -### Example: defining external tasks +In addition to defining handlers as functions directly provided to your Payload config, you can also pass an _absolute path_ to where the handler is defined. If your task has large dependencies, and you are planning on executing your jobs in a separate process that has access to the filesystem, this could be a handy way to make sure that your Payload + Next.js app remains quick to compile and has minimal dependencies. -payload.config.ts: +In general, this is an advanced use case. Here's how this would look: + +`payload.config.ts:` ```ts import { fileURLToPath } from 'node:url' @@ -86,26 +119,11 @@ const filename = fileURLToPath(import.meta.url) const dirname = path.dirname(filename) export default buildConfig({ - // ... jobs: { tasks: [ { - retries: 2, - slug: 'createPost', - inputSchema: [ - { - name: 'title', - type: 'text', - required: true, - }, - ], - outputSchema: [ - { - name: 'postID', - type: 'text', - required: true, - }, - ], + // ... + // The #createPostHandler is a named export within the `createPost.ts` file handler: path.resolve(dirname, 'src/tasks/createPost.ts') + '#createPostHandler', } ] @@ -113,7 +131,9 @@ export default buildConfig({ }) ``` -src/tasks/createPost.ts: +Then, the `createPost` file itself: + +`src/tasks/createPost.ts:` ```ts import type { TaskHandler } from 'payload' @@ -134,18 +154,23 @@ export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job, } ``` +## Workflows -## Defining workflows + + A "Workflow" is an optional way to combine multiple tasks together in a way that can be gracefully retried from the point of failure. + -There are two types of workflows - JS-based workflows and JSON-based workflows. +They're most helpful when you have multiple tasks in a row, and you want to configure each task to be able to be retried if they fail. -### Defining JS-based workflows +If a task within a workflow fails, the Workflow will automatically "pick back up" on the task where it failed and **not re-execute any prior tasks that have already been executed**. -A JS-based function is a function in which you decide yourself when the tasks should run, by simply calling the `runTask` function. If the job, or any task within the job, fails, the entire function will re-run. +#### Defining a workflow -Tasks that have successfully been completed will simply re-return the cached output without running again, and failed tasks will be re-run. +The most important aspect of a Workflow is the `handler`, where you can declare when and how the tasks should run by simply calling the `runTask` function. If any task within the workflow, fails, the entire `handler` function will re-run. -Simply add a workflow to the `jobs.wokflows` array in your Payload config. A wokflow consists of the following fields: +However, importantly, tasks that have successfully been completed will simply re-return the cached and saved output without running again. The Workflow will pick back up where it failed and only task from the failure point onward will be re-executed. + +To define a JS-based workflow, simply add a workflow to the `jobs.wokflows` array in your Payload config. A workflow consists of the following fields: | Option | Description | | --------------------------- | -------------------------------------------------------------------------------- | @@ -168,6 +193,8 @@ export default buildConfig({ workflows: [ { slug: 'createPostAndUpdate', + + // The arguments that the workflow will accept inputSchema: [ { name: 'title', @@ -175,15 +202,26 @@ export default buildConfig({ required: true, }, ], + + // The handler that defines the "control flow" of the workflow + // Notice how it calls `runTask` to execute tasks handler: async ({ job, runTask }) => { + + // This workflow first runs a task called `createPost` const output = await runTask({ task: 'createPost', + + // You need to define a unique ID for this task invocation + // that will always be the same if this workflow fails + // and is re-executed in the future id: '1', input: { title: job.input.title, }, }) + // Once the prior task completes, it will run a task + // called `updatePost` await runTask({ task: 'updatePost', id: '2', @@ -201,9 +239,11 @@ export default buildConfig({ #### Running tasks inline -In order to run tasks inline without predefining them, you can use the `runTaskInline` function. +In the above example, our workflow was executing tasks that we already had defined in our Payload config. But, you can also run tasks without predefining them. -The drawbacks of this approach are that tasks cannot be re-used as easily, and the **task data stored in the job** will not be typed. In the following example, the inline task data will be stored on the job under `job.taskStatus.inline['2']` but completely untyped, as types for dynamic tasks like these cannot be generated beforehand. +To do this, you can use the `runTaskInline` function. + +The drawbacks of this approach are that tasks cannot be re-used across workflows as easily, and the **task data stored in the job** will not be typed. In the following example, the inline task data will be stored on the job under `job.taskStatus.inline['2']` but completely untyped, as types for dynamic tasks like these cannot be generated beforehand. Example: @@ -225,6 +265,9 @@ export default buildConfig({ }, ], handler: async ({ job, runTask }) => { + // Here, we run a predefined task. + // The `createPost` handler arguments and return type + // are both strongly typed const output = await runTask({ task: 'createPost', id: '1', @@ -233,11 +276,15 @@ export default buildConfig({ }, }) + + // Here, this task is not defined in the Payload config + // and is "inline". Its output will be stored on the Job in the database + // however its arguments will be untyped. const { newPost } = await runTaskInline({ task: async ({ req }) => { const newPost = await req.payload.update({ collection: 'post', - id: output.postID, + id: '2', req, retries: 3, data: { @@ -259,28 +306,37 @@ export default buildConfig({ }) ``` -### Defining JSON-based workflows +## Jobs -JSON-based workflows are a way to define the tasks the workflow should run in an array. The relationships between the tasks, their run order and their conditions are defined in the JSON object, which allows payload to statically analyze the workflow and will generate more helpful graphs. +Now that we have covered Tasks and Workflows, we can tie them together with a concept called a Job. -This functionality is not available yet, but it will be available in the future. + + Whereas you define Workflows and Tasks, which control your business logic, a Job is an individual instance of either a Task or a Workflow which contains many tasks. + -## Queueing workflows and tasks +For example, let's say we have a Workflow or Task that describes the logic to sync information from Payload to a third-party system. This is how you'd declare how to sync that info, but it wouldn't do anything on its own. In order to run that task or workflow, you'd create a Job that references the corresponding Task or Workflow. -In order to queue a workflow or a task (= create them and add them to the queue), you can use the `payload.jobs.queue` function. +Jobs are stored in the Payload database in the `payload-jobs` collection, and you can decide to keep a running list of all jobs, or configure Payload to delete the job when it has been successfully executed. -Example: queueing workflows: +#### Queuing a new job + +In order to queue a job, you can use the `payload.jobs.queue` function. + +Here's how you'd queue a new Job, which will run a `createPostAndUpdate` workflow: ```ts const createdJob = await payload.jobs.queue({ - workflows: 'createPostAndUpdate', + // Pass the name of the workflow + workflow: 'createPostAndUpdate', + // The input type will be automatically typed + // according to the input you've defined for this workflow input: { title: 'my title', }, }) ``` -Example: queueing tasks: +In addition to being able to queue new Jobs based on Workflows, you can also queue a job for a single Task: ```ts const createdJob = await payload.jobs.queue({ @@ -291,55 +347,51 @@ const createdJob = await payload.jobs.queue({ }) ``` -## Running workflows and tasks +## Queues -Workflows and tasks added to the queue will not run unless a worker picks it up and runs it. This can be done in two ways: +Now let's talk about how to _run these jobs_. Right now, all we've covered is how to queue up jobs to run, but so far, we aren't actually running any jobs. This is the final piece of the puzzle. -### Endpoint + + A Queue is a list of jobs that should be executed in order of when they were added. + -Make a fetch request to the `api/payload-jobs/run` endpoint: +When you go to run jobs, Payload will query for any jobs that are added to the queue and then run them. By default, all queued jobs are added to the `default` queue. + +**But, imagine if you wanted to have some jobs that run nightly, and other jobs which should run every five minutes.** + +By specifying the `queue` name when you queue a new job using `payload.jobs.queue()`, you can queue certain jobs with `queue: 'nightly'`, and other jobs can be left as the default queue. + +Then, you could configure two different runner strategies: + +1. A `cron` that runs nightly, querying for jobs added to the `nightly` queue +2. Another that runs any jobs that were added to the `default` queue every ~5 minutes or so + +## Executing jobs + +As mentioned above, you can queue jobs, but the jobs won't run unless a worker picks up your jobs and runs them. This can be done in two ways: + +#### Endpoint + +You can execute jobs by making a fetch request to the `/api/payload-jobs/run` endpoint: ```ts -await fetch('/api/payload-jobs/run', { +// Here, we're saying we want to run only 100 jobs for this invocation +// and we want to pull jobs from the `nightly` queue: +await fetch('/api/payload-jobs/run?limit=100&queue=nightly', { method: 'GET', headers: { - 'Authorization': `JWT ${token}`, + 'Authorization': `Bearer ${token}`, }, }); ``` -### Local API +This endpoint is automatically mounted for you and is helpful in conjunction with serverless platforms like Vercel, where you might want to use Vercel Cron to invoke a serverless function that executes your jobs. -Run the payload.jobs.run function: +**Vercel Cron Example** -```ts -const results = await payload.jobs.run() +If you're deploying on Vercel, you can add a `vercel.json` file in the root of your project that configures Vercel Cron to invoke the `run` endpoint on a cron schedule. -// You can customize the queue name by passing it as an argument -await payload.jobs.run({ queue: 'posts' }) -``` - -### Script - -You can run the jobs:run script from the command line: - -```sh -npx payload jobs:run --queue default --limit 10 -``` - -#### Triggering jobs as cronjob - -You can pass the --cron flag to the jobs:run script to run the jobs in a cronjob: - -```sh -npx payload jobs:run --cron "*/5 * * * *" -``` - -### Vercel Cron - -Vercel Cron allows scheduled tasks to be executed automatically by triggering specific endpoints. Below is a step-by-step guide to configuring Vercel Cron for running queued jobs on apps hosted on Vercel: - -1. Add Vercel Cron Configuration: Place a vercel.json file at the root of your project with the following content: +Here's an example of what this file will look like: ```json { @@ -352,13 +404,13 @@ Vercel Cron allows scheduled tasks to be executed automatically by triggering sp } ``` -This configuration schedules the endpoint `/api/payload-jobs/run` to be triggered every 5 minutes. This endpoint is added automatically by payload and is responsible for running the queued jobs. +The configuration above schedules the endpoint `/api/payload-jobs/run` to be invoked every 5 minutes. -2. Environment Variable Setup: By default, the endpoint may require a JWT token for authorization. However, Vercel Cron jobs cannot pass JWT tokens. Instead, you can use an environment variable to secure the endpoint: +The last step will be to secure your `run` endpoint so that only the proper users can invoke the runner. -Add a new environment variable named `CRON_SECRET` to your Vercel project settings. This should be a random string, ideally 16 characters or longer. +To do this, you can set an environment variable on your Vercel project called `CRON_SECRET`, which should be a random string—ideally 16 characters or longer. -3. Modify Authentication for Job Running: Adjust the job running authorization logic in your project to accept the `CRON_SECRET` as a valid token. Modify your `payload.config.ts` file as follows: +Then, you can modify the `access` function for running jobs by ensuring that only Vercel can invoke your runner. ```ts export default buildConfig({ @@ -366,6 +418,12 @@ export default buildConfig({ jobs: { access: { run: ({ req }: { req: PayloadRequest }): boolean => { + // Allow logged in users to execute this endpoint (default) + if (req.user) return true + + // If there is no logged in user, then check + // for the Vercel Cron secret to be present as an + // Authorization header: const authHeader = req.headers.get('authorization'); return authHeader === `Bearer ${process.env.CRON_SECRET}`; }, @@ -375,8 +433,31 @@ export default buildConfig({ }) ``` -This code snippet ensures that the jobs can only be triggered if the correct `CRON_SECRET` is provided in the authorization header. - -Vercel will automatically make the `CRON_SECRET` environment variable available to the endpoint when triggered by the Vercel Cron, ensuring that the jobs can be run securely. +This works because Vercel automatically makes the `CRON_SECRET` environment variable available to the endpoint as the `Authorization` header when triggered by the Vercel Cron, ensuring that the jobs can be run securely. After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background. + +#### Local API + +If you want to process jobs programmatically from your server-side code, you can use the Local API: + +```ts +const results = await payload.jobs.run() + +// You can customize the queue name and limit by passing them as arguments: +await payload.jobs.run({ queue: 'nightly', limit: 100 }) +``` + +#### Bin script + +Finally, you can process jobs via the bin script that comes with Payload out of the box. + +```sh +npx payload jobs:run --queue default --limit 10 +``` + +In addition, the bin script allows you to pass a `--cron` flag to the `jobs:run` command to run the jobs on a scheduled, cron basis: + +```sh +npx payload jobs:run --cron "*/5 * * * *" +```