From 84cb2b5819d5be44d6f54c37603ce9cedc55924b Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Mon, 16 Jun 2025 13:15:56 -0700 Subject: [PATCH] refactor: simplify job type (#12816) Previously, there were multiple ways to type a running job: - `GeneratedTypes['payload-jobs']` - only works in an installed project - is `any` in monorepo - `BaseJob` - works everywhere, but does not incorporate generated types which may include type for custom fields added to the jobs collection - `RunningJob<>` - more accurate version of `BaseJob`, but same problem This PR deprecated all those types in favor of a new `Job` type. Benefits: - Works in both monorepo and installed projects. If no generated types exist, it will automatically fall back to `BaseJob` - Comes with an optional generic that can be used to narrow down `job.input` based on the task / workflow slug. No need to use a separate type helper like `RunningJob<>` With this new type, I was able to replace every usage of `GeneratedTypes['payload-jobs']`, `BaseJob` and `RunningJob<>` with the simple `Job` type. Additionally, this PR simplifies some of the logic used to run jobs --- packages/db-mongodb/src/updateJobs.ts | 4 +- .../payload/src/database/defaultUpdateJobs.ts | 8 +- packages/payload/src/database/types.ts | 4 +- packages/payload/src/index.ts | 22 +++ packages/payload/src/queues/config/index.ts | 9 +- .../src/queues/config/types/taskTypes.ts | 14 +- .../queues/config/types/workflowJSONTypes.ts | 19 ++- .../src/queues/config/types/workflowTypes.ts | 65 +++++--- packages/payload/src/queues/localAPI.ts | 20 +-- .../src/queues/operations/runJobs/index.ts | 155 ++++++++---------- .../operations/runJobs/runJSONJob/index.ts | 23 +-- .../runJobs/runJob/getRunTaskFunction.ts | 85 ++++------ .../runJobs/runJob/getUpdateJobFunction.ts | 17 +- .../runJobs/runJob/handleWorkflowError.ts | 7 +- .../queues/operations/runJobs/runJob/index.ts | 17 +- .../src/queues/utilities/getJobTaskStatus.ts | 9 +- .../payload/src/queues/utilities/updateJob.ts | 10 +- test/queues/config.ts | 1 + test/queues/runners/updatePost.ts | 2 +- 19 files changed, 254 insertions(+), 237 deletions(-) diff --git a/packages/db-mongodb/src/updateJobs.ts b/packages/db-mongodb/src/updateJobs.ts index cac9c0dc2..8ea4ecd57 100644 --- a/packages/db-mongodb/src/updateJobs.ts +++ b/packages/db-mongodb/src/updateJobs.ts @@ -1,5 +1,5 @@ import type { MongooseUpdateQueryOptions } from 'mongoose' -import type { BaseJob, UpdateJobs, Where } from 'payload' +import type { Job, UpdateJobs, Where } from 'payload' import type { MongooseAdapter } from './index.js' @@ -47,7 +47,7 @@ export const updateJobs: UpdateJobs = async function updateMany( transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'write' }) - let result: BaseJob[] = [] + let result: Job[] = [] try { if (id) { diff --git a/packages/payload/src/database/defaultUpdateJobs.ts b/packages/payload/src/database/defaultUpdateJobs.ts index 135185f80..260851ada 100644 --- a/packages/payload/src/database/defaultUpdateJobs.ts +++ b/packages/payload/src/database/defaultUpdateJobs.ts @@ -1,4 +1,4 @@ -import type { BaseJob, DatabaseAdapter } from '../index.js' +import type { DatabaseAdapter, Job } from '../index.js' import type { UpdateJobs } from './types.js' import { jobsCollectionSlug } from '../queues/config/index.js' @@ -7,9 +7,9 @@ export const defaultUpdateJobs: UpdateJobs = async function updateMany( this: DatabaseAdapter, { id, data, limit, req, returning, where }, ) { - const updatedJobs: BaseJob[] | null = [] + const updatedJobs: Job[] | null = [] - const jobsToUpdate: BaseJob[] = ( + const jobsToUpdate: Job[] = ( id ? [ await this.findOne({ @@ -27,7 +27,7 @@ export const defaultUpdateJobs: UpdateJobs = async function updateMany( where, }) ).docs - ).filter(Boolean) as BaseJob[] + ).filter(Boolean) as Job[] if (!jobsToUpdate) { return null diff --git a/packages/payload/src/database/types.ts b/packages/payload/src/database/types.ts index 869837384..ca94f7699 100644 --- a/packages/payload/src/database/types.ts +++ b/packages/payload/src/database/types.ts @@ -1,5 +1,5 @@ import type { TypeWithID } from '../collections/config/types.js' -import type { BaseJob, CollectionSlug, GlobalSlug } from '../index.js' +import type { CollectionSlug, GlobalSlug, Job } from '../index.js' import type { Document, JoinQuery, @@ -566,7 +566,7 @@ export type UpdateJobsArgs = { } ) -export type UpdateJobs = (args: UpdateJobsArgs) => Promise +export type UpdateJobs = (args: UpdateJobsArgs) => Promise export type UpsertArgs = { collection: CollectionSlug diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index 2ff0571ca..cb39c0409 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -71,6 +71,7 @@ import type { SupportedLanguages } from '@payloadcms/translations' import { Cron } from 'croner' import type { ClientConfig } from './config/client.js' +import type { BaseJob } from './queues/config/types/workflowTypes.js' import type { TypeWithVersion } from './versions/types.js' import { decrypt, encrypt } from './auth/crypto.js' @@ -247,6 +248,27 @@ export type TypedAuthOperations = ResolveAuthOperationsType type ResolveJobOperationsType = 'jobs' extends keyof T ? T['jobs'] : T['jobsUntyped'] export type TypedJobs = ResolveJobOperationsType +type HasPayloadJobsType = 'collections' extends keyof GeneratedTypes + ? 'payload-jobs' extends keyof TypedCollection + ? true + : false + : false + +/** + * Represents a job in the `payload-jobs` collection, referencing a queued workflow or task (= Job). + * If a generated type for the `payload-jobs` collection is not available, falls back to the BaseJob type. + * + * `input` and `taksStatus` are always present here, as the job afterRead hook will always populate them. + */ +export type Job< + TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false, +> = HasPayloadJobsType extends true + ? { + input: BaseJob['input'] + taskStatus: BaseJob['taskStatus'] + } & Omit + : BaseJob + const filename = fileURLToPath(import.meta.url) const dirname = path.dirname(filename) diff --git a/packages/payload/src/queues/config/index.ts b/packages/payload/src/queues/config/index.ts index 2c92c9655..9628a29bf 100644 --- a/packages/payload/src/queues/config/index.ts +++ b/packages/payload/src/queues/config/index.ts @@ -1,7 +1,7 @@ import type { CollectionConfig } from '../../collections/config/types.js' import type { Config, SanitizedConfig } from '../../config/types.js' import type { Field } from '../../fields/config/types.js' -import type { BaseJob } from './types/workflowTypes.js' +import type { Job } from '../../index.js' import { runJobsEndpoint } from '../restEndpointRun.js' import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' @@ -51,6 +51,9 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c type: 'text', required: true, }, + /** + * @todo make required in 4.0 + */ { name: 'input', type: 'json', @@ -238,9 +241,11 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c } // eslint-disable-next-line @typescript-eslint/no-unused-vars -export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: BaseJob }): BaseJob { +export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: Job }): Job { doc.taskStatus = getJobTaskStatus({ jobLog: doc.log || [], }) + doc.input = doc.input || {} + doc.taskStatus = doc.taskStatus || {} return doc } diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index 0a698905a..fe9107b8f 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -1,5 +1,5 @@ -import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' -import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js' +import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' +import type { SingleTaskStatus } from './workflowTypes.js' export type TaskInputOutput = { input: object @@ -34,7 +34,7 @@ export type TaskHandlerArgs< : TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type ? TTaskSlugOrInputOutput['input'] : never - job: RunningJob + job: Job req: PayloadRequest tasks: RunTaskFunctions } @@ -42,8 +42,8 @@ export type TaskHandlerArgs< /** * Inline tasks in JSON workflows have no input, as they can just get the input from job.taskStatus */ -export type TaskHandlerArgsNoInput = { - job: RunningJobSimple +export type TaskHandlerArgsNoInput = { + job: Job req: PayloadRequest } @@ -107,7 +107,7 @@ export type RunInlineTaskFunction = + job: Job req: PayloadRequest tasks: RunTaskFunctions }) => MaybePromise< @@ -128,7 +128,7 @@ export type ShouldRestoreFn = (args: { * Input data passed to the task */ input: object - job: BaseJob + job: Job req: PayloadRequest taskStatus: SingleTaskStatus }) => boolean | Promise diff --git a/packages/payload/src/queues/config/types/workflowJSONTypes.ts b/packages/payload/src/queues/config/types/workflowJSONTypes.ts index 7de9f03da..95b98e860 100644 --- a/packages/payload/src/queues/config/types/workflowJSONTypes.ts +++ b/packages/payload/src/queues/config/types/workflowJSONTypes.ts @@ -1,15 +1,15 @@ -import type { RunningJob, TaskHandlerResult, TypedJobs } from '../../../index.js' +import type { Job, TaskHandlerResult, TypedJobs } from '../../../index.js' import type { RetryConfig, TaskHandlerArgsNoInput } from './taskTypes.js' export type WorkflowStep< TTaskSlug extends keyof TypedJobs['tasks'], - TWorkflowSlug extends keyof TypedJobs['workflows'], + TWorkflowSlug extends false | keyof TypedJobs['workflows'] = false, > = { /** * If this step is completed, the workflow will be marked as completed */ completesJob?: boolean - condition?: (args: { job: RunningJob }) => boolean + condition?: (args: { job: Job }) => boolean /** * Each task needs to have a unique ID to track its status */ @@ -23,19 +23,20 @@ export type WorkflowStep< } & ( | { inlineTask?: ( - args: TaskHandlerArgsNoInput, + args: TWorkflowSlug extends keyof TypedJobs['workflows'] + ? TaskHandlerArgsNoInput + : TaskHandlerArgsNoInput, ) => Promise> | TaskHandlerResult } | { - input: (args: { job: RunningJob }) => TypedJobs['tasks'][TTaskSlug]['input'] + input: (args: { job: Job }) => TypedJobs['tasks'][TTaskSlug]['input'] task: TTaskSlug } ) -type AllWorkflowSteps = { +type AllWorkflowSteps = { [TTaskSlug in keyof TypedJobs['tasks']]: WorkflowStep }[keyof TypedJobs['tasks']] -export type WorkflowJSON = Array< - AllWorkflowSteps -> +export type WorkflowJSON = + Array> diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index 823dff112..6a4adc011 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -1,5 +1,11 @@ import type { Field } from '../../../fields/config/types.js' -import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js' +import type { + Job, + PayloadRequest, + StringKeyOf, + TypedCollection, + TypedJobs, +} from '../../../index.js' import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js' import type { RetryConfig, @@ -27,28 +33,41 @@ export type JobLog = { parent?: TaskParent state: 'failed' | 'succeeded' taskID: string - taskSlug: string + taskSlug: TaskType } -export type BaseJob = { - completedAt?: string +/** + * @deprecated - will be made private in 4.0. Please use the `Job` type instead. + */ +export type BaseJob< + TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false, +> = { + completedAt?: null | string + createdAt: string error?: unknown hasError?: boolean id: number | string - input?: any - log: JobLog[] + input: TWorkflowSlugOrInput extends false + ? object + : TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] + ? TypedJobs['workflows'][TWorkflowSlugOrInput]['input'] + : TWorkflowSlugOrInput + log?: JobLog[] processing?: boolean - queue: string - taskSlug?: string - taskStatus?: JobTaskStatus + queue?: string + taskSlug?: null | TaskType + taskStatus: JobTaskStatus totalTried: number - waitUntil?: string - workflowSlug?: string + updatedAt: string + waitUntil?: null | string + workflowSlug?: null | WorkflowTypes } export type WorkflowTypes = StringKeyOf -// TODO: Type job.taskStatus once available - for JSON-defined workflows +/** + * @deprecated - will be removed in 4.0. Use `Job` type instead. + */ export type RunningJob = { input: TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] ? TypedJobs['workflows'][TWorkflowSlugOrInput]['input'] @@ -56,6 +75,9 @@ export type RunningJob +/** + * @deprecated - will be removed in 4.0. Use `Job` type instead. + */ export type RunningJobSimple = { input: TWorkflowInput } & TypedCollection['payload-jobs'] @@ -65,13 +87,14 @@ export type RunningJobFromTask = { input: TypedJobs['tasks'][TTaskSlug]['input'] } & TypedCollection['payload-jobs'] -export type WorkflowHandler = - (args: { - inlineTask: RunInlineTaskFunction - job: RunningJob - req: PayloadRequest - tasks: RunTaskFunctions - }) => Promise +export type WorkflowHandler< + TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false, +> = (args: { + inlineTask: RunInlineTaskFunction + job: Job + req: PayloadRequest + tasks: RunTaskFunctions +}) => Promise export type SingleTaskStatus = { complete: boolean @@ -91,7 +114,9 @@ export type JobTaskStatus = { } } -export type WorkflowConfig = { +export type WorkflowConfig< + TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false, +> = { /** * You can either pass a string-based path to the workflow function file, or the workflow function itself. * diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index 1d6e4a0f5..f1449c566 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -1,10 +1,10 @@ -import type { BaseJob, RunningJobFromTask } from './config/types/workflowTypes.js' +import type { RunningJobFromTask } from './config/types/workflowTypes.js' import { createLocalReq, + type Job, type Payload, type PayloadRequest, - type RunningJob, type Sort, type TypedJobs, type Where, @@ -40,7 +40,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ }, ): Promise< TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] - ? RunningJob + ? Job : RunningJobFromTask > => { let queue: string | undefined = undefined @@ -57,7 +57,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ } } - const data: Partial = { + const data: Partial = { input: args.input, } @@ -75,7 +75,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ } type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] - ? RunningJob + ? Job : RunningJobFromTask // Type assertion is still needed here if (payload?.config?.jobs?.depth || payload?.config?.jobs?.runHooks) { @@ -199,10 +199,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ hasError: true, processing: false, waitUntil: null, - } as { - completedAt: null - waitUntil: null - } & BaseJob, + }, depth: 0, // No depth, since we're not returning disableTransaction: true, req: newReq, @@ -228,10 +225,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ hasError: true, processing: false, waitUntil: null, - } as { - completedAt: null - waitUntil: null - } & BaseJob, + }, depth: 0, // No depth, since we're not returning disableTransaction: true, req: newReq, diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index 74c581c41..b88b508bd 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -1,11 +1,7 @@ +import type { Job } from '../../../index.js' import type { PayloadRequest, Sort, Where } from '../../../types/index.js' import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js' -import type { - BaseJob, - WorkflowConfig, - WorkflowHandler, - WorkflowTypes, -} from '../../config/types/workflowTypes.js' +import type { WorkflowConfig, WorkflowHandler } from '../../config/types/workflowTypes.js' import type { RunJobResult } from './runJob/index.js' import { Forbidden } from '../../../errors/Forbidden.js' @@ -97,42 +93,40 @@ export const runJobs = async (args: RunJobsArgs): Promise => { throw new Forbidden(req.t) } } - const where: Where = { - and: [ - { - completedAt: { - exists: false, - }, + const and: Where[] = [ + { + completedAt: { + exists: false, }, - { - hasError: { - not_equals: true, - }, + }, + { + hasError: { + not_equals: true, }, - { - processing: { - equals: false, - }, + }, + { + processing: { + equals: false, }, - { - or: [ - { - waitUntil: { - exists: false, - }, + }, + { + or: [ + { + waitUntil: { + exists: false, }, - { - waitUntil: { - less_than: new Date().toISOString(), - }, + }, + { + waitUntil: { + less_than: new Date().toISOString(), }, - ], - }, - ], - } + }, + ], + }, + ] if (allQueues !== true) { - where.and?.push({ + and.push({ queue: { equals: queue ?? 'default', }, @@ -140,34 +134,33 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } if (whereFromProps) { - where.and?.push(whereFromProps) + and.push(whereFromProps) } // Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of // the same job being picked up by another worker - const jobsQuery: { - docs: BaseJob[] - } = { docs: [] } + let jobs: Job[] = [] if (id) { // Only one job to run - jobsQuery.docs = [ - (await updateJob({ - id, - data: { - processing: true, - }, - depth: jobsConfig.depth, - disableTransaction: true, - req, - returning: true, - }))!, - ] + const job = await updateJob({ + id, + data: { + processing: true, + }, + depth: jobsConfig.depth, + disableTransaction: true, + req, + returning: true, + }) + if (job) { + jobs = [job] + } } else { let defaultProcessingOrder: Sort = payload.collections[jobsCollectionSlug]?.config.defaultSort ?? 'createdAt' - const processingOrderConfig = jobsConfig?.processingOrder + const processingOrderConfig = jobsConfig.processingOrder if (typeof processingOrderConfig === 'function') { defaultProcessingOrder = await processingOrderConfig(args) } else if (typeof processingOrderConfig === 'object' && !Array.isArray(processingOrderConfig)) { @@ -194,11 +187,11 @@ export const runJobs = async (args: RunJobsArgs): Promise => { req, returning: true, sort: processingOrder ?? defaultProcessingOrder, - where, + where: { and }, }) if (updatedDocs) { - jobsQuery.docs = updatedDocs + jobs = updatedDocs } } @@ -206,7 +199,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { * Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried). * This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing. */ - const { existingJobs, newJobs } = jobsQuery.docs.reduce( + const { existingJobs, newJobs } = jobs.reduce( (acc, job) => { if (job.totalTried > 0) { acc.existingJobs.push(job) @@ -215,34 +208,31 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } return acc }, - { existingJobs: [] as BaseJob[], newJobs: [] as BaseJob[] }, + { existingJobs: [] as Job[], newJobs: [] as Job[] }, ) - if (!jobsQuery.docs.length) { + if (!jobs.length) { return { noJobsRemaining: true, remainingJobsFromQueried: 0, } } - if (jobsQuery?.docs?.length) { - payload.logger.info({ - msg: `Running ${jobsQuery.docs.length} jobs.`, - new: newJobs?.length, - retrying: existingJobs?.length, - }) - } - const jobsToDelete: (number | string)[] | undefined = jobsConfig.deleteJobOnComplete - ? [] - : undefined + payload.logger.info({ + msg: `Running ${jobs.length} jobs.`, + new: newJobs?.length, + retrying: existingJobs?.length, + }) - const runSingleJob = async (job: BaseJob) => { + const successfullyCompletedJobs: (number | string)[] = [] + + const runSingleJob = async (job: Job) => { if (!job.workflowSlug && !job.taskSlug) { throw new Error('Job must have either a workflowSlug or a taskSlug') } const jobReq = isolateObjectProperty(req, 'transactionID') - const workflowConfig: WorkflowConfig = + const workflowConfig: WorkflowConfig = job.workflowSlug && jobsConfig.workflows?.length ? jobsConfig.workflows.find(({ slug }) => slug === job.workflowSlug)! : { @@ -263,8 +253,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { // the runner will either be passed to the config // OR it will be a path, which we will need to import via eval to avoid // Next.js compiler dynamic import expression errors - let workflowHandler: WorkflowHandler | WorkflowJSON - + let workflowHandler: WorkflowHandler | WorkflowJSON if ( typeof workflowConfig.handler === 'function' || (typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler)) @@ -299,8 +288,8 @@ export const runJobs = async (args: RunJobsArgs): Promise => { workflowHandler, }) - if (result.status !== 'error' && jobsToDelete) { - jobsToDelete.push(job.id) + if (result.status !== 'error') { + successfullyCompletedJobs.push(job.id) } return { id: job.id, result } @@ -313,8 +302,8 @@ export const runJobs = async (args: RunJobsArgs): Promise => { workflowHandler, }) - if (result.status !== 'error' && jobsToDelete) { - jobsToDelete.push(job.id) + if (result.status !== 'error') { + successfullyCompletedJobs.push(job.id) } return { id: job.id, result } @@ -323,39 +312,39 @@ export const runJobs = async (args: RunJobsArgs): Promise => { let resultsArray: { id: number | string; result: RunJobResult }[] = [] if (sequential) { - for (const job of jobsQuery.docs) { + for (const job of jobs) { const result = await runSingleJob(job) - if (result !== null) { - resultsArray.push(result!) + if (result) { + resultsArray.push(result) } } } else { - const jobPromises = jobsQuery.docs.map(runSingleJob) + const jobPromises = jobs.map(runSingleJob) resultsArray = (await Promise.all(jobPromises)) as { id: number | string result: RunJobResult }[] } - if (jobsToDelete && jobsToDelete.length > 0) { + if (jobsConfig.deleteJobOnComplete && successfullyCompletedJobs.length) { try { if (jobsConfig.runHooks) { await payload.delete({ collection: jobsCollectionSlug, depth: 0, // can be 0 since we're not returning anything disableTransaction: true, - where: { id: { in: jobsToDelete } }, + where: { id: { in: successfullyCompletedJobs } }, }) } else { await payload.db.deleteMany({ collection: jobsCollectionSlug, - where: { id: { in: jobsToDelete } }, + where: { id: { in: successfullyCompletedJobs } }, }) } } catch (err) { payload.logger.error({ err, - msg: `failed to delete jobs ${jobsToDelete.join(', ')} on complete`, + msg: `Failed to delete jobs ${successfullyCompletedJobs.join(', ')} on complete`, }) } } diff --git a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts index 6b34670c8..28a117194 100644 --- a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts @@ -1,11 +1,7 @@ +import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' import type { WorkflowJSON, WorkflowStep } from '../../../config/types/workflowJSONTypes.js' -import type { - BaseJob, - RunningJob, - WorkflowConfig, - WorkflowTypes, -} from '../../../config/types/workflowTypes.js' +import type { WorkflowConfig } from '../../../config/types/workflowTypes.js' import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js' import type { JobRunStatus } from '../runJob/index.js' @@ -13,11 +9,11 @@ import { getRunTaskFunction, type RunTaskFunctionState } from '../runJob/getRunT import { handleWorkflowError } from '../runJob/handleWorkflowError.js' type Args = { - job: BaseJob + job: Job req: PayloadRequest updateJob: UpdateJobFunction - workflowConfig: WorkflowConfig - workflowHandler: WorkflowJSON + workflowConfig: WorkflowConfig + workflowHandler: WorkflowJSON } export type RunJSONJobResult = { @@ -37,7 +33,7 @@ export const runJSONJob = async ({ reachedMaxRetries: false, } - const stepsToRun: WorkflowStep[] = [] + const stepsToRun: WorkflowStep[] = [] for (const step of workflowHandler) { if ('task' in step) { @@ -49,8 +45,7 @@ export const runJSONJob = async ({ continue } } - if (step.condition && !step.condition({ job: job as RunningJob })) { - // TODO: Improve RunningJob type see todo below + if (step.condition && !step.condition({ job })) { continue } stepsToRun.push(step) @@ -67,7 +62,7 @@ export const runJSONJob = async ({ stepsToRun.map(async (step) => { if ('task' in step) { await tasks[step.task]!(step.id, { - input: step.input ? step.input({ job: job as RunningJob }) : {}, // TODO: Type better. We should use RunningJob anywhere and make TypedCollection['payload-jobs'] be BaseJob if type not generated + input: step.input ? step.input({ job }) : {}, retries: step.retries, }) } else { @@ -93,7 +88,7 @@ export const runJSONJob = async ({ // Check if workflow has completed let workflowCompleted = false - for (const [slug, map] of Object.entries(job.taskStatus!)) { + for (const [slug, map] of Object.entries(job.taskStatus)) { for (const [id, taskStatus] of Object.entries(map)) { if (taskStatus.complete) { const step = workflowHandler.find((step) => { diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index f8ffce40e..587c2ac92 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -1,6 +1,7 @@ import ObjectIdImport from 'bson-objectid' -import type { PayloadRequest } from '../../../../types/index.js' +import type { Job } from '../../../../index.js' +import type { JsonObject, PayloadRequest } from '../../../../types/index.js' import type { RetryConfig, RunInlineTaskFunction, @@ -12,8 +13,6 @@ import type { TaskType, } from '../../../config/types/taskTypes.js' import type { - BaseJob, - RunningJob, SingleTaskStatus, WorkflowConfig, WorkflowTypes, @@ -31,15 +30,15 @@ export type RunTaskFunctionState = { reachedMaxRetries: boolean } -async function getTaskHandlerFromConfig(taskConfig: TaskConfig) { - let handler: TaskHandler - - if (typeof taskConfig.handler === 'function') { - handler = taskConfig.handler - } else { - handler = await importHandlerPath>(taskConfig.handler) +async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) { + if (!taskConfig) { + throw new Error('Task config is required to get the task handler') + } + if (typeof taskConfig.handler === 'function') { + return taskConfig.handler + } else { + return await importHandlerPath>(taskConfig.handler) } - return handler } export async function handleTaskFailed({ @@ -63,7 +62,7 @@ export async function handleTaskFailed({ error?: Error executedAt: Date input: object - job: BaseJob + job: Job maxRetries: number output: object parent?: TaskParent @@ -83,9 +82,6 @@ export async function handleTaskFailed({ await taskConfig.onFail() } - if (!job.log) { - job.log = [] - } const errorJSON = error ? { name: error.name, @@ -99,14 +95,16 @@ export async function handleTaskFailed({ : 'failed', } - job.log.push({ + const currentDate = new Date() + + ;(job.log ??= []).push({ id: new ObjectId().toHexString(), - completedAt: new Date().toISOString(), + completedAt: currentDate.toISOString(), error: errorJSON, executedAt: executedAt.toISOString(), input, output, - parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, + parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined, state: 'failed', taskID, taskSlug, @@ -115,7 +113,7 @@ export async function handleTaskFailed({ if (job.waitUntil) { // Check if waitUntil is in the past const waitUntil = new Date(job.waitUntil) - if (waitUntil < new Date()) { + if (waitUntil < currentDate) { // Outdated waitUntil, remove it delete job.waitUntil } @@ -163,13 +161,15 @@ export type TaskParent = { export const getRunTaskFunction = ( state: RunTaskFunctionState, - job: BaseJob, - workflowConfig: WorkflowConfig, + job: Job, + workflowConfig: WorkflowConfig, req: PayloadRequest, isInline: TIsInline, updateJob: UpdateJobFunction, parent?: TaskParent, ): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => { + const jobConfig = req.payload.config.jobs + const runTask: ( taskSlug: TTaskSlug, ) => TTaskSlug extends 'inline' ? RunInlineTaskFunction : RunTaskFunction = ( @@ -180,20 +180,16 @@ export const getRunTaskFunction = ( { input, retries, + // Only available for inline tasks: task, }: Parameters[1] & Parameters>[1], ) => { const executedAt = new Date() - let inlineRunner: TaskHandler = null! - if (isInline) { - inlineRunner = task as TaskHandler - } - - let taskConfig!: TaskConfig + let taskConfig: TaskConfig | undefined if (!isInline) { - taskConfig = (req.payload.config.jobs.tasks?.length && - req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug)) as TaskConfig + taskConfig = (jobConfig.tasks?.length && + jobConfig.tasks.find((t) => t.slug === taskSlug)) as TaskConfig if (!taskConfig) { throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`) @@ -239,15 +235,9 @@ export const getRunTaskFunction = ( } } - let runner: TaskHandler - if (isInline) { - runner = inlineRunner - } else { - if (!taskConfig) { - throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`) - } - runner = await getTaskHandlerFromConfig(taskConfig) - } + const runner = isInline + ? (task as TaskHandler) + : await getTaskHandlerFromConfig(taskConfig) if (!runner || typeof runner !== 'function') { const errorMessage = isInline @@ -261,13 +251,13 @@ export const getRunTaskFunction = ( }, hasError: true, log: [ - ...job.log, + ...(job.log || []), { id: new ObjectId().toHexString(), completedAt: new Date().toISOString(), error: errorMessage, executedAt: executedAt.toISOString(), - parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, + parent: jobConfig.addParentToTaskLog ? parent : undefined, state: 'failed', taskID, taskSlug, @@ -294,7 +284,7 @@ export const getRunTaskFunction = ( } let taskHandlerResult: TaskHandlerResult - let output: object = {} + let output: JsonObject | undefined = {} try { taskHandlerResult = await runner({ @@ -303,7 +293,7 @@ export const getRunTaskFunction = ( taskSlug, }), input, - job: job as unknown as RunningJob, // TODO: Type this better + job: job as unknown as Job, req, tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, { taskID, @@ -351,23 +341,20 @@ export const getRunTaskFunction = ( }) throw new Error('Task failed') } else { - output = taskHandlerResult.output! + output = taskHandlerResult.output } if (taskConfig?.onSuccess) { await taskConfig.onSuccess() } - if (!job.log) { - job.log = [] - } - job.log.push({ + ;(job.log ??= []).push({ id: new ObjectId().toHexString(), completedAt: new Date().toISOString(), executedAt: executedAt.toISOString(), input, output, - parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, + parent: jobConfig.addParentToTaskLog ? parent : undefined, state: 'succeeded', taskID, taskSlug, @@ -384,7 +371,7 @@ export const getRunTaskFunction = ( return runTask('inline') as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions } else { const tasks: RunTaskFunctions = {} - for (const task of req?.payload?.config?.jobs?.tasks ?? []) { + for (const task of jobConfig.tasks ?? []) { tasks[task.slug] = runTask(task.slug) as RunTaskFunction } return tasks as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts index 13addfe5d..bfeba3bfb 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts @@ -1,11 +1,11 @@ +import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' -import type { BaseJob } from '../../../config/types/workflowTypes.js' import { updateJob } from '../../../utilities/updateJob.js' -export type UpdateJobFunction = (jobData: Partial) => Promise +export type UpdateJobFunction = (jobData: Partial) => Promise -export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction { +export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFunction { return async (jobData) => { const updatedJob = await updateJob({ id: job.id, @@ -18,18 +18,15 @@ export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJ // Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function for (const key in updatedJob) { if (key === 'log') { - if (!job.log) { - job.log = [] - } // Add all new log entries to the original job.log object. Do not delete any existing log entries. // Do not update existing log entries, as existing log entries should be immutable. - for (const logEntry of updatedJob.log) { - if (!job.log.some((entry) => entry.id === logEntry.id)) { - job.log.push(logEntry) + for (const logEntry of updatedJob?.log ?? []) { + if (!job.log || !job.log.some((entry) => entry.id === logEntry.id)) { + ;(job.log ??= []).push(logEntry) } } } else { - ;(job as any)[key] = updatedJob[key as keyof BaseJob] + ;(job as any)[key] = updatedJob[key as keyof Job] } } diff --git a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts index c6b2f2f73..2481db7b2 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts @@ -1,5 +1,6 @@ +import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' -import type { BaseJob, WorkflowConfig, WorkflowTypes } from '../../../config/types/workflowTypes.js' +import type { WorkflowConfig } from '../../../config/types/workflowTypes.js' import type { RunTaskFunctionState } from './getRunTaskFunction.js' import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' @@ -16,10 +17,10 @@ export function handleWorkflowError({ workflowConfig, }: { error: Error - job: BaseJob + job: Job req: PayloadRequest state: RunTaskFunctionState - workflowConfig: WorkflowConfig + workflowConfig: WorkflowConfig }): { hasFinalError: boolean } { diff --git a/packages/payload/src/queues/operations/runJobs/runJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJob/index.ts index 6e3f6be65..7b3b755d7 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/index.ts @@ -1,12 +1,7 @@ import type { APIError } from '../../../../errors/APIError.js' +import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' -import type { - BaseJob, - RunningJob, - WorkflowConfig, - WorkflowHandler, - WorkflowTypes, -} from '../../../config/types/workflowTypes.js' +import type { WorkflowConfig, WorkflowHandler } from '../../../config/types/workflowTypes.js' import type { RunTaskFunctionState } from './getRunTaskFunction.js' import type { UpdateJobFunction } from './getUpdateJobFunction.js' @@ -14,11 +9,11 @@ import { getRunTaskFunction } from './getRunTaskFunction.js' import { handleWorkflowError } from './handleWorkflowError.js' type Args = { - job: BaseJob + job: Job req: PayloadRequest updateJob: UpdateJobFunction - workflowConfig: WorkflowConfig - workflowHandler: WorkflowHandler + workflowConfig: WorkflowConfig + workflowHandler: WorkflowHandler } export type JobRunStatus = 'error' | 'error-reached-max-retries' | 'success' @@ -44,7 +39,7 @@ export const runJob = async ({ try { await workflowHandler({ inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob), - job: job as unknown as RunningJob, //TODO: Type this better + job, req, tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob), }) diff --git a/packages/payload/src/queues/utilities/getJobTaskStatus.ts b/packages/payload/src/queues/utilities/getJobTaskStatus.ts index 9338198de..8e00a8e15 100644 --- a/packages/payload/src/queues/utilities/getJobTaskStatus.ts +++ b/packages/payload/src/queues/utilities/getJobTaskStatus.ts @@ -1,12 +1,17 @@ -import type { BaseJob, JobTaskStatus } from '../config/types/workflowTypes.js' +import type { Job } from '../../index.js' +import type { JobTaskStatus } from '../config/types/workflowTypes.js' type Args = { - jobLog: BaseJob['log'] + jobLog: Job['log'] } export const getJobTaskStatus = ({ jobLog }: Args): JobTaskStatus => { const taskStatus: JobTaskStatus = {} + if (!jobLog || !Array.isArray(jobLog)) { + return taskStatus + } + // First, add (in order) the steps from the config to // our status map for (const loggedJob of jobLog) { diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts index 70f66d332..3517e19dd 100644 --- a/packages/payload/src/queues/utilities/updateJob.ts +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -1,12 +1,12 @@ import type { ManyOptions } from '../../collections/operations/local/update.js' import type { UpdateJobsArgs } from '../../database/types.js' +import type { Job } from '../../index.js' import type { PayloadRequest, Sort, Where } from '../../types/index.js' -import type { BaseJob } from '../config/types/workflowTypes.js' import { jobAfterRead, jobsCollectionSlug } from '../config/index.js' type BaseArgs = { - data: Partial + data: Partial depth?: number disableTransaction?: boolean limit?: number @@ -50,7 +50,7 @@ export async function updateJobs({ returning, sort, where: whereArg, -}: RunJobsArgs): Promise { +}: RunJobsArgs): Promise { const limit = id ? 1 : limitArg const where = id ? { id: { equals: id } } : whereArg @@ -68,7 +68,7 @@ export async function updateJobs({ if (returning === false || !result) { return null } - return result.docs as BaseJob[] + return result.docs as Job[] } const jobReq = { @@ -94,7 +94,7 @@ export async function updateJobs({ where: where as Where, } - const updatedJobs: BaseJob[] | null = await req.payload.db.updateJobs(args) + const updatedJobs: Job[] | null = await req.payload.db.updateJobs(args) if (req.payload.db.name !== 'mongoose' && jobReq.transactionID) { await req.payload.db.commitTransaction(jobReq.transactionID) diff --git a/test/queues/config.ts b/test/queues/config.ts index 7f0c66ad8..a5fb76a96 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -30,6 +30,7 @@ import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workf const filename = fileURLToPath(import.meta.url) const dirname = path.dirname(filename) +// eslint-disable-next-line no-restricted-exports export default buildConfigWithDefaults({ collections: [ { diff --git a/test/queues/runners/updatePost.ts b/test/queues/runners/updatePost.ts index c9dff56a0..b09caf395 100644 --- a/test/queues/runners/updatePost.ts +++ b/test/queues/runners/updatePost.ts @@ -44,7 +44,7 @@ export const updatePostStep2: TaskHandler<'UpdatePostStep2'> = async ({ req, inp id: postID, req, data: { - jobStep2Ran: input.messageTwice + job.taskStatus.UpdatePost['1'].output.messageTwice, + jobStep2Ran: input.messageTwice + job.taskStatus.UpdatePost?.['1']?.output?.messageTwice, }, })