feat: adds jobs queue (#8228)

Adds a jobs queue to Payload.

- [x] Docs, w/ examples for Vercel Cron, additional services
- [x] Type the `job` using GeneratedTypes in `JobRunnerArgs`
(@AlessioGr)
- [x] Write the `runJobs` function 
- [x] Allow for some type of `payload.runTask` 
- [x] Open up a new bin script for running jobs
- [x] Determine strategy for runner endpoint to either await jobs
successfully or return early and stay open until job work completes
(serverless ramifications here)
- [x] Allow for job runner to accept how many jobs to run in one
invocation
- [x] Make a Payload local API method for creating a new job easily
(payload.createJob) or similar which is strongly typed (@AlessioGr)
- [x] Make `payload.runJobs` or similar  (@AlessioGr)
- [x] Write tests for retrying up to max retries for a given step
- [x] Write tests for dynamic import of a runner

The shape of the config should permit the definition of steps separate
from the job workflows themselves.

```js
const config = {
  // Not sure if we need this property anymore
  queues: {
  },
  // A job is an instance of a workflow, stored in DB
  // and triggered by something at some point
  jobs: {
    // Be able to override the jobs collection
    collectionOverrides: () => {},

    // Workflows are groups of tasks that handle
    // the flow from task to task.
    // When defined on the config, they are considered as predefined workflows
    // BUT - in the future, we'll allow for UI-based workflow definition as well.
    workflows: [
      {
        slug: 'job-name',
        // Temporary name for this
        // should be able to pass function 
        // or path to it for Node to dynamically import
        controlFlowInJS: '/my-runner.js',

        // Temporary name as well
        // should be able to eventually define workflows
        // in UI (meaning they need to be serialized in JSON)
        // Should not be able to define both control flows
        controlFlowInJSON: [
          {
            task: 'myTask',
            next: {
              // etc
            }
          }
        ],

        // Workflows take input
        // which are a group of fields
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
      },
    ],

    // Tasks are defined separately as isolated functions
    // that can be retried on fail
    tasks: [
      {
        slug: 'myTask',
        retries: 2,
        // Each task takes input
        // Used to auto-type the task func args
        input: [
          {
            name: 'post',
            type: 'relationship',
            relationTo: 'posts',
            maxDepth: 0,
            required: true,
          },
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        // Each task takes output
        // Used to auto-type the function signature
        output: [
          {
            name: 'success',
            type: 'checkbox',
          }
        ],
        onSuccess: () => {},
        onFail: () => {},
        run: myRunner,
      },
    ]
  }
}
```

### `payload.createJob`

This function should allow for the creation of jobs based on either a
workflow (group of tasks) or an individual task.

To create a job using a workflow:

```js
const job = await payload.createJob({
  // Accept the `name` of a workflow so we can match to either a 
  // code-based workflow OR a workflow defined in the DB
  // Should auto-type the input
  workflowName: 'myWorkflow',
  input: {
    // typed to the args of the workflow by name
  }
})
```

To create a job using a task:

```js
const job = await payload.createJob({
  // Accept the `name` of a task
  task: 'myTask',
  input: {
    // typed to the args of the task by name
  }
})
```

---------

Co-authored-by: Alessio Gravili <alessio@gravili.de>
Co-authored-by: Dan Ribbens <dan.ribbens@gmail.com>
This commit is contained in:
James Mikrut
2024-10-30 13:56:50 -04:00
committed by GitHub
parent 0574155e59
commit 8970c6b3a6
49 changed files with 6357 additions and 125 deletions

View File

@@ -126,6 +126,6 @@ await payload.update({
where: {
slug: { equals: 'my-slug' }
},
req: { disableTransaction: true },
disableTransaction: true,
})
```

View File

@@ -0,0 +1,382 @@
---
title: Jobs Queue
label: Jobs Queue
order: 10
desc: Payload provides all you need to run job queues, which are helpful to offload long-running processes into separate workers.
keywords: jobs queue, application framework, typescript, node, react, nextjs
---
## Defining tasks
A task is a simple function that can be executed directly or within a workflow. The difference between tasks and functions is that tasks can be run in the background, and can be retried if they fail.
Tasks can either be defined within the `jobs.tasks` array in your payload config, or they can be run inline within a workflow.
### Defining tasks in the config
Simply add a task to the `jobs.tasks` array in your Payload config. A task consists of the following fields:
| Option | Description |
| --------------------------- | -------------------------------------------------------------------------------- |
| `slug` | Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows.|
| `handler` | The function that should be responsible for running the job. You can either pass a string-based path to the job function file, or the job function itself. If you are using large dependencies within your job, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug. |
| `outputSchema` | Define the output field schema - payload will generate a type for this schema. |
| `label` | Define a human-friendly label for this task. |
| `onFail` | Function to be executed if the task fails. |
| `onSuccess` | Function to be executed if the task fails. |
| `retries` | Specify the number of times that this step should be retried if it fails. |
The handler is the function, or a path to the function, that will run once the job picks up this task. The handler function should return an object with an `output` key, which should contain the output of the task.
Example:
```ts
export default buildConfig({
// ...
jobs: {
tasks: [
{
retries: 2,
slug: 'createPost',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
handler: async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
},
} as TaskConfig<'createPost'>,
]
}
})
```
### Example: defining external tasks
payload.config.ts:
```ts
import { fileURLToPath } from 'node:url'
import path from 'path'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export default buildConfig({
// ...
jobs: {
tasks: [
{
retries: 2,
slug: 'createPost',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'postID',
type: 'text',
required: true,
},
],
handler: path.resolve(dirname, 'src/tasks/createPost.ts') + '#createPostHandler',
}
]
}
})
```
src/tasks/createPost.ts:
```ts
import type { TaskHandler } from 'payload'
export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job, req }) => {
const newPost = await req.payload.create({
collection: 'post',
req,
data: {
title: input.title,
},
})
return {
output: {
postID: newPost.id,
},
}
}
```
## Defining workflows
There are two types of workflows - JS-based workflows and JSON-based workflows.
### Defining JS-based workflows
A JS-based function is a function in which you decide yourself when the tasks should run, by simply calling the `runTask` function. If the job, or any task within the job, fails, the entire function will re-run.
Tasks that have successfully been completed will simply re-return the cached output without running again, and failed tasks will be re-run.
Simply add a workflow to the `jobs.wokflows` array in your Payload config. A wokflow consists of the following fields:
| Option | Description |
| --------------------------- | -------------------------------------------------------------------------------- |
| `slug` | Define a slug-based name for this workflow. This slug needs to be unique among both tasks and workflows.|
| `handler` | The function that should be responsible for running the workflow. You can either pass a string-based path to the workflow function file, or workflow job function itself. If you are using large dependencies within your workflow, you might prefer to pass the string path because that will avoid bundling large dependencies in your Next.js app. |
| `inputSchema` | Define the input field schema - payload will generate a type for this schema. |
| `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug. |
| `label` | Define a human-friendly label for this workflow. |
| `queue` | Optionally, define the queue name that this workflow should be tied to. Defaults to "default". |
Example:
```ts
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
handler: async ({ job, runTask }) => {
const output = await runTask({
task: 'createPost',
id: '1',
input: {
title: job.input.title,
},
})
await runTask({
task: 'updatePost',
id: '2',
input: {
post: job.taskStatus.createPost['1'].output.postID, // or output.postID
title: job.input.title + '2',
},
})
},
} as WorkflowConfig<'updatePost'>
]
}
})
```
#### Running tasks inline
In order to run tasks inline without predefining them, you can use the `runTaskInline` function.
The drawbacks of this approach are that tasks cannot be re-used as easily, and the **task data stored in the job** will not be typed. In the following example, the inline task data will be stored on the job under `job.taskStatus.inline['2']` but completely untyped, as types for dynamic tasks like these cannot be generated beforehand.
Example:
```ts
export default buildConfig({
// ...
jobs: {
tasks: [
// ...
]
workflows: [
{
slug: 'createPostAndUpdate',
inputSchema: [
{
name: 'title',
type: 'text',
required: true,
},
],
handler: async ({ job, runTask }) => {
const output = await runTask({
task: 'createPost',
id: '1',
input: {
title: job.input.title,
},
})
const { newPost } = await runTaskInline({
task: async ({ req }) => {
const newPost = await req.payload.update({
collection: 'post',
id: output.postID,
req,
retries: 3,
data: {
title: 'updated!',
},
})
return {
output: {
newPost
},
}
},
id: '2',
})
},
} as WorkflowConfig<'updatePost'>
]
}
})
```
### Defining JSON-based workflows
JSON-based workflows are a way to define the tasks the workflow should run in an array. The relationships between the tasks, their run order and their conditions are defined in the JSON object, which allows payload to statically analyze the workflow and will generate more helpful graphs.
This functionality is not available yet, but it will be available in the future.
## Queueing workflows and tasks
In order to queue a workflow or a task (= create them and add them to the queue), you can use the `payload.jobs.queue` function.
Example: queueing workflows:
```ts
const createdJob = await payload.jobs.queue({
workflows: 'createPostAndUpdate',
input: {
title: 'my title',
},
})
```
Example: queueing tasks:
```ts
const createdJob = await payload.jobs.queue({
task: 'createPost',
input: {
title: 'my title',
},
})
```
## Running workflows and tasks
Workflows and tasks added to the queue will not run unless a worker picks it up and runs it. This can be done in two ways:
### Endpoint
Make a fetch request to the `api/payload-jobs/run` endpoint:
```ts
await fetch('/api/payload-jobs/run', {
method: 'GET',
headers: {
'Authorization': `JWT ${token}`,
},
});
```
### Local API
Run the payload.jobs.run function:
```ts
const results = await payload.jobs.run()
// You can customize the queue name by passing it as an argument
await payload.jobs.run({ queue: 'posts' })
```
### Script
You can run the jobs:run script from the command line:
```sh
npx payload jobs:run --queue default --limit 10
```
#### Triggering jobs as cronjob
You can pass the --cron flag to the jobs:run script to run the jobs in a cronjob:
```sh
npx payload jobs:run --cron "*/5 * * * *"
```
### Vercel Cron
Vercel Cron allows scheduled tasks to be executed automatically by triggering specific endpoints. Below is a step-by-step guide to configuring Vercel Cron for running queued jobs on apps hosted on Vercel:
1. Add Vercel Cron Configuration: Place a vercel.json file at the root of your project with the following content:
```json
{
"crons": [
{
"path": "/api/payload-jobs/run",
"schedule": "*/5 * * * *"
}
]
}
```
This configuration schedules the endpoint `/api/payload-jobs/run` to be triggered every 5 minutes. This endpoint is added automatically by payload and is responsible for running the queued jobs.
2. Environment Variable Setup: By default, the endpoint may require a JWT token for authorization. However, Vercel Cron jobs cannot pass JWT tokens. Instead, you can use an environment variable to secure the endpoint:
Add a new environment variable named `CRON_SECRET` to your Vercel project settings. This should be a random string, ideally 16 characters or longer.
3. Modify Authentication for Job Running: Adjust the job running authorization logic in your project to accept the `CRON_SECRET` as a valid token. Modify your `payload.config.ts` file as follows:
```ts
export default buildConfig({
// Other configurations...
jobs: {
access: {
run: ({ req }: { req: PayloadRequest }): boolean => {
const authHeader = req.headers.get('authorization');
return authHeader === `Bearer ${process.env.CRON_SECRET}`;
},
},
// Other job configurations...
}
})
```
This code snippet ensures that the jobs can only be triggered if the correct `CRON_SECRET` is provided in the authorization header.
Vercel will automatically make the `CRON_SECRET` environment variable available to the endpoint when triggered by the Vercel Cron, ensuring that the jobs can be run securely.
After the project is deployed to Vercel, the Vercel Cron job will automatically trigger the `/api/payload-jobs/run` endpoint in the specified schedule, running the queued jobs in the background.

View File

@@ -92,6 +92,7 @@
"bson-objectid": "2.0.4",
"ci-info": "^4.0.0",
"console-table-printer": "2.11.2",
"croner": "8.1.2",
"dataloader": "2.2.2",
"deepmerge": "4.3.1",
"file-type": "19.3.0",

View File

@@ -1,3 +1,4 @@
import { Cron } from 'croner'
import minimist from 'minimist'
import { pathToFileURL } from 'node:url'
import path from 'path'
@@ -5,6 +6,7 @@ import path from 'path'
import type { BinScript } from '../config/types.js'
import { findConfig } from '../config/find.js'
import { getPayload } from '../index.js'
import { generateImportMap } from './generateImportMap/index.js'
import { generateTypes } from './generateTypes.js'
import { info } from './info.js'
@@ -83,6 +85,30 @@ export const bin = async () => {
return generateImportMap(config)
}
if (script === 'jobs:run') {
const payload = await getPayload({ config })
const limit = args.limit ? parseInt(args.limit, 10) : undefined
const queue = args.queue ? args.queue : undefined
if (args.cron) {
Cron(args.cron, async () => {
await payload.jobs.run({
limit,
queue,
})
})
process.stdin.resume() // Keep the process alive
return
} else {
return await payload.jobs.run({
limit,
queue,
})
}
}
console.error(`Unknown script: "${script}".`)
process.exit(1)
}

View File

@@ -19,6 +19,7 @@ export type ServerOnlyRootProperties = keyof Pick<
| 'endpoints'
| 'graphQL'
| 'hooks'
| 'jobs'
| 'logger'
| 'onInit'
| 'plugins'
@@ -64,6 +65,7 @@ export const serverOnlyConfigProperties: readonly Partial<ServerOnlyRootProperti
'email',
'custom',
'graphQL',
'jobs',
'logger',
// `admin`, `onInit`, `localization`, `collections`, and `globals` are all handled separately
]

View File

@@ -1,5 +1,8 @@
import type { JobsConfig } from '../queues/config/types/index.js'
import type { Config } from './types.js'
import defaultAccess from '../auth/defaultAccess.js'
export const defaults: Omit<Config, 'db' | 'editor' | 'secret'> = {
admin: {
avatar: 'gravatar',
@@ -44,6 +47,13 @@ export const defaults: Omit<Config, 'db' | 'editor' | 'secret'> = {
},
hooks: {},
i18n: {},
jobs: {
access: {
run: defaultAccess,
},
deleteJobOnComplete: true,
depth: 0,
} as JobsConfig,
localization: false,
maxDepth: 10,
routes: {

View File

@@ -17,6 +17,7 @@ import { InvalidConfiguration } from '../errors/index.js'
import { sanitizeGlobals } from '../globals/config/sanitize.js'
import { getLockedDocumentsCollection } from '../lockedDocuments/lockedDocumentsCollection.js'
import getPreferencesCollection from '../preferences/preferencesCollection.js'
import { getDefaultJobsCollection } from '../queues/config/jobsCollection.js'
import checkDuplicateCollections from '../utilities/checkDuplicateCollections.js'
import { defaults } from './defaults.js'
@@ -66,6 +67,16 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC
...defaults.graphQL,
...incomingConfig?.graphQL,
},
jobs: {
...defaults.jobs,
...incomingConfig?.jobs,
access: {
...defaults.jobs.access,
...incomingConfig?.jobs?.access,
},
tasks: incomingConfig?.jobs?.tasks || [],
workflows: incomingConfig?.jobs?.workflows || [],
},
routes: {
...defaults.routes,
...incomingConfig?.routes,
@@ -147,6 +158,19 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC
config.i18n = i18nConfig
// Need to add default jobs collection before locked documents collections
if (Array.isArray(configWithDefaults.jobs?.tasks) && configWithDefaults.jobs.tasks.length > 0) {
let defaultJobsCollection = getDefaultJobsCollection(config as unknown as Config)
if (typeof configWithDefaults.jobs.jobsCollectionOverrides === 'function') {
defaultJobsCollection = configWithDefaults.jobs.jobsCollectionOverrides({
defaultJobsCollection,
})
}
configWithDefaults.collections.push(defaultJobsCollection)
}
configWithDefaults.collections.push(getLockedDocumentsCollection(config as unknown as Config))
configWithDefaults.collections.push(getPreferencesCollection(config as unknown as Config))
configWithDefaults.collections.push(migrationsCollection)

View File

@@ -31,7 +31,7 @@ import type {
import type { DatabaseAdapterResult } from '../database/types.js'
import type { EmailAdapter, SendEmailOptions } from '../email/types.js'
import type { GlobalConfig, Globals, SanitizedGlobalConfig } from '../globals/config/types.js'
import type { Payload, RequestContext, TypedUser } from '../index.js'
import type { JobsConfig, Payload, RequestContext, TypedUser } from '../index.js'
import type { PayloadRequest, Where } from '../types/index.js'
import type { PayloadLogger } from '../utilities/logger.js'
@@ -935,6 +935,10 @@ export type Config = {
i18n?: I18nOptions<{} | DefaultTranslationsObject> // loosen the type here to allow for custom translations
/** Automatically index all sortable top-level fields in the database to improve sort performance and add database compatibility for Azure Cosmos and similar. */
indexSortableFields?: boolean
/**
* @experimental There may be frequent breaking changes to this API
*/
jobs?: JobsConfig
/**
* Translate your content to different languages/locales.
*
@@ -1058,6 +1062,7 @@ export type SanitizedConfig = {
endpoints: Endpoint[]
globals: SanitizedGlobalConfig[]
i18n: Required<I18nOptions>
jobs: JobsConfig // Redefine here, as the DeepRequired<Config> can break its type
localization: false | SanitizedLocalizationConfig
paths: {
config: string

View File

@@ -73,6 +73,7 @@ import localOperations from './collections/operations/local/index.js'
import { consoleEmailAdapter } from './email/consoleEmailAdapter.js'
import { fieldAffectsData } from './fields/config/types.js'
import localGlobalOperations from './globals/operations/local/index.js'
import { getJobsLocalAPI } from './queues/localAPI.js'
import { getLogger } from './utilities/logger.js'
import { serverInit as serverInitTelemetry } from './utilities/telemetry/events/serverInit.js'
import { traverseFields } from './utilities/traverseFields.js'
@@ -113,6 +114,19 @@ export interface GeneratedTypes {
globalsUntyped: {
[slug: string]: JsonObject
}
jobsUntyped: {
tasks: {
[slug: string]: {
input?: JsonObject
output?: JsonObject
}
}
workflows: {
[slug: string]: {
input: JsonObject
}
}
}
localeUntyped: null | string
userUntyped: User
}
@@ -146,7 +160,7 @@ export type TypedGlobal = ResolveGlobalType<GeneratedTypes>
export type TypedGlobalSelect = ResolveGlobalSelectType<GeneratedTypes>
// Extract string keys from the type
type StringKeyOf<T> = Extract<keyof T, string>
export type StringKeyOf<T> = Extract<keyof T, string>
// Define the types for slugs using the appropriate collections and globals
export type CollectionSlug = StringKeyOf<TypedCollection>
@@ -173,6 +187,10 @@ export type TypedUser = ResolveUserType<GeneratedTypes>
type ResolveAuthOperationsType<T> = 'auth' extends keyof T ? T['auth'] : T['authUntyped']
export type TypedAuthOperations = ResolveAuthOperationsType<GeneratedTypes>
// @ts-expect-error
type ResolveJobOperationsType<T> = 'jobs' extends keyof T ? T['jobs'] : T['jobsUntyped']
export type TypedJobs = ResolveJobOperationsType<GeneratedTypes>
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
@@ -344,6 +362,8 @@ export class BasePayload {
importMap: ImportMap
jobs = getJobsLocalAPI(this)
logger: Logger
login = async <TSlug extends CollectionSlug>(
@@ -1052,6 +1072,27 @@ export type {
PreferenceUpdateRequest,
TabsPreferences,
} from './preferences/types.js'
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'
export type {
RunTaskFunction,
TaskConfig,
TaskHandler,
TaskHandlerArgs,
TaskHandlerResult,
TaskHandlerResults,
TaskInput,
TaskOutput,
TaskType,
} from './queues/config/types/taskTypes.js'
export type {
BaseJob,
JobTaskStatus,
RunningJob,
SingleTaskStatus,
WorkflowConfig,
WorkflowHandler,
WorkflowTypes,
} from './queues/config/types/workflowTypes.js'
export { getLocalI18n } from './translations/getLocalI18n.js'
export * from './types/index.js'
export { getFileByPath } from './uploads/getFileByPath.js'

View File

@@ -0,0 +1,173 @@
import type { JSONSchema4 } from 'json-schema'
import type { SanitizedConfig } from '../../config/types.js'
import type { JobsConfig } from './types/index.js'
import { fieldsToJSONSchema } from '../../utilities/configToJSONSchema.js'
export function generateJobsJSONSchemas(
config: SanitizedConfig,
jobsConfig: JobsConfig,
interfaceNameDefinitions: Map<string, JSONSchema4>,
/**
* Used for relationship fields, to determine whether to use a string or number type for the ID.
* While there is a default ID field type set by the db adapter, they can differ on a collection-level
* if they have custom ID fields.
*/
collectionIDFieldTypes: { [key: string]: 'number' | 'string' },
): {
definitions?: Map<string, JSONSchema4>
properties?: { tasks: JSONSchema4 }
} {
const properties: { tasks: JSONSchema4; workflows: JSONSchema4 } = {
tasks: {},
workflows: {},
}
const definitions: Map<string, JSONSchema4> = new Map()
if (jobsConfig?.tasks?.length) {
for (const task of jobsConfig.tasks) {
const fullTaskJsonSchema: JSONSchema4 = {
type: 'object',
additionalProperties: false,
properties: {
input: {},
output: {},
},
required: [],
}
if (task?.inputSchema?.length) {
const inputJsonSchema = fieldsToJSONSchema(
collectionIDFieldTypes,
task.inputSchema,
interfaceNameDefinitions,
config,
)
const fullInputJsonSchema: JSONSchema4 = {
type: 'object',
additionalProperties: false,
properties: inputJsonSchema.properties,
required: inputJsonSchema.required,
}
fullTaskJsonSchema.properties.input = fullInputJsonSchema
;(fullTaskJsonSchema.required as string[]).push('input')
}
if (task?.outputSchema?.length) {
const outputJsonSchema = fieldsToJSONSchema(
collectionIDFieldTypes,
task.outputSchema,
interfaceNameDefinitions,
config,
)
const fullOutputJsonSchema: JSONSchema4 = {
type: 'object',
additionalProperties: false,
properties: outputJsonSchema.properties,
required: outputJsonSchema.required,
}
fullTaskJsonSchema.properties.output = fullOutputJsonSchema
;(fullTaskJsonSchema.required as string[]).push('output')
}
const normalizedTaskSlug = task.slug[0].toUpperCase() + task.slug.slice(1)
definitions.set(task.interfaceName ?? `Task${normalizedTaskSlug}`, fullTaskJsonSchema)
}
// Now add properties.tasks definition that references the types in definitions keyed by task slug:
properties.tasks = {
type: 'object',
additionalProperties: false,
properties: {
...Object.fromEntries(
jobsConfig.tasks.map((task) => {
const normalizedTaskSlug = task.slug[0].toUpperCase() + task.slug.slice(1)
const toReturn: JSONSchema4 = {
$ref: task.interfaceName
? `#/definitions/${task.interfaceName}`
: `#/definitions/Task${normalizedTaskSlug}`,
}
return [task.slug, toReturn]
}),
),
inline: {
type: 'object',
additionalProperties: false,
properties: {
input: {},
output: {},
},
required: ['input', 'output'],
},
},
required: jobsConfig.tasks.map((task) => task.slug),
}
}
if (jobsConfig?.workflows?.length) {
for (const workflow of jobsConfig.workflows) {
const fullWorkflowJsonSchema: JSONSchema4 = {
type: 'object',
additionalProperties: false,
properties: {
input: {},
},
required: [],
}
if (workflow?.inputSchema?.length) {
const inputJsonSchema = fieldsToJSONSchema(
collectionIDFieldTypes,
workflow.inputSchema,
interfaceNameDefinitions,
config,
)
const fullInputJsonSchema: JSONSchema4 = {
type: 'object',
additionalProperties: false,
properties: inputJsonSchema.properties,
required: inputJsonSchema.required,
}
fullWorkflowJsonSchema.properties.input = fullInputJsonSchema
;(fullWorkflowJsonSchema.required as string[]).push('input')
}
const normalizedWorkflowSlug = workflow.slug[0].toUpperCase() + workflow.slug.slice(1)
definitions.set(
workflow.interfaceName ?? `Workflow${normalizedWorkflowSlug}`,
fullWorkflowJsonSchema,
)
properties.workflows = {
type: 'object',
additionalProperties: false,
properties: Object.fromEntries(
jobsConfig.workflows.map((workflow) => {
const normalizedWorkflowSlug = workflow.slug[0].toUpperCase() + workflow.slug.slice(1)
const toReturn: JSONSchema4 = {
$ref: workflow.interfaceName
? `#/definitions/${workflow.interfaceName}`
: `#/definitions/Workflow${normalizedWorkflowSlug}`,
}
return [workflow.slug, toReturn]
}),
),
required: jobsConfig.tasks.map((task) => task.slug),
}
}
}
return {
definitions,
properties,
}
}

View File

@@ -0,0 +1,206 @@
import type { CollectionConfig } from '../../collections/config/types.js'
import type { Config } from '../../config/types.js'
import { runJobsEndpoint } from '../restEndpointRun.js'
import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js'
export const getDefaultJobsCollection: (config: Config) => CollectionConfig | null = (config) => {
if (!Array.isArray(config?.jobs?.workflows)) {
return null
}
const workflowSlugs: Set<string> = new Set()
const taskSlugs: Set<string> = new Set(['inline'])
const queueNames: Set<string> = new Set(['default'])
config.jobs.workflows.forEach((workflow) => {
workflowSlugs.add(workflow.slug)
if (workflow.queue) {
queueNames.add(workflow.queue)
}
})
config.jobs.tasks.forEach((task) => {
if (workflowSlugs.has(task.slug)) {
throw new Error(
`Task slug "${task.slug}" is already used by a workflow. No tasks are allowed to have the same slug as a workflow.`,
)
}
taskSlugs.add(task.slug)
})
const jobsCollection: CollectionConfig = {
slug: 'payload-jobs',
admin: {
group: 'System',
hidden: true,
},
endpoints: [runJobsEndpoint],
fields: [
{
name: 'input',
type: 'json',
admin: {
description: 'Input data provided to the job',
},
},
{
name: 'taskStatus',
type: 'json',
virtual: true,
},
{
type: 'tabs',
tabs: [
{
fields: [
{
name: 'completedAt',
type: 'date',
index: true,
},
{
name: 'totalTried',
type: 'number',
defaultValue: 0,
index: true,
},
{
name: 'hasError',
type: 'checkbox',
admin: {
description: 'If hasError is true this job will not be retried',
},
defaultValue: false,
index: true,
},
{
name: 'error',
type: 'json',
admin: {
condition: (data) => data.hasError,
description: 'If hasError is true, this is the error that caused it',
},
},
{
name: 'log',
type: 'array',
admin: {
description: 'Task execution log',
},
fields: [
{
name: 'executedAt',
type: 'date',
required: true,
},
{
name: 'completedAt',
type: 'date',
required: true,
},
{
name: 'taskSlug',
type: 'select',
options: [...taskSlugs],
required: true,
},
{
name: 'taskID',
type: 'text',
required: true,
},
{
name: 'input',
type: 'json',
},
{
name: 'output',
type: 'json',
},
{
name: 'state',
type: 'radio',
options: ['failed', 'succeeded'],
required: true,
},
{
name: 'error',
type: 'json',
admin: {
condition: (_, data) => data.state === 'failed',
},
required: true,
},
],
},
],
label: 'Status',
},
],
},
{
name: 'workflowSlug',
type: 'select',
admin: {
position: 'sidebar',
},
index: true,
options: [...workflowSlugs],
required: false,
},
{
name: 'taskSlug',
type: 'select',
admin: {
position: 'sidebar',
},
index: true,
options: [...taskSlugs],
required: false,
},
{
name: 'queue',
type: 'select',
admin: {
position: 'sidebar',
},
defaultValue: 'default',
index: true,
options: [...queueNames],
},
{
name: 'waitUntil',
type: 'date',
index: true,
},
{
name: 'processing',
type: 'checkbox',
admin: {
position: 'sidebar',
},
defaultValue: false,
index: true,
},
],
hooks: {
afterRead: [
({ doc, req }) => {
// This hook is used to add the virtual `tasks` field to the document, that is computed from the `log` field
doc.taskStatus = getJobTaskStatus({
jobLog: doc.log,
tasksConfig: req.payload.config.jobs.tasks,
})
return doc
},
],
},
lockDocuments: false,
}
return jobsCollection
}

View File

@@ -0,0 +1,45 @@
import type { CollectionConfig } from '../../../index.js'
import type { PayloadRequest } from '../../../types/index.js'
import type { TaskConfig } from './taskTypes.js'
import type { WorkflowConfig } from './workflowTypes.js'
export type RunJobAccessArgs = {
req: PayloadRequest
}
export type RunJobAccess = (args: RunJobAccessArgs) => boolean | Promise<boolean>
export type JobsConfig = {
/**
* Specify access control to determine who can interact with jobs.
*/
access?: {
/**
* By default, all logged-in users can trigger jobs.
*/
run?: RunJobAccess
}
/**
* Determine whether or not to delete a job after it has successfully completed.
*/
deleteJobOnComplete?: boolean
/**
* Specify depth for retrieving jobs from the queue.
* This should be as low as possible in order for job retrieval
* to be as efficient as possible. Defaults to 0.
*/
depth?: number
/**
* Override any settings on the default Jobs collection. Accepts the default collection and allows you to return
* a new collection.
*/
jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig
/**
* Define all possible tasks here
*/
tasks: TaskConfig<any>[]
/**
* Define all the workflows here. Workflows orchestrate the flow of multiple tasks.
*/
workflows: WorkflowConfig<any>[]
}

View File

@@ -0,0 +1,171 @@
import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
import type { RunningJob, RunningJobSimple } from './workflowTypes.js'
export type TaskInputOutput = {
input: object
output: object
}
export type TaskHandlerResult<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
> = {
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['output']
: never
state?: 'failed' | 'succeeded'
}
export type TaskHandlerArgs<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
TWorkflowSlug extends keyof TypedJobs['workflows'] = string,
> = {
input: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['input']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['input']
: never
job: RunningJob<TWorkflowSlug>
req: PayloadRequest
}
/**
* Inline tasks in JSON workflows have no input, as they can just get the input from job.taskStatus
*/
export type TaskHandlerArgsNoInput<TWorkflowInput extends object> = {
job: RunningJobSimple<TWorkflowInput>
req: PayloadRequest
}
export type TaskHandler<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
TWorkflowSlug extends keyof TypedJobs['workflows'] = string,
> = (
args: TaskHandlerArgs<TTaskSlugOrInputOutput, TWorkflowSlug>,
) => Promise<TaskHandlerResult<TTaskSlugOrInputOutput>> | TaskHandlerResult<TTaskSlugOrInputOutput>
export type TaskType = StringKeyOf<TypedJobs['tasks']>
// Extracts the type of `input` corresponding to each task
export type TaskInput<T extends keyof TypedJobs['tasks']> = TypedJobs['tasks'][T]['input']
export type TaskOutput<T extends keyof TypedJobs['tasks']> = TypedJobs['tasks'][T]['output']
export type TaskHandlerResults = {
[TTaskSlug in keyof TypedJobs['tasks']]: {
[id: string]: TaskHandlerResult<TTaskSlug>
}
}
// Helper type to create correct argument type for the function corresponding to each task.
export type RunTaskFunctionArgs<TTaskSlug extends keyof TypedJobs['tasks']> = {
input?: TaskInput<TTaskSlug>
retries?: number | RetryConfig
}
export type RunTaskFunction<TTaskSlug extends keyof TypedJobs['tasks']> = (
taskID: string,
taskArgs?: RunTaskFunctionArgs<TTaskSlug>,
) => Promise<TaskOutput<TTaskSlug>>
export type RunTaskFunctions = {
[TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug>
}
export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>(
taskID: string,
taskArgs: {
input?: TTaskInput
retries?: number | RetryConfig
// This is the same as TaskHandler, but typed out explicitly in order to improve type inference
task: (args: { input: TTaskInput; job: RunningJob<any>; req: PayloadRequest }) =>
| {
output: TTaskOutput
state?: 'failed' | 'succeeded'
}
| Promise<{ output: TTaskOutput; state?: 'failed' | 'succeeded' }>
},
) => Promise<TTaskOutput>
export type RetryConfig = {
attempts: number
/**
* The backoff strategy to use when retrying the task. This determines how long to wait before retrying the task.
*
* If this is set on a single task, the longest backoff time of a task will determine the time until the entire workflow is retried.
*/
backoff?: {
/**
* Base delay between running jobs in ms
*/
delay?: number
/**
* @default fixed
*
* The backoff strategy to use when retrying the task. This determines how long to wait before retrying the task.
* If fixed (default) is used, the delay will be the same between each retry.
*
* If exponential is used, the delay will increase exponentially with each retry.
*
* @example
* delay = 1000
* attempts = 3
* type = 'fixed'
*
* The task will be retried 3 times with a delay of 1000ms between each retry.
*
* @example
* delay = 1000
* attempts = 3
* type = 'exponential'
*
* The task will be retried 3 times with a delay of 1000ms, 2000ms, and 4000ms between each retry.
*/
type: 'exponential' | 'fixed'
}
}
export type TaskConfig<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput = TaskType,
> = {
/**
* The function that should be responsible for running the job.
* You can either pass a string-based path to the job function file, or the job function itself.
*
* If you are using large dependencies within your job, you might prefer to pass the string path
* because that will avoid bundling large dependencies in your Next.js app.
*/
handler: string | TaskHandler<TTaskSlugOrInputOutput>
/**
* Define the input field schema - payload will generate a type for this schema.
*/
inputSchema?: Field[]
/**
* You can use interfaceName to change the name of the interface that is generated for this task. By default, this is "Task" + the capitalized task slug.
*/
interfaceName?: string
/**
* Define a human-friendly label for this task.
*/
label?: string
/**
* Function to be executed if the task fails.
*/
onFail?: () => Promise<void> | void
/**
* Function to be executed if the task succeeds.
*/
onSuccess?: () => Promise<void> | void
/**
* Define the output field schema - payload will generate a type for this schema.
*/
outputSchema?: Field[]
/**
* Specify the number of times that this step should be retried if it fails.
*/
retries?: number | RetryConfig
/**
* Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows.
*/
slug: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] ? TTaskSlugOrInputOutput : string
}

View File

@@ -0,0 +1,36 @@
import type { RunningJob, TaskHandlerResult, TypedJobs } from '../../../index.js'
import type { RetryConfig, TaskHandlerArgsNoInput } from './taskTypes.js'
export type WorkflowStep<
TTaskSlug extends keyof TypedJobs['tasks'],
TWorkflowSlug extends keyof TypedJobs['workflows'],
> = {
/**
* If this step is completed, the workflow will be marked as completed
*/
completesJob?: boolean
condition?: (args: { job: RunningJob<TWorkflowSlug> }) => boolean
/**
* Each task needs to have a unique ID to track its status
*/
id: string
retries?: number | RetryConfig
} & (
| {
inlineTask?: (
args: TaskHandlerArgsNoInput<TypedJobs['workflows'][TWorkflowSlug]['input']>,
) => Promise<TaskHandlerResult<TTaskSlug>> | TaskHandlerResult<TTaskSlug>
}
| {
input: (args: { job: RunningJob<TWorkflowSlug> }) => TypedJobs['tasks'][TTaskSlug]['input']
task: TTaskSlug
}
)
type AllWorkflowSteps<TWorkflowSlug extends keyof TypedJobs['workflows']> = {
[TTaskSlug in keyof TypedJobs['tasks']]: WorkflowStep<TTaskSlug, TWorkflowSlug>
}[keyof TypedJobs['tasks']]
export type WorkflowJSON<TWorkflowSlug extends keyof TypedJobs['workflows']> = Array<
AllWorkflowSteps<TWorkflowSlug>
>

View File

@@ -0,0 +1,127 @@
import type { Field } from '../../../fields/config/types.js'
import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js'
import type {
RetryConfig,
RunInlineTaskFunction,
RunTaskFunctions,
TaskInput,
TaskOutput,
TaskType,
} from './taskTypes.js'
import type { WorkflowJSON } from './workflowJSONTypes.js'
export type JobLog = {
completedAt: string
error?: unknown
executedAt: string
input?: any
output?: any
state: 'failed' | 'succeeded'
taskID: string
taskSlug: string
}
export type BaseJob = {
completedAt?: string
error?: unknown
hasError?: boolean
id: number | string
input?: any
log: JobLog[]
processing?: boolean
queue: string
taskSlug?: string
taskStatus?: JobTaskStatus
totalTried: number
waitUntil?: string
workflowSlug?: string
}
export type WorkflowTypes = StringKeyOf<TypedJobs['workflows']>
// TODO: Type job.taskStatus once available - for JSON-defined workflows
export type RunningJob<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> = {
input: TWorkflowSlugOrInput extends keyof TypedJobs['workflows']
? TypedJobs['workflows'][TWorkflowSlugOrInput]['input']
: TWorkflowSlugOrInput
taskStatus: JobTaskStatus
} & Omit<TypedCollection['payload-jobs'], 'input' | 'taskStatus'>
export type RunningJobSimple<TWorkflowInput extends object> = {
input: TWorkflowInput
} & TypedCollection['payload-jobs']
// Simplified version of RunningJob that doesn't break TypeScript (TypeScript seems to stop evaluating RunningJob when it's too complex)
export type RunningJobFromTask<TTaskSlug extends keyof TypedJobs['tasks']> = {
input: TypedJobs['tasks'][TTaskSlug]['input']
} & TypedCollection['payload-jobs']
export type WorkflowHandler<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> =
(args: {
inlineTask: RunInlineTaskFunction
job: RunningJob<TWorkflowSlugOrInput>
req: PayloadRequest
tasks: RunTaskFunctions
}) => Promise<void>
export type SingleTaskStatus<T extends keyof TypedJobs['tasks']> = {
complete: boolean
input: TaskInput<T>
output: TaskOutput<T>
taskSlug: TaskType
totalTried: number
}
/**
* Task IDs mapped to their status
*/
export type JobTaskStatus = {
// Wrap in taskSlug to improve typing
[taskSlug in TaskType]: {
[taskID: string]: SingleTaskStatus<taskSlug>
}
}
export type WorkflowConfig<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> = {
/**
* You can either pass a string-based path to the workflow function file, or the workflow function itself.
*
* If you are using large dependencies within your workflow control flow, you might prefer to pass the string path
* because that will avoid bundling large dependencies in your Next.js app.
*
*
*/
handler:
| string
| WorkflowHandler<TWorkflowSlugOrInput>
| WorkflowJSON<TWorkflowSlugOrInput extends object ? string : TWorkflowSlugOrInput>
/**
* Define the input field schema - payload will generate a type for this schema.
*/
inputSchema?: Field[]
/**
* You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug.
*/
interfaceName?: string
/**
* Define a human-friendly label for this workflow.
*/
label?: string
/**
* Optionally, define the queue name that this workflow should be tied to.
* Defaults to "default".
*/
queue?: string
/**
* Specify the number of times that this workflow should be retried if it fails for any reason.
*/
retries?: number | RetryConfig
/**
* Define a slug-based name for this job.
*/
slug: TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] ? TWorkflowSlugOrInput : string
}
type AllWorkflowConfigs = {
[TWorkflowSlug in keyof TypedJobs['workflows']]: WorkflowConfig<TWorkflowSlug>
}[keyof TypedJobs['workflows']]

View File

@@ -0,0 +1,66 @@
import type { RunningJobFromTask } from './config/types/workflowTypes.js'
import {
createLocalReq,
type Payload,
type PayloadRequest,
type RunningJob,
type TypedJobs,
} from '../index.js'
import { runJobs } from './operations/runJobs/index.js'
export const getJobsLocalAPI = (payload: Payload) => ({
queue: async <
// eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents
TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'],
>(
args:
| {
input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input']
req?: PayloadRequest
// TTaskOrWorkflowlug with keyof TypedJobs['workflows'] removed:
task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never
workflow?: never
}
| {
input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input']
req?: PayloadRequest
task?: never
workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? TTaskOrWorkflowSlug
: never
},
): Promise<
TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>
> => {
return (await payload.create({
collection: 'payload-jobs',
data: {
input: args.input,
taskSlug: 'task' in args ? args.task : undefined,
workflowSlug: 'workflow' in args ? args.workflow : undefined,
},
req: args.req,
})) as TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug> // Type assertion is still needed here
},
run: async (args?: {
limit?: number
overrideAccess?: boolean
queue?: string
req?: PayloadRequest
}): Promise<ReturnType<typeof runJobs>> => {
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
const result = await runJobs({
limit: args?.limit,
overrideAccess: args?.overrideAccess !== false,
queue: args?.queue,
req: newReq,
})
return result
},
})

View File

@@ -0,0 +1,228 @@
import type { PaginatedDocs } from '../../../database/types.js'
import type { PayloadRequest, Where } from '../../../types/index.js'
import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js'
import type {
BaseJob,
WorkflowConfig,
WorkflowHandler,
WorkflowTypes,
} from '../../config/types/workflowTypes.js'
import type { RunJobResult } from './runJob/index.js'
import { Forbidden } from '../../../errors/Forbidden.js'
import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js'
import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js'
import { importHandlerPath } from './runJob/importHandlerPath.js'
import { runJob } from './runJob/index.js'
import { runJSONJob } from './runJSONJob/index.js'
export type RunJobsArgs = {
limit?: number
overrideAccess?: boolean
queue?: string
req: PayloadRequest
}
export type RunJobsResult = {
jobStatus?: Record<string, RunJobResult>
/**
* If this is false, there for sure are no jobs remaining, regardless of the limit
*/
noJobsRemaining?: boolean
/**
* Out of the jobs that were queried & processed (within the set limit), how many are remaining and retryable?
*/
remainingJobsFromQueried: number
}
export const runJobs = async ({
limit = 10,
overrideAccess,
queue,
req,
}: RunJobsArgs): Promise<RunJobsResult> => {
if (!overrideAccess) {
const hasAccess = await req.payload.config.jobs.access.run({ req })
if (!hasAccess) {
throw new Forbidden(req.t)
}
}
const where: Where = {
and: [
{
completedAt: {
exists: false,
},
},
{
hasError: {
not_equals: true,
},
},
{
processing: {
equals: false,
},
},
{
or: [
{
waitUntil: {
exists: false,
},
},
{
waitUntil: {
less_than: new Date().toISOString(),
},
},
],
},
],
}
if (queue) {
where.and.push({
queue: {
equals: queue,
},
})
}
// Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of
// the same job being picked up by another worker
const jobsQuery = (await req.payload.update({
collection: 'payload-jobs',
data: {
processing: true,
seenByWorker: true,
},
depth: req.payload.config.jobs.depth,
disableTransaction: true,
limit,
showHiddenFields: true,
where,
})) as unknown as PaginatedDocs<BaseJob>
/**
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
*/
const { newJobs } = jobsQuery.docs.reduce(
(acc, job) => {
if (job.totalTried > 0) {
acc.existingJobs.push(job)
} else {
acc.newJobs.push(job)
}
return acc
},
{ existingJobs: [], newJobs: [] },
)
if (!jobsQuery.docs.length) {
return {
noJobsRemaining: true,
remainingJobsFromQueried: 0,
}
}
if (jobsQuery?.docs?.length) {
req.payload.logger.info(`Running ${jobsQuery.docs.length} jobs.`)
}
const jobPromises = jobsQuery.docs.map(async (job) => {
if (!job.workflowSlug && !job.taskSlug) {
throw new Error('Job must have either a workflowSlug or a taskSlug')
}
const jobReq = isolateObjectProperty(req, 'transactionID')
const workflowConfig: WorkflowConfig<WorkflowTypes> = job.workflowSlug
? req.payload.config.jobs.workflows.find(({ slug }) => slug === job.workflowSlug)
: {
slug: 'singleTask',
handler: async ({ job, tasks }) => {
await tasks[job.taskSlug as string]('1', {
input: job.input,
})
},
}
if (!workflowConfig) {
return null // Skip jobs with no workflow configuration
}
const updateJob = getUpdateJobFunction(job, jobReq)
// the runner will either be passed to the config
// OR it will be a path, which we will need to import via eval to avoid
// Next.js compiler dynamic import expression errors
let workflowHandler: WorkflowHandler<WorkflowTypes> | WorkflowJSON<WorkflowTypes>
if (
typeof workflowConfig.handler === 'function' ||
(typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler))
) {
workflowHandler = workflowConfig.handler
} else {
workflowHandler = await importHandlerPath<typeof workflowHandler>(workflowConfig.handler)
if (!workflowHandler) {
const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${job.workflowSlug}.`
req.payload.logger.error(errorMessage)
await updateJob({
error: {
error: errorMessage,
},
hasError: true,
processing: false,
})
return
}
}
if (typeof workflowHandler === 'function') {
const result = await runJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
return { id: job.id, result }
} else {
const result = await runJSONJob({
job,
req: jobReq,
updateJob,
workflowConfig,
workflowHandler,
})
return { id: job.id, result }
}
})
const resultsArray = await Promise.all(jobPromises)
const resultsObject: RunJobsResult['jobStatus'] = resultsArray.reduce((acc, cur) => {
if (cur !== null) {
// Check if there's a valid result to include
acc[cur.id] = cur.result
}
return acc
}, {})
let remainingJobsFromQueried = 0
for (const jobID in resultsObject) {
const jobResult = resultsObject[jobID]
if (jobResult.status === 'error') {
remainingJobsFromQueried++ // Can be retried
}
}
return {
jobStatus: resultsObject,
remainingJobsFromQueried,
}
}

View File

@@ -0,0 +1,159 @@
import type { PayloadRequest } from '../../../../types/index.js'
import type { WorkflowJSON, WorkflowStep } from '../../../config/types/workflowJSONTypes.js'
import type {
BaseJob,
RunningJob,
WorkflowConfig,
WorkflowTypes,
} from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js'
import type { JobRunStatus } from '../runJob/index.js'
import { getRunTaskFunction, type RunTaskFunctionState } from '../runJob/getRunTaskFunction.js'
import { handleWorkflowError } from '../runJob/handleWorkflowError.js'
type Args = {
job: BaseJob
req: PayloadRequest
updateJob: UpdateJobFunction
workflowConfig: WorkflowConfig<WorkflowTypes>
workflowHandler: WorkflowJSON<WorkflowTypes>
}
export type RunJSONJobResult = {
status: JobRunStatus
}
export const runJSONJob = async ({
job,
req,
updateJob,
workflowConfig,
workflowHandler,
}: Args): Promise<RunJSONJobResult> => {
// Object so that we can pass contents by reference, not value.
// We want any mutations to be reflected in here.
const state: RunTaskFunctionState = {
reachedMaxRetries: false,
}
const stepsToRun: WorkflowStep<string, string>[] = []
for (const step of workflowHandler) {
if ('task' in step) {
if (job?.taskStatus?.[step.task]?.[step.id]?.complete) {
continue
}
} else {
if (job?.taskStatus?.['inline']?.[step.id]?.complete) {
continue
}
}
if (step.condition && !step.condition({ job: job as RunningJob<any> })) {
// TODO: Improve RunningJob type see todo below
continue
}
stepsToRun.push(step)
}
const tasks = getRunTaskFunction(state, job, workflowConfig, req, false, updateJob)
const inlineTask = getRunTaskFunction(state, job, workflowConfig, req, true, updateJob)
// Run the job
let hasFinalError = false
let error: Error | undefined
try {
await Promise.all(
stepsToRun.map(async (step) => {
if ('task' in step) {
await tasks[step.task](step.id, {
input: step.input ? step.input({ job: job as RunningJob<any> }) : {}, // TODO: Type better. We should use RunningJob anywhere and make TypedCollection['payload-jobs'] be BaseJob if type not generated
retries: step.retries,
})
} else {
await inlineTask(step.id, {
retries: step.retries,
task: step.inlineTask as any, // TODO: Fix type
})
}
}),
)
} catch (err) {
const errorResult = handleWorkflowError({
error: err,
job,
req,
state,
workflowConfig,
})
error = err
hasFinalError = errorResult.hasFinalError
}
// Check if workflow has completed
let workflowCompleted = false
for (const [slug, map] of Object.entries(job.taskStatus)) {
for (const [id, taskStatus] of Object.entries(map)) {
if (taskStatus.complete) {
const step = workflowHandler.find((step) => {
if ('task' in step) {
return step.task === slug && step.id === id
} else {
return step.id === id && slug === 'inline'
}
})
if (step.completesJob) {
workflowCompleted = true
break
}
}
}
}
if (workflowCompleted) {
if (error) {
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
completedAt: new Date().toISOString(),
error: hasFinalError ? error : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
} else {
await updateJob({
completedAt: new Date().toISOString(),
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
}
return {
status: 'success',
}
} else {
if (error) {
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: hasFinalError ? error : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
return {
status: hasFinalError ? 'error-reached-max-retries' : 'error',
}
} else {
// Retry the job - no need to bump processing or totalTried as this does not count as a retry. A condition of a different task might have just opened up!
return await runJSONJob({
job,
req,
updateJob,
workflowConfig,
workflowHandler,
})
}
}
}

View File

@@ -0,0 +1,36 @@
import type { RetryConfig } from '../../../config/types/taskTypes.js'
export function calculateBackoffWaitUntil({
retriesConfig,
totalTried,
}: {
retriesConfig: number | RetryConfig
totalTried: number
}): Date {
let waitUntil: Date = new Date()
if (typeof retriesConfig === 'object') {
if (retriesConfig.backoff) {
if (retriesConfig.backoff.type === 'fixed') {
waitUntil = retriesConfig.backoff.delay
? new Date(new Date().getTime() + retriesConfig.backoff.delay)
: new Date()
} else if (retriesConfig.backoff.type === 'exponential') {
// 2 ^ (attempts - 1) * delay (current attempt is not included in totalTried, thus no need for -1)
const delay = retriesConfig.backoff.delay ? retriesConfig.backoff.delay : 0
waitUntil = new Date(new Date().getTime() + Math.pow(2, totalTried) * delay)
}
}
}
/*
const differenceInMSBetweenNowAndWaitUntil = waitUntil.getTime() - new Date().getTime()
const differenceInSBetweenNowAndWaitUntil = differenceInMSBetweenNowAndWaitUntil / 1000
console.log('Calculated backoff', {
differenceInMSBetweenNowAndWaitUntil,
differenceInSBetweenNowAndWaitUntil,
retriesConfig,
totalTried,
})*/
return waitUntil
}

View File

@@ -0,0 +1,309 @@
import type { PayloadRequest } from '../../../../types/index.js'
import type {
RetryConfig,
RunInlineTaskFunction,
RunTaskFunction,
RunTaskFunctions,
TaskConfig,
TaskHandler,
TaskHandlerResult,
TaskType,
} from '../../../config/types/taskTypes.js'
import type {
BaseJob,
RunningJob,
SingleTaskStatus,
WorkflowConfig,
WorkflowTypes,
} from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
import { importHandlerPath } from './importHandlerPath.js'
// Helper object type to force being passed by reference
export type RunTaskFunctionState = {
reachedMaxRetries: boolean
}
async function getTaskHandlerFromConfig(taskConfig: TaskConfig<string>) {
let handler: TaskHandler<TaskType>
if (typeof taskConfig.handler === 'function') {
handler = taskConfig.handler
} else {
handler = await importHandlerPath<TaskHandler<TaskType>>(taskConfig.handler)
}
return handler
}
export async function handleTaskFailed({
error,
executedAt,
input,
job,
maxRetries,
output,
req,
retriesConfig,
runnerOutput,
state,
taskConfig,
taskID,
taskSlug,
taskStatus,
updateJob,
}: {
error?: Error
executedAt: Date
input: object
job: BaseJob
maxRetries: number
output: object
req: PayloadRequest
retriesConfig: number | RetryConfig
runnerOutput?: TaskHandlerResult<string>
state: RunTaskFunctionState
taskConfig?: TaskConfig<string>
taskID: string
taskSlug: string
taskStatus: null | SingleTaskStatus<string>
updateJob: UpdateJobFunction
}): Promise<never> {
req.payload.logger.error({ err: error, job, msg: 'Error running task', taskSlug })
if (taskConfig?.onFail) {
await taskConfig.onFail()
}
if (!job.log) {
job.log = []
}
job.log.push({
completedAt: new Date().toISOString(),
error: error ?? runnerOutput.state,
executedAt: executedAt.toISOString(),
input,
output,
state: 'failed',
taskID,
taskSlug,
})
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < new Date()) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
if (taskStatus && !taskStatus.complete && taskStatus.totalTried >= maxRetries) {
state.reachedMaxRetries = true
await updateJob({
error,
hasError: true,
log: job.log,
processing: false,
waitUntil: job.waitUntil,
})
throw new Error(
`Task ${taskSlug} has failed more than the allowed retries in workflow ${job.workflowSlug}${error ? `. Error: ${String(error)}` : ''}`,
)
} else {
// Job will retry. Let's determine when!
const waitUntil: Date = calculateBackoffWaitUntil({
retriesConfig,
totalTried: taskStatus?.totalTried ?? 0,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) {
job.waitUntil = waitUntil.toISOString()
}
await updateJob({
log: job.log,
processing: false,
waitUntil: job.waitUntil,
})
throw error ?? new Error('Task failed')
}
}
export const getRunTaskFunction = <TIsInline extends boolean>(
state: RunTaskFunctionState,
job: BaseJob,
workflowConfig: WorkflowConfig<string>,
req: PayloadRequest,
isInline: TIsInline,
updateJob: UpdateJobFunction,
): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => {
const runTask: <TTaskSlug extends string>(
taskSlug: TTaskSlug,
) => TTaskSlug extends 'inline' ? RunInlineTaskFunction : RunTaskFunction<TTaskSlug> = (
taskSlug,
) =>
(async (
taskID: Parameters<RunInlineTaskFunction>[0],
{
input,
retries,
task,
}: Parameters<RunInlineTaskFunction>[1] & Parameters<RunTaskFunction<string>>[1],
) => {
const executedAt = new Date()
let inlineRunner: TaskHandler<TaskType> = null
if (isInline) {
inlineRunner = task
}
let retriesConfig: number | RetryConfig = retries
let taskConfig: TaskConfig<string>
if (!isInline) {
taskConfig = req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug)
if (!retriesConfig) {
retriesConfig = taskConfig.retries
}
if (!taskConfig) {
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
}
}
const maxRetries: number =
typeof retriesConfig === 'object' ? retriesConfig?.attempts : retriesConfig
const taskStatus: null | SingleTaskStatus<string> = job?.taskStatus?.[taskSlug]
? job.taskStatus[taskSlug][taskID]
: null
if (taskStatus && taskStatus.complete === true) {
return taskStatus.output
}
let runner: TaskHandler<TaskType>
if (isInline) {
runner = inlineRunner
} else {
if (!taskConfig) {
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
}
runner = await getTaskHandlerFromConfig(taskConfig)
}
if (!runner || typeof runner !== 'function') {
const errorMessage = isInline
? `Can't find runner for inline task with ID ${taskID}`
: `Can't find runner while importing with the path ${typeof workflowConfig.handler === 'string' ? workflowConfig.handler : 'unknown - no string path'} in job type ${job.workflowSlug} for task ${taskSlug}.`
req.payload.logger.error(errorMessage)
await updateJob({
error: {
error: errorMessage,
},
hasError: true,
log: [
...job.log,
{
completedAt: new Date().toISOString(),
error: errorMessage,
executedAt: executedAt.toISOString(),
state: 'failed',
taskID,
taskSlug,
},
],
processing: false,
})
return
}
let output: object
try {
const runnerOutput = await runner({
input,
job: job as unknown as RunningJob<WorkflowTypes>, // TODO: Type this better
req,
})
if (runnerOutput.state === 'failed') {
await handleTaskFailed({
executedAt,
input,
job,
maxRetries,
output,
req,
retriesConfig,
runnerOutput,
state,
taskConfig,
taskID,
taskSlug,
taskStatus,
updateJob,
})
throw new Error('Task failed')
} else {
output = runnerOutput.output
}
} catch (err) {
await handleTaskFailed({
error: err,
executedAt,
input,
job,
maxRetries,
output,
req,
retriesConfig,
state,
taskConfig,
taskID,
taskSlug,
taskStatus,
updateJob,
})
throw new Error('Task failed')
}
if (taskConfig?.onSuccess) {
await taskConfig.onSuccess()
}
if (!job.log) {
job.log = []
}
job.log.push({
completedAt: new Date().toISOString(),
executedAt: executedAt.toISOString(),
input,
output,
state: 'succeeded',
taskID,
taskSlug,
})
await updateJob({
log: job.log,
})
return output
}) as any
if (isInline) {
return runTask('inline') as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions
} else {
const tasks: RunTaskFunctions = {}
for (const task of req?.payload?.config?.jobs?.tasks ?? []) {
tasks[task.slug] = runTask(task.slug)
}
return tasks as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions
}
}

View File

@@ -0,0 +1,23 @@
import type { PayloadRequest } from '../../../../types/index.js'
import type { BaseJob } from '../../../config/types/workflowTypes.js'
export type UpdateJobFunction = (jobData: Partial<BaseJob>) => Promise<BaseJob>
export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction {
return async (jobData) => {
const updatedJob = (await req.payload.update({
id: job.id,
collection: 'payload-jobs',
data: jobData,
depth: 0,
disableTransaction: true,
})) as BaseJob
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
for (const key in updatedJob) {
job[key] = updatedJob[key]
}
return updatedJob
}
}

View File

@@ -0,0 +1,66 @@
import type { PayloadRequest } from '../../../../types/index.js'
import type { BaseJob, WorkflowConfig, WorkflowTypes } from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
/**
* This is called if a workflow catches an error. It determines if it's a final error
* or not and handles logging.
*/
export function handleWorkflowError({
error,
job,
req,
state,
workflowConfig,
}: {
error: Error
job: BaseJob
req: PayloadRequest
state: RunTaskFunctionState
workflowConfig: WorkflowConfig<WorkflowTypes>
}): {
hasFinalError: boolean
} {
let hasFinalError = state.reachedMaxRetries // If any TASK reached max retries, the job has an error
const maxRetries =
typeof workflowConfig.retries === 'object'
? workflowConfig.retries.attempts
: workflowConfig.retries
// Now let's handle workflow retries
if (!hasFinalError && workflowConfig.retries) {
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < new Date()) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
}
if (job.totalTried >= maxRetries) {
state.reachedMaxRetries = true
hasFinalError = true
} else {
// Job will retry. Let's determine when!
const waitUntil: Date = calculateBackoffWaitUntil({
retriesConfig: workflowConfig.retries,
totalTried: job.totalTried ?? 0,
})
// Update job's waitUntil only if this waitUntil is later than the current one
if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) {
job.waitUntil = waitUntil.toISOString()
}
}
}
req.payload.logger.error({
err: error,
msg: `Error running job ${job.workflowSlug} ${job.taskSlug} id: ${job.id} attempt ${job.totalTried}/${maxRetries}`,
})
return {
hasFinalError,
}
}

View File

@@ -0,0 +1,28 @@
import { pathToFileURL } from 'url'
export async function importHandlerPath<T>(path: string): Promise<T> {
let runner: T
const [runnerPath, runnerImportName] = path.split('#')
const runnerModule =
typeof require === 'function'
? await eval(`require('${runnerPath.replaceAll('\\', '/')}')`)
: await eval(`import('${pathToFileURL(runnerPath).href}')`)
// If the path has indicated an #exportName, try to get it
if (runnerImportName && runnerModule[runnerImportName]) {
runner = runnerModule[runnerImportName]
}
// If there is a default export, use it
if (!runner && runnerModule.default) {
runner = runnerModule.default
}
// Finally, use whatever was imported
if (!runner) {
runner = runnerModule
}
return runner
}

View File

@@ -0,0 +1,83 @@
import type { PayloadRequest } from '../../../../types/index.js'
import type {
BaseJob,
RunningJob,
WorkflowConfig,
WorkflowHandler,
WorkflowTypes,
} from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
import { getRunTaskFunction } from './getRunTaskFunction.js'
import { handleWorkflowError } from './handleWorkflowError.js'
type Args = {
job: BaseJob
req: PayloadRequest
updateJob: UpdateJobFunction
workflowConfig: WorkflowConfig<WorkflowTypes>
workflowHandler: WorkflowHandler<WorkflowTypes>
}
export type JobRunStatus = 'error' | 'error-reached-max-retries' | 'success'
export type RunJobResult = {
status: JobRunStatus
}
export const runJob = async ({
job,
req,
updateJob,
workflowConfig,
workflowHandler,
}: Args): Promise<RunJobResult> => {
// Object so that we can pass contents by reference, not value.
// We want any mutations to be reflected in here.
const state: RunTaskFunctionState = {
reachedMaxRetries: false,
}
// Run the job
try {
await workflowHandler({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob),
job: job as unknown as RunningJob<WorkflowTypes>, //TODO: Type this better
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob),
})
} catch (err) {
const { hasFinalError } = handleWorkflowError({
error: err,
job,
req,
state,
workflowConfig,
})
// Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task)
// we need to ensure the job is updated to reflect the error
await updateJob({
error: hasFinalError ? err : undefined,
hasError: hasFinalError, // If reached max retries => final error. If hasError is true this job will not be retried
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
return {
status: hasFinalError ? 'error-reached-max-retries' : 'error',
}
}
// Workflow has completed
await updateJob({
completedAt: new Date().toISOString(),
processing: false,
totalTried: (job.totalTried ?? 0) + 1,
})
return {
status: 'success',
}
}

View File

@@ -0,0 +1,81 @@
import type { Endpoint } from '../config/types.js'
import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js'
export const runJobsEndpoint: Endpoint = {
handler: async (req) => {
if (
!Array.isArray(req.payload.config.jobs.workflows) ||
!(req.payload.config.jobs?.workflows?.length > 0)
) {
return Response.json(
{
message: 'No jobs to run.',
},
{ status: 200 },
)
}
const hasAccess = await req.payload.config.jobs.access.run({ req })
if (!hasAccess) {
return Response.json(
{
message: req.i18n.t('error:unauthorized'),
},
{ status: 401 },
)
}
const { limit, queue } = req.query
const runJobsArgs: RunJobsArgs = {
queue: 'default',
req,
// We are checking access above, so we can override it here
overrideAccess: true,
}
if (typeof queue === 'string') {
runJobsArgs.queue = queue
}
if (typeof limit !== 'undefined') {
runJobsArgs.limit = Number(limit)
}
let noJobsRemaining = false
let remainingJobsFromQueried = 0
try {
const result = await runJobs(runJobsArgs)
noJobsRemaining = result.noJobsRemaining
remainingJobsFromQueried = result.remainingJobsFromQueried
} catch (err) {
req.payload.logger.error({
err,
msg: 'There was an error running jobs:',
queue: runJobsArgs.queue,
})
return Response.json(
{
message: req.i18n.t('error:unknown'),
noJobsRemaining: true,
remainingJobsFromQueried,
},
{ status: 500 },
)
}
return Response.json(
{
message: req.i18n.t('general:success'),
noJobsRemaining,
remainingJobsFromQueried,
},
{ status: 200 },
)
},
method: 'get',
path: '/run',
}

View File

@@ -0,0 +1,38 @@
import type { TaskConfig, TaskType } from '../config/types/taskTypes.js'
import type { BaseJob, JobTaskStatus } from '../config/types/workflowTypes.js'
type Args = {
jobLog: BaseJob['log']
tasksConfig: TaskConfig<TaskType>[]
}
export const getJobTaskStatus = ({ jobLog }: Args): JobTaskStatus => {
const taskStatus: JobTaskStatus = {}
// First, add (in order) the steps from the config to
// our status map
for (const loggedJob of jobLog) {
if (!taskStatus[loggedJob.taskSlug]) {
taskStatus[loggedJob.taskSlug] = {}
}
if (!taskStatus[loggedJob.taskSlug][loggedJob.taskID]) {
taskStatus[loggedJob.taskSlug][loggedJob.taskID] = {
complete: loggedJob.state === 'succeeded',
input: loggedJob.input,
output: loggedJob.output,
taskSlug: loggedJob.taskSlug,
totalTried: 1,
}
} else {
const newTaskStatus = taskStatus[loggedJob.taskSlug][loggedJob.taskID]
newTaskStatus.totalTried += 1
if (loggedJob.state === 'succeeded') {
newTaskStatus.complete = true
}
taskStatus[loggedJob.taskSlug][loggedJob.taskID] = newTaskStatus
}
}
return taskStatus
}

View File

@@ -11,6 +11,7 @@ import type { SanitizedGlobalConfig } from '../globals/config/types.js'
import { MissingEditorProp } from '../errors/MissingEditorProp.js'
import { fieldAffectsData, tabHasName } from '../fields/config/types.js'
import { generateJobsJSONSchemas } from '../queues/config/generateJobsJSONSchemas.js'
import { deepCopyObject } from './deepCopyObject.js'
import { toWords } from './formatLabels.js'
import { getCollectionIDFieldTypes } from './getCollectionIDFieldTypes.js'
@@ -288,13 +289,17 @@ export function fieldsToJSONSchema(
type: withNullableJSONSchemaType('array', isRequired),
items: {
type: 'string',
enum: optionEnums,
},
}
if (optionEnums?.length) {
;(fieldSchema.items as JSONSchema4).enum = optionEnums
}
} else {
fieldSchema = {
type: withNullableJSONSchemaType('string', isRequired),
enum: optionEnums,
}
if (optionEnums?.length) {
fieldSchema.enum = optionEnums
}
}
@@ -604,7 +609,11 @@ export function entityToJSONSchema(
incomingEntity: SanitizedCollectionConfig | SanitizedGlobalConfig,
interfaceNameDefinitions: Map<string, JSONSchema4>,
defaultIDType: 'number' | 'text',
collectionIDFieldTypes?: { [key: string]: 'number' | 'string' },
): JSONSchema4 {
if (!collectionIDFieldTypes) {
collectionIDFieldTypes = getCollectionIDFieldTypes({ config, defaultIDType })
}
const entity: SanitizedCollectionConfig | SanitizedGlobalConfig = deepCopyObject(incomingEntity)
const title = entity.typescript?.interface
? entity.typescript.interface
@@ -641,9 +650,6 @@ export function entityToJSONSchema(
})
}
// Used for relationship fields, to determine whether to use a string or number type for the ID.
const collectionIDFieldTypes = getCollectionIDFieldTypes({ config, defaultIDType })
return {
type: 'object',
additionalProperties: false,
@@ -912,6 +918,9 @@ export function configToJSONSchema(
// a mutable Map to store custom top-level `interfaceName` types. Fields with an `interfaceName` property will be moved to the top-level definitions here
const interfaceNameDefinitions: Map<string, JSONSchema4> = new Map()
// Used for relationship fields, to determine whether to use a string or number type for the ID.
const collectionIDFieldTypes = getCollectionIDFieldTypes({ config, defaultIDType })
// Collections and Globals have to be moved to the top-level definitions as well. Reason: The top-level type will be the `Config` type - we don't want all collection and global
// types to be inlined inside the `Config` type
@@ -928,7 +937,13 @@ export function configToJSONSchema(
const entityDefinitions: { [k: string]: JSONSchema4 } = entities.reduce(
(acc, { type, entity }) => {
acc[entity.slug] = entityToJSONSchema(config, entity, interfaceNameDefinitions, defaultIDType)
acc[entity.slug] = entityToJSONSchema(
config,
entity,
interfaceNameDefinitions,
defaultIDType,
collectionIDFieldTypes,
)
const select = fieldsToSelectJSONSchema({ fields: entity.fields })
if (type === 'global') {
@@ -958,6 +973,10 @@ export function configToJSONSchema(
{ auth: {} },
)
const jobsSchemas = config.jobs
? generateJobsJSONSchemas(config, config.jobs, interfaceNameDefinitions, collectionIDFieldTypes)
: {}
let jsonSchema: JSONSchema4 = {
additionalProperties: false,
definitions: {
@@ -980,6 +999,19 @@ export function configToJSONSchema(
required: ['user', 'locale', 'collections', 'globals', 'auth', 'db'],
title: 'Config',
}
if (jobsSchemas.definitions?.size) {
for (const [key, value] of jobsSchemas.definitions) {
jsonSchema.definitions[key] = value
}
}
if (jobsSchemas.properties) {
jsonSchema.properties.jobs = {
type: 'object',
additionalProperties: false,
properties: jobsSchemas.properties,
required: ['tasks'],
}
}
if (config?.typescript?.schema?.length) {
for (const schema of config.typescript.schema) {

View File

@@ -155,7 +155,7 @@ function FixedToolbar({
}): React.ReactNode {
const currentToolbarRef = React.useRef<HTMLDivElement>(null)
const { y } = useScrollInfo!()
const { y } = useScrollInfo()
// Memoize the parent toolbar element
const parentToolbarElem = useMemo(() => {

View File

@@ -148,7 +148,7 @@ function vercelBlobStorageInternal(
prefix,
token,
}),
staticHandler: getStaticHandler({ baseUrl, token, cacheControlMaxAge }, collection),
staticHandler: getStaticHandler({ baseUrl, cacheControlMaxAge, token }, collection),
}
}
}

View File

@@ -7,12 +7,12 @@ import path from 'path'
type StaticHandlerArgs = {
baseUrl: string
token: string
cacheControlMaxAge?: number
token: string
}
export const getStaticHandler = (
{ baseUrl, token, cacheControlMaxAge = 0 }: StaticHandlerArgs,
{ baseUrl, cacheControlMaxAge = 0, token }: StaticHandlerArgs,
collection: CollectionConfig,
): StaticHandler => {
return async (req, { params: { filename } }) => {
@@ -38,10 +38,10 @@ export const getStaticHandler = (
return new Response(bodyBuffer, {
headers: new Headers({
'Cache-Control': `public, max-age=${cacheControlMaxAge}`,
'Content-Disposition': contentDisposition,
'Content-Length': String(size),
'Content-Type': contentType,
'Cache-Control': `public, max-age=${cacheControlMaxAge}`
}),
status: 200,
})

9
pnpm-lock.yaml generated
View File

@@ -827,6 +827,9 @@ importers:
console-table-printer:
specifier: 2.11.2
version: 2.11.2
croner:
specifier: 8.1.2
version: 8.1.2
dataloader:
specifier: 2.2.2
version: 2.2.2
@@ -5602,6 +5605,10 @@ packages:
engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0}
hasBin: true
croner@8.1.2:
resolution: {integrity: sha512-ypfPFcAXHuAZRCzo3vJL6ltENzniTjwe/qsLleH1V2/7SRDjgvRQyrLmumFTLmjFax4IuSxfGXEn79fozXcJog==}
engines: {node: '>=18.0'}
cross-env@7.0.3:
resolution: {integrity: sha512-+/HKd6EgcQCJGh2PSjZuUitQBQynKor4wrFbRg4DtAgS1aWO+gU52xpH7M9ScGgXSYmAVS9bIJ8EzuaGw0oNAw==}
engines: {node: '>=10.14', npm: '>=6', yarn: '>=1'}
@@ -15151,6 +15158,8 @@ snapshots:
- supports-color
- ts-node
croner@8.1.2: {}
cross-env@7.0.3:
dependencies:
cross-spawn: 7.0.3

View File

@@ -21,25 +21,26 @@ import { default as default_19 } from '@/components/BeforeDashboard'
import { default as default_20 } from '@/components/BeforeLogin'
export const importMap = {
"@payloadcms/richtext-lexical/client#RichTextCell": RichTextCell_0,
"@payloadcms/richtext-lexical/client#RichTextField": RichTextField_1,
"@payloadcms/richtext-lexical/generateComponentMap#getGenerateComponentMap": getGenerateComponentMap_2,
"@payloadcms/richtext-lexical/client#InlineToolbarFeatureClient": InlineToolbarFeatureClient_3,
"@payloadcms/richtext-lexical/client#FixedToolbarFeatureClient": FixedToolbarFeatureClient_4,
"@payloadcms/richtext-lexical/client#HeadingFeatureClient": HeadingFeatureClient_5,
"@payloadcms/richtext-lexical/client#UnderlineFeatureClient": UnderlineFeatureClient_6,
"@payloadcms/richtext-lexical/client#BoldFeatureClient": BoldFeatureClient_7,
"@payloadcms/richtext-lexical/client#ItalicFeatureClient": ItalicFeatureClient_8,
"@payloadcms/richtext-lexical/client#LinkFeatureClient": LinkFeatureClient_9,
"@payloadcms/plugin-seo/client#OverviewComponent": OverviewComponent_10,
"@payloadcms/plugin-seo/client#MetaTitleComponent": MetaTitleComponent_11,
"@payloadcms/plugin-seo/client#MetaImageComponent": MetaImageComponent_12,
"@payloadcms/plugin-seo/client#MetaDescriptionComponent": MetaDescriptionComponent_13,
"@payloadcms/plugin-seo/client#PreviewComponent": PreviewComponent_14,
"@/fields/slug/SlugComponent#SlugComponent": SlugComponent_15,
"@payloadcms/richtext-lexical/client#HorizontalRuleFeatureClient": HorizontalRuleFeatureClient_16,
"@payloadcms/richtext-lexical/client#BlocksFeatureClient": BlocksFeatureClient_17,
"@payloadcms/plugin-search/client#LinkToDoc": LinkToDoc_18,
"@/components/BeforeDashboard#default": default_19,
"@/components/BeforeLogin#default": default_20
'@payloadcms/richtext-lexical/client#RichTextCell': RichTextCell_0,
'@payloadcms/richtext-lexical/client#RichTextField': RichTextField_1,
'@payloadcms/richtext-lexical/generateComponentMap#getGenerateComponentMap':
getGenerateComponentMap_2,
'@payloadcms/richtext-lexical/client#InlineToolbarFeatureClient': InlineToolbarFeatureClient_3,
'@payloadcms/richtext-lexical/client#FixedToolbarFeatureClient': FixedToolbarFeatureClient_4,
'@payloadcms/richtext-lexical/client#HeadingFeatureClient': HeadingFeatureClient_5,
'@payloadcms/richtext-lexical/client#UnderlineFeatureClient': UnderlineFeatureClient_6,
'@payloadcms/richtext-lexical/client#BoldFeatureClient': BoldFeatureClient_7,
'@payloadcms/richtext-lexical/client#ItalicFeatureClient': ItalicFeatureClient_8,
'@payloadcms/richtext-lexical/client#LinkFeatureClient': LinkFeatureClient_9,
'@payloadcms/plugin-seo/client#OverviewComponent': OverviewComponent_10,
'@payloadcms/plugin-seo/client#MetaTitleComponent': MetaTitleComponent_11,
'@payloadcms/plugin-seo/client#MetaImageComponent': MetaImageComponent_12,
'@payloadcms/plugin-seo/client#MetaDescriptionComponent': MetaDescriptionComponent_13,
'@payloadcms/plugin-seo/client#PreviewComponent': PreviewComponent_14,
'@/fields/slug/SlugComponent#SlugComponent': SlugComponent_15,
'@payloadcms/richtext-lexical/client#HorizontalRuleFeatureClient': HorizontalRuleFeatureClient_16,
'@payloadcms/richtext-lexical/client#BlocksFeatureClient': BlocksFeatureClient_17,
'@payloadcms/plugin-search/client#LinkToDoc': LinkToDoc_18,
'@/components/BeforeDashboard#default': default_19,
'@/components/BeforeLogin#default': default_20,
}

View File

@@ -30,6 +30,10 @@ export interface Config {
user: User & {
collection: 'users';
};
jobs?: {
tasks: unknown;
workflows?: unknown;
};
}
export interface UserAuthOperations {
forgotPassword: {

View File

@@ -1,3 +1,5 @@
/* tslint:disable */
/* eslint-disable */
/**
@@ -77,6 +79,10 @@ export interface Config {
user: User & {
collection: 'users';
};
jobs?: {
tasks: unknown;
workflows?: unknown;
};
}
export interface UserAuthOperations {
forgotPassword: {

View File

@@ -40,7 +40,9 @@ export const reorderColumns = async (
})
.boundingBox()
if (!fromBoundingBox || !toBoundingBox) {return}
if (!fromBoundingBox || !toBoundingBox) {
return
}
// drag the "from" column to the left of the "to" column
await page.mouse.move(fromBoundingBox.x + 2, fromBoundingBox.y + 2, { steps: 10 })

View File

@@ -44,8 +44,12 @@ import {
AdminViewComponent,
AdminViewConfig,
AdminViewProps,
baseBlockFields,
baseIDField,
BaseLocalizationConfig,
buildConfig,
Config,
defaults,
EditView,
EditViewConfig,
EmailOptions,
@@ -56,6 +60,8 @@ import {
FieldTypes,
GeneratePreviewURL,
GraphQLExtension,
hasTransport,
hasTransportOptions,
InitOptions,
LivePreviewConfig,
Locale,
@@ -64,31 +70,28 @@ import {
LocalizationConfigWithNoLabels,
PayloadHandler,
Plugin,
sanitizeConfig,
SanitizedConfig,
SanitizedLocalizationConfig,
baseBlockFields,
baseIDField,
buildConfig,
defaults,
hasTransport,
hasTransportOptions,
sanitizeConfig,
sanitizeFields,
} from 'payload/config'
import {
BaseDatabaseAdapter,
BeginTransaction,
combineQueries,
CommitTransaction,
Connect,
Count,
CountArgs,
Create,
CreateArgs,
createDatabaseAdapter,
CreateGlobal,
CreateGlobalArgs,
CreateGlobalVersion,
CreateGlobalVersionArgs,
CreateMigration,
createMigration,
CreateVersion,
CreateVersionArgs,
DBIdentifierName,
@@ -110,13 +113,24 @@ import {
FindOneArgs,
FindVersions,
FindVersionsArgs,
flattenWhereToOperators,
getLocalizedPaths,
getMigrations,
Init,
migrate,
migrateDown,
migrateRefresh,
migrateReset,
migrateStatus,
Migration,
MigrationData,
migrationsCollection,
migrationTemplate,
PaginatedDocs,
PathToQuery,
QueryDrafts,
QueryDraftsArgs,
readMigrationFiles,
RollbackTransaction,
Transaction,
TypeWithVersion,
@@ -128,20 +142,6 @@ import {
UpdateOneArgs,
UpdateVersion,
UpdateVersionArgs,
combineQueries,
createDatabaseAdapter,
createMigration,
flattenWhereToOperators,
getLocalizedPaths,
getMigrations,
migrate,
migrateDown,
migrateRefresh,
migrateReset,
migrateStatus,
migrationTemplate,
migrationsCollection,
readMigrationFiles,
validateQueryPaths,
validateSearchParam,
} from 'payload/database'
@@ -165,7 +165,7 @@ import {
QueryError,
ValidationError,
} from 'payload/errors'
import { GraphQL, buildPaginatedListType } from 'payload/graphql'
import { buildPaginatedListType, GraphQL } from 'payload/graphql'
import {
AccessArgs as AccessArgsType,
Access as AccessType,
@@ -201,21 +201,31 @@ import {
CustomSaveDraftButtonProps,
Data,
DateField,
docHasTimestamps,
Document,
EmailField,
Field,
FieldAccess,
FieldAffectingData,
fieldAffectsData,
FieldBase,
fieldHasMaxDepth,
fieldHasSubFields,
FieldHook,
FieldHookArgs,
fieldIsArrayType,
fieldIsBlockType,
fieldIsGroupType,
fieldIsLocalized,
fieldIsPresentationalOnly,
FieldPresentationalOnly,
Fields,
fieldSupportsMany,
FieldWithMany,
FieldWithMaxDepth,
FieldWithPath,
FieldWithRichTextRequiredEditor,
FieldWithSubFields,
Fields,
FileData,
FilterOptions,
FilterOptionsProps,
@@ -239,7 +249,10 @@ import {
Operation,
Operator,
Option,
optionIsObject,
optionIsValue,
OptionObject,
optionsAreObjects,
PayloadRequest,
PointField,
PolymorphicRelationshipField,
@@ -259,36 +272,23 @@ import {
SingleRelationshipField,
Tab,
TabAsField,
tabHasName,
TabsAdmin,
TabsField,
TextField,
TextareaField,
TextField,
TypeWithID,
UIField,
UnnamedTab,
UploadField,
Validate,
ValidateOptions,
validOperators,
valueIsValueWithRelation,
ValueWithRelation,
VersionOperations,
Where,
WhereField,
docHasTimestamps,
fieldAffectsData,
fieldHasMaxDepth,
fieldHasSubFields,
fieldIsArrayType,
fieldIsBlockType,
fieldIsGroupType,
fieldIsLocalized,
fieldIsPresentationalOnly,
fieldSupportsMany,
optionIsObject,
optionIsValue,
optionsAreObjects,
tabHasName,
validOperators,
valueIsValueWithRelation,
} from 'payload/types'
import {
afterReadPromise,
@@ -351,17 +351,18 @@ import {
CountryField,
Email,
FieldConfig,
FieldValues,
FieldsConfig,
FieldValues,
Form,
FormattedEmail,
CheckboxField as FormBuilderCheckboxField,
EmailField as FormBuilderEmailField,
SelectField as FormBuilderSelectField,
TextField as FormBuilderTextField,
FormFieldBlock,
FormSubmission,
FormattedEmail,
HandlePayment,
isValidBlockConfig,
MessageField,
PaymentField,
PaymentFieldConfig,
@@ -372,7 +373,6 @@ import {
StateField,
SubmissionValue,
TextAreaField,
isValidBlockConfig,
} from '@payloadcms/plugin-form-builder/types'
import nestedDocs from '@payloadcms/plugin-nested-docs'
import { createBreadcrumbsField, createParentField } from '@payloadcms/plugin-nested-docs/fields'
@@ -400,8 +400,8 @@ import {
GenerateImage,
GenerateTitle,
Meta,
PluginConfig as SeoPluginConfig,
GenerateURL as seoGenerateURL,
PluginConfig as SeoPluginConfig,
} from '@payloadcms/plugin-seo/types'
import stripePlugin from '@payloadcms/plugin-stripe'
import {
@@ -425,6 +425,10 @@ import {
$isRelationshipNode,
$isUploadNode,
AdapterProps,
addSwipeDownListener,
addSwipeLeftListener,
addSwipeRightListener,
addSwipeUpListener,
AlignFeature,
AutoLinkNode,
BlockFields,
@@ -435,30 +439,51 @@ import {
BoldTextFeature,
CAN_USE_DOM,
CheckListFeature,
cloneDeep,
consolidateHTMLConverters,
convertLexicalNodesToHTML,
convertLexicalToHTML,
convertSlateNodesToLexical,
convertSlateToLexical,
createBlockNode,
defaultEditorConfig,
defaultEditorFeatures,
defaultHTMLConverters,
defaultRichTextValue,
defaultSanitizedEditorConfig,
defaultSlateConverters,
DETAIL_TYPE_TO_DETAIL,
DOUBLE_LINE_BREAK,
EditorConfig,
EditorConfigProvider,
ELEMENT_FORMAT_TO_TYPE,
ELEMENT_TYPE_TO_FORMAT,
ENABLE_SLASH_MENU_COMMAND,
EditorConfig,
EditorConfigProvider,
Feature,
FeatureProvider,
FeatureProviderMap,
FloatingToolbarSection,
FloatingToolbarSectionEntry,
FormatSectionWithEntries,
getDOMRangeRect,
getEnabledNodes,
getSelectedNode,
HeadingFeature,
HTMLConverter,
HTMLConverterFeature,
HTMLConverterFeatureProps,
HeadingFeature,
IS_ALL_FORMATTING,
IndentFeature,
InlineCodeTextFeature,
invariant,
IS_ALL_FORMATTING,
isHTMLElement,
isPoint,
ItalicTextFeature,
LTR_REGEX,
joinClasses,
LexicalBlock,
lexicalEditor,
LexicalEditorProps,
lexicalHTML,
LexicalPluginToLexicalFeature,
LexicalRichTextAdapter,
LinebreakHTMLConverter,
@@ -466,15 +491,16 @@ import {
LinkFeatureProps,
LinkFields,
LinkNode,
NON_BREAKING_SPACE,
loadFeatures,
LTR_REGEX,
NodeFormat,
NodeValidation,
NON_BREAKING_SPACE,
OrderedListFeature,
ParagraphFeature,
ParagraphHTMLConverter,
Point,
PopulationPromise,
RTL_REGEX,
RawUploadPayload,
Rect,
RelationshipData,
@@ -482,13 +508,19 @@ import {
RelationshipNode,
ResolvedFeature,
ResolvedFeatureMap,
RTL_REGEX,
SanitizedEditorConfig,
SanitizedFeatures,
sanitizeEditorConfig,
sanitizeFeatures,
sanitizeUrl,
SerializedAutoLinkNode,
SerializedBlockNode,
SerializedLinkNode,
SerializedRelationshipNode,
SerializedUploadNode,
setFloatingElemPosition,
setFloatingElemPositionForLinkEditor,
SlashMenuGroup,
SlashMenuOption,
SlateBlockquoteConverter,
@@ -504,16 +536,17 @@ import {
SlateUnknownConverter,
SlateUnorderedListConverter,
SlateUploadConverter,
sortFeaturesForOptimalLoading,
StrikethroughTextFeature,
SubscriptTextFeature,
SuperscriptTextFeature,
TestRecorderFeature,
TEXT_MODE_TO_TYPE,
TEXT_TYPE_TO_FORMAT,
TEXT_TYPE_TO_MODE,
TOGGLE_LINK_COMMAND,
TestRecorderFeature,
TextDropdownSectionWithEntries,
TextHTMLConverter,
TOGGLE_LINK_COMMAND,
TreeViewFeature,
UnderlineTextFeature,
UnorderedListFeature,
@@ -521,48 +554,15 @@ import {
UploadFeature,
UploadFeatureProps,
UploadNode,
addSwipeDownListener,
addSwipeLeftListener,
addSwipeRightListener,
addSwipeUpListener,
cloneDeep,
consolidateHTMLConverters,
convertLexicalNodesToHTML,
convertLexicalToHTML,
convertSlateNodesToLexical,
convertSlateToLexical,
createBlockNode,
defaultEditorConfig,
defaultEditorFeatures,
defaultHTMLConverters,
defaultRichTextValue,
defaultSanitizedEditorConfig,
defaultSlateConverters,
getDOMRangeRect,
getEnabledNodes,
getSelectedNode,
invariant,
isHTMLElement,
isPoint,
joinClasses,
lexicalEditor,
lexicalHTML,
loadFeatures,
sanitizeEditorConfig,
sanitizeFeatures,
sanitizeUrl,
setFloatingElemPosition,
setFloatingElemPositionForLinkEditor,
sortFeaturesForOptimalLoading,
useEditorConfigContext,
validateUrl,
} from '@payloadcms/richtext-lexical'
import {
defaultEditorLexicalConfig,
RichTextCell,
RichTextField,
ToolbarButton,
ToolbarDropdown,
defaultEditorLexicalConfig,
} from '@payloadcms/richtext-lexical/components'
import {
AdapterArguments,
@@ -570,12 +570,12 @@ import {
ElementNode,
FieldProps,
LeafButton,
nodeIsTextNode,
RichTextCustomElement,
RichTextCustomLeaf,
RichTextElement,
RichTextLeaf,
TextNode,
nodeIsTextNode,
slateEditor,
TextNode,
toggleElement,
} from '@payloadcms/richtext-slate'

2
test/queues/.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/media
/media-gif

580
test/queues/config.ts Normal file
View File

@@ -0,0 +1,580 @@
import type { TaskConfig, WorkflowConfig } from 'payload'
import { lexicalEditor } from '@payloadcms/richtext-lexical'
import { fileURLToPath } from 'node:url'
import path from 'path'
import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js'
import { devUser } from '../credentials.js'
import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
import { clearAndSeedEverything } from './seed.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export default buildConfigWithDefaults({
collections: [
{
slug: 'posts',
admin: {
useAsTitle: 'title',
},
hooks: {
afterChange: [
async ({ req, doc, context }) => {
await req.payload.jobs.queue({
workflow: context.useJSONWorkflow ? 'updatePostJSONWorkflow' : 'updatePost',
input: {
post: doc.id,
message: 'hello',
},
req,
})
},
],
},
fields: [
{
name: 'title',
type: 'text',
required: true,
},
{
name: 'content',
type: 'richText',
},
{
name: 'jobStep1Ran',
type: 'text',
},
{
name: 'jobStep2Ran',
type: 'text',
},
],
},
{
slug: 'simple',
admin: {
useAsTitle: 'title',
},
fields: [
{
name: 'title',
type: 'text',
required: true,
},
],
},
],
admin: {
importMap: {
baseDir: path.resolve(dirname),
},
autoLogin: {
prefillOnly: true,
email: devUser.email,
password: devUser.password,
},
},
jobs: {
jobsCollectionOverrides: ({ defaultJobsCollection }) => {
return {
...defaultJobsCollection,
admin: {
...(defaultJobsCollection?.admin || {}),
hidden: false,
},
}
},
tasks: [
{
retries: 2,
slug: 'UpdatePost',
interfaceName: 'MyUpdatePostType',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'messageTwice',
type: 'text',
required: true,
},
],
handler: updatePostStep1,
} as TaskConfig<'UpdatePost'>,
{
retries: 2,
slug: 'UpdatePostStep2',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'messageTwice',
type: 'text',
required: true,
},
],
handler: updatePostStep2,
} as TaskConfig<'UpdatePostStep2'>,
{
retries: 3,
slug: 'CreateSimple',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimple'>,
{
retries: 2,
slug: 'CreateSimpleWithDuplicateMessage',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
{
name: 'shouldFail',
type: 'checkbox',
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: async ({ input, req }) => {
if (input.shouldFail) {
throw new Error('Failed on purpose')
}
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message + input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
} as TaskConfig<'CreateSimpleWithDuplicateMessage'>,
{
retries: 2,
slug: 'ExternalTask',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
outputSchema: [
{
name: 'simpleID',
type: 'text',
required: true,
},
],
handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler',
} as TaskConfig<'ExternalTask'>,
],
workflows: [
{
slug: 'updatePost',
interfaceName: 'MyUpdatePostWorkflowType',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, tasks }) => {
await tasks.UpdatePost('1', {
input: {
post: job.input.post,
message: job.input.message,
},
})
await tasks.UpdatePostStep2('2', {
input: {
post: job.taskStatus.UpdatePost['1'].input.post,
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
},
})
},
} as WorkflowConfig<'updatePost'>,
{
slug: 'updatePostJSONWorkflow',
inputSchema: [
{
name: 'post',
type: 'relationship',
relationTo: 'posts',
maxDepth: 0,
required: true,
},
{
name: 'message',
type: 'text',
required: true,
},
],
handler: [
{
task: 'UpdatePost',
id: '1',
input: ({ job }) => ({
post: job.input.post,
message: job.input.message,
}),
},
{
task: 'UpdatePostStep2',
id: '2',
input: ({ job }) => ({
post: job.taskStatus.UpdatePost['1'].input.post,
messageTwice: job.taskStatus.UpdatePost['1'].output.messageTwice,
}),
condition({ job }) {
return job?.taskStatus?.UpdatePost?.['1']?.complete
},
completesJob: true,
},
],
} as WorkflowConfig<'updatePostJSONWorkflow'>,
{
slug: 'retriesTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, tasks, req }) => {
await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountRetried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
},
},
id: job.id,
})
await tasks.CreateSimple('1', {
input: {
message: job.input.message,
},
})
// At this point there should always be one post created.
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
await tasks.CreateSimple('2', {
input: {
message: job.input.message,
shouldFail: true,
},
})
// This will never be reached
},
} as WorkflowConfig<'retriesTest'>,
{
slug: 'retriesRollbackTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, inlineTask, req }) => {
await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountRetried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
},
},
id: job.id,
})
await inlineTask('1', {
task: async ({ req }) => {
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: job.input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
})
await inlineTask('2', {
task: async ({ req }) => {
await req.payload.create({
collection: 'simple',
req,
data: {
title: 'should not exist',
},
})
// Fail afterwards, so that we can also test that transactions work (i.e. the job is rolled back)
throw new Error('Failed on purpose')
},
retries: {
attempts: 4,
},
})
},
} as WorkflowConfig<'retriesRollbackTest'>,
{
slug: 'retriesWorkflowLevelTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once
handler: async ({ job, tasks, req }) => {
await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountRetried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
},
},
id: job.id,
})
await tasks.CreateSimple('1', {
input: {
message: job.input.message,
},
})
// At this point there should always be one post created.
// job.input.amountRetried will go up to 2 as CreatePost has 2 retries
await tasks.CreateSimple('2', {
input: {
message: job.input.message,
shouldFail: true,
},
})
// This will never be reached
},
} as WorkflowConfig<'retriesWorkflowLevelTest'>,
{
slug: 'inlineTaskTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, inlineTask }) => {
await inlineTask('1', {
task: async ({ input, req }) => {
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
},
input: {
message: job.input.message,
},
})
},
} as WorkflowConfig<'inlineTaskTest'>,
{
slug: 'externalWorkflow',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: path.resolve(dirname, 'runners/externalWorkflow.ts') + '#externalWorkflowHandler',
} as WorkflowConfig<'externalWorkflow'>,
{
slug: 'retriesBackoffTest',
inputSchema: [
{
name: 'message',
type: 'text',
required: true,
},
],
handler: async ({ job, inlineTask, req }) => {
const newJob = await req.payload.update({
collection: 'payload-jobs',
data: {
input: {
...job.input,
amountRetried:
// @ts-expect-error amountRetried is new arbitrary data and not in the type
job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0,
},
},
id: job.id,
})
job.input = newJob.input as any
await inlineTask('1', {
task: async ({ req }) => {
const totalTried = job?.taskStatus?.inline?.['1']?.totalTried || 0
const { id } = await req.payload.create({
collection: 'simple',
req,
data: {
title: 'should not exist',
},
})
// @ts-expect-error timeTried is new arbitrary data and not in the type
if (!job.input.timeTried) {
// @ts-expect-error timeTried is new arbitrary data and not in the type
job.input.timeTried = {}
}
// @ts-expect-error timeTried is new arbitrary data and not in the type
job.input.timeTried[totalTried] = new Date().toISOString()
await req.payload.update({
collection: 'payload-jobs',
data: {
input: job.input,
},
id: job.id,
})
if (totalTried < 4) {
// Cleanup the post
await req.payload.delete({
collection: 'simple',
id,
req,
})
// Last try it should succeed
throw new Error('Failed on purpose')
}
return {
output: {},
}
},
retries: {
attempts: 4,
backoff: {
type: 'exponential',
// Should retry in 300ms, then 600, then 1200, then 2400, then succeed
delay: 300,
},
},
})
},
} as WorkflowConfig<'retriesBackoffTest'>,
],
},
editor: lexicalEditor(),
onInit: async (payload) => {
if (process.env.SEED_IN_CONFIG_ONINIT !== 'false') {
await clearAndSeedEverything(payload)
}
},
typescript: {
outputFile: path.resolve(dirname, 'payload-types.ts'),
},
})

65
test/queues/e2e.spec.ts Normal file
View File

@@ -0,0 +1,65 @@
import type { Page } from '@playwright/test'
import { expect, test } from '@playwright/test'
import * as path from 'path'
import { fileURLToPath } from 'url'
import type { PayloadTestSDK } from '../helpers/sdk/index.js'
import type { Config } from './payload-types.js'
import { ensureCompilationIsDone, initPageConsoleErrorCatch } from '../helpers.js'
import { AdminUrlUtil } from '../helpers/adminUrlUtil.js'
import { initPayloadE2ENoConfig } from '../helpers/initPayloadE2ENoConfig.js'
import { reInitializeDB } from '../helpers/reInitializeDB.js'
import { RESTClient } from '../helpers/rest.js'
import { TEST_TIMEOUT } from '../playwright.config.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
let serverURL: string
let payload: PayloadTestSDK<Config>
let client: RESTClient
test.describe('Queues', () => {
let page: Page
let url: AdminUrlUtil
test.beforeAll(async ({ browser }, testInfo) => {
testInfo.setTimeout(TEST_TIMEOUT)
process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit
;({ payload, serverURL } = await initPayloadE2ENoConfig({ dirname }))
url = new AdminUrlUtil(serverURL, 'payload-jobs')
const context = await browser.newContext()
page = await context.newPage()
initPageConsoleErrorCatch(page)
await reInitializeDB({
serverURL,
snapshotKey: 'queuesTest',
})
await ensureCompilationIsDone({ page, serverURL })
})
test.beforeEach(async () => {
await reInitializeDB({
serverURL,
snapshotKey: 'fieldsTest',
uploadsDir: path.resolve(dirname, './collections/Upload/uploads'),
})
if (client) {
await client.logout()
}
client = new RESTClient(null, { defaultSlug: 'users', serverURL })
await client.login()
await ensureCompilationIsDone({ page, serverURL })
})
test('example test', async () => {
await page.goto(url.list)
const textCell = page.locator('.row-1 .cell-text')
await expect(textCell).toHaveText('example post')
})
})

View File

@@ -0,0 +1,20 @@
import { rootParserOptions } from '../../eslint.config.js'
import { testEslintConfig } from '../eslint.config.js'
/** @typedef {import('eslint').Linter.FlatConfig} */
let FlatConfig
/** @type {FlatConfig[]} */
export const index = [
...testEslintConfig,
{
languageOptions: {
parserOptions: {
...rootParserOptions,
tsconfigRootDir: import.meta.dirname,
},
},
},
]
export default index

668
test/queues/int.spec.ts Normal file
View File

@@ -0,0 +1,668 @@
import type { JobTaskStatus, Payload } from 'payload'
import path from 'path'
import { fileURLToPath } from 'url'
import type { NextRESTClient } from '../helpers/NextRESTClient.js'
import { devUser } from '../credentials.js'
import { initPayloadInt } from '../helpers/initPayloadInt.js'
import { clearAndSeedEverything } from './seed.js'
let payload: Payload
let restClient: NextRESTClient
let token: string
const { email, password } = devUser
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
describe('Queues', () => {
beforeAll(async () => {
process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit
;({ payload, restClient } = await initPayloadInt(dirname))
})
afterAll(async () => {
if (typeof payload.db.destroy === 'function') {
await payload.db.destroy()
}
})
beforeEach(async () => {
await clearAndSeedEverything(payload)
const data = await restClient
.POST('/users/login', {
body: JSON.stringify({
email,
password,
}),
})
.then((res) => res.json())
if (data.token) {
token = data.token
}
})
it('will run access control on jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run', {
headers: {
// Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(401)
})
it('will return 200 from jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run', {
headers: {
Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(200)
})
// There used to be a bug in payload where updating the job threw the following error - only in
// postgres:
// QueryError: The following path cannot be queried: document.relationTo
// This test is to ensure that the bug is fixed
it('can create and update new jobs', async () => {
const job = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: '1',
},
},
})
// @ts-expect-error
expect(job.input.message).toBe('1')
const updatedJob = await payload.update({
collection: 'payload-jobs',
id: job.id,
data: {
input: {
message: '2',
},
},
})
// @ts-expect-error
expect(updatedJob.input.message).toBe('2')
})
it('can create new jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run()
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('can create new JSON-workflow jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
context: {
useJSONWorkflow: true,
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run()
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('ensure job retrying works', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run()
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
})
it('ensure workflow-level retries are respected', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesWorkflowLevelTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run()
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('ensure failed tasks are rolled back via transactions', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesRollbackTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run()
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1) // Failure happens after task creates a simple document, but still within the task => any document creation should be rolled back
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
})*/
it('ensure backoff strategy of task is respected', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesBackoffTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
let firstGotNoJobs = null
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
// Keep running until no jobs found. If no jobs found, wait for 1.6 seconds to see if any new jobs are added
// (Specifically here we want to see if the backoff strategy is respected and thus need to wait for `waitUntil`)
while (
hasJobsRemaining ||
!firstGotNoJobs ||
new Date().getTime() - firstGotNoJobs.getTime() < 3000
) {
const response = await payload.jobs.run()
if (response.noJobsRemaining) {
if (hasJobsRemaining) {
firstGotNoJobs = new Date()
hasJobsRemaining = false
}
} else {
firstGotNoJobs = null
hasJobsRemaining = true
}
// Add a 100ms delay before the next iteration
await delay(100)
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.totalTried).toBe(5)
expect((jobAfterRun.taskStatus as JobTaskStatus).inline['1'].totalTried).toBe(5)
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
/*
Job.input.timeTried may look something like this:
timeTried: {
'0': '2024-10-07T16:05:49.300Z',
'1': '2024-10-07T16:05:49.469Z',
'2': '2024-10-07T16:05:49.779Z',
'3': '2024-10-07T16:05:50.388Z',
'4': '2024-10-07T16:05:51.597Z'
}
Convert this into an array, each item is the duration between the fails. So this should have 4 items
*/
const timeTried: {
[key: string]: string
// @ts-expect-error timeTried is new arbitrary data and not in the type
} = jobAfterRun.input.timeTried
const durations = Object.values(timeTried)
.map((time, index, arr) => {
if (index === arr.length - 1) {
return null
}
return new Date(arr[index + 1]).getTime() - new Date(time).getTime()
})
.filter((p) => p !== null)
expect(durations).toHaveLength(4)
expect(durations[0]).toBeGreaterThan(300)
expect(durations[1]).toBeGreaterThan(600)
expect(durations[2]).toBeGreaterThan(1200)
expect(durations[3]).toBeGreaterThan(2400)
})
it('can create new inline jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
},
})
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('hello!')
})
it('can queue single tasks', async () => {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('from single task')
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('transaction test against payload-jobs collection', async () => {
// This kinds of emulates what happens when multiple jobs are queued and then run in parallel.
const runWorkflowFN = async (i: number) => {
const { id } = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: 'Number ' + i,
},
taskSlug: 'CreateSimple',
},
})
const _req = await createLocalReq({}, payload)
const t1Req = isolateObjectProperty(_req, 'transactionID')
delete t1Req.transactionID
await initTransaction(t1Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 1',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
/**
* T1 start
*/
/*
const t2Req = isolateObjectProperty(t1Req, 'transactionID')
delete t2Req.transactionID
//
await initTransaction(t2Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 2',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await payload.create({
collection: 'simple',
req: t2Req,
data: {
title: 'from single task',
},
})
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 3',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t2Req)
/**
* T1 end
*/
/*
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 4',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t1Req)
}
await Promise.all(
new Array(30).fill(0).map(async (_, i) => {
await runWorkflowFN(i)
}),
)
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(30)
})*/
it('can queue single tasks 8 times', async () => {
for (let i = 0; i < 8; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(8)
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[7].title).toBe('from single task')
})
it('can queue single tasks 500 times', async () => {
for (let i = 0; i < 500; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
limit: 1000,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(500) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[490].title).toBe('from single task')
})
it('ensure default jobs run limit of 10 works', async () => {
for (let i = 0; i < 500; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(10) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[9].title).toBe('from single task')
})
it('ensure jobs run limit can be customized', async () => {
for (let i = 0; i < 500; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
limit: 42,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(42) // Default limit: 10
expect(allSimples.docs[0].title).toBe('from single task')
expect(allSimples.docs[30].title).toBe('from single task')
expect(allSimples.docs[41].title).toBe('from single task')
})
it('can queue different kinds of single tasks multiple times', async () => {
for (let i = 0; i < 3; i++) {
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
}
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(9)
let amountOfCreateSimple = 0
let amountOfCreateSimpleWithDuplicateMessage = 0
for (const simple of allSimples.docs) {
if (simple.title === 'from single task') {
amountOfCreateSimple++
} else if (simple.title === 'hellohello') {
amountOfCreateSimpleWithDuplicateMessage++
}
}
expect(amountOfCreateSimple).toBe(3)
expect(amountOfCreateSimpleWithDuplicateMessage).toBe(6)
})
it('can queue external tasks', async () => {
await payload.jobs.queue({
task: 'ExternalTask',
input: {
message: 'external',
},
})
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('external')
})
it('can queue external workflow that is running external task', async () => {
await payload.jobs.queue({
workflow: 'externalWorkflow',
input: {
message: 'externalWorkflow',
},
})
await payload.jobs.run()
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0].title).toBe('externalWorkflow')
})
})

View File

@@ -0,0 +1,446 @@
/* tslint:disable */
/* eslint-disable */
/**
* This file was automatically generated by Payload.
* DO NOT MODIFY IT BY HAND. Instead, modify your source Payload config,
* and re-run `payload generate:types` to regenerate this file.
*/
export interface Config {
auth: {
users: UserAuthOperations;
};
collections: {
posts: Post;
simple: Simple;
users: User;
'payload-jobs': PayloadJob;
'payload-locked-documents': PayloadLockedDocument;
'payload-preferences': PayloadPreference;
'payload-migrations': PayloadMigration;
};
db: {
defaultIDType: string;
};
globals: {};
locale: null;
user: User & {
collection: 'users';
};
jobs?: {
tasks: {
UpdatePost: MyUpdatePostType;
UpdatePostStep2: TaskUpdatePostStep2;
CreateSimple: TaskCreateSimple;
CreateSimpleWithDuplicateMessage: TaskCreateSimpleWithDuplicateMessage;
ExternalTask: TaskExternalTask;
inline?: {
input: unknown;
output: unknown;
};
};
workflows?: {
updatePost?: MyUpdatePostWorkflowType;
updatePostJSONWorkflow?: WorkflowUpdatePostJSONWorkflow;
retriesTest?: WorkflowRetriesTest;
retriesRollbackTest?: WorkflowRetriesRollbackTest;
retriesWorkflowLevelTest?: WorkflowRetriesWorkflowLevelTest;
inlineTaskTest?: WorkflowInlineTaskTest;
externalWorkflow?: WorkflowExternalWorkflow;
retriesBackoffTest?: WorkflowRetriesBackoffTest;
};
};
}
export interface UserAuthOperations {
forgotPassword: {
email: string;
password: string;
};
login: {
email: string;
password: string;
};
registerFirstUser: {
email: string;
password: string;
};
unlock: {
email: string;
password: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "posts".
*/
export interface Post {
id: string;
title: string;
content?: {
root: {
type: string;
children: {
type: string;
version: number;
[k: string]: unknown;
}[];
direction: ('ltr' | 'rtl') | null;
format: 'left' | 'start' | 'center' | 'right' | 'end' | 'justify' | '';
indent: number;
version: number;
};
[k: string]: unknown;
} | null;
jobStep1Ran?: string | null;
jobStep2Ran?: string | null;
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "simple".
*/
export interface Simple {
id: string;
title: string;
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "users".
*/
export interface User {
id: string;
updatedAt: string;
createdAt: string;
email: string;
resetPasswordToken?: string | null;
resetPasswordExpiration?: string | null;
salt?: string | null;
hash?: string | null;
loginAttempts?: number | null;
lockUntil?: string | null;
password?: string | null;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-jobs".
*/
export interface PayloadJob {
id: string;
input?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
taskStatus?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
completedAt?: string | null;
totalTried?: number | null;
hasError?: boolean | null;
error?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
log?:
| {
executedAt: string;
completedAt: string;
taskSlug:
| 'inline'
| 'UpdatePost'
| 'UpdatePostStep2'
| 'CreateSimple'
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask';
taskID: string;
input?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
output?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
state: 'failed' | 'succeeded';
error?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
id?: string | null;
}[]
| null;
workflowSlug?:
| (
| 'updatePost'
| 'updatePostJSONWorkflow'
| 'retriesTest'
| 'retriesRollbackTest'
| 'retriesWorkflowLevelTest'
| 'inlineTaskTest'
| 'externalWorkflow'
| 'retriesBackoffTest'
)
| null;
taskSlug?:
| (
| 'inline'
| 'UpdatePost'
| 'UpdatePostStep2'
| 'CreateSimple'
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask'
)
| null;
queue?: 'default' | null;
waitUntil?: string | null;
processing?: boolean | null;
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-locked-documents".
*/
export interface PayloadLockedDocument {
id: string;
document?:
| ({
relationTo: 'posts';
value: string | Post;
} | null)
| ({
relationTo: 'simple';
value: string | Simple;
} | null)
| ({
relationTo: 'users';
value: string | User;
} | null)
| ({
relationTo: 'payload-jobs';
value: string | PayloadJob;
} | null);
globalSlug?: string | null;
user: {
relationTo: 'users';
value: string | User;
};
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-preferences".
*/
export interface PayloadPreference {
id: string;
user: {
relationTo: 'users';
value: string | User;
};
key?: string | null;
value?:
| {
[k: string]: unknown;
}
| unknown[]
| string
| number
| boolean
| null;
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "payload-migrations".
*/
export interface PayloadMigration {
id: string;
name?: string | null;
batch?: number | null;
updatedAt: string;
createdAt: string;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "MyUpdatePostType".
*/
export interface MyUpdatePostType {
input: {
post: string | Post;
message: string;
};
output: {
messageTwice: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskUpdatePostStep2".
*/
export interface TaskUpdatePostStep2 {
input: {
post: string | Post;
messageTwice: string;
};
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskCreateSimple".
*/
export interface TaskCreateSimple {
input: {
message: string;
shouldFail?: boolean | null;
};
output: {
simpleID: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskCreateSimpleWithDuplicateMessage".
*/
export interface TaskCreateSimpleWithDuplicateMessage {
input: {
message: string;
shouldFail?: boolean | null;
};
output: {
simpleID: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskExternalTask".
*/
export interface TaskExternalTask {
input: {
message: string;
};
output: {
simpleID: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "MyUpdatePostWorkflowType".
*/
export interface MyUpdatePostWorkflowType {
input: {
post: string | Post;
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowUpdatePostJSONWorkflow".
*/
export interface WorkflowUpdatePostJSONWorkflow {
input: {
post: string | Post;
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowRetriesTest".
*/
export interface WorkflowRetriesTest {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowRetriesRollbackTest".
*/
export interface WorkflowRetriesRollbackTest {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowRetriesWorkflowLevelTest".
*/
export interface WorkflowRetriesWorkflowLevelTest {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowInlineTaskTest".
*/
export interface WorkflowInlineTaskTest {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowExternalWorkflow".
*/
export interface WorkflowExternalWorkflow {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowRetriesBackoffTest".
*/
export interface WorkflowRetriesBackoffTest {
input: {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "auth".
*/
export interface Auth {
[k: string]: unknown;
}
declare module 'payload' {
// @ts-ignore
export interface GeneratedTypes extends Config {}
}

View File

@@ -0,0 +1,16 @@
import type { TaskHandler } from 'payload'
export const externalTaskHandler: TaskHandler<'ExternalTask'> = async ({ input, req }) => {
const newSimple = await req.payload.create({
collection: 'simple',
req,
data: {
title: input.message,
},
})
return {
output: {
simpleID: newSimple.id,
},
}
}

View File

@@ -0,0 +1,12 @@
import type { WorkflowHandler } from 'payload'
export const externalWorkflowHandler: WorkflowHandler<'externalWorkflow'> = async ({
job,
tasks,
}) => {
await tasks.ExternalTask('1', {
input: {
message: job.input.message,
},
})
}

View File

@@ -0,0 +1,55 @@
import type { TaskHandler } from 'payload'
export const updatePostStep1: TaskHandler<'UpdatePost'> = async ({ req, input }) => {
const postID =
typeof input.post === 'string' || typeof input.post === 'number' ? input.post : input.post.id
if (!postID) {
return {
state: 'failed',
output: null,
}
}
await req.payload.update({
collection: 'posts',
id: postID,
req,
data: {
jobStep1Ran: input.message,
},
})
return {
state: 'succeeded',
output: {
messageTwice: input.message + input.message,
},
}
}
export const updatePostStep2: TaskHandler<'UpdatePostStep2'> = async ({ req, input, job }) => {
const postID =
typeof input.post === 'string' || typeof input.post === 'number' ? input.post : input.post.id
if (!postID) {
return {
state: 'failed',
output: null,
}
}
await req.payload.update({
collection: 'posts',
id: postID,
req,
data: {
jobStep2Ran: input.messageTwice + job.taskStatus.UpdatePost['1'].output.messageTwice,
},
})
return {
state: 'succeeded',
output: null,
}
}

1902
test/queues/schema.graphql Normal file

File diff suppressed because it is too large Load Diff

30
test/queues/seed.ts Normal file
View File

@@ -0,0 +1,30 @@
import type { Payload } from 'payload'
import path from 'path'
import { fileURLToPath } from 'url'
import { devUser } from '../credentials.js'
import { seedDB } from '../helpers/seed.js'
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
export const seed = async (_payload: Payload) => {
await _payload.create({
collection: 'users',
data: {
email: devUser.email,
password: devUser.password,
},
})
}
export async function clearAndSeedEverything(_payload: Payload) {
return await seedDB({
_payload,
collectionSlugs: _payload.config.collections.map((collection) => collection.slug),
seedFunction: seed,
snapshotKey: 'fieldsTest',
uploadsDir: path.resolve(dirname, './collections/Upload/uploads'),
})
}

View File

@@ -0,0 +1,13 @@
{
// extend your base config to share compilerOptions, etc
//"extends": "./tsconfig.json",
"compilerOptions": {
// ensure that nobody can accidentally use this config for a build
"noEmit": true
},
"include": [
// whatever paths you intend to lint
"./**/*.ts",
"./**/*.tsx"
]
}

View File

@@ -0,0 +1,3 @@
{
"extends": "../tsconfig.json"
}