Some checks failed
ci / changes (push) Has been cancelled
ci / lint (push) Has been cancelled
ci / build (push) Has been cancelled
ci / tests-unit (push) Has been cancelled
ci / tests-types (push) Has been cancelled
ci / int-cosmosdb (push) Has been cancelled
ci / int-documentdb (push) Has been cancelled
ci / int-firestore (push) Has been cancelled
ci / int-mongodb (push) Has been cancelled
ci / int-postgres (push) Has been cancelled
ci / int-postgres-custom-schema (push) Has been cancelled
ci / int-postgres-uuid (push) Has been cancelled
ci / int-sqlite (push) Has been cancelled
ci / int-sqlite-uuid (push) Has been cancelled
ci / int-supabase (push) Has been cancelled
ci / e2e-_community (push) Has been cancelled
ci / e2e-access-control (push) Has been cancelled
ci / e2e-admin-bar (push) Has been cancelled
ci / e2e-admin-root (push) Has been cancelled
ci / e2e-admin__e2e__document-view (push) Has been cancelled
ci / e2e-admin__e2e__general (push) Has been cancelled
ci / e2e-admin__e2e__list-view (push) Has been cancelled
ci / e2e-auth (push) Has been cancelled
ci / e2e-auth-basic (push) Has been cancelled
ci / e2e-bulk-edit (push) Has been cancelled
ci / e2e-field-error-states (push) Has been cancelled
ci / e2e-fields-relationship (push) Has been cancelled
ci / e2e-fields__collections__Array (push) Has been cancelled
ci / e2e-fields__collections__Blocks#config.blockreferences.ts (push) Has been cancelled
ci / e2e-fields__collections__Blocks (push) Has been cancelled
ci / e2e-fields__collections__Checkbox (push) Has been cancelled
ci / e2e-fields__collections__Collapsible (push) Has been cancelled
ci / e2e-fields__collections__ConditionalLogic (push) Has been cancelled
ci / e2e-fields__collections__CustomID (push) Has been cancelled
ci / e2e-fields__collections__Date (push) Has been cancelled
ci / e2e-fields__collections__Email (push) Has been cancelled
ci / e2e-fields__collections__Indexed (push) Has been cancelled
ci / e2e-fields__collections__JSON (push) Has been cancelled
ci / e2e-fields__collections__Number (push) Has been cancelled
ci / e2e-fields__collections__Point (push) Has been cancelled
ci / e2e-fields__collections__Radio (push) Has been cancelled
ci / e2e-fields__collections__Relationship (push) Has been cancelled
ci / e2e-fields__collections__Row (push) Has been cancelled
ci / e2e-fields__collections__Select (push) Has been cancelled
ci / e2e-fields__collections__Tabs (push) Has been cancelled
ci / e2e-fields__collections__Tabs2 (push) Has been cancelled
ci / e2e-fields__collections__Text (push) Has been cancelled
ci / e2e-fields__collections__UI (push) Has been cancelled
ci / e2e-fields__collections__Upload (push) Has been cancelled
ci / e2e-folders (push) Has been cancelled
ci / e2e-form-state (push) Has been cancelled
ci / e2e-group-by (push) Has been cancelled
ci / e2e-hooks (push) Has been cancelled
ci / e2e-i18n (push) Has been cancelled
ci / e2e-joins (push) Has been cancelled
ci / e2e-lexical__collections__LexicalHeadingFeature (push) Has been cancelled
ci / e2e-lexical__collections__LexicalJSXConverter (push) Has been cancelled
ci / e2e-lexical__collections__LexicalLinkFeature (push) Has been cancelled
ci / e2e-lexical__collections__Lexical__e2e__blocks#config.blockreferences.ts (push) Has been cancelled
ci / e2e-lexical__collections__Lexical__e2e__blocks (push) Has been cancelled
ci / e2e-lexical__collections__Lexical__e2e__main (push) Has been cancelled
ci / e2e-lexical__collections__OnDemandForm (push) Has been cancelled
ci / e2e-lexical__collections__RichText (push) Has been cancelled
ci / e2e-lexical__collections___LexicalFullyFeatured (push) Has been cancelled
ci / e2e-lexical__collections___LexicalFullyFeatured__db (push) Has been cancelled
ci / e2e-live-preview (push) Has been cancelled
ci / e2e-localization (push) Has been cancelled
ci / e2e-locked-documents (push) Has been cancelled
ci / e2e-plugin-cloud-storage (push) Has been cancelled
ci / e2e-plugin-form-builder (push) Has been cancelled
ci / e2e-plugin-import-export (push) Has been cancelled
ci / e2e-plugin-multi-tenant (push) Has been cancelled
ci / e2e-plugin-nested-docs (push) Has been cancelled
ci / e2e-plugin-seo (push) Has been cancelled
ci / e2e-query-presets (push) Has been cancelled
ci / e2e-sort (push) Has been cancelled
ci / e2e-trash (push) Has been cancelled
ci / e2e-uploads (push) Has been cancelled
ci / e2e-versions (push) Has been cancelled
ci / e2e-turbo-_community (push) Has been cancelled
ci / e2e-turbo-access-control (push) Has been cancelled
ci / e2e-turbo-admin-bar (push) Has been cancelled
ci / e2e-turbo-admin-root (push) Has been cancelled
ci / e2e-turbo-admin__e2e__document-view (push) Has been cancelled
ci / e2e-turbo-admin__e2e__general (push) Has been cancelled
ci / e2e-turbo-admin__e2e__list-view (push) Has been cancelled
ci / e2e-turbo-auth (push) Has been cancelled
ci / e2e-turbo-auth-basic (push) Has been cancelled
ci / e2e-turbo-bulk-edit (push) Has been cancelled
ci / e2e-turbo-field-error-states (push) Has been cancelled
ci / e2e-turbo-fields-relationship (push) Has been cancelled
ci / e2e-turbo-fields__collections__Array (push) Has been cancelled
ci / e2e-turbo-fields__collections__Blocks#config.blockreferences.ts (push) Has been cancelled
ci / e2e-turbo-fields__collections__Blocks (push) Has been cancelled
ci / e2e-turbo-fields__collections__Checkbox (push) Has been cancelled
ci / e2e-turbo-fields__collections__Collapsible (push) Has been cancelled
ci / e2e-turbo-fields__collections__ConditionalLogic (push) Has been cancelled
ci / e2e-turbo-fields__collections__CustomID (push) Has been cancelled
ci / e2e-turbo-fields__collections__Date (push) Has been cancelled
ci / e2e-turbo-fields__collections__Email (push) Has been cancelled
ci / e2e-turbo-fields__collections__Indexed (push) Has been cancelled
ci / e2e-turbo-fields__collections__JSON (push) Has been cancelled
ci / e2e-turbo-fields__collections__Number (push) Has been cancelled
ci / e2e-turbo-fields__collections__Point (push) Has been cancelled
ci / e2e-turbo-fields__collections__Radio (push) Has been cancelled
ci / e2e-turbo-fields__collections__Relationship (push) Has been cancelled
ci / e2e-turbo-fields__collections__Row (push) Has been cancelled
ci / e2e-turbo-fields__collections__Select (push) Has been cancelled
ci / e2e-turbo-fields__collections__Tabs (push) Has been cancelled
ci / e2e-turbo-fields__collections__Tabs2 (push) Has been cancelled
ci / e2e-turbo-fields__collections__Text (push) Has been cancelled
ci / e2e-turbo-fields__collections__UI (push) Has been cancelled
ci / e2e-turbo-fields__collections__Upload (push) Has been cancelled
ci / e2e-turbo-folders (push) Has been cancelled
ci / e2e-turbo-form-state (push) Has been cancelled
ci / e2e-turbo-group-by (push) Has been cancelled
ci / e2e-turbo-hooks (push) Has been cancelled
ci / e2e-turbo-i18n (push) Has been cancelled
ci / e2e-turbo-joins (push) Has been cancelled
ci / e2e-turbo-lexical__collections__LexicalHeadingFeature (push) Has been cancelled
ci / e2e-turbo-lexical__collections__LexicalJSXConverter (push) Has been cancelled
ci / e2e-turbo-lexical__collections__LexicalLinkFeature (push) Has been cancelled
ci / e2e-turbo-lexical__collections__Lexical__e2e__blocks#config.blockreferences.ts (push) Has been cancelled
ci / e2e-turbo-lexical__collections__Lexical__e2e__blocks (push) Has been cancelled
ci / e2e-turbo-lexical__collections__Lexical__e2e__main (push) Has been cancelled
ci / e2e-turbo-lexical__collections__OnDemandForm (push) Has been cancelled
ci / e2e-turbo-lexical__collections__RichText (push) Has been cancelled
ci / e2e-turbo-lexical__collections___LexicalFullyFeatured (push) Has been cancelled
ci / e2e-turbo-lexical__collections___LexicalFullyFeatured__db (push) Has been cancelled
ci / e2e-turbo-live-preview (push) Has been cancelled
ci / e2e-turbo-localization (push) Has been cancelled
ci / e2e-turbo-locked-documents (push) Has been cancelled
ci / e2e-turbo-plugin-cloud-storage (push) Has been cancelled
ci / e2e-turbo-plugin-form-builder (push) Has been cancelled
ci / e2e-turbo-plugin-import-export (push) Has been cancelled
ci / e2e-turbo-plugin-multi-tenant (push) Has been cancelled
ci / e2e-turbo-plugin-nested-docs (push) Has been cancelled
ci / e2e-turbo-plugin-seo (push) Has been cancelled
ci / e2e-turbo-query-presets (push) Has been cancelled
ci / e2e-turbo-sort (push) Has been cancelled
ci / e2e-turbo-trash (push) Has been cancelled
ci / e2e-turbo-uploads (push) Has been cancelled
ci / e2e-turbo-versions (push) Has been cancelled
ci / build-template-blank-mongodb (push) Has been cancelled
ci / build-template-website-mongodb (push) Has been cancelled
ci / build-template-with-payload-cloud-mongodb (push) Has been cancelled
ci / build-template-with-vercel-mongodb-mongodb (push) Has been cancelled
ci / build-template-plugin- (push) Has been cancelled
ci / build-template-with-postgres-postgres (push) Has been cancelled
ci / build-template-with-vercel-postgres-postgres (push) Has been cancelled
ci / tests-type-generation (push) Has been cancelled
ci / All Green (push) Has been cancelled
ci / Publish Canary (push) Has been cancelled
ci / analyze (push) Has been cancelled
publish-prerelease / publish-prerelease-${{ github.ref_name }}-${{ github.sha }} (push) Has been cancelled
lock-issues / lock_issues (push) Has been cancelled
stale / stale (push) Has been cancelled
audit-dependencies / audit (push) Has been cancelled
activity-notifications / run (push) Has been cancelled
1494 lines
40 KiB
TypeScript
1494 lines
40 KiB
TypeScript
import path from 'path'
|
|
import {
|
|
_internal_jobSystemGlobals,
|
|
_internal_resetJobSystemGlobals,
|
|
type JobTaskStatus,
|
|
type Payload,
|
|
} from 'tbsh-cms'
|
|
import { wait } from 'tbsh-cms/shared'
|
|
import { fileURLToPath } from 'url'
|
|
|
|
import type { NextRESTClient } from '../helpers/NextRESTClient.js'
|
|
|
|
import { devUser } from '../credentials.js'
|
|
import { initPayloadInt } from '../helpers/initPayloadInt.js'
|
|
import { clearAndSeedEverything } from './seed.js'
|
|
import { waitUntilAutorunIsDone } from './utilities.js'
|
|
|
|
const { email, password } = devUser
|
|
const filename = fileURLToPath(import.meta.url)
|
|
const dirname = path.dirname(filename)
|
|
|
|
describe('Queues - Payload', () => {
|
|
let payload: Payload
|
|
let restClient: NextRESTClient
|
|
let token: string
|
|
|
|
beforeAll(async () => {
|
|
process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit
|
|
;({ payload, restClient } = await initPayloadInt(dirname))
|
|
})
|
|
|
|
afterAll(async () => {
|
|
// Ensure no new crons are scheduled
|
|
_internal_jobSystemGlobals.shouldAutoRun = false
|
|
_internal_jobSystemGlobals.shouldAutoSchedule = false
|
|
// Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues
|
|
// Cron function runs may persist after a test has finished
|
|
await wait(3000)
|
|
// Now we can destroy the payload instance
|
|
await payload.destroy()
|
|
_internal_resetJobSystemGlobals()
|
|
})
|
|
|
|
afterEach(() => {
|
|
_internal_resetJobSystemGlobals()
|
|
})
|
|
|
|
beforeEach(async () => {
|
|
// Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests
|
|
_internal_jobSystemGlobals.shouldAutoRun = false
|
|
_internal_jobSystemGlobals.shouldAutoSchedule = false
|
|
await clearAndSeedEverything(payload)
|
|
const data = await restClient
|
|
.POST('/users/login', {
|
|
body: JSON.stringify({
|
|
email,
|
|
password,
|
|
}),
|
|
})
|
|
.then((res) => res.json())
|
|
|
|
if (data.token) {
|
|
token = data.token
|
|
}
|
|
payload.config.jobs.deleteJobOnComplete = true
|
|
_internal_jobSystemGlobals.shouldAutoRun = true
|
|
_internal_jobSystemGlobals.shouldAutoSchedule = true
|
|
})
|
|
|
|
it('will run access control on jobs runner', async () => {
|
|
const response = await restClient.GET('/payload-jobs/run?silent=true', {
|
|
headers: {
|
|
// Authorization: `JWT ${token}`,
|
|
},
|
|
}) // Needs to be a rest call to test auth
|
|
expect(response.status).toBe(401)
|
|
})
|
|
|
|
it('will return 200 from jobs runner', async () => {
|
|
const response = await restClient.GET('/payload-jobs/run?silent=true', {
|
|
headers: {
|
|
Authorization: `JWT ${token}`,
|
|
},
|
|
}) // Needs to be a rest call to test auth
|
|
|
|
expect(response.status).toBe(200)
|
|
})
|
|
|
|
// There used to be a bug in payload where updating the job threw the following error - only in
|
|
// postgres:
|
|
// QueryError: The following path cannot be queried: document.relationTo
|
|
// This test is to ensure that the bug is fixed
|
|
it('can create and update new jobs', async () => {
|
|
const job = await payload.create({
|
|
collection: 'payload-jobs',
|
|
data: {
|
|
input: {
|
|
message: '1',
|
|
},
|
|
},
|
|
})
|
|
// @ts-expect-error
|
|
expect(job.input.message).toBe('1')
|
|
|
|
const updatedJob = await payload.update({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
data: {
|
|
input: {
|
|
message: '2',
|
|
},
|
|
},
|
|
})
|
|
// @ts-expect-error
|
|
expect(updatedJob.input.message).toBe('2')
|
|
})
|
|
|
|
it('can create new jobs', async () => {
|
|
const newPost = await payload.create({
|
|
collection: 'posts',
|
|
data: {
|
|
title: 'my post',
|
|
},
|
|
})
|
|
|
|
const retrievedPost = await payload.findByID({
|
|
collection: 'posts',
|
|
id: newPost.id,
|
|
})
|
|
|
|
expect(retrievedPost.jobStep1Ran).toBeFalsy()
|
|
expect(retrievedPost.jobStep2Ran).toBeFalsy()
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const postAfterJobs = await payload.findByID({
|
|
collection: 'posts',
|
|
id: newPost.id,
|
|
})
|
|
|
|
expect(postAfterJobs.jobStep1Ran).toBe('hello')
|
|
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
|
|
})
|
|
|
|
it('can create new JSON-workflow jobs', async () => {
|
|
const newPost = await payload.create({
|
|
collection: 'posts',
|
|
data: {
|
|
title: 'my post',
|
|
},
|
|
context: {
|
|
useJSONWorkflow: true,
|
|
},
|
|
})
|
|
|
|
const retrievedPost = await payload.findByID({
|
|
collection: 'posts',
|
|
id: newPost.id,
|
|
})
|
|
|
|
expect(retrievedPost.jobStep1Ran).toBeFalsy()
|
|
expect(retrievedPost.jobStep2Ran).toBeFalsy()
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const postAfterJobs = await payload.findByID({
|
|
collection: 'posts',
|
|
id: newPost.id,
|
|
})
|
|
|
|
expect(postAfterJobs.jobStep1Ran).toBe('hello')
|
|
expect(postAfterJobs.jobStep2Ran).toBe('hellohellohellohello')
|
|
})
|
|
|
|
it('ensure job retrying works', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'retriesTest',
|
|
queue: 'default',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(3)
|
|
})
|
|
|
|
it('ensure workflow-level retries are respected', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'retriesWorkflowLevelTest',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(2)
|
|
})
|
|
|
|
it('ensure workflows dont limit retries if no retries property is set', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'workflowNoRetriesSet',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(3)
|
|
})
|
|
|
|
it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'workflowRetries0',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(0)
|
|
})
|
|
|
|
it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'workflowAndTasksRetriesUndefined',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(0)
|
|
})
|
|
|
|
it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'workflowRetries2TasksRetriesUndefined',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(2)
|
|
})
|
|
|
|
it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'workflowRetries2TasksRetries0',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(0)
|
|
})
|
|
|
|
/*
|
|
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
|
|
it('ensure failed tasks are rolled back via transactions', async () => {
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'retriesRollbackTest',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({silent: true})
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1) // Failure happens after task creates a simple document, but still within the task => any document creation should be rolled back
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(4)
|
|
})*/
|
|
|
|
it('ensure backoff strategy of task is respected', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'retriesBackoffTest',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
let firstGotNoJobs = null
|
|
|
|
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms))
|
|
|
|
// Keep running until no jobs found. If no jobs found, wait for 1.6 seconds to see if any new jobs are added
|
|
// (Specifically here we want to see if the backoff strategy is respected and thus need to wait for `waitUntil`)
|
|
while (
|
|
hasJobsRemaining ||
|
|
!firstGotNoJobs ||
|
|
new Date().getTime() - firstGotNoJobs.getTime() < 3000
|
|
) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
if (hasJobsRemaining) {
|
|
firstGotNoJobs = new Date()
|
|
hasJobsRemaining = false
|
|
}
|
|
} else {
|
|
firstGotNoJobs = null
|
|
hasJobsRemaining = true
|
|
}
|
|
|
|
// Add a 100ms delay before the next iteration
|
|
await delay(100)
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
expect(jobAfterRun.totalTried).toBe(5)
|
|
expect((jobAfterRun.taskStatus as JobTaskStatus).inline?.['1']?.totalTried).toBe(5)
|
|
|
|
// @ts-expect-error amountRetried is new arbitrary data and not in the type
|
|
expect(jobAfterRun.input.amountRetried).toBe(4)
|
|
|
|
/*
|
|
Job.input.timeTried may look something like this:
|
|
timeTried: {
|
|
'0': '2024-10-07T16:05:49.300Z',
|
|
'1': '2024-10-07T16:05:49.469Z',
|
|
'2': '2024-10-07T16:05:49.779Z',
|
|
'3': '2024-10-07T16:05:50.388Z',
|
|
'4': '2024-10-07T16:05:51.597Z'
|
|
}
|
|
Convert this into an array, each item is the duration between the fails. So this should have 4 items
|
|
*/
|
|
const timeTried: {
|
|
[key: string]: string
|
|
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
|
} = jobAfterRun.input.timeTried
|
|
|
|
const durations = Object.values(timeTried)
|
|
.map((time, index, arr) => {
|
|
if (index === arr.length - 1) {
|
|
return null
|
|
}
|
|
return new Date(arr[index + 1] as string).getTime() - new Date(time).getTime()
|
|
})
|
|
.filter((p) => p !== null)
|
|
|
|
expect(durations).toHaveLength(4)
|
|
expect(durations[0]).toBeGreaterThan(300)
|
|
expect(durations[1]).toBeGreaterThan(600)
|
|
expect(durations[2]).toBeGreaterThan(1200)
|
|
expect(durations[3]).toBeGreaterThan(2400)
|
|
})
|
|
|
|
it('ensure jobs run in FIFO order by default', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
input: {
|
|
message: 'task 1',
|
|
},
|
|
})
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 100))
|
|
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
input: {
|
|
message: 'task 2',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({
|
|
sequential: true,
|
|
silent: true,
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
sort: 'createdAt',
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(2)
|
|
expect(allSimples.docs?.[0]?.title).toBe('task 1')
|
|
expect(allSimples.docs?.[1]?.title).toBe('task 2')
|
|
})
|
|
|
|
it('ensure jobs can run LIFO if processingOrder is passed', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
input: {
|
|
message: 'task 1',
|
|
},
|
|
})
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 100))
|
|
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
input: {
|
|
message: 'task 2',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({
|
|
sequential: true,
|
|
silent: true,
|
|
processingOrder: '-createdAt',
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
sort: 'createdAt',
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(2)
|
|
expect(allSimples.docs?.[0]?.title).toBe('task 2')
|
|
expect(allSimples.docs?.[1]?.title).toBe('task 1')
|
|
})
|
|
|
|
it('ensure job config processingOrder using queues object is respected', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
queue: 'lifo',
|
|
input: {
|
|
message: 'task 1',
|
|
},
|
|
})
|
|
|
|
await new Promise((resolve) => setTimeout(resolve, 100))
|
|
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTestDelayed',
|
|
queue: 'lifo',
|
|
input: {
|
|
message: 'task 2',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({
|
|
sequential: true,
|
|
silent: true,
|
|
queue: 'lifo',
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
sort: 'createdAt',
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(2)
|
|
expect(allSimples.docs?.[0]?.title).toBe('task 2')
|
|
expect(allSimples.docs?.[1]?.title).toBe('task 1')
|
|
})
|
|
|
|
it('can create new inline jobs', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTest',
|
|
input: {
|
|
message: 'hello!',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('hello!')
|
|
})
|
|
|
|
it('should respect deleteJobOnComplete true default configuration', async () => {
|
|
const { id } = await payload.jobs.queue({
|
|
workflow: 'inlineTaskTest',
|
|
input: {
|
|
message: 'deleteJobOnComplete test',
|
|
},
|
|
})
|
|
|
|
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(before?.id).toBe(id)
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(after).toBeNull()
|
|
})
|
|
|
|
it('should not delete failed jobs if deleteJobOnComplete is true', async () => {
|
|
const { id } = await payload.jobs.queue({
|
|
workflow: 'failsImmediately',
|
|
input: {},
|
|
})
|
|
|
|
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(before?.id).toBe(id)
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(after?.id).toBe(id)
|
|
})
|
|
|
|
it('should respect deleteJobOnComplete false configuration', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const { id } = await payload.jobs.queue({
|
|
workflow: 'inlineTaskTest',
|
|
input: {
|
|
message: 'hello!',
|
|
},
|
|
})
|
|
|
|
const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(before?.id).toBe(id)
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true })
|
|
expect(after?.id).toBe(id)
|
|
})
|
|
|
|
it('can queue single tasks', async () => {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
})
|
|
|
|
it('can queue and run via the endpoint single tasks without workflows', async () => {
|
|
const workflowsRef = payload.config.jobs.workflows
|
|
delete payload.config.jobs.workflows
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
|
|
await restClient.GET('/payload-jobs/run?silent=true', {
|
|
headers: {
|
|
Authorization: `JWT ${token}`,
|
|
},
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
payload.config.jobs.workflows = workflowsRef
|
|
})
|
|
|
|
/*
|
|
// Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues
|
|
it('transaction test against payload-jobs collection', async () => {
|
|
// This kinds of emulates what happens when multiple jobs are queued and then run in parallel.
|
|
const runWorkflowFN = async (i: number) => {
|
|
const { id } = await payload.create({
|
|
collection: 'payload-jobs',
|
|
data: {
|
|
input: {
|
|
message: 'Number ' + i,
|
|
},
|
|
taskSlug: 'CreateSimple',
|
|
},
|
|
})
|
|
|
|
const _req = await createLocalReq({}, payload)
|
|
const t1Req = isolateObjectProperty(_req, 'transactionID')
|
|
delete t1Req.transactionID
|
|
|
|
await initTransaction(t1Req)
|
|
|
|
await payload.update({
|
|
collection: 'payload-jobs',
|
|
id,
|
|
req: t1Req,
|
|
data: {
|
|
input: {
|
|
message: 'Number ' + i + ' Update 1',
|
|
},
|
|
processing: true,
|
|
taskSlug: 'CreateSimple',
|
|
},
|
|
})
|
|
|
|
/**
|
|
* T1 start
|
|
*/
|
|
/*
|
|
const t2Req = isolateObjectProperty(t1Req, 'transactionID')
|
|
delete t2Req.transactionID
|
|
//
|
|
await initTransaction(t2Req)
|
|
|
|
await payload.update({
|
|
collection: 'payload-jobs',
|
|
id,
|
|
req: t1Req,
|
|
data: {
|
|
input: {
|
|
message: 'Number ' + i + ' Update 2',
|
|
},
|
|
processing: true,
|
|
taskSlug: 'CreateSimple',
|
|
},
|
|
})
|
|
|
|
await payload.create({
|
|
collection: 'simple',
|
|
req: t2Req,
|
|
data: {
|
|
title: 'from single task',
|
|
},
|
|
})
|
|
|
|
await payload.update({
|
|
collection: 'payload-jobs',
|
|
id,
|
|
req: t1Req,
|
|
data: {
|
|
input: {
|
|
message: 'Number ' + i + ' Update 3',
|
|
},
|
|
processing: true,
|
|
taskSlug: 'CreateSimple',
|
|
},
|
|
})
|
|
|
|
await commitTransaction(t2Req)
|
|
|
|
/**
|
|
* T1 end
|
|
*/
|
|
/*
|
|
await payload.update({
|
|
collection: 'payload-jobs',
|
|
id,
|
|
req: t1Req,
|
|
data: {
|
|
input: {
|
|
message: 'Number ' + i + ' Update 4',
|
|
},
|
|
processing: true,
|
|
taskSlug: 'CreateSimple',
|
|
},
|
|
})
|
|
await commitTransaction(t1Req)
|
|
}
|
|
|
|
await Promise.all(
|
|
new Array(30).fill(0).map(async (_, i) => {
|
|
await runWorkflowFN(i)
|
|
}),
|
|
)
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(30)
|
|
})*/
|
|
|
|
it('can queue single tasks 8 times', async () => {
|
|
for (let i = 0; i < 8; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(8)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
expect(allSimples.docs[7]?.title).toBe('from single task')
|
|
})
|
|
|
|
it('can queue single tasks hundreds of times', async () => {
|
|
const numberOfTasks = 150
|
|
// TODO: Ramp up the limit from 150 to 500 or 1000, to test reliability of the database
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
for (let i = 0; i < numberOfTasks; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({
|
|
silent: true,
|
|
limit: numberOfTasks,
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: numberOfTasks,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(numberOfTasks) // Default limit: 10
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
expect(allSimples.docs[numberOfTasks - 1]?.title).toBe('from single task')
|
|
})
|
|
|
|
it('ensure default jobs run limit of 10 works', async () => {
|
|
for (let i = 0; i < 65; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 1000,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(10) // Default limit: 10
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
expect(allSimples.docs[9]?.title).toBe('from single task')
|
|
})
|
|
|
|
it('ensure jobs run limit can be customized', async () => {
|
|
for (let i = 0; i < 110; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({
|
|
limit: 42,
|
|
silent: true,
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 1000,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(42) // Default limit: 10
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
expect(allSimples.docs[30]?.title).toBe('from single task')
|
|
expect(allSimples.docs[41]?.title).toBe('from single task')
|
|
})
|
|
|
|
it('can queue different kinds of single tasks multiple times', async () => {
|
|
for (let i = 0; i < 3; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimpleWithDuplicateMessage',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimpleWithDuplicateMessage',
|
|
input: {
|
|
message: 'hello',
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(9)
|
|
|
|
let amountOfCreateSimple = 0
|
|
let amountOfCreateSimpleWithDuplicateMessage = 0
|
|
|
|
for (const simple of allSimples.docs) {
|
|
if (simple.title === 'from single task') {
|
|
amountOfCreateSimple++
|
|
} else if (simple.title === 'hellohello') {
|
|
amountOfCreateSimpleWithDuplicateMessage++
|
|
}
|
|
}
|
|
|
|
expect(amountOfCreateSimple).toBe(3)
|
|
expect(amountOfCreateSimpleWithDuplicateMessage).toBe(6)
|
|
})
|
|
|
|
it('can queue external tasks', async () => {
|
|
await payload.jobs.queue({
|
|
task: 'ExternalTask',
|
|
input: {
|
|
message: 'external',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('external')
|
|
})
|
|
|
|
it('can queue external workflow that is running external task', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'externalWorkflow',
|
|
input: {
|
|
message: 'externalWorkflow',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('externalWorkflow')
|
|
})
|
|
|
|
it('ensure payload.jobs.runByID works and only runs the specified job', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
let lastJobID: null | number | string = null
|
|
for (let i = 0; i < 3; i++) {
|
|
const job = await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
lastJobID = job.id
|
|
}
|
|
if (!lastJobID) {
|
|
throw new Error('No job ID found after queuing jobs')
|
|
}
|
|
|
|
await payload.jobs.runByID({
|
|
id: lastJobID,
|
|
silent: true,
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
|
|
const allCompletedJobs = await payload.find({
|
|
collection: 'payload-jobs',
|
|
limit: 100,
|
|
where: {
|
|
completedAt: {
|
|
exists: true,
|
|
},
|
|
},
|
|
})
|
|
|
|
expect(allCompletedJobs.totalDocs).toBe(1)
|
|
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
|
|
})
|
|
|
|
it('ensure where query for id in payload.jobs.run works and only runs the specified job', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
let lastJobID: null | number | string = null
|
|
for (let i = 0; i < 3; i++) {
|
|
const job = await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: 'from single task',
|
|
},
|
|
})
|
|
lastJobID = job.id
|
|
}
|
|
if (!lastJobID) {
|
|
throw new Error('No job ID found after queuing jobs')
|
|
}
|
|
|
|
await payload.jobs.run({
|
|
silent: true,
|
|
where: {
|
|
id: {
|
|
equals: lastJobID,
|
|
},
|
|
},
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task')
|
|
|
|
const allCompletedJobs = await payload.find({
|
|
collection: 'payload-jobs',
|
|
limit: 100,
|
|
where: {
|
|
completedAt: {
|
|
exists: true,
|
|
},
|
|
},
|
|
})
|
|
|
|
expect(allCompletedJobs.totalDocs).toBe(1)
|
|
expect(allCompletedJobs.docs[0]?.id).toBe(lastJobID)
|
|
})
|
|
|
|
it('ensure where query for input data in payload.jobs.run works and only runs the specified job', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
for (let i = 0; i < 3; i++) {
|
|
await payload.jobs.queue({
|
|
task: 'CreateSimple',
|
|
input: {
|
|
message: `from single task ${i}`,
|
|
},
|
|
})
|
|
}
|
|
|
|
await payload.jobs.run({
|
|
silent: true,
|
|
where: {
|
|
'input.message': {
|
|
equals: 'from single task 2',
|
|
},
|
|
},
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples.docs[0]?.title).toBe('from single task 2')
|
|
|
|
const allCompletedJobs = await payload.find({
|
|
collection: 'payload-jobs',
|
|
limit: 100,
|
|
where: {
|
|
completedAt: {
|
|
exists: true,
|
|
},
|
|
},
|
|
})
|
|
|
|
expect(allCompletedJobs.totalDocs).toBe(1)
|
|
expect((allCompletedJobs.docs[0]?.input as any).message).toBe('from single task 2')
|
|
})
|
|
|
|
it('can run sub-tasks', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'subTask',
|
|
input: {
|
|
message: 'hello!',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(2)
|
|
expect(allSimples.docs[0]?.title).toBe('hello!')
|
|
expect(allSimples.docs[1]?.title).toBe('hello!')
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
expect(jobAfterRun?.log?.[0]?.taskID).toBe('create doc 1')
|
|
//expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs')
|
|
// jobAfterRun.log[0].parent should not exist
|
|
expect(jobAfterRun?.log?.[0]?.parent).toBeUndefined()
|
|
|
|
expect(jobAfterRun?.log?.[1]?.taskID).toBe('create doc 2')
|
|
//expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs')
|
|
expect(jobAfterRun?.log?.[1]?.parent).toBeUndefined()
|
|
|
|
expect(jobAfterRun?.log?.[2]?.taskID).toBe('create two docs')
|
|
})
|
|
|
|
it('ensure successful sub-tasks are not retried', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'subTaskFails',
|
|
input: {
|
|
message: 'hello!',
|
|
},
|
|
})
|
|
|
|
let hasJobsRemaining = true
|
|
|
|
while (hasJobsRemaining) {
|
|
const response = await payload.jobs.run({ silent: true })
|
|
|
|
if (response.noJobsRemaining) {
|
|
hasJobsRemaining = false
|
|
}
|
|
}
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// @ts-expect-error
|
|
expect(jobAfterRun.input.amountTask2Retried).toBe(3)
|
|
// @ts-expect-error
|
|
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
|
|
})
|
|
|
|
it('ensure jobs can be cancelled using payload.jobs.cancelByID', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'longRunning',
|
|
input: {},
|
|
})
|
|
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
|
|
await new Promise((resolve) => setTimeout(resolve, 1000))
|
|
|
|
// Should be in processing - cancel job
|
|
await payload.jobs.cancelByID({
|
|
id: job.id,
|
|
})
|
|
|
|
// Wait 4 seconds. This ensures that the job has enough time to finish
|
|
// if it hadn't been cancelled. That way we can be sure that the job was
|
|
// actually cancelled.
|
|
await new Promise((resolve) => setTimeout(resolve, 4000))
|
|
|
|
// Ensure job is not completed and cancelled
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
depth: 0,
|
|
})
|
|
|
|
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
|
|
expect(jobAfterRun.hasError).toBe(true)
|
|
// @ts-expect-error error is not typed
|
|
expect(jobAfterRun.error?.cancelled).toBe(true)
|
|
expect(jobAfterRun.processing).toBe(false)
|
|
})
|
|
|
|
it('ensure jobs can be cancelled using payload.jobs.cancel', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'longRunning',
|
|
input: {},
|
|
})
|
|
void payload.jobs.run({ silent: true }).catch((_ignored) => {})
|
|
await new Promise((resolve) => setTimeout(resolve, 1000))
|
|
|
|
// Cancel all jobs
|
|
await payload.jobs.cancel({
|
|
where: {
|
|
id: {
|
|
exists: true,
|
|
},
|
|
},
|
|
})
|
|
|
|
// Wait 4 seconds. This ensures that the job has enough time to finish
|
|
// if it hadn't been cancelled. That way we can be sure that the job was
|
|
// actually cancelled.
|
|
await new Promise((resolve) => setTimeout(resolve, 4000))
|
|
|
|
// Ensure job is not completed and cancelled
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
depth: 0,
|
|
})
|
|
|
|
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
|
|
expect(jobAfterRun.hasError).toBe(true)
|
|
// @ts-expect-error error is not typed
|
|
expect(jobAfterRun.error?.cancelled).toBe(true)
|
|
expect(jobAfterRun.processing).toBe(false)
|
|
})
|
|
|
|
it('can tasks throw error', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
task: 'ThrowError',
|
|
input: {},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
expect(jobAfterRun.hasError).toBe(true)
|
|
expect(jobAfterRun.log?.length).toBe(1)
|
|
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('failed')
|
|
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
|
|
})
|
|
|
|
it('can tasks return error', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
task: 'ReturnError',
|
|
input: {},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
expect(jobAfterRun.hasError).toBe(true)
|
|
expect(jobAfterRun.log?.length).toBe(1)
|
|
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state')
|
|
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
|
|
})
|
|
|
|
it('can tasks return error with custom error message', async () => {
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
task: 'ReturnCustomError',
|
|
input: {
|
|
errorMessage: 'custom error message',
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
expect(jobAfterRun.hasError).toBe(true)
|
|
expect(jobAfterRun.log?.length).toBe(1)
|
|
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message')
|
|
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
|
|
})
|
|
|
|
it('can reliably run workflows with parallel tasks', async () => {
|
|
// eslint-disable-next-line jest/no-conditional-in-test
|
|
if (process.env.PAYLOAD_DATABASE === 'supabase') {
|
|
// TODO: This test is flaky on supabase in CI, so we skip it for now
|
|
return
|
|
}
|
|
const amount = 50
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'parallelTask',
|
|
input: {
|
|
amount,
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: true })
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore.
|
|
expect(Boolean(jobAfterRun.error)).toBe(false)
|
|
expect(jobAfterRun.hasError).toBe(false)
|
|
expect(jobAfterRun.log?.length).toBe(amount)
|
|
|
|
const simpleDocs = await payload.find({
|
|
collection: 'simple',
|
|
limit: amount,
|
|
depth: 0,
|
|
})
|
|
expect(simpleDocs.docs).toHaveLength(amount)
|
|
|
|
// Ensure all docs are created (= all tasks are run once)
|
|
for (let i = 1; i <= simpleDocs.docs.length; i++) {
|
|
const simpleDoc = simpleDocs.docs.find((doc) => doc.title === `parallel task ${i}`)
|
|
const logEntry = jobAfterRun?.log?.find((log) => log.taskID === `parallel task ${i}`)
|
|
expect(simpleDoc).toBeDefined()
|
|
expect(logEntry).toBeDefined()
|
|
expect((logEntry?.output as any)?.simpleID).toBe(simpleDoc?.id)
|
|
}
|
|
})
|
|
|
|
it('can reliably run workflows with parallel tasks that complete immediately', async () => {
|
|
const amount = 20
|
|
payload.config.jobs.deleteJobOnComplete = false
|
|
|
|
const job = await payload.jobs.queue({
|
|
workflow: 'fastParallelTask',
|
|
input: {
|
|
amount,
|
|
},
|
|
})
|
|
|
|
await payload.jobs.run({ silent: false })
|
|
|
|
const jobAfterRun = await payload.findByID({
|
|
collection: 'payload-jobs',
|
|
id: job.id,
|
|
})
|
|
|
|
// error can be defined while hasError is true, as hasError: true is only set if the job cannot retry anymore.
|
|
expect(Boolean(jobAfterRun.error)).toBe(false)
|
|
expect(jobAfterRun.hasError).toBe(false)
|
|
expect(jobAfterRun.log?.length).toBe(amount)
|
|
})
|
|
|
|
it('can create and autorun jobs', async () => {
|
|
await payload.jobs.queue({
|
|
workflow: 'inlineTaskTest',
|
|
queue: 'autorunSecond',
|
|
input: {
|
|
message: 'hello!',
|
|
},
|
|
})
|
|
|
|
await waitUntilAutorunIsDone({
|
|
payload,
|
|
queue: 'autorunSecond',
|
|
})
|
|
|
|
const allSimples = await payload.find({
|
|
collection: 'simple',
|
|
limit: 100,
|
|
})
|
|
|
|
expect(allSimples.totalDocs).toBe(1)
|
|
expect(allSimples?.docs?.[0]?.title).toBe('hello!')
|
|
})
|
|
})
|