Files
payloadcms/test/queues/int.spec.ts
Alessio Gravili 13c24afa63 feat: allow multiple, different payload instances using getPayload in same process (#13603)
Fixes https://github.com/payloadcms/payload/issues/13433. Testing
release: `3.54.0-internal.90cf7d5`

Previously, when calling `getPayload`, you would always use the same,
cached payload instance within a single process, regardless of the
arguments passed to the `getPayload` function. This resulted in the
following issues - both are fixed by this PR:

- If, in your frontend you're calling `getPayload` without `cron: true`,
and you're hosting the Payload Admin Panel in the same process, crons
will not be enabled even if you visit the admin panel which calls
`getPayload` with `cron: true`. This will break jobs autorun depending
on which page you visit first - admin panel or frontend
- Within the same process, you are unable to use `getPayload` twice for
different instances of payload with different Payload Configs.
On postgres, you can get around this by manually calling new
`BasePayload()` which skips the cache. This did not work on mongoose
though, as mongoose was caching the models on a global singleton (this
PR addresses this).
In order to bust the cache for different Payload Config, this PR
introduces a new, optional `key` property to `getPayload`.


## Mongoose - disable using global singleton

This PR refactors the Payload Mongoose adapter to stop relying on the
global mongoose singleton. Instead, each adapter instance now creates
and manages its own scoped Connection object.

### Motivation

Previously, calling `getPayload()` more than once in the same process
would throw `Cannot overwrite model` errors because models were compiled
into the global singleton. This prevented running multiple Payload
instances side-by-side, even when pointing at different databases.

### Changes
- Replace usage of `mongoose.connect()` / `mongoose.model()` with
instance-scoped `createConnection()` and `connection.model()`.
- Ensure models, globals, and versions are compiled per connection, not
globally.
- Added proper `close()` handling on `this.connection` instead of
`mongoose.disconnect()`.

---
- To see the specific tasks where the Asana app for GitHub is being
used, see below:
  - https://app.asana.com/0/0/1211114366468745
2025-08-27 10:24:37 -07:00

1468 lines
39 KiB
TypeScript

import path from 'path'
import {
_internal_jobSystemGlobals,
_internal_resetJobSystemGlobals,
type JobTaskStatus,
type Payload,
} from 'payload'
import { wait } from 'payload/shared'
import { fileURLToPath } from 'url'
import type { NextRESTClient } from '../helpers/NextRESTClient.js'
import { devUser } from '../credentials.js'
import { initPayloadInt } from '../helpers/initPayloadInt.js'
import { clearAndSeedEverything } from './seed.js'
import { waitUntilAutorunIsDone } from './utilities.js'
const { email, password } = devUser
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
describe('Queues - Payload', () => {
let payload: Payload
let restClient: NextRESTClient
let token: string
beforeAll(async () => {
process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit
;({ payload, restClient } = await initPayloadInt(dirname))
})
afterAll(async () => {
// Ensure no new crons are scheduled
_internal_jobSystemGlobals.shouldAutoRun = false
_internal_jobSystemGlobals.shouldAutoSchedule = false
// Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues
// Cron function runs may persist after a test has finished
await wait(3000)
// Now we can destroy the payload instance
await payload.destroy()
_internal_resetJobSystemGlobals()
})
afterEach(() => {
_internal_resetJobSystemGlobals()
})
beforeEach(async () => {
// Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests
_internal_jobSystemGlobals.shouldAutoRun = false
_internal_jobSystemGlobals.shouldAutoSchedule = false
await clearAndSeedEverything(payload)
const data = await restClient
.POST('/users/login', {
body: JSON.stringify({
email,
password,
}),
})
.then((res) => res.json())
if (data.token) {
token = data.token
}
payload.config.jobs.deleteJobOnComplete = true
_internal_jobSystemGlobals.shouldAutoRun = true
_internal_jobSystemGlobals.shouldAutoSchedule = true
})
it('will run access control on jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
// Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(401)
})
it('will return 200 from jobs runner', async () => {
const response = await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
Authorization: `JWT ${token}`,
},
}) // Needs to be a rest call to test auth
expect(response.status).toBe(200)
})
// There used to be a bug in payload where updating the job threw the following error - only in
// postgres:
// QueryError: The following path cannot be queried: document.relationTo
// This test is to ensure that the bug is fixed
it('can create and update new jobs', async () => {
const job = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: '1',
},
},
})
// @ts-expect-error
expect(job.input.message).toBe('1')
const updatedJob = await payload.update({
collection: 'payload-jobs',
id: job.id,
data: {
input: {
message: '2',
},
},
})
// @ts-expect-error
expect(updatedJob.input.message).toBe('2')
})
it('can create new jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run({ silent: true })
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('can create new JSON-workflow jobs', async () => {
const newPost = await payload.create({
collection: 'posts',
data: {
title: 'my post',
},
context: {
useJSONWorkflow: true,
},
})
const retrievedPost = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(retrievedPost.jobStep1Ran).toBeFalsy()
expect(retrievedPost.jobStep2Ran).toBeFalsy()
await payload.jobs.run({ silent: true })
const postAfterJobs = await payload.findByID({
collection: 'posts',
id: newPost.id,
})
expect(postAfterJobs.jobStep1Ran).toBe('hello')
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
})
it('ensure job retrying works', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesTest',
queue: 'default',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
})
it('ensure workflow-level retries are respected', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesWorkflowLevelTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
})
it('ensure workflows dont limit retries if no retries property is set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowNoRetriesSet',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(3)
})
it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries0',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowAndTasksRetriesUndefined',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries2TasksRetriesUndefined',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(2)
})
it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'workflowRetries2TasksRetries0',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(0)
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('ensure failed tasks are rolled back via transactions', async () => {
const job = await payload.jobs.queue({
workflow: 'retriesRollbackTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({silent: true})
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1) // Failure happens after task creates a simple document, but still within the task => any document creation should be rolled back
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
})*/
it('ensure backoff strategy of task is respected', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'retriesBackoffTest',
input: {
message: 'hello',
},
})
let hasJobsRemaining = true
let firstGotNoJobs = null
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
// Keep running until no jobs found. If no jobs found, wait for 1.6 seconds to see if any new jobs are added
// (Specifically here we want to see if the backoff strategy is respected and thus need to wait for `waitUntil`)
while (
hasJobsRemaining ||
!firstGotNoJobs ||
new Date().getTime() - firstGotNoJobs.getTime() < 3000
) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
if (hasJobsRemaining) {
firstGotNoJobs = new Date()
hasJobsRemaining = false
}
} else {
firstGotNoJobs = null
hasJobsRemaining = true
}
// Add a 100ms delay before the next iteration
await delay(100)
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.totalTried).toBe(5)
expect((jobAfterRun.taskStatus as JobTaskStatus).inline?.['1']?.totalTried).toBe(5)
// @ts-expect-error amountRetried is new arbitrary data and not in the type
expect(jobAfterRun.input.amountRetried).toBe(4)
/*
Job.input.timeTried may look something like this:
timeTried: {
'0': '2024-10-07T16:05:49.300Z',
'1': '2024-10-07T16:05:49.469Z',
'2': '2024-10-07T16:05:49.779Z',
'3': '2024-10-07T16:05:50.388Z',
'4': '2024-10-07T16:05:51.597Z'
}
Convert this into an array, each item is the duration between the fails. So this should have 4 items
*/
const timeTried: {
[key: string]: string
// @ts-expect-error timeTried is new arbitrary data and not in the type
} = jobAfterRun.input.timeTried
const durations = Object.values(timeTried)
.map((time, index, arr) => {
if (index === arr.length - 1) {
return null
}
return new Date(arr[index + 1] as string).getTime() - new Date(time).getTime()
})
.filter((p) => p !== null)
expect(durations).toHaveLength(4)
expect(durations[0]).toBeGreaterThan(300)
expect(durations[1]).toBeGreaterThan(600)
expect(durations[2]).toBeGreaterThan(1200)
expect(durations[3]).toBeGreaterThan(2400)
})
it('ensure jobs run in FIFO order by default', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 1')
expect(allSimples.docs?.[1]?.title).toBe('task 2')
})
it('ensure jobs can run LIFO if processingOrder is passed', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
processingOrder: '-createdAt',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 2')
expect(allSimples.docs?.[1]?.title).toBe('task 1')
})
it('ensure job config processingOrder using queues object is respected', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
queue: 'lifo',
input: {
message: 'task 1',
},
})
await new Promise((resolve) => setTimeout(resolve, 100))
await payload.jobs.queue({
workflow: 'inlineTaskTestDelayed',
queue: 'lifo',
input: {
message: 'task 2',
},
})
await payload.jobs.run({
sequential: true,
silent: true,
queue: 'lifo',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
sort: 'createdAt',
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs?.[0]?.title).toBe('task 2')
expect(allSimples.docs?.[1]?.title).toBe('task 1')
})
it('can create new inline jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('hello!')
})
it('should respect deleteJobOnComplete true default configuration', async () => {
const { id } = await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'deleteJobOnComplete test',
},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after).toBeNull()
})
it('should not delete failed jobs if deleteJobOnComplete is true', async () => {
const { id } = await payload.jobs.queue({
workflow: 'failsImmediately',
input: {},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after?.id).toBe(id)
})
it('should respect deleteJobOnComplete false configuration', async () => {
payload.config.jobs.deleteJobOnComplete = false
const { id } = await payload.jobs.queue({
workflow: 'inlineTaskTest',
input: {
message: 'hello!',
},
})
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(before?.id).toBe(id)
await payload.jobs.run({ silent: true })
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
expect(after?.id).toBe(id)
})
it('can queue single tasks', async () => {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
})
it('can queue and run via the endpoint single tasks without workflows', async () => {
const workflowsRef = payload.config.jobs.workflows
delete payload.config.jobs.workflows
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await restClient.GET('/payload-jobs/run?silent=true', {
headers: {
Authorization: `JWT ${token}`,
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
payload.config.jobs.workflows = workflowsRef
})
/*
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
it('transaction test against payload-jobs collection', async () => {
// This kinds of emulates what happens when multiple jobs are queued and then run in parallel.
const runWorkflowFN = async (i: number) => {
const { id } = await payload.create({
collection: 'payload-jobs',
data: {
input: {
message: 'Number ' + i,
},
taskSlug: 'CreateSimple',
},
})
const _req = await createLocalReq({}, payload)
const t1Req = isolateObjectProperty(_req, 'transactionID')
delete t1Req.transactionID
await initTransaction(t1Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 1',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
/**
* T1 start
*/
/*
const t2Req = isolateObjectProperty(t1Req, 'transactionID')
delete t2Req.transactionID
//
await initTransaction(t2Req)
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 2',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await payload.create({
collection: 'simple',
req: t2Req,
data: {
title: 'from single task',
},
})
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 3',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t2Req)
/**
* T1 end
*/
/*
await payload.update({
collection: 'payload-jobs',
id,
req: t1Req,
data: {
input: {
message: 'Number ' + i + ' Update 4',
},
processing: true,
taskSlug: 'CreateSimple',
},
})
await commitTransaction(t1Req)
}
await Promise.all(
new Array(30).fill(0).map(async (_, i) => {
await runWorkflowFN(i)
}),
)
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(30)
})*/
it('can queue single tasks 8 times', async () => {
for (let i = 0; i < 8; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(8)
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[7]?.title).toBe('from single task')
})
it('can queue single tasks hundreds of times', async () => {
const numberOfTasks = 150
// TODO: Ramp up the limit from 150 to 500 or 1000, to test reliability of the database
payload.config.jobs.deleteJobOnComplete = false
for (let i = 0; i < numberOfTasks; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
silent: true,
limit: numberOfTasks,
})
const allSimples = await payload.find({
collection: 'simple',
limit: numberOfTasks,
})
expect(allSimples.totalDocs).toBe(numberOfTasks) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[numberOfTasks - 1]?.title).toBe('from single task')
})
it('ensure default jobs run limit of 10 works', async () => {
for (let i = 0; i < 65; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(10) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[9]?.title).toBe('from single task')
})
it('ensure jobs run limit can be customized', async () => {
for (let i = 0; i < 110; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
}
await payload.jobs.run({
limit: 42,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 1000,
})
expect(allSimples.totalDocs).toBe(42) // Default limit: 10
expect(allSimples.docs[0]?.title).toBe('from single task')
expect(allSimples.docs[30]?.title).toBe('from single task')
expect(allSimples.docs[41]?.title).toBe('from single task')
})
it('can queue different kinds of single tasks multiple times', async () => {
for (let i = 0; i < 3; i++) {
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
await payload.jobs.queue({
task: 'CreateSimpleWithDuplicateMessage',
input: {
message: 'hello',
},
})
}
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(9)
let amountOfCreateSimple = 0
let amountOfCreateSimpleWithDuplicateMessage = 0
for (const simple of allSimples.docs) {
if (simple.title === 'from single task') {
amountOfCreateSimple++
} else if (simple.title === 'hellohello') {
amountOfCreateSimpleWithDuplicateMessage++
}
}
expect(amountOfCreateSimple).toBe(3)
expect(amountOfCreateSimpleWithDuplicateMessage).toBe(6)
})
it('can queue external tasks', async () => {
await payload.jobs.queue({
task: 'ExternalTask',
input: {
message: 'external',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('external')
})
it('can queue external workflow that is running external task', async () => {
await payload.jobs.queue({
workflow: 'externalWorkflow',
input: {
message: 'externalWorkflow',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('externalWorkflow')
})
it('ensure payload.jobs.runByID works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: null | number | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.runByID({
id: lastJobID,
silent: true,
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
let lastJobID: null | number | string = null
for (let i = 0; i < 3; i++) {
const job = await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: 'from single task',
},
})
lastJobID = job.id
}
if (!lastJobID) {
throw new Error('No job ID found after queuing jobs')
}
await payload.jobs.run({
silent: true,
where: {
id: {
equals: lastJobID,
},
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
})
it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => {
payload.config.jobs.deleteJobOnComplete = false
for (let i = 0; i < 3; i++) {
await payload.jobs.queue({
task: 'CreateSimple',
input: {
message: `from single task ${i}`,
},
})
}
await payload.jobs.run({
silent: true,
where: {
'input.message': {
equals: 'from single task 2',
},
},
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples.docs[0]?.title).toBe('from single task 2')
const allCompletedJobs = await payload.find({
collection: 'payload-jobs',
limit: 100,
where: {
completedAt: {
exists: true,
},
},
})
expect(allCompletedJobs.totalDocs).toBe(1)
expect((allCompletedJobs.docs[0]?.input as any).message).toBe('from single task 2')
})
it('can run sub-tasks', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTask',
input: {
message: 'hello!',
},
})
await payload.jobs.run({ silent: true })
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(2)
expect(allSimples.docs[0]?.title).toBe('hello!')
expect(allSimples.docs[1]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun?.log?.[0]?.taskID).toBe('create doc 1')
//expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs')
// jobAfterRun.log[0].parent should not exist
expect(jobAfterRun?.log?.[0]?.parent).toBeUndefined()
expect(jobAfterRun?.log?.[1]?.taskID).toBe('create doc 2')
//expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs')
expect(jobAfterRun?.log?.[1]?.parent).toBeUndefined()
expect(jobAfterRun?.log?.[2]?.taskID).toBe('create two docs')
})
it('ensure successful sub-tasks are not retried', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'subTaskFails',
input: {
message: 'hello!',
},
})
let hasJobsRemaining = true
while (hasJobsRemaining) {
const response = await payload.jobs.run({ silent: true })
if (response.noJobsRemaining) {
hasJobsRemaining = false
}
}
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
// @ts-expect-error
expect(jobAfterRun.input.amountTask2Retried).toBe(3)
// @ts-expect-error
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
})
it('ensure jobs can be cancelled using payload.jobs.cancelByID', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Should be in processing - cancel job
await payload.jobs.cancelByID({
id: job.id,
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('ensure jobs can be cancelled using payload.jobs.cancel', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Cancel all jobs
await payload.jobs.cancel({
where: {
id: {
exists: true,
},
},
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('can tasks throw error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ThrowError',
input: {},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('failed')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnError',
input: {},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can tasks return error with custom error message', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
task: 'ReturnCustomError',
input: {
errorMessage: 'custom error message',
},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})
it('can reliably run workflows with parallel tasks', async () => {
// eslint-disable-next-line jest/no-conditional-in-test
if (process.env.PAYLOAD_DATABASE === 'supabase') {
// TODO: This test is flaky on supabase in CI, so we skip it for now
return
}
const amount = 50
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'parallelTask',
input: {
amount,
},
})
await payload.jobs.run({ silent: true })
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})
expect(jobAfterRun.hasError).toBe(false)
expect(jobAfterRun.log?.length).toBe(amount)
const simpleDocs = await payload.find({
collection: 'simple',
limit: amount,
depth: 0,
})
expect(simpleDocs.docs).toHaveLength(amount)
// Ensure all docs are created (= all tasks are run once)
for (let i = 1; i <= simpleDocs.docs.length; i++) {
const simpleDoc = simpleDocs.docs.find((doc) => doc.title === `parallel task ${i}`)
const logEntry = jobAfterRun?.log?.find((log) => log.taskID === `parallel task ${i}`)
expect(simpleDoc).toBeDefined()
expect(logEntry).toBeDefined()
expect((logEntry?.output as any)?.simpleID).toBe(simpleDoc?.id)
}
})
it('can create and autorun jobs', async () => {
await payload.jobs.queue({
workflow: 'inlineTaskTest',
queue: 'autorunSecond',
input: {
message: 'hello!',
},
})
await waitUntilAutorunIsDone({
payload,
queue: 'autorunSecond',
})
const allSimples = await payload.find({
collection: 'simple',
limit: 100,
})
expect(allSimples.totalDocs).toBe(1)
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
})
})