Files
payloadcms/test/queues/int.spec.ts
Alessio Gravili c08b2aea89 feat: scheduling jobs (#12863)
Adds a new `schedule` property to workflow and task configs that can be
used to have Payload automatically _queue_ jobs following a certain
_schedule_.

Docs:
https://payloadcms.com/docs/dynamic/jobs-queue/schedules?branch=feat/schedule-jobs

## API Example

```ts
export default buildConfig({
  // ...
  jobs: {
    // ...
    scheduler: 'manual', // Or `cron` if you're not using serverless. If `manual` is used, then user needs to set up running /api/payload-jobs/handleSchedules or payload.jobs.handleSchedules in regular intervals
    tasks: [
      {
        schedule: [
          {
            cron: '* * * * * *',
            queue: 'autorunSecond',
            // Hooks are optional
            hooks: {
              // Not an array, as providing and calling `defaultBeforeSchedule` would be more error-prone if this was an array
              beforeSchedule: async (args) => {
                // Handles verifying that there are no jobs already scheduled or processing.
                // You can override this behavior by not calling defaultBeforeSchedule, e.g. if you wanted
                // to allow a maximum of 3 scheduled jobs in the queue instead of 1, or add any additional conditions
                const result = await args.defaultBeforeSchedule(args)
                return {
                  ...result,
                  input: {
                    message: 'This task runs every second',
                  },
                }
              },
              afterSchedule: async (args) => {
                await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global
                args.req.payload.logger.info(
                  'EverySecond task scheduled: ' +
                  (args.status === 'success' ? args.job.id : 'skipped or failed to schedule'),
                )
              },
            },
          },
        ],
        slug: 'EverySecond',
        inputSchema: [
          {
            name: 'message',
            type: 'text',
            required: true,
          },
        ],
        handler: ({ input, req }) => {
          req.payload.logger.info(input.message)
          return {
            output: {},
          }
        },
      }
    ]
  }
})
```

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1210495300843759
2025-07-18 06:48:27 -04:00

1491 lines
39 KiB
TypeScript

import path from 'path'
import {
_internal_jobSystemGlobals,
_internal_resetJobSystemGlobals,
type JobTaskStatus,
type Payload,
type SanitizedConfig,
} from 'payload'
import { migrateCLI } from 'payload'
import { wait } from 'payload/shared'
import { fileURLToPath } from 'url'
import type { NextRESTClient } from '../helpers/NextRESTClient.js'
import { devUser } from '../credentials.js'
import { initPayloadInt } from '../helpers/initPayloadInt.js'
import { clearAndSeedEverything } from './seed.js'
import { waitUntilAutorunIsDone } from './utilities.js'
let payload: Payload
let restClient: NextRESTClient
let token: string
const { email, password } = devUser
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
describe('Queues', () => {
beforeAll(async () => {
process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit
;({ payload, restClient } = await initPayloadInt(dirname))
})
afterAll(async () => {
// Ensure no new crons are scheduled
_internal_jobSystemGlobals.shouldAutoRun = false
_internal_jobSystemGlobals.shouldAutoSchedule = false
// Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues
// Cron function runs may persist after a test has finished
await wait(3000)
// Now we can destroy the payload instance
await payload.destroy()
_internal_resetJobSystemGlobals()
})
afterEach(() => {
_internal_resetJobSystemGlobals()
})
beforeEach(async () => {
// Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests
_internal_jobSystemGlobals.shouldAutoRun = false
_internal_jobSystemGlobals.shouldAutoSchedule = false
await clearAndSeedEverything(payload)
const data = await restClient
.POST('/users/login', {
body: JSON.stringify({
email,
password,
}),
})
.then((res) => res.json())
if (data.token) {
token = data.token
}
payload.config.jobs.deleteJobOnComplete = true
_internal_jobSystemGlobals.shouldAutoRun = true
_internal_jobSystemGlobals.shouldAutoSchedule = true
})
it('will run access control on jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
// Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(401)
})
it('will return 200 from jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(200)
})
// There used to be a bug in payload where updating the job threw the following error - only in
// postgres:
// QueryError: The following path cannot be queried: document.relationTo
// This test is to ensure that the bug is fixed
it('can create and update new jobs', async () => {
const job = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: '1',
},
},
})
// @ts-expect-error
expect(job.input.message).toBe('1')
const updatedJob = await payload.update({
collection: 'payload-jobs',
id: job.id,
data: {
input: {
message: '2',
},
},
})
// @ts-expect-error
expect(updatedJob.input.message).toBe('2')
})
it('can create new jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run({ silent: true })
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('can create new JSON-workflow jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
context: {
useJSONWorkflow: true,
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run({ silent: true })
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('ensure job retrying works', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesTest',
queue: 'default',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
})
it('ensure workflow-level retries are respected', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesWorkflowLevelTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
})
it('ensure workflows dont limit retries if no retries property is set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowNoRetriesSet',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
})
it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries0',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowAndTasksRetriesUndefined',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries2TasksRetriesUndefined',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
})
it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries2TasksRetries0',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('ensure failed tasks are rolled back via transactions', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesRollbackTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({silent: true})
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1) // Failure happens after task creates a simple document, but still within the task => any document creation should be rolled back
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
})*/
it('ensure backoff strategy of task is respected', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesBackoffTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
let firstGotNoJobs = null
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
// Keep running until no jobs found. If no jobs found, wait for 1.6 seconds to see if any new jobs are added
// (Specifically here we want to see if the backoff strategy is respected and thus need to wait for `waitUntil`)
while (
hasJobsRemaining ||
!firstGotNoJobs ||
new Date().getTime() - firstGotNoJobs.getTime() < 3000
) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
if (hasJobsRemaining) {
firstGotNoJobs = new Date()
hasJobsRemaining = false
}
} else {
firstGotNoJobs = null
hasJobsRemaining = true
}
// Add a 100ms delay before the next iteration
await delay(100)
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.totalTried).toBe(5)
expect((jobAfterRun.taskStatus as JobTaskStatus).inline?.['1']?.totalTried).toBe(5)
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
/*
Job.input.timeTried may look something like this:
timeTried: {
'0': '2024-10-07T16:05:49.300Z',
'1': '2024-10-07T16:05:49.469Z',
'2': '2024-10-07T16:05:49.779Z',
'3': '2024-10-07T16:05:50.388Z',
'4': '2024-10-07T16:05:51.597Z'
}
Convert this into an array, each item is the duration between the fails. So this should have 4 items
*/
const timeTried: {
[key: string]: string
// @ts-expect-error timeTried is new arbitrary data and not in the type
} = jobAfterRun.input.timeTried
const durations = Object.values(timeTried)
.map((time, index, arr) => {
if (index === arr.length - 1) {
return null
}
return new Date(arr[index + 1] as string).getTime() - new Date(time).getTime()
})
.filter((p) => p !== null)
expect(durations).toHaveLength(4)
expect(durations[0]).toBeGreaterThan(300)
expect(durations[1]).toBeGreaterThan(600)
expect(durations[2]).toBeGreaterThan(1200)
expect(durations[3]).toBeGreaterThan(2400)
})
it('ensure jobs run in FIFO order by default', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 1')
expect(allSimples.docs?.[1]?.title).toBe('task 2')
})
it('ensure jobs can run LIFO if processingOrder is passed', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
processingOrder: '-createdAt',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 2')
expect(allSimples.docs?.[1]?.title).toBe('task 1')
})
it('ensure job config processingOrder using queues object is respected', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
queue: 'lifo',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
queue: 'lifo',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
queue: 'lifo',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 2')
expect(allSimples.docs?.[1]?.title).toBe('task 1')
})
it('can create new inline jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('hello!')
})
it('should respect deleteJobOnComplete true default configuration', async () => {
const { id } = await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'deleteJobOnComplete test',
},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after).toBeNull()
})
it('should not delete failed jobs if deleteJobOnComplete is true', async () => {
const { id } = await payload.jobs.queue({
workflow: 'failsImmediately',
input: {},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after?.id).toBe(id)
})
it('should respect deleteJobOnComplete false configuration', async () => {
payload.config.jobs.deleteJobOnComplete = false
const { id } = await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after?.id).toBe(id)
})
it('can queue single tasks', async () => {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
})
it('can queue and run via the endpoint single tasks without workflows', async () => {
const workflowsRef = payload.config.jobs.workflows
delete payload.config.jobs.workflows
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
Authorization: `JWT ${token}`,
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
payload.config.jobs.workflows = workflowsRef
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('transaction test against payload-jobs collection', async () => {
// This kinds of emulates what happens when multiple jobs are queued and then run in parallel.
const runWorkflowFN = async (i: number) => {
const { id } = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: 'Number ' + i,
},
taskSlug: 'CreateSimple',
},
})
const _req = await createLocalReq({}, payload)
const t1Req = isolateObjectProperty(_req, 'transactionID')
delete t1Req.transactionID
await initTransaction(t1Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 1',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
/**
* T1 start
*/
/*
const t2Req = isolateObjectProperty(t1Req, 'transactionID')
delete t2Req.transactionID
//
await initTransaction(t2Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 2',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await payload.create({
collection: 'simple',
req: t2Req,
data: {
title: 'from single task',
},
})
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 3',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t2Req)
/**
* T1 end
*/
/*
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 4',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t1Req)
}
await Promise.all(
new Array(30).fill(0).map(async (_, i) => {
await runWorkflowFN(i)
}),
)
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(30)
})*/
it('can queue single tasks 8 times', async () => {
for (let i = 0; i < 8; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(8)
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[7]?.title).toBe('from single task')
})
it('can queue single tasks hundreds of times', async () => {
const numberOfTasks = 150
// TODO: Ramp up the limit from 150 to 500 or 1000, to test reliability of the database
payload.config.jobs.deleteJobOnComplete = false
for (let i = 0; i < numberOfTasks; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
silent: true,
limit: numberOfTasks,
})
const allSimples = await payload.find({
collection: 'simple',
limit: numberOfTasks,
})
expect(allSimples.totalDocs).toBe(numberOfTasks) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[numberOfTasks - 1]?.title).toBe('from single task')
})
it('ensure default jobs run limit of 10 works', async () => {
for (let i = 0; i < 65; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(10) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[9]?.title).toBe('from single task')
})
it('ensure jobs run limit can be customized', async () => {
for (let i = 0; i < 110; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
limit: 42,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(42) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[30]?.title).toBe('from single task')
expect(allSimples.docs[41]?.title).toBe('from single task')
})
it('can queue different kinds of single tasks multiple times', async () => {
for (let i = 0; i < 3; i++) {
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(9)
let amountOfCreateSimple = 0
let amountOfCreateSimpleWithDuplicateMessage = 0
for (const simple of allSimples.docs) {
if (simple.title === 'from single task') {
amountOfCreateSimple++
} else if (simple.title === 'hellohello') {
amountOfCreateSimpleWithDuplicateMessage++
}
}
expect(amountOfCreateSimple).toBe(3)
expect(amountOfCreateSimpleWithDuplicateMessage).toBe(6)
})
it('can queue external tasks', async () => {
await payload.jobs.queue({
task: 'ExternalTask',
input: {
message: 'external',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('external')
})
it('can queue external workflow that is running external task', async () => {
await payload.jobs.queue({
workflow: 'externalWorkflow',
input: {
message: 'externalWorkflow',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('externalWorkflow')
})
it('ensure payload.jobs.runByID works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: null | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.runByID({
id: lastJobID,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: null | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.run({
silent: true,
where: {
id: {
equals: lastJobID,
},
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
for (let i = 0; i < 3; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: `from single task ${i}`,
},
})
}
await payload.jobs.run({
silent: true,
where: {
'input.message': {
equals: 'from single task 2',
},
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task 2')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect((allCompletedJobs.docs[0]?.input as any).message).toBe('from single task 2')
})
it('can run sub-tasks', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTask',
input: {
message: 'hello!',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs[0]?.title).toBe('hello!')
expect(allSimples.docs[1]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun?.log?.[0]?.taskID).toBe('create doc 1')
//expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs')
// jobAfterRun.log[0].parent should not exist
expect(jobAfterRun?.log?.[0]?.parent).toBeUndefined()
expect(jobAfterRun?.log?.[1]?.taskID).toBe('create doc 2')
//expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs')
expect(jobAfterRun?.log?.[1]?.parent).toBeUndefined()
expect(jobAfterRun?.log?.[2]?.taskID).toBe('create two docs')
})
it('ensure successful sub-tasks are not retried', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTaskFails',
input: {
message: 'hello!',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error
expect(jobAfterRun.input.amountTask2Retried).toBe(3)
// @ts-expect-error
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
})
it('ensure jobs can be cancelled using payload.jobs.cancelByID', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Should be in processing - cancel job
await payload.jobs.cancelByID({
id: job.id,
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('ensure jobs can be cancelled using payload.jobs.cancel', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Cancel all jobs
await payload.jobs.cancel({
where: {
id: {
exists: true,
},
},
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('can tasks throw error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ThrowError',
input: {},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('failed')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnError',
input: {},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error with custom error message', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnCustomError',
input: {
errorMessage: 'custom error message',
},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can reliably run workflows with parallel tasks', async () => {
// eslint-disable-next-line jest/no-conditional-in-test
if (process.env.PAYLOAD_DATABASE === 'supabase') {
// TODO: This test is flaky on supabase in CI, so we skip it for now
return
}
const amount = 50
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'parallelTask',
input: {
amount,
},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(false)
expect(jobAfterRun.log?.length).toBe(amount)
const simpleDocs = await payload.find({
collection: 'simple',
limit: amount,
depth: 0,
})
expect(simpleDocs.docs).toHaveLength(amount)
// Ensure all docs are created (= all tasks are run once)
for (let i = 1; i <= simpleDocs.docs.length; i++) {
const simpleDoc = simpleDocs.docs.find((doc) => doc.title === `parallel task ${i}`)
const logEntry = jobAfterRun?.log?.find((log) => log.taskID === `parallel task ${i}`)
expect(simpleDoc).toBeDefined()
expect(logEntry).toBeDefined()
expect((logEntry?.output as any)?.simpleID).toBe(simpleDoc?.id)
}
})
it('can create and autorun jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',
queue: 'autorunSecond',
input: {
message: 'hello!',
},
})
await waitUntilAutorunIsDone({
payload,
queue: 'autorunSecond',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
})
})
describe('Queues - CLI', () => {
let config: SanitizedConfig
beforeAll(async () => {
;({ config } = await initPayloadInt(dirname, undefined, false))
})
it('can run migrate CLI without jobs attempting to run', async () => {
await migrateCLI({
config,
parsedArgs: {
_: ['migrate'],
},
})
// Wait 3 seconds to let potential autorun crons trigger
await new Promise((resolve) => setTimeout(resolve, 3000))
// Expect no errors. Previously, this would throw an "error: relation "payload_jobs" does not exist" error
expect(true).toBe(true)
})
})