feat: ability to cancel jobs (#11409)

This adds new `payload.jobs.cancel` and `payload.jobs.cancelByID` methods that allow you to cancel already-running jobs, or prevent queued jobs from running.

While it's not possible to cancel a function mid-execution, this will stop job execution the next time the job makes a request to the db, which happens after every task.
This commit is contained in:
Alessio Gravili
2025-02-28 10:58:43 -07:00
committed by GitHub
parent 96d1d90e78
commit 38131ed2c3
9 changed files with 256 additions and 16 deletions

View File

@@ -44,3 +44,31 @@ const createdJob = await payload.jobs.queue({
},
})
```
#### Cancelling Jobs
Payload allows you to cancel jobs that are either queued or currently running. When cancelling a running job, the current task will finish executing, but no subsequent tasks will run. This happens because the job checks its cancellation status between tasks.
##### Cancel a Single Job
To cancel a specific job, use the `payload.jobs.cancelByID` method with the job's ID:
```ts
await payload.jobs.cancelByID({
id: createdJob.id,
})
```
##### Cancel Multiple Jobs
To cancel multiple jobs at once, use the `payload.jobs.cancel` method with a `Where` query:
```ts
await payload.jobs.cancel({
where: {
workflowSlug: {
equals: 'createPost',
},
},
})
```

View File

@@ -221,6 +221,21 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
return doc
},
],
/**
* If another update comes in after a job as already been cancelled, we need to make sure that update doesn't
* change the state of the job.
*/
beforeChange: [
({ data, originalDoc }) => {
if (originalDoc?.error?.cancelled) {
data.processing = false
data.hasError = true
delete data.completedAt
delete data.waitUntil
}
return data
},
],
},
lockDocuments: false,
}

View File

@@ -1,4 +1,3 @@
// @ts-strict-ignore
import type { BaseJob, RunningJobFromTask } from './config/types/workflowTypes.js'
import {
@@ -41,7 +40,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
? RunningJob<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>
> => {
let queue: string
let queue: string | undefined = undefined
// If user specifies queue, use that
if (args.queue) {
@@ -55,15 +54,26 @@ export const getJobsLocalAPI = (payload: Payload) => ({
}
}
const data: Partial<BaseJob> = {
input: args.input,
}
if (queue) {
data.queue = queue
}
if (args.waitUntil) {
data.waitUntil = args.waitUntil?.toISOString()
}
if (args.workflow) {
data.workflowSlug = args.workflow as string
}
if (args.task) {
data.taskSlug = args.task as string
}
return (await payload.create({
collection: 'payload-jobs',
data: {
input: args.input,
queue,
taskSlug: 'task' in args ? args.task : undefined,
waitUntil: args.waitUntil?.toISOString() ?? undefined,
workflowSlug: 'workflow' in args ? args.workflow : undefined,
} as BaseJob,
data,
req: args.req,
})) as TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
@@ -78,14 +88,14 @@ export const getJobsLocalAPI = (payload: Payload) => ({
where?: Where
}): Promise<ReturnType<typeof runJobs>> => {
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
const result = await runJobs({
return await runJobs({
limit: args?.limit,
overrideAccess: args?.overrideAccess !== false,
queue: args?.queue,
req: newReq,
where: args?.where,
})
return result
},
runByID: async (args: {
@@ -93,12 +103,89 @@ export const getJobsLocalAPI = (payload: Payload) => ({
overrideAccess?: boolean
req?: PayloadRequest
}): Promise<ReturnType<typeof runJobs>> => {
const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload))
const result = await runJobs({
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
return await runJobs({
id: args.id,
overrideAccess: args?.overrideAccess !== false,
overrideAccess: args.overrideAccess !== false,
req: newReq,
})
},
cancel: async (args: {
overrideAccess?: boolean
queue?: string
req?: PayloadRequest
where: Where
}): Promise<void> => {
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
const and: Where[] = [
args.where,
{
completedAt: {
exists: false,
},
},
{
hasError: {
not_equals: true,
},
},
]
if (args.queue) {
and.push({
queue: {
equals: args.queue,
},
})
}
await payload.db.updateMany({
collection: 'payload-jobs',
data: {
completedAt: null,
error: {
cancelled: true,
},
hasError: true,
processing: false,
waitUntil: null,
} as Partial<
{
completedAt: null
waitUntil: null
} & BaseJob
>,
req: newReq,
where: { and },
})
},
cancelByID: async (args: {
id: number | string
overrideAccess?: boolean
req?: PayloadRequest
}): Promise<void> => {
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
await payload.db.updateOne({
id: args.id,
collection: 'payload-jobs',
data: {
completedAt: null,
error: {
cancelled: true,
},
hasError: true,
processing: false,
waitUntil: null,
} as {
completedAt: null
waitUntil: null
} & BaseJob,
req: newReq,
})
return result
},
})

View File

@@ -264,7 +264,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
processing: false,
})
return
throw new Error(errorMessage)
}
let maxRetries: number | undefined = finalRetriesConfig?.attempts

View File

@@ -19,6 +19,10 @@ export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJ
job[key] = updatedJob[key]
}
if ((updatedJob.error as any)?.cancelled) {
throw new Error('Job cancelled')
}
return updatedJob
}
}

View File

@@ -10,6 +10,7 @@ import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js'
import { clearAndSeedEverything } from './seed.js'
import { externalWorkflow } from './workflows/externalWorkflow.js'
import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js'
import { longRunningWorkflow } from './workflows/longRunning.js'
import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js'
import { retries0Workflow } from './workflows/retries0.js'
import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js'
@@ -378,6 +379,7 @@ export default buildConfigWithDefaults({
retriesBackoffTestWorkflow,
subTaskWorkflow,
subTaskFailsWorkflow,
longRunningWorkflow,
],
},
editor: lexicalEditor(),

View File

@@ -1129,6 +1129,79 @@ describe('Queues', () => {
expect(jobAfterRun.input.amountTask1Retried).toBe(0)
})
it('ensure jobs can be cancelled using payload.jobs.cancelByID', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run().catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Should be in processing - cancel job
await payload.jobs.cancelByID({
id: job.id,
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('ensure jobs can be cancelled using payload.jobs.cancel', async () => {
payload.config.jobs.deleteJobOnComplete = false
const job = await payload.jobs.queue({
workflow: 'longRunning',
input: {},
})
void payload.jobs.run().catch((_ignored) => {})
await new Promise((resolve) => setTimeout(resolve, 1000))
// Cancel all jobs
await payload.jobs.cancel({
where: {
id: {
exists: true,
},
},
})
// Wait 4 seconds. This ensures that the job has enough time to finish
// if it hadn't been cancelled. That way we can be sure that the job was
// actually cancelled.
await new Promise((resolve) => setTimeout(resolve, 4000))
// Ensure job is not completed and cancelled
const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
depth: 0,
})
expect(Boolean(jobAfterRun.completedAt)).toBe(false)
expect(jobAfterRun.hasError).toBe(true)
// @ts-expect-error error is not typed
expect(jobAfterRun.error?.cancelled).toBe(true)
expect(jobAfterRun.processing).toBe(false)
})
it('can tasks throw error', async () => {
payload.config.jobs.deleteJobOnComplete = false

View File

@@ -123,6 +123,7 @@ export interface Config {
retriesBackoffTest: WorkflowRetriesBackoffTest;
subTask: WorkflowSubTask;
subTaskFails: WorkflowSubTaskFails;
longRunning: WorkflowLongRunning;
};
};
}
@@ -308,6 +309,7 @@ export interface PayloadJob {
| 'retriesBackoffTest'
| 'subTask'
| 'subTaskFails'
| 'longRunning'
)
| null;
taskSlug?:
@@ -718,6 +720,13 @@ export interface WorkflowSubTaskFails {
message: string;
};
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "WorkflowLongRunning".
*/
export interface WorkflowLongRunning {
input?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "auth".

View File

@@ -0,0 +1,22 @@
import type { WorkflowConfig } from 'payload'
/**
* Should finish after 2 seconds
*/
export const longRunningWorkflow: WorkflowConfig<'longRunning'> = {
slug: 'longRunning',
inputSchema: [],
handler: async ({ inlineTask }) => {
for (let i = 0; i < 4; i += 1) {
await inlineTask(String(i), {
task: async () => {
// Wait 500ms
await new Promise((resolve) => setTimeout(resolve, 500))
return {
output: {},
}
},
})
}
},
}