perf: use direct db calls in job-queue system (#11489)
Previously, our job queue system relied on `payload.*` operations, which ran very frequently: - whenever job execution starts, as all jobs need to be set to `processing: true` - every single time a task completes or fails, as the job log needs to be updated - whenever job execution stops, to mark it as completed and to delete it (if `deleteJobOnComplete` is set) This PR replaces these with direct `payload.db.*` calls, which are significantly faster than payload operations. Given how often the job queue system communicates with the database, this should be a massive performance improvement. ## How it affects running hooks To generate the task status, we previously used an `afterRead` hook. Since direct db adapter calls no longer execute hooks, this PR introduces new `updateJob` and `updateJobs` helpers to handle task status generation outside the normal payload hook lifecycle. Additionally, a new `runHooks` property has been added to the global job configuration. While setting this to `true` can be useful if custom hooks were added to the `payload-jobs` collection config, this will revert the job system to use normal payload operations. This should be avoided as it degrades performance. In most cases, the `onSuccess` or `onFail` properties in the job config will be sufficient and much faster. Furthermore, if the `depth` property is set in the global job configuration, the job queue system will also fall back to the slower, normal payload operations. --------- Co-authored-by: Dan Ribbens <DanRibbens@users.noreply.github.com>
This commit is contained in:
@@ -286,6 +286,20 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise<SanitizedC
|
||||
defaultJobsCollection = configWithDefaults.jobs.jobsCollectionOverrides({
|
||||
defaultJobsCollection,
|
||||
})
|
||||
|
||||
const hooks = defaultJobsCollection?.hooks
|
||||
// @todo - delete this check in 4.0
|
||||
if (hooks && config?.jobs?.runHooks !== true) {
|
||||
for (const hook of Object.keys(hooks)) {
|
||||
const defaultAmount = hook === 'afterRead' || hook === 'beforeChange' ? 1 : 0
|
||||
if (hooks[hook]?.length > defaultAmount) {
|
||||
console.warn(
|
||||
`The jobsCollectionOverrides function is returning a collection with an additional ${hook} hook defined. These hooks will not run unless the jobs.runHooks option is set to true. Setting this option to true will negatively impact performance.`,
|
||||
)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
const sanitizedJobsCollection = await sanitizeCollection(
|
||||
config as unknown as Config,
|
||||
|
||||
@@ -1390,7 +1390,9 @@ export type {
|
||||
PreferenceUpdateRequest,
|
||||
TabsPreferences,
|
||||
} from './preferences/types.js'
|
||||
export { jobAfterRead } from './queues/config/index.js'
|
||||
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'
|
||||
|
||||
export type {
|
||||
RunInlineTaskFunction,
|
||||
RunTaskFunction,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { CollectionConfig } from '../../collections/config/types.js'
|
||||
import type { Config } from '../../config/types.js'
|
||||
import type { Config, SanitizedConfig } from '../../config/types.js'
|
||||
import type { Field } from '../../fields/config/types.js'
|
||||
import type { BaseJob } from './types/workflowTypes.js'
|
||||
|
||||
import { runJobsEndpoint } from '../restEndpointRun.js'
|
||||
import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js'
|
||||
@@ -211,12 +212,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
|
||||
({ doc, req }) => {
|
||||
// This hook is used to add the virtual `tasks` field to the document, that is computed from the `log` field
|
||||
|
||||
doc.taskStatus = getJobTaskStatus({
|
||||
jobLog: doc.log,
|
||||
tasksConfig: req.payload.config.jobs.tasks,
|
||||
})
|
||||
|
||||
return doc
|
||||
return jobAfterRead({ config: req.payload.config, doc })
|
||||
},
|
||||
],
|
||||
/**
|
||||
@@ -240,3 +236,11 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
|
||||
|
||||
return jobsCollection
|
||||
}
|
||||
|
||||
export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: BaseJob }): BaseJob {
|
||||
doc.taskStatus = getJobTaskStatus({
|
||||
jobLog: doc.log || [],
|
||||
tasksConfig: config.jobs.tasks,
|
||||
})
|
||||
return doc
|
||||
}
|
||||
|
||||
@@ -68,7 +68,11 @@ export type JobsConfig = {
|
||||
/**
|
||||
* Specify depth for retrieving jobs from the queue.
|
||||
* This should be as low as possible in order for job retrieval
|
||||
* to be as efficient as possible. Defaults to 0.
|
||||
* to be as efficient as possible. Setting it to anything higher than
|
||||
* 0 will drastically affect performance, as less efficient database
|
||||
* queries will be used.
|
||||
*
|
||||
* @default 0
|
||||
*/
|
||||
depth?: number
|
||||
/**
|
||||
@@ -76,6 +80,15 @@ export type JobsConfig = {
|
||||
* a new collection.
|
||||
*/
|
||||
jobsCollectionOverrides?: (args: { defaultJobsCollection: CollectionConfig }) => CollectionConfig
|
||||
/**
|
||||
* By default, the job system uses direct database calls for optimal performance.
|
||||
* If you added custom hooks to your jobs collection, you can set this to true to
|
||||
* use the standard Payload API for all job operations. This is discouraged, as it will
|
||||
* drastically affect performance.
|
||||
*
|
||||
* @default false
|
||||
*/
|
||||
runHooks?: boolean
|
||||
/**
|
||||
* A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function.
|
||||
* If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run.
|
||||
|
||||
@@ -8,8 +8,9 @@ import {
|
||||
type TypedJobs,
|
||||
type Where,
|
||||
} from '../index.js'
|
||||
import { jobsCollectionSlug } from './config/index.js'
|
||||
import { jobAfterRead, jobsCollectionSlug } from './config/index.js'
|
||||
import { runJobs } from './operations/runJobs/index.js'
|
||||
import { updateJob, updateJobs } from './utilities/updateJob.js'
|
||||
|
||||
export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
queue: async <
|
||||
@@ -72,13 +73,27 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
data.taskSlug = args.task as string
|
||||
}
|
||||
|
||||
return (await payload.create({
|
||||
collection: jobsCollectionSlug,
|
||||
data,
|
||||
req: args.req,
|
||||
})) as TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
|
||||
type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
|
||||
? RunningJob<TTaskOrWorkflowSlug>
|
||||
: RunningJobFromTask<TTaskOrWorkflowSlug> // Type assertion is still needed here
|
||||
|
||||
if (payload?.config?.jobs?.depth || payload?.config?.jobs?.runHooks) {
|
||||
return (await payload.create({
|
||||
collection: jobsCollectionSlug,
|
||||
data,
|
||||
depth: payload.config.jobs.depth ?? 0,
|
||||
req: args.req,
|
||||
})) as ReturnType
|
||||
} else {
|
||||
return jobAfterRead({
|
||||
config: payload.config,
|
||||
doc: await payload.db.create({
|
||||
collection: jobsCollectionSlug,
|
||||
data,
|
||||
req: args.req,
|
||||
}),
|
||||
}) as unknown as ReturnType
|
||||
}
|
||||
},
|
||||
|
||||
run: async (args?: {
|
||||
@@ -143,37 +158,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
})
|
||||
}
|
||||
|
||||
await payload.db.updateMany({
|
||||
collection: jobsCollectionSlug,
|
||||
data: {
|
||||
completedAt: null,
|
||||
error: {
|
||||
cancelled: true,
|
||||
},
|
||||
hasError: true,
|
||||
processing: false,
|
||||
waitUntil: null,
|
||||
} as Partial<
|
||||
{
|
||||
completedAt: null
|
||||
waitUntil: null
|
||||
} & BaseJob
|
||||
>,
|
||||
req: newReq,
|
||||
where: { and },
|
||||
})
|
||||
},
|
||||
|
||||
cancelByID: async (args: {
|
||||
id: number | string
|
||||
overrideAccess?: boolean
|
||||
req?: PayloadRequest
|
||||
}): Promise<void> => {
|
||||
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
|
||||
|
||||
await payload.db.updateOne({
|
||||
id: args.id,
|
||||
collection: jobsCollectionSlug,
|
||||
await updateJobs({
|
||||
data: {
|
||||
completedAt: null,
|
||||
error: {
|
||||
@@ -186,7 +171,39 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
completedAt: null
|
||||
waitUntil: null
|
||||
} & BaseJob,
|
||||
depth: 0, // No depth, since we're not returning
|
||||
disableTransaction: true,
|
||||
req: newReq,
|
||||
returning: false,
|
||||
where: { and },
|
||||
})
|
||||
},
|
||||
|
||||
cancelByID: async (args: {
|
||||
id: number | string
|
||||
overrideAccess?: boolean
|
||||
req?: PayloadRequest
|
||||
}): Promise<void> => {
|
||||
const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload))
|
||||
|
||||
await updateJob({
|
||||
id: args.id,
|
||||
data: {
|
||||
completedAt: null,
|
||||
error: {
|
||||
cancelled: true,
|
||||
},
|
||||
hasError: true,
|
||||
processing: false,
|
||||
waitUntil: null,
|
||||
} as {
|
||||
completedAt: null
|
||||
waitUntil: null
|
||||
} & BaseJob,
|
||||
depth: 0, // No depth, since we're not returning
|
||||
disableTransaction: true,
|
||||
req: newReq,
|
||||
returning: false,
|
||||
})
|
||||
},
|
||||
})
|
||||
|
||||
@@ -13,6 +13,7 @@ import type { RunJobResult } from './runJob/index.js'
|
||||
import { Forbidden } from '../../../errors/Forbidden.js'
|
||||
import isolateObjectProperty from '../../../utilities/isolateObjectProperty.js'
|
||||
import { jobsCollectionSlug } from '../../config/index.js'
|
||||
import { updateJob, updateJobs } from '../../utilities/updateJob.js'
|
||||
import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js'
|
||||
import { importHandlerPath } from './runJob/importHandlerPath.js'
|
||||
import { runJob } from './runJob/index.js'
|
||||
@@ -106,40 +107,45 @@ export const runJobs = async ({
|
||||
// the same job being picked up by another worker
|
||||
const jobsQuery: {
|
||||
docs: BaseJob[]
|
||||
} = id
|
||||
? {
|
||||
docs: [
|
||||
(await req.payload.update({
|
||||
id,
|
||||
collection: jobsCollectionSlug,
|
||||
data: {
|
||||
processing: true,
|
||||
seenByWorker: true,
|
||||
},
|
||||
depth: req.payload.config.jobs.depth,
|
||||
disableTransaction: true,
|
||||
showHiddenFields: true,
|
||||
})) as BaseJob,
|
||||
],
|
||||
}
|
||||
: ((await req.payload.update({
|
||||
collection: jobsCollectionSlug,
|
||||
} = { docs: [] }
|
||||
|
||||
if (id) {
|
||||
// Only one job to run
|
||||
jobsQuery.docs = [
|
||||
await updateJob({
|
||||
id,
|
||||
data: {
|
||||
processing: true,
|
||||
seenByWorker: true,
|
||||
},
|
||||
depth: req.payload.config.jobs.depth,
|
||||
disableTransaction: true,
|
||||
limit,
|
||||
showHiddenFields: true,
|
||||
where,
|
||||
})) as unknown as PaginatedDocs<BaseJob>)
|
||||
req,
|
||||
returning: true,
|
||||
}),
|
||||
]
|
||||
} else {
|
||||
const updatedDocs = await updateJobs({
|
||||
data: {
|
||||
processing: true,
|
||||
},
|
||||
depth: req.payload.config.jobs.depth,
|
||||
disableTransaction: true,
|
||||
limit,
|
||||
req,
|
||||
returning: true,
|
||||
where,
|
||||
})
|
||||
|
||||
if (updatedDocs) {
|
||||
jobsQuery.docs = updatedDocs
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
|
||||
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
|
||||
*/
|
||||
const { newJobs } = jobsQuery.docs.reduce(
|
||||
const { existingJobs, newJobs } = jobsQuery.docs.reduce(
|
||||
(acc, job) => {
|
||||
if (job.totalTried > 0) {
|
||||
acc.existingJobs.push(job)
|
||||
@@ -159,7 +165,11 @@ export const runJobs = async ({
|
||||
}
|
||||
|
||||
if (jobsQuery?.docs?.length) {
|
||||
req.payload.logger.info(`Running ${jobsQuery.docs.length} jobs.`)
|
||||
req.payload.logger.info({
|
||||
msg: `Running ${jobsQuery.docs.length} jobs.`,
|
||||
new: newJobs?.length,
|
||||
retrying: existingJobs?.length,
|
||||
})
|
||||
}
|
||||
const jobsToDelete: (number | string)[] | undefined = req.payload.config.jobs.deleteJobOnComplete
|
||||
? []
|
||||
@@ -253,11 +263,19 @@ export const runJobs = async ({
|
||||
|
||||
if (jobsToDelete && jobsToDelete.length > 0) {
|
||||
try {
|
||||
await req.payload.delete({
|
||||
collection: jobsCollectionSlug,
|
||||
req,
|
||||
where: { id: { in: jobsToDelete } },
|
||||
})
|
||||
if (req.payload.config.jobs.runHooks) {
|
||||
await req.payload.delete({
|
||||
collection: jobsCollectionSlug,
|
||||
depth: 0, // can be 0 since we're not returning anything
|
||||
disableTransaction: true,
|
||||
where: { id: { in: jobsToDelete } },
|
||||
})
|
||||
} else {
|
||||
await req.payload.db.deleteMany({
|
||||
collection: jobsCollectionSlug,
|
||||
where: { id: { in: jobsToDelete } },
|
||||
})
|
||||
}
|
||||
} catch (err) {
|
||||
req.payload.logger.error({
|
||||
err,
|
||||
|
||||
@@ -2,27 +2,29 @@
|
||||
import type { PayloadRequest } from '../../../../types/index.js'
|
||||
import type { BaseJob } from '../../../config/types/workflowTypes.js'
|
||||
|
||||
import { jobsCollectionSlug } from '../../../config/index.js'
|
||||
import { updateJob } from '../../../utilities/updateJob.js'
|
||||
|
||||
export type UpdateJobFunction = (jobData: Partial<BaseJob>) => Promise<BaseJob>
|
||||
|
||||
export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction {
|
||||
return async (jobData) => {
|
||||
const updatedJob = (await req.payload.update({
|
||||
const updatedJob = await updateJob({
|
||||
id: job.id,
|
||||
collection: jobsCollectionSlug,
|
||||
data: jobData,
|
||||
depth: 0,
|
||||
depth: req.payload.config.jobs.depth,
|
||||
disableTransaction: true,
|
||||
})) as BaseJob
|
||||
req,
|
||||
})
|
||||
|
||||
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
|
||||
for (const key in updatedJob) {
|
||||
job[key] = updatedJob[key]
|
||||
}
|
||||
|
||||
if ((updatedJob.error as any)?.cancelled) {
|
||||
throw new Error('Job cancelled')
|
||||
if ((updatedJob.error as Record<string, unknown>)?.cancelled) {
|
||||
const cancelledError = new Error('Job cancelled') as { cancelled: boolean } & Error
|
||||
cancelledError.cancelled = true
|
||||
throw cancelledError
|
||||
}
|
||||
|
||||
return updatedJob
|
||||
|
||||
@@ -26,7 +26,7 @@ export function handleWorkflowError({
|
||||
} {
|
||||
const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}`
|
||||
|
||||
let hasFinalError = state.reachedMaxRetries // If any TASK reached max retries, the job has an error
|
||||
let hasFinalError = state.reachedMaxRetries || !!('cancelled' in error && error.cancelled) // If any TASK reached max retries, the job has an error
|
||||
const maxWorkflowRetries: number =
|
||||
(typeof workflowConfig.retries === 'object'
|
||||
? workflowConfig.retries.attempts
|
||||
|
||||
@@ -60,6 +60,7 @@ export const runJob = async ({
|
||||
const errorJSON = hasFinalError
|
||||
? {
|
||||
name: err.name,
|
||||
cancelled: Boolean('cancelled' in err && err.cancelled),
|
||||
message: err.message,
|
||||
stack: err.stack,
|
||||
}
|
||||
|
||||
28
packages/payload/src/queues/utilities/sanitizeUpdateData.ts
Normal file
28
packages/payload/src/queues/utilities/sanitizeUpdateData.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import ObjectIdImport from 'bson-objectid'
|
||||
|
||||
import type { BaseJob } from '../config/types/workflowTypes.js'
|
||||
|
||||
const ObjectId = (ObjectIdImport.default ||
|
||||
ObjectIdImport) as unknown as typeof ObjectIdImport.default
|
||||
|
||||
/**
|
||||
* Our payload operations sanitize the input data to, for example, add missing IDs to array rows.
|
||||
* This function is used to manually sanitize the data for direct db adapter operations
|
||||
*/
|
||||
export function sanitizeUpdateData({ data }: { data: Partial<BaseJob> }): Partial<BaseJob> {
|
||||
if (data.log) {
|
||||
const sanitizedData = { ...data }
|
||||
sanitizedData.log = sanitizedData?.log?.map((log) => {
|
||||
if (log.id) {
|
||||
return log
|
||||
}
|
||||
return {
|
||||
...log,
|
||||
id: new ObjectId().toHexString(),
|
||||
}
|
||||
})
|
||||
return sanitizedData
|
||||
}
|
||||
|
||||
return data
|
||||
}
|
||||
115
packages/payload/src/queues/utilities/updateJob.ts
Normal file
115
packages/payload/src/queues/utilities/updateJob.ts
Normal file
@@ -0,0 +1,115 @@
|
||||
import type { ManyOptions } from '../../collections/operations/local/update.js'
|
||||
import type { PayloadRequest, Where } from '../../types/index.js'
|
||||
import type { BaseJob } from '../config/types/workflowTypes.js'
|
||||
|
||||
import { jobAfterRead, jobsCollectionSlug } from '../config/index.js'
|
||||
import { sanitizeUpdateData } from './sanitizeUpdateData.js'
|
||||
|
||||
type BaseArgs = {
|
||||
data: Partial<BaseJob>
|
||||
depth?: number
|
||||
disableTransaction?: boolean
|
||||
limit?: number
|
||||
req: PayloadRequest
|
||||
returning?: boolean
|
||||
}
|
||||
|
||||
type ArgsByID = {
|
||||
id: number | string
|
||||
limit?: never
|
||||
where?: never
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience method for updateJobs by id
|
||||
*/
|
||||
export async function updateJob(args: ArgsByID & BaseArgs) {
|
||||
const result = await updateJobs({
|
||||
...args,
|
||||
id: undefined,
|
||||
limit: 1,
|
||||
where: { id: { equals: args.id } },
|
||||
})
|
||||
if (result) {
|
||||
return result[0]
|
||||
}
|
||||
}
|
||||
|
||||
type ArgsWhere = {
|
||||
id?: never | undefined
|
||||
limit?: number
|
||||
where: Where
|
||||
}
|
||||
|
||||
type RunJobsArgs = (ArgsByID | ArgsWhere) & BaseArgs
|
||||
|
||||
export async function updateJobs({
|
||||
id,
|
||||
data,
|
||||
depth,
|
||||
disableTransaction,
|
||||
limit: limitArg,
|
||||
req,
|
||||
returning,
|
||||
where,
|
||||
}: RunJobsArgs): Promise<BaseJob[] | null> {
|
||||
const limit = id ? 1 : limitArg
|
||||
if (depth || req.payload.config?.jobs?.runHooks) {
|
||||
const result = await req.payload.update({
|
||||
id,
|
||||
collection: jobsCollectionSlug,
|
||||
data,
|
||||
depth,
|
||||
disableTransaction,
|
||||
limit,
|
||||
req,
|
||||
where,
|
||||
} as ManyOptions<any, any>)
|
||||
if (returning === false || !result) {
|
||||
return null
|
||||
}
|
||||
return result.docs as BaseJob[]
|
||||
}
|
||||
|
||||
const updatedJobs = []
|
||||
|
||||
// TODO: this can be optimized in the future - partial updates are supported in mongodb. In postgres,
|
||||
// we can support this by manually constructing the sql query. We should use req.payload.db.updateMany instead
|
||||
// of req.payload.db.updateOne once this is supported
|
||||
const jobsToUpdate = await req.payload.db.find({
|
||||
collection: jobsCollectionSlug,
|
||||
limit,
|
||||
pagination: false,
|
||||
req: disableTransaction === true ? undefined : req,
|
||||
where,
|
||||
})
|
||||
if (!jobsToUpdate?.docs) {
|
||||
return null
|
||||
}
|
||||
|
||||
for (const job of jobsToUpdate.docs) {
|
||||
const updateData = {
|
||||
...job,
|
||||
...data,
|
||||
}
|
||||
const updatedJob = await req.payload.db.updateOne({
|
||||
id: job.id,
|
||||
collection: jobsCollectionSlug,
|
||||
data: sanitizeUpdateData({ data: updateData }),
|
||||
req: disableTransaction === true ? undefined : req,
|
||||
returning,
|
||||
})
|
||||
updatedJobs.push(updatedJob)
|
||||
}
|
||||
|
||||
if (returning === false || !updatedJobs?.length) {
|
||||
return null
|
||||
}
|
||||
|
||||
return updatedJobs.map((updatedJob) => {
|
||||
return jobAfterRead({
|
||||
config: req.payload.config,
|
||||
doc: updatedJob,
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -10,7 +10,7 @@ export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = {
|
||||
},
|
||||
],
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -23,6 +23,8 @@ export const noRetriesSetWorkflow: WorkflowConfig<'workflowNoRetriesSet'> = {
|
||||
id: job.id,
|
||||
})
|
||||
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimple('1', {
|
||||
input: {
|
||||
message: job.input.message,
|
||||
|
||||
@@ -11,7 +11,7 @@ export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = {
|
||||
],
|
||||
retries: 0,
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -23,6 +23,7 @@ export const retries0Workflow: WorkflowConfig<'workflowRetries0'> = {
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimple('1', {
|
||||
input: {
|
||||
|
||||
@@ -45,13 +45,14 @@ export const retriesBackoffTestWorkflow: WorkflowConfig<'retriesBackoffTest'> =
|
||||
// @ts-expect-error timeTried is new arbitrary data and not in the type
|
||||
job.input.timeTried[totalTried] = new Date().toISOString()
|
||||
|
||||
await req.payload.update({
|
||||
const updated = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: job.input,
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updated.input as any
|
||||
|
||||
if (totalTried < 4) {
|
||||
// Cleanup the post
|
||||
|
||||
@@ -10,7 +10,7 @@ export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = {
|
||||
},
|
||||
],
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -22,6 +22,7 @@ export const retriesTestWorkflow: WorkflowConfig<'retriesTest'> = {
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimple('1', {
|
||||
input: {
|
||||
|
||||
@@ -11,7 +11,7 @@ export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLe
|
||||
],
|
||||
retries: 2, // Even though CreateSimple has 3 retries, this workflow only has 2. Thus, it will only retry once
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -23,6 +23,7 @@ export const retriesWorkflowLevelTestWorkflow: WorkflowConfig<'retriesWorkflowLe
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimple('1', {
|
||||
input: {
|
||||
|
||||
@@ -23,7 +23,7 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = {
|
||||
},
|
||||
})
|
||||
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -38,6 +38,8 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = {
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
return {
|
||||
output: {
|
||||
newSimple,
|
||||
@@ -48,7 +50,7 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = {
|
||||
|
||||
await inlineTask('create doc 2 - fails', {
|
||||
task: async ({ req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -63,6 +65,8 @@ export const subTaskFailsWorkflow: WorkflowConfig<'subTaskFails'> = {
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
throw new Error('Failed on purpose')
|
||||
},
|
||||
})
|
||||
|
||||
@@ -11,7 +11,7 @@ export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowA
|
||||
},
|
||||
],
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -23,6 +23,7 @@ export const workflowAndTasksRetriesUndefinedWorkflow: WorkflowConfig<'workflowA
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimpleRetriesUndefined('1', {
|
||||
input: {
|
||||
|
||||
@@ -12,7 +12,7 @@ export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetr
|
||||
},
|
||||
],
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -24,6 +24,7 @@ export const workflowRetries2TasksRetries0Workflow: WorkflowConfig<'workflowRetr
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimpleRetries0('1', {
|
||||
input: {
|
||||
|
||||
@@ -12,7 +12,7 @@ export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'work
|
||||
},
|
||||
],
|
||||
handler: async ({ job, tasks, req }) => {
|
||||
await req.payload.update({
|
||||
const updatedJob = await req.payload.update({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: {
|
||||
@@ -24,6 +24,7 @@ export const workflowRetries2TasksRetriesUndefinedWorkflow: WorkflowConfig<'work
|
||||
},
|
||||
id: job.id,
|
||||
})
|
||||
job.input = updatedJob.input as any
|
||||
|
||||
await tasks.CreateSimpleRetriesUndefined('1', {
|
||||
input: {
|
||||
|
||||
Reference in New Issue
Block a user