refactor: simplify job type (#12816)

Previously, there were multiple ways to type a running job:
- `GeneratedTypes['payload-jobs']` - only works in an installed project
- is `any` in monorepo
- `BaseJob` - works everywhere, but does not incorporate generated types
which may include type for custom fields added to the jobs collection
- `RunningJob<>` - more accurate version of `BaseJob`, but same problem

This PR deprecated all those types in favor of a new `Job` type.
Benefits:
- Works in both monorepo and installed projects. If no generated types
exist, it will automatically fall back to `BaseJob`
- Comes with an optional generic that can be used to narrow down
`job.input` based on the task / workflow slug. No need to use a separate
type helper like `RunningJob<>`

With this new type, I was able to replace every usage of
`GeneratedTypes['payload-jobs']`, `BaseJob` and `RunningJob<>` with the
simple `Job` type.

Additionally, this PR simplifies some of the logic used to run jobs
This commit is contained in:
Alessio Gravili
2025-06-16 13:15:56 -07:00
committed by GitHub
parent 810869f3fa
commit 84cb2b5819
19 changed files with 254 additions and 237 deletions

View File

@@ -1,5 +1,5 @@
import type { MongooseUpdateQueryOptions } from 'mongoose'
import type { BaseJob, UpdateJobs, Where } from 'payload'
import type { Job, UpdateJobs, Where } from 'payload'
import type { MongooseAdapter } from './index.js'
@@ -47,7 +47,7 @@ export const updateJobs: UpdateJobs = async function updateMany(
transform({ adapter: this, data, fields: collectionConfig.fields, operation: 'write' })
let result: BaseJob[] = []
let result: Job[] = []
try {
if (id) {

View File

@@ -1,4 +1,4 @@
import type { BaseJob, DatabaseAdapter } from '../index.js'
import type { DatabaseAdapter, Job } from '../index.js'
import type { UpdateJobs } from './types.js'
import { jobsCollectionSlug } from '../queues/config/index.js'
@@ -7,9 +7,9 @@ export const defaultUpdateJobs: UpdateJobs = async function updateMany(
this: DatabaseAdapter,
{ id, data, limit, req, returning, where },
) {
const updatedJobs: BaseJob[] | null = []
const updatedJobs: Job[] | null = []
const jobsToUpdate: BaseJob[] = (
const jobsToUpdate: Job[] = (
id
? [
await this.findOne({
@@ -27,7 +27,7 @@ export const defaultUpdateJobs: UpdateJobs = async function updateMany(
where,
})
).docs
).filter(Boolean) as BaseJob[]
).filter(Boolean) as Job[]
if (!jobsToUpdate) {
return null

View File

@@ -1,5 +1,5 @@
import type { TypeWithID } from '../collections/config/types.js'
import type { BaseJob, CollectionSlug, GlobalSlug } from '../index.js'
import type { CollectionSlug, GlobalSlug, Job } from '../index.js'
import type {
Document,
JoinQuery,
@@ -566,7 +566,7 @@ export type UpdateJobsArgs = {
}
)
export type UpdateJobs = (args: UpdateJobsArgs) => Promise<BaseJob[] | null>
export type UpdateJobs = (args: UpdateJobsArgs) => Promise<Job[] | null>
export type UpsertArgs = {
collection: CollectionSlug

View File

@@ -71,6 +71,7 @@ import type { SupportedLanguages } from '@payloadcms/translations'
import { Cron } from 'croner'
import type { ClientConfig } from './config/client.js'
import type { BaseJob } from './queues/config/types/workflowTypes.js'
import type { TypeWithVersion } from './versions/types.js'
import { decrypt, encrypt } from './auth/crypto.js'
@@ -247,6 +248,27 @@ export type TypedAuthOperations = ResolveAuthOperationsType<GeneratedTypes>
type ResolveJobOperationsType<T> = 'jobs' extends keyof T ? T['jobs'] : T['jobsUntyped']
export type TypedJobs = ResolveJobOperationsType<GeneratedTypes>
type HasPayloadJobsType = 'collections' extends keyof GeneratedTypes
? 'payload-jobs' extends keyof TypedCollection
? true
: false
: false
/**
* Represents a job in the `payload-jobs` collection, referencing a queued workflow or task (= Job).
* If a generated type for the `payload-jobs` collection is not available, falls back to the BaseJob type.
*
* `input` and `taksStatus` are always present here, as the job afterRead hook will always populate them.
*/
export type Job<
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
> = HasPayloadJobsType extends true
? {
input: BaseJob<TWorkflowSlugOrInput>['input']
taskStatus: BaseJob<TWorkflowSlugOrInput>['taskStatus']
} & Omit<TypedCollection['payload-jobs'], 'input' | 'taskStatus'>
: BaseJob<TWorkflowSlugOrInput>
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)

View File

@@ -1,7 +1,7 @@
import type { CollectionConfig } from '../../collections/config/types.js'
import type { Config, SanitizedConfig } from '../../config/types.js'
import type { Field } from '../../fields/config/types.js'
import type { BaseJob } from './types/workflowTypes.js'
import type { Job } from '../../index.js'
import { runJobsEndpoint } from '../restEndpointRun.js'
import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js'
@@ -51,6 +51,9 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
type: 'text',
required: true,
},
/**
* @todo make required in 4.0
*/
{
name: 'input',
type: 'json',
@@ -238,9 +241,11 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c
}
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: BaseJob }): BaseJob {
export function jobAfterRead({ config, doc }: { config: SanitizedConfig; doc: Job }): Job {
doc.taskStatus = getJobTaskStatus({
jobLog: doc.log || [],
})
doc.input = doc.input || {}
doc.taskStatus = doc.taskStatus || {}
return doc
}

View File

@@ -1,5 +1,5 @@
import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js'
import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
import type { SingleTaskStatus } from './workflowTypes.js'
export type TaskInputOutput = {
input: object
@@ -34,7 +34,7 @@ export type TaskHandlerArgs<
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['input']
: never
job: RunningJob<TWorkflowSlug>
job: Job<TWorkflowSlug>
req: PayloadRequest
tasks: RunTaskFunctions
}
@@ -42,8 +42,8 @@ export type TaskHandlerArgs<
/**
* Inline tasks in JSON workflows have no input, as they can just get the input from job.taskStatus
*/
export type TaskHandlerArgsNoInput<TWorkflowInput extends object> = {
job: RunningJobSimple<TWorkflowInput>
export type TaskHandlerArgsNoInput<TWorkflowInput extends false | object = false> = {
job: Job<TWorkflowInput>
req: PayloadRequest
}
@@ -107,7 +107,7 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
task: (args: {
inlineTask: RunInlineTaskFunction
input: TTaskInput
job: RunningJob<any>
job: Job<any>
req: PayloadRequest
tasks: RunTaskFunctions
}) => MaybePromise<
@@ -128,7 +128,7 @@ export type ShouldRestoreFn = (args: {
* Input data passed to the task
*/
input: object
job: BaseJob
job: Job
req: PayloadRequest
taskStatus: SingleTaskStatus<string>
}) => boolean | Promise<boolean>

View File

@@ -1,15 +1,15 @@
import type { RunningJob, TaskHandlerResult, TypedJobs } from '../../../index.js'
import type { Job, TaskHandlerResult, TypedJobs } from '../../../index.js'
import type { RetryConfig, TaskHandlerArgsNoInput } from './taskTypes.js'
export type WorkflowStep<
TTaskSlug extends keyof TypedJobs['tasks'],
TWorkflowSlug extends keyof TypedJobs['workflows'],
TWorkflowSlug extends false | keyof TypedJobs['workflows'] = false,
> = {
/**
* If this step is completed, the workflow will be marked as completed
*/
completesJob?: boolean
condition?: (args: { job: RunningJob<TWorkflowSlug> }) => boolean
condition?: (args: { job: Job<TWorkflowSlug> }) => boolean
/**
* Each task needs to have a unique ID to track its status
*/
@@ -23,19 +23,20 @@ export type WorkflowStep<
} & (
| {
inlineTask?: (
args: TaskHandlerArgsNoInput<TypedJobs['workflows'][TWorkflowSlug]['input']>,
args: TWorkflowSlug extends keyof TypedJobs['workflows']
? TaskHandlerArgsNoInput<TypedJobs['workflows'][TWorkflowSlug]['input']>
: TaskHandlerArgsNoInput,
) => Promise<TaskHandlerResult<TTaskSlug>> | TaskHandlerResult<TTaskSlug>
}
| {
input: (args: { job: RunningJob<TWorkflowSlug> }) => TypedJobs['tasks'][TTaskSlug]['input']
input: (args: { job: Job<TWorkflowSlug> }) => TypedJobs['tasks'][TTaskSlug]['input']
task: TTaskSlug
}
)
type AllWorkflowSteps<TWorkflowSlug extends keyof TypedJobs['workflows']> = {
type AllWorkflowSteps<TWorkflowSlug extends false | keyof TypedJobs['workflows'] = false> = {
[TTaskSlug in keyof TypedJobs['tasks']]: WorkflowStep<TTaskSlug, TWorkflowSlug>
}[keyof TypedJobs['tasks']]
export type WorkflowJSON<TWorkflowSlug extends keyof TypedJobs['workflows']> = Array<
AllWorkflowSteps<TWorkflowSlug>
>
export type WorkflowJSON<TWorkflowSlug extends false | keyof TypedJobs['workflows'] = false> =
Array<AllWorkflowSteps<TWorkflowSlug>>

View File

@@ -1,5 +1,11 @@
import type { Field } from '../../../fields/config/types.js'
import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js'
import type {
Job,
PayloadRequest,
StringKeyOf,
TypedCollection,
TypedJobs,
} from '../../../index.js'
import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js'
import type {
RetryConfig,
@@ -27,28 +33,41 @@ export type JobLog = {
parent?: TaskParent
state: 'failed' | 'succeeded'
taskID: string
taskSlug: string
taskSlug: TaskType
}
export type BaseJob = {
completedAt?: string
/**
* @deprecated - will be made private in 4.0. Please use the `Job` type instead.
*/
export type BaseJob<
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
> = {
completedAt?: null | string
createdAt: string
error?: unknown
hasError?: boolean
id: number | string
input?: any
log: JobLog[]
input: TWorkflowSlugOrInput extends false
? object
: TWorkflowSlugOrInput extends keyof TypedJobs['workflows']
? TypedJobs['workflows'][TWorkflowSlugOrInput]['input']
: TWorkflowSlugOrInput
log?: JobLog[]
processing?: boolean
queue: string
taskSlug?: string
taskStatus?: JobTaskStatus
queue?: string
taskSlug?: null | TaskType
taskStatus: JobTaskStatus
totalTried: number
waitUntil?: string
workflowSlug?: string
updatedAt: string
waitUntil?: null | string
workflowSlug?: null | WorkflowTypes
}
export type WorkflowTypes = StringKeyOf<TypedJobs['workflows']>
// TODO: Type job.taskStatus once available - for JSON-defined workflows
/**
* @deprecated - will be removed in 4.0. Use `Job` type instead.
*/
export type RunningJob<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> = {
input: TWorkflowSlugOrInput extends keyof TypedJobs['workflows']
? TypedJobs['workflows'][TWorkflowSlugOrInput]['input']
@@ -56,6 +75,9 @@ export type RunningJob<TWorkflowSlugOrInput extends keyof TypedJobs['workflows']
taskStatus: JobTaskStatus
} & Omit<TypedCollection['payload-jobs'], 'input' | 'taskStatus'>
/**
* @deprecated - will be removed in 4.0. Use `Job` type instead.
*/
export type RunningJobSimple<TWorkflowInput extends object> = {
input: TWorkflowInput
} & TypedCollection['payload-jobs']
@@ -65,13 +87,14 @@ export type RunningJobFromTask<TTaskSlug extends keyof TypedJobs['tasks']> = {
input: TypedJobs['tasks'][TTaskSlug]['input']
} & TypedCollection['payload-jobs']
export type WorkflowHandler<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> =
(args: {
inlineTask: RunInlineTaskFunction
job: RunningJob<TWorkflowSlugOrInput>
req: PayloadRequest
tasks: RunTaskFunctions
}) => Promise<void>
export type WorkflowHandler<
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
> = (args: {
inlineTask: RunInlineTaskFunction
job: Job<TWorkflowSlugOrInput>
req: PayloadRequest
tasks: RunTaskFunctions
}) => Promise<void>
export type SingleTaskStatus<T extends keyof TypedJobs['tasks']> = {
complete: boolean
@@ -91,7 +114,9 @@ export type JobTaskStatus = {
}
}
export type WorkflowConfig<TWorkflowSlugOrInput extends keyof TypedJobs['workflows'] | object> = {
export type WorkflowConfig<
TWorkflowSlugOrInput extends false | keyof TypedJobs['workflows'] | object = false,
> = {
/**
* You can either pass a string-based path to the workflow function file, or the workflow function itself.
*

View File

@@ -1,10 +1,10 @@
import type { BaseJob, RunningJobFromTask } from './config/types/workflowTypes.js'
import type { RunningJobFromTask } from './config/types/workflowTypes.js'
import {
createLocalReq,
type Job,
type Payload,
type PayloadRequest,
type RunningJob,
type Sort,
type TypedJobs,
type Where,
@@ -40,7 +40,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
},
): Promise<
TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
? Job<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug>
> => {
let queue: string | undefined = undefined
@@ -57,7 +57,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
}
}
const data: Partial<BaseJob> = {
const data: Partial<Job> = {
input: args.input,
}
@@ -75,7 +75,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
}
type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
? RunningJob<TTaskOrWorkflowSlug>
? Job<TTaskOrWorkflowSlug>
: RunningJobFromTask<TTaskOrWorkflowSlug> // Type assertion is still needed here
if (payload?.config?.jobs?.depth || payload?.config?.jobs?.runHooks) {
@@ -199,10 +199,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
hasError: true,
processing: false,
waitUntil: null,
} as {
completedAt: null
waitUntil: null
} & BaseJob,
},
depth: 0, // No depth, since we're not returning
disableTransaction: true,
req: newReq,
@@ -228,10 +225,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
hasError: true,
processing: false,
waitUntil: null,
} as {
completedAt: null
waitUntil: null
} & BaseJob,
},
depth: 0, // No depth, since we're not returning
disableTransaction: true,
req: newReq,

View File

@@ -1,11 +1,7 @@
import type { Job } from '../../../index.js'
import type { PayloadRequest, Sort, Where } from '../../../types/index.js'
import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js'
import type {
BaseJob,
WorkflowConfig,
WorkflowHandler,
WorkflowTypes,
} from '../../config/types/workflowTypes.js'
import type { WorkflowConfig, WorkflowHandler } from '../../config/types/workflowTypes.js'
import type { RunJobResult } from './runJob/index.js'
import { Forbidden } from '../../../errors/Forbidden.js'
@@ -97,42 +93,40 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
throw new Forbidden(req.t)
}
}
const where: Where = {
and: [
{
completedAt: {
exists: false,
},
const and: Where[] = [
{
completedAt: {
exists: false,
},
{
hasError: {
not_equals: true,
},
},
{
hasError: {
not_equals: true,
},
{
processing: {
equals: false,
},
},
{
processing: {
equals: false,
},
{
or: [
{
waitUntil: {
exists: false,
},
},
{
or: [
{
waitUntil: {
exists: false,
},
{
waitUntil: {
less_than: new Date().toISOString(),
},
},
{
waitUntil: {
less_than: new Date().toISOString(),
},
],
},
],
}
},
],
},
]
if (allQueues !== true) {
where.and?.push({
and.push({
queue: {
equals: queue ?? 'default',
},
@@ -140,34 +134,33 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
if (whereFromProps) {
where.and?.push(whereFromProps)
and.push(whereFromProps)
}
// Find all jobs and ensure we set job to processing: true as early as possible to reduce the chance of
// the same job being picked up by another worker
const jobsQuery: {
docs: BaseJob[]
} = { docs: [] }
let jobs: Job[] = []
if (id) {
// Only one job to run
jobsQuery.docs = [
(await updateJob({
id,
data: {
processing: true,
},
depth: jobsConfig.depth,
disableTransaction: true,
req,
returning: true,
}))!,
]
const job = await updateJob({
id,
data: {
processing: true,
},
depth: jobsConfig.depth,
disableTransaction: true,
req,
returning: true,
})
if (job) {
jobs = [job]
}
} else {
let defaultProcessingOrder: Sort =
payload.collections[jobsCollectionSlug]?.config.defaultSort ?? 'createdAt'
const processingOrderConfig = jobsConfig?.processingOrder
const processingOrderConfig = jobsConfig.processingOrder
if (typeof processingOrderConfig === 'function') {
defaultProcessingOrder = await processingOrderConfig(args)
} else if (typeof processingOrderConfig === 'object' && !Array.isArray(processingOrderConfig)) {
@@ -194,11 +187,11 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
req,
returning: true,
sort: processingOrder ?? defaultProcessingOrder,
where,
where: { and },
})
if (updatedDocs) {
jobsQuery.docs = updatedDocs
jobs = updatedDocs
}
}
@@ -206,7 +199,7 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
*/
const { existingJobs, newJobs } = jobsQuery.docs.reduce(
const { existingJobs, newJobs } = jobs.reduce(
(acc, job) => {
if (job.totalTried > 0) {
acc.existingJobs.push(job)
@@ -215,34 +208,31 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
return acc
},
{ existingJobs: [] as BaseJob[], newJobs: [] as BaseJob[] },
{ existingJobs: [] as Job[], newJobs: [] as Job[] },
)
if (!jobsQuery.docs.length) {
if (!jobs.length) {
return {
noJobsRemaining: true,
remainingJobsFromQueried: 0,
}
}
if (jobsQuery?.docs?.length) {
payload.logger.info({
msg: `Running ${jobsQuery.docs.length} jobs.`,
new: newJobs?.length,
retrying: existingJobs?.length,
})
}
const jobsToDelete: (number | string)[] | undefined = jobsConfig.deleteJobOnComplete
? []
: undefined
payload.logger.info({
msg: `Running ${jobs.length} jobs.`,
new: newJobs?.length,
retrying: existingJobs?.length,
})
const runSingleJob = async (job: BaseJob) => {
const successfullyCompletedJobs: (number | string)[] = []
const runSingleJob = async (job: Job) => {
if (!job.workflowSlug && !job.taskSlug) {
throw new Error('Job must have either a workflowSlug or a taskSlug')
}
const jobReq = isolateObjectProperty(req, 'transactionID')
const workflowConfig: WorkflowConfig<WorkflowTypes> =
const workflowConfig: WorkflowConfig =
job.workflowSlug && jobsConfig.workflows?.length
? jobsConfig.workflows.find(({ slug }) => slug === job.workflowSlug)!
: {
@@ -263,8 +253,7 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
// the runner will either be passed to the config
// OR it will be a path, which we will need to import via eval to avoid
// Next.js compiler dynamic import expression errors
let workflowHandler: WorkflowHandler<WorkflowTypes> | WorkflowJSON<WorkflowTypes>
let workflowHandler: WorkflowHandler | WorkflowJSON
if (
typeof workflowConfig.handler === 'function' ||
(typeof workflowConfig.handler === 'object' && Array.isArray(workflowConfig.handler))
@@ -299,8 +288,8 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
workflowHandler,
})
if (result.status !== 'error' && jobsToDelete) {
jobsToDelete.push(job.id)
if (result.status !== 'error') {
successfullyCompletedJobs.push(job.id)
}
return { id: job.id, result }
@@ -313,8 +302,8 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
workflowHandler,
})
if (result.status !== 'error' && jobsToDelete) {
jobsToDelete.push(job.id)
if (result.status !== 'error') {
successfullyCompletedJobs.push(job.id)
}
return { id: job.id, result }
@@ -323,39 +312,39 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
let resultsArray: { id: number | string; result: RunJobResult }[] = []
if (sequential) {
for (const job of jobsQuery.docs) {
for (const job of jobs) {
const result = await runSingleJob(job)
if (result !== null) {
resultsArray.push(result!)
if (result) {
resultsArray.push(result)
}
}
} else {
const jobPromises = jobsQuery.docs.map(runSingleJob)
const jobPromises = jobs.map(runSingleJob)
resultsArray = (await Promise.all(jobPromises)) as {
id: number | string
result: RunJobResult
}[]
}
if (jobsToDelete && jobsToDelete.length > 0) {
if (jobsConfig.deleteJobOnComplete && successfullyCompletedJobs.length) {
try {
if (jobsConfig.runHooks) {
await payload.delete({
collection: jobsCollectionSlug,
depth: 0, // can be 0 since we're not returning anything
disableTransaction: true,
where: { id: { in: jobsToDelete } },
where: { id: { in: successfullyCompletedJobs } },
})
} else {
await payload.db.deleteMany({
collection: jobsCollectionSlug,
where: { id: { in: jobsToDelete } },
where: { id: { in: successfullyCompletedJobs } },
})
}
} catch (err) {
payload.logger.error({
err,
msg: `failed to delete jobs ${jobsToDelete.join(', ')} on complete`,
msg: `Failed to delete jobs ${successfullyCompletedJobs.join(', ')} on complete`,
})
}
}

View File

@@ -1,11 +1,7 @@
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type { WorkflowJSON, WorkflowStep } from '../../../config/types/workflowJSONTypes.js'
import type {
BaseJob,
RunningJob,
WorkflowConfig,
WorkflowTypes,
} from '../../../config/types/workflowTypes.js'
import type { WorkflowConfig } from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js'
import type { JobRunStatus } from '../runJob/index.js'
@@ -13,11 +9,11 @@ import { getRunTaskFunction, type RunTaskFunctionState } from '../runJob/getRunT
import { handleWorkflowError } from '../runJob/handleWorkflowError.js'
type Args = {
job: BaseJob
job: Job
req: PayloadRequest
updateJob: UpdateJobFunction
workflowConfig: WorkflowConfig<WorkflowTypes>
workflowHandler: WorkflowJSON<WorkflowTypes>
workflowConfig: WorkflowConfig
workflowHandler: WorkflowJSON
}
export type RunJSONJobResult = {
@@ -37,7 +33,7 @@ export const runJSONJob = async ({
reachedMaxRetries: false,
}
const stepsToRun: WorkflowStep<string, string>[] = []
const stepsToRun: WorkflowStep<string>[] = []
for (const step of workflowHandler) {
if ('task' in step) {
@@ -49,8 +45,7 @@ export const runJSONJob = async ({
continue
}
}
if (step.condition && !step.condition({ job: job as RunningJob<any> })) {
// TODO: Improve RunningJob type see todo below
if (step.condition && !step.condition({ job })) {
continue
}
stepsToRun.push(step)
@@ -67,7 +62,7 @@ export const runJSONJob = async ({
stepsToRun.map(async (step) => {
if ('task' in step) {
await tasks[step.task]!(step.id, {
input: step.input ? step.input({ job: job as RunningJob<any> }) : {}, // TODO: Type better. We should use RunningJob anywhere and make TypedCollection['payload-jobs'] be BaseJob if type not generated
input: step.input ? step.input({ job }) : {},
retries: step.retries,
})
} else {
@@ -93,7 +88,7 @@ export const runJSONJob = async ({
// Check if workflow has completed
let workflowCompleted = false
for (const [slug, map] of Object.entries(job.taskStatus!)) {
for (const [slug, map] of Object.entries(job.taskStatus)) {
for (const [id, taskStatus] of Object.entries(map)) {
if (taskStatus.complete) {
const step = workflowHandler.find((step) => {

View File

@@ -1,6 +1,7 @@
import ObjectIdImport from 'bson-objectid'
import type { PayloadRequest } from '../../../../types/index.js'
import type { Job } from '../../../../index.js'
import type { JsonObject, PayloadRequest } from '../../../../types/index.js'
import type {
RetryConfig,
RunInlineTaskFunction,
@@ -12,8 +13,6 @@ import type {
TaskType,
} from '../../../config/types/taskTypes.js'
import type {
BaseJob,
RunningJob,
SingleTaskStatus,
WorkflowConfig,
WorkflowTypes,
@@ -31,15 +30,15 @@ export type RunTaskFunctionState = {
reachedMaxRetries: boolean
}
async function getTaskHandlerFromConfig(taskConfig: TaskConfig<string>) {
let handler: TaskHandler<TaskType>
if (typeof taskConfig.handler === 'function') {
handler = taskConfig.handler
} else {
handler = await importHandlerPath<TaskHandler<TaskType>>(taskConfig.handler)
async function getTaskHandlerFromConfig(taskConfig?: TaskConfig) {
if (!taskConfig) {
throw new Error('Task config is required to get the task handler')
}
if (typeof taskConfig.handler === 'function') {
return taskConfig.handler
} else {
return await importHandlerPath<TaskHandler<TaskType>>(taskConfig.handler)
}
return handler
}
export async function handleTaskFailed({
@@ -63,7 +62,7 @@ export async function handleTaskFailed({
error?: Error
executedAt: Date
input: object
job: BaseJob
job: Job
maxRetries: number
output: object
parent?: TaskParent
@@ -83,9 +82,6 @@ export async function handleTaskFailed({
await taskConfig.onFail()
}
if (!job.log) {
job.log = []
}
const errorJSON = error
? {
name: error.name,
@@ -99,14 +95,16 @@ export async function handleTaskFailed({
: 'failed',
}
job.log.push({
const currentDate = new Date()
;(job.log ??= []).push({
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
completedAt: currentDate.toISOString(),
error: errorJSON,
executedAt: executedAt.toISOString(),
input,
output,
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
parent: req.payload.config.jobs.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
@@ -115,7 +113,7 @@ export async function handleTaskFailed({
if (job.waitUntil) {
// Check if waitUntil is in the past
const waitUntil = new Date(job.waitUntil)
if (waitUntil < new Date()) {
if (waitUntil < currentDate) {
// Outdated waitUntil, remove it
delete job.waitUntil
}
@@ -163,13 +161,15 @@ export type TaskParent = {
export const getRunTaskFunction = <TIsInline extends boolean>(
state: RunTaskFunctionState,
job: BaseJob,
workflowConfig: WorkflowConfig<string>,
job: Job,
workflowConfig: WorkflowConfig,
req: PayloadRequest,
isInline: TIsInline,
updateJob: UpdateJobFunction,
parent?: TaskParent,
): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => {
const jobConfig = req.payload.config.jobs
const runTask: <TTaskSlug extends string>(
taskSlug: TTaskSlug,
) => TTaskSlug extends 'inline' ? RunInlineTaskFunction : RunTaskFunction<TTaskSlug> = (
@@ -180,20 +180,16 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
{
input,
retries,
// Only available for inline tasks:
task,
}: Parameters<RunInlineTaskFunction>[1] & Parameters<RunTaskFunction<string>>[1],
) => {
const executedAt = new Date()
let inlineRunner: TaskHandler<TaskType> = null!
if (isInline) {
inlineRunner = task as TaskHandler<TaskType>
}
let taskConfig!: TaskConfig<string>
let taskConfig: TaskConfig | undefined
if (!isInline) {
taskConfig = (req.payload.config.jobs.tasks?.length &&
req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug)) as TaskConfig<string>
taskConfig = (jobConfig.tasks?.length &&
jobConfig.tasks.find((t) => t.slug === taskSlug)) as TaskConfig<string>
if (!taskConfig) {
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
@@ -239,15 +235,9 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
}
}
let runner: TaskHandler<TaskType>
if (isInline) {
runner = inlineRunner
} else {
if (!taskConfig) {
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
}
runner = await getTaskHandlerFromConfig(taskConfig)
}
const runner = isInline
? (task as TaskHandler<TaskType>)
: await getTaskHandlerFromConfig(taskConfig)
if (!runner || typeof runner !== 'function') {
const errorMessage = isInline
@@ -261,13 +251,13 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
},
hasError: true,
log: [
...job.log,
...(job.log || []),
{
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
error: errorMessage,
executedAt: executedAt.toISOString(),
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
parent: jobConfig.addParentToTaskLog ? parent : undefined,
state: 'failed',
taskID,
taskSlug,
@@ -294,7 +284,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
}
let taskHandlerResult: TaskHandlerResult<string>
let output: object = {}
let output: JsonObject | undefined = {}
try {
taskHandlerResult = await runner({
@@ -303,7 +293,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
taskSlug,
}),
input,
job: job as unknown as RunningJob<WorkflowTypes>, // TODO: Type this better
job: job as unknown as Job<WorkflowTypes>,
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, {
taskID,
@@ -351,23 +341,20 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
})
throw new Error('Task failed')
} else {
output = taskHandlerResult.output!
output = taskHandlerResult.output
}
if (taskConfig?.onSuccess) {
await taskConfig.onSuccess()
}
if (!job.log) {
job.log = []
}
job.log.push({
;(job.log ??= []).push({
id: new ObjectId().toHexString(),
completedAt: new Date().toISOString(),
executedAt: executedAt.toISOString(),
input,
output,
parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined,
parent: jobConfig.addParentToTaskLog ? parent : undefined,
state: 'succeeded',
taskID,
taskSlug,
@@ -384,7 +371,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
return runTask('inline') as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions
} else {
const tasks: RunTaskFunctions = {}
for (const task of req?.payload?.config?.jobs?.tasks ?? []) {
for (const task of jobConfig.tasks ?? []) {
tasks[task.slug] = runTask(task.slug) as RunTaskFunction<string>
}
return tasks as TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions

View File

@@ -1,11 +1,11 @@
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type { BaseJob } from '../../../config/types/workflowTypes.js'
import { updateJob } from '../../../utilities/updateJob.js'
export type UpdateJobFunction = (jobData: Partial<BaseJob>) => Promise<BaseJob>
export type UpdateJobFunction = (jobData: Partial<Job>) => Promise<Job>
export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJobFunction {
export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFunction {
return async (jobData) => {
const updatedJob = await updateJob({
id: job.id,
@@ -18,18 +18,15 @@ export function getUpdateJobFunction(job: BaseJob, req: PayloadRequest): UpdateJ
// Update job object like this to modify the original object - that way, incoming changes (e.g. taskStatus field that will be re-generated through the hook) will be reflected in the calling function
for (const key in updatedJob) {
if (key === 'log') {
if (!job.log) {
job.log = []
}
// Add all new log entries to the original job.log object. Do not delete any existing log entries.
// Do not update existing log entries, as existing log entries should be immutable.
for (const logEntry of updatedJob.log) {
if (!job.log.some((entry) => entry.id === logEntry.id)) {
job.log.push(logEntry)
for (const logEntry of updatedJob?.log ?? []) {
if (!job.log || !job.log.some((entry) => entry.id === logEntry.id)) {
;(job.log ??= []).push(logEntry)
}
}
} else {
;(job as any)[key] = updatedJob[key as keyof BaseJob]
;(job as any)[key] = updatedJob[key as keyof Job]
}
}

View File

@@ -1,5 +1,6 @@
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type { BaseJob, WorkflowConfig, WorkflowTypes } from '../../../config/types/workflowTypes.js'
import type { WorkflowConfig } from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js'
@@ -16,10 +17,10 @@ export function handleWorkflowError({
workflowConfig,
}: {
error: Error
job: BaseJob
job: Job
req: PayloadRequest
state: RunTaskFunctionState
workflowConfig: WorkflowConfig<WorkflowTypes>
workflowConfig: WorkflowConfig
}): {
hasFinalError: boolean
} {

View File

@@ -1,12 +1,7 @@
import type { APIError } from '../../../../errors/APIError.js'
import type { Job } from '../../../../index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type {
BaseJob,
RunningJob,
WorkflowConfig,
WorkflowHandler,
WorkflowTypes,
} from '../../../config/types/workflowTypes.js'
import type { WorkflowConfig, WorkflowHandler } from '../../../config/types/workflowTypes.js'
import type { RunTaskFunctionState } from './getRunTaskFunction.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
@@ -14,11 +9,11 @@ import { getRunTaskFunction } from './getRunTaskFunction.js'
import { handleWorkflowError } from './handleWorkflowError.js'
type Args = {
job: BaseJob
job: Job
req: PayloadRequest
updateJob: UpdateJobFunction
workflowConfig: WorkflowConfig<WorkflowTypes>
workflowHandler: WorkflowHandler<WorkflowTypes>
workflowConfig: WorkflowConfig
workflowHandler: WorkflowHandler
}
export type JobRunStatus = 'error' | 'error-reached-max-retries' | 'success'
@@ -44,7 +39,7 @@ export const runJob = async ({
try {
await workflowHandler({
inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob),
job: job as unknown as RunningJob<WorkflowTypes>, //TODO: Type this better
job,
req,
tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob),
})

View File

@@ -1,12 +1,17 @@
import type { BaseJob, JobTaskStatus } from '../config/types/workflowTypes.js'
import type { Job } from '../../index.js'
import type { JobTaskStatus } from '../config/types/workflowTypes.js'
type Args = {
jobLog: BaseJob['log']
jobLog: Job['log']
}
export const getJobTaskStatus = ({ jobLog }: Args): JobTaskStatus => {
const taskStatus: JobTaskStatus = {}
if (!jobLog || !Array.isArray(jobLog)) {
return taskStatus
}
// First, add (in order) the steps from the config to
// our status map
for (const loggedJob of jobLog) {

View File

@@ -1,12 +1,12 @@
import type { ManyOptions } from '../../collections/operations/local/update.js'
import type { UpdateJobsArgs } from '../../database/types.js'
import type { Job } from '../../index.js'
import type { PayloadRequest, Sort, Where } from '../../types/index.js'
import type { BaseJob } from '../config/types/workflowTypes.js'
import { jobAfterRead, jobsCollectionSlug } from '../config/index.js'
type BaseArgs = {
data: Partial<BaseJob>
data: Partial<Job>
depth?: number
disableTransaction?: boolean
limit?: number
@@ -50,7 +50,7 @@ export async function updateJobs({
returning,
sort,
where: whereArg,
}: RunJobsArgs): Promise<BaseJob[] | null> {
}: RunJobsArgs): Promise<Job[] | null> {
const limit = id ? 1 : limitArg
const where = id ? { id: { equals: id } } : whereArg
@@ -68,7 +68,7 @@ export async function updateJobs({
if (returning === false || !result) {
return null
}
return result.docs as BaseJob[]
return result.docs as Job[]
}
const jobReq = {
@@ -94,7 +94,7 @@ export async function updateJobs({
where: where as Where,
}
const updatedJobs: BaseJob[] | null = await req.payload.db.updateJobs(args)
const updatedJobs: Job[] | null = await req.payload.db.updateJobs(args)
if (req.payload.db.name !== 'mongoose' && jobReq.transactionID) {
await req.payload.db.commitTransaction(jobReq.transactionID)

View File

@@ -30,6 +30,7 @@ import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workf
const filename = fileURLToPath(import.meta.url)
const dirname = path.dirname(filename)
// eslint-disable-next-line no-restricted-exports
export default buildConfigWithDefaults({
collections: [
{

View File

@@ -44,7 +44,7 @@ export const updatePostStep2: TaskHandler<'UpdatePostStep2'> = async ({ req, inp
id: postID,
req,
data: {
jobStep2Ran: input.messageTwice + job.taskStatus.UpdatePost['1'].output.messageTwice,
jobStep2Ran: input.messageTwice + job.taskStatus.UpdatePost?.['1']?.output?.messageTwice,
},
})