From a89d54454a9c48ea81aba40d04c63c4d4ed912b4 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Mon, 2 Dec 2024 15:05:48 -0700 Subject: [PATCH] fix: ensure jobs do not retry indefinitely by default, fix undefined values in error messages (#9605) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Fix default retries By default, if no `retries` property has been set, jobs / tasks should not be retried. This was not the case previously, as the `maxRetries` variable was `undefined`, causing jobs to retry endlessly. This PR sets them to `0` by default. Additionally, this fixes some undesirable behavior of the workflow retries property. Workflow retries now act as **maximum**, workflow-level retries. Only tasks that do not have a retry property set will inherit the workflow-level retries. ## Fix error messages Previously, you were able to encounter error messages with undefined values like these: ![CleanShot 2024-11-28 at 15 23 37@2x](https://github.com/user-attachments/assets/81617ca8-11de-4d35-b9bf-cc6c5bc515be) Reason is that it was always using `job.workflowSlug` for the error messages. However, if you queue a task directly, without a workflow, `job.workflowSlug` is undefined and `job.taskSlug` should be used instead. This PR then gets rid of the second undefined value by ensuring that `maxRetries´ is never undefined --- docs/jobs-queue/tasks.mdx | 2 +- docs/jobs-queue/workflows.mdx | 3 +- .../src/queues/config/types/taskTypes.ts | 24 +- .../queues/config/types/workflowJSONTypes.ts | 5 + .../src/queues/config/types/workflowTypes.ts | 9 +- .../src/queues/operations/runJobs/index.ts | 3 +- .../runJobs/runJob/getRunTaskFunction.ts | 16 +- .../runJobs/runJob/handleWorkflowError.ts | 44 +-- test/live-preview/payload-types.ts | 170 +++++------ test/queues/config.ts | 280 ++++++++++++++++++ test/queues/int.spec.ts | 198 ++++++++++++- test/queues/payload-types.ts | 87 ++++++ 12 files changed, 722 insertions(+), 119 deletions(-) diff --git a/docs/jobs-queue/tasks.mdx b/docs/jobs-queue/tasks.mdx index ca525f830..8864e4a49 100644 --- a/docs/jobs-queue/tasks.mdx +++ b/docs/jobs-queue/tasks.mdx @@ -30,7 +30,7 @@ Simply add a task to the `jobs.tasks` array in your Payload config. A task consi | `label` | Define a human-friendly label for this task. | | `onFail` | Function to be executed if the task fails. | | `onSuccess` | Function to be executed if the task succeeds. | -| `retries` | Specify the number of times that this step should be retried if it fails. | +| `retries` | Specify the number of times that this step should be retried if it fails. If this is undefined, the task will either inherit the retries from the workflow or have no retries. If this is 0, the task will not be retried. By default, this is undefined. | The logic for the Task is defined in the `handler` - which can be defined as a function, or a path to a function. The `handler` will run once a worker picks picks up a Job that includes this task. diff --git a/docs/jobs-queue/workflows.mdx b/docs/jobs-queue/workflows.mdx index f261deb9d..3a3a136b5 100644 --- a/docs/jobs-queue/workflows.mdx +++ b/docs/jobs-queue/workflows.mdx @@ -2,7 +2,7 @@ title: Workflows label: Workflows order: 30 -desc: A Task is a distinct function declaration that can be run within Payload's Jobs Queue. +desc: A Task is a distinct function declaration that can be run within Payload's Jobs Queue. keywords: jobs queue, application framework, typescript, node, react, nextjs --- @@ -30,6 +30,7 @@ To define a JS-based workflow, simply add a workflow to the `jobs.wokflows` arra | `interfaceName` | You can use interfaceName to change the name of the interface that is generated for this workflow. By default, this is "Workflow" + the capitalized workflow slug. | | `label` | Define a human-friendly label for this workflow. | | `queue` | Optionally, define the queue name that this workflow should be tied to. Defaults to "default". | +| `retries` | You can define `retries` on the workflow level, which will enforce that the workflow can only fail up to that number of retries. If a task does not have retries specified, it will inherit the retry count as specified on the workflow. You can specify `0` as `workflow` retries, which will disregard all `task` retry specifications and fail the entire workflow on any task failure. You can leave `workflow` retries as undefined, in which case, the workflow will respect what each task dictates as their own retry count. By default this is undefined, meaning workflows retries are defined by their tasks | Example: diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index dc5753c29..90439e805 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -60,7 +60,14 @@ export type TaskHandlerResults = { // Helper type to create correct argument type for the function corresponding to each task. export type RunTaskFunctionArgs = { input?: TaskInput - retries?: number | RetryConfig + /** + * Specify the number of times that this task should be retried if it fails for any reason. + * If this is undefined, the task will either inherit the retries from the workflow or have no retries. + * If this is 0, the task will not be retried. + * + * @default By default, tasks are not retried and `retries` is `undefined`. + */ + retries?: number | RetryConfig | undefined } export type RunTaskFunction = ( @@ -76,7 +83,14 @@ export type RunInlineTaskFunction = ; req: PayloadRequest }) => | { @@ -162,8 +176,12 @@ export type TaskConfig< outputSchema?: Field[] /** * Specify the number of times that this step should be retried if it fails. + * If this is undefined, the task will either inherit the retries from the workflow or have no retries. + * If this is 0, the task will not be retried. + * + * @default By default, tasks are not retried and `retries` is `undefined`. */ - retries?: number | RetryConfig + retries?: number | RetryConfig | undefined /** * Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows. */ diff --git a/packages/payload/src/queues/config/types/workflowJSONTypes.ts b/packages/payload/src/queues/config/types/workflowJSONTypes.ts index 001746851..7de9f03da 100644 --- a/packages/payload/src/queues/config/types/workflowJSONTypes.ts +++ b/packages/payload/src/queues/config/types/workflowJSONTypes.ts @@ -14,6 +14,11 @@ export type WorkflowStep< * Each task needs to have a unique ID to track its status */ id: string + /** + * Specify the number of times that this workflow should be retried if it fails for any reason. + * + * @default By default, workflows are not retried and `retries` is `0`. + */ retries?: number | RetryConfig } & ( | { diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index 3ee703c77..9d8f85b39 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -114,9 +114,14 @@ export type WorkflowConfig(workflowConfig.handler) if (!workflowHandler) { - const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${job.workflowSlug}.` + const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` + const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.` req.payload.logger.error(errorMessage) await updateJob({ diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 4553f97e2..6ba74134c 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -107,7 +107,7 @@ export async function handleTaskFailed({ } } - if (taskStatus && !taskStatus.complete && taskStatus.totalTried >= maxRetries) { + if (!taskStatus?.complete && (taskStatus?.totalTried ?? 0) >= maxRetries) { state.reachedMaxRetries = true await updateJob({ @@ -182,9 +182,21 @@ export const getRunTaskFunction = ( throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`) } } - const maxRetries: number = + let maxRetries: number = typeof retriesConfig === 'object' ? retriesConfig?.attempts : retriesConfig + if (maxRetries === undefined || maxRetries === null) { + // Inherit retries from workflow config, if they are undefined and the workflow config has retries configured + if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) { + maxRetries = + typeof workflowConfig.retries === 'object' + ? workflowConfig.retries.attempts + : workflowConfig.retries + } else { + maxRetries = 0 + } + } + const taskStatus: null | SingleTaskStatus = job?.taskStatus?.[taskSlug] ? job.taskStatus[taskSlug][taskID] : null diff --git a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts index 15f586221..e0396d966 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/handleWorkflowError.ts @@ -23,13 +23,25 @@ export function handleWorkflowError({ }): { hasFinalError: boolean } { + const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` + let hasFinalError = state.reachedMaxRetries // If any TASK reached max retries, the job has an error - const maxRetries = - typeof workflowConfig.retries === 'object' + const maxWorkflowRetries: number = + (typeof workflowConfig.retries === 'object' ? workflowConfig.retries.attempts - : workflowConfig.retries + : workflowConfig.retries) ?? undefined + + if ( + maxWorkflowRetries !== undefined && + maxWorkflowRetries !== null && + job.totalTried >= maxWorkflowRetries + ) { + hasFinalError = true + state.reachedMaxRetries = true + } + // Now let's handle workflow retries - if (!hasFinalError && workflowConfig.retries) { + if (!hasFinalError) { if (job.waitUntil) { // Check if waitUntil is in the past const waitUntil = new Date(job.waitUntil) @@ -38,26 +50,22 @@ export function handleWorkflowError({ delete job.waitUntil } } - if (job.totalTried >= maxRetries) { - state.reachedMaxRetries = true - hasFinalError = true - } else { - // Job will retry. Let's determine when! - const waitUntil: Date = calculateBackoffWaitUntil({ - retriesConfig: workflowConfig.retries, - totalTried: job.totalTried ?? 0, - }) - // Update job's waitUntil only if this waitUntil is later than the current one - if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) { - job.waitUntil = waitUntil.toISOString() - } + // Job will retry. Let's determine when! + const waitUntil: Date = calculateBackoffWaitUntil({ + retriesConfig: workflowConfig.retries, + totalTried: job.totalTried ?? 0, + }) + + // Update job's waitUntil only if this waitUntil is later than the current one + if (!job.waitUntil || waitUntil > new Date(job.waitUntil)) { + job.waitUntil = waitUntil.toISOString() } } req.payload.logger.error({ err: error, - msg: `Error running job ${job.workflowSlug} ${job.taskSlug} id: ${job.id} attempt ${job.totalTried}/${maxRetries}`, + msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, }) return { diff --git a/test/live-preview/payload-types.ts b/test/live-preview/payload-types.ts index de95477c8..6a9157119 100644 --- a/test/live-preview/payload-types.ts +++ b/test/live-preview/payload-types.ts @@ -38,7 +38,7 @@ export interface Config { 'payload-migrations': PayloadMigrationsSelect | PayloadMigrationsSelect; }; db: { - defaultIDType: number; + defaultIDType: string; }; globals: { header: Header; @@ -80,7 +80,7 @@ export interface UserAuthOperations { * via the `definition` "users". */ export interface User { - id: number; + id: string; updatedAt: string; createdAt: string; email: string; @@ -97,9 +97,9 @@ export interface User { * via the `definition` "pages". */ export interface Page { - id: number; + id: string; slug: string; - tenant?: (number | null) | Tenant; + tenant?: (string | null) | Tenant; title: string; hero: { type: 'none' | 'highImpact' | 'lowImpact'; @@ -108,7 +108,7 @@ export interface Page { [k: string]: unknown; }[] | null; - media?: (number | null) | Media; + media?: (string | null) | Media; }; layout?: | ( @@ -127,11 +127,11 @@ export interface Page { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -161,11 +161,11 @@ export interface Page { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -181,7 +181,7 @@ export interface Page { | { invertBackground?: boolean | null; position?: ('default' | 'fullscreen') | null; - media: number | Media; + media: string | Media; id?: string | null; blockName?: string | null; blockType: 'mediaBlock'; @@ -194,18 +194,18 @@ export interface Page { | null; populateBy?: ('collection' | 'selection') | null; relationTo?: 'posts' | null; - categories?: (number | Category)[] | null; + categories?: (string | Category)[] | null; limit?: number | null; selectedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocsTotal?: number | null; @@ -216,7 +216,7 @@ export interface Page { )[] | null; localizedTitle?: string | null; - relationToLocalized?: (number | null) | Post; + relationToLocalized?: (string | null) | Post; richTextSlate?: | { [k: string]: unknown; @@ -237,22 +237,22 @@ export interface Page { }; [k: string]: unknown; } | null; - relationshipAsUpload?: (number | null) | Media; - relationshipMonoHasOne?: (number | null) | Post; - relationshipMonoHasMany?: (number | Post)[] | null; + relationshipAsUpload?: (string | null) | Media; + relationshipMonoHasOne?: (string | null) | Post; + relationshipMonoHasMany?: (string | Post)[] | null; relationshipPolyHasOne?: { relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null; relationshipPolyHasMany?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; arrayOfRelationships?: | { - uploadInArray?: (number | null) | Media; + uploadInArray?: (string | null) | Media; richTextInArray?: { root: { type: string; @@ -268,28 +268,28 @@ export interface Page { }; [k: string]: unknown; } | null; - relationshipInArrayMonoHasOne?: (number | null) | Post; - relationshipInArrayMonoHasMany?: (number | Post)[] | null; + relationshipInArrayMonoHasOne?: (string | null) | Post; + relationshipInArrayMonoHasMany?: (string | Post)[] | null; relationshipInArrayPolyHasOne?: { relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null; relationshipInArrayPolyHasMany?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; id?: string | null; }[] | null; tab?: { - relationshipInTab?: (number | null) | Post; + relationshipInTab?: (string | null) | Post; }; meta?: { title?: string | null; description?: string | null; - image?: (number | null) | Media; + image?: (string | null) | Media; }; updatedAt: string; createdAt: string; @@ -299,7 +299,7 @@ export interface Page { * via the `definition` "tenants". */ export interface Tenant { - id: number; + id: string; title: string; clientURL: string; updatedAt: string; @@ -310,7 +310,7 @@ export interface Tenant { * via the `definition` "media". */ export interface Media { - id: number; + id: string; alt: string; updatedAt: string; createdAt: string; @@ -329,9 +329,9 @@ export interface Media { * via the `definition` "posts". */ export interface Post { - id: number; + id: string; slug: string; - tenant?: (number | null) | Tenant; + tenant?: (string | null) | Tenant; title: string; hero: { type: 'none' | 'highImpact' | 'lowImpact'; @@ -340,7 +340,7 @@ export interface Post { [k: string]: unknown; }[] | null; - media?: (number | null) | Media; + media?: (string | null) | Media; }; layout?: | ( @@ -359,11 +359,11 @@ export interface Post { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -393,11 +393,11 @@ export interface Post { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -413,7 +413,7 @@ export interface Post { | { invertBackground?: boolean | null; position?: ('default' | 'fullscreen') | null; - media: number | Media; + media: string | Media; id?: string | null; blockName?: string | null; blockType: 'mediaBlock'; @@ -426,18 +426,18 @@ export interface Post { | null; populateBy?: ('collection' | 'selection') | null; relationTo?: 'posts' | null; - categories?: (number | Category)[] | null; + categories?: (string | Category)[] | null; limit?: number | null; selectedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocsTotal?: number | null; @@ -447,12 +447,12 @@ export interface Post { } )[] | null; - relatedPosts?: (number | Post)[] | null; + relatedPosts?: (string | Post)[] | null; localizedTitle?: string | null; meta?: { title?: string | null; description?: string | null; - image?: (number | null) | Media; + image?: (string | null) | Media; }; updatedAt: string; createdAt: string; @@ -462,7 +462,7 @@ export interface Post { * via the `definition` "categories". */ export interface Category { - id: number; + id: string; title?: string | null; updatedAt: string; createdAt: string; @@ -472,9 +472,9 @@ export interface Category { * via the `definition` "ssr". */ export interface Ssr { - id: number; + id: string; slug: string; - tenant?: (number | null) | Tenant; + tenant?: (string | null) | Tenant; title: string; hero: { type: 'none' | 'highImpact' | 'lowImpact'; @@ -483,7 +483,7 @@ export interface Ssr { [k: string]: unknown; }[] | null; - media?: (number | null) | Media; + media?: (string | null) | Media; }; layout?: | ( @@ -502,11 +502,11 @@ export interface Ssr { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -536,11 +536,11 @@ export interface Ssr { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -556,7 +556,7 @@ export interface Ssr { | { invertBackground?: boolean | null; position?: ('default' | 'fullscreen') | null; - media: number | Media; + media: string | Media; id?: string | null; blockName?: string | null; blockType: 'mediaBlock'; @@ -569,18 +569,18 @@ export interface Ssr { | null; populateBy?: ('collection' | 'selection') | null; relationTo?: 'posts' | null; - categories?: (number | Category)[] | null; + categories?: (string | Category)[] | null; limit?: number | null; selectedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocsTotal?: number | null; @@ -593,7 +593,7 @@ export interface Ssr { meta?: { title?: string | null; description?: string | null; - image?: (number | null) | Media; + image?: (string | null) | Media; }; updatedAt: string; createdAt: string; @@ -603,9 +603,9 @@ export interface Ssr { * via the `definition` "ssr-autosave". */ export interface SsrAutosave { - id: number; + id: string; slug: string; - tenant?: (number | null) | Tenant; + tenant?: (string | null) | Tenant; title: string; hero: { type: 'none' | 'highImpact' | 'lowImpact'; @@ -614,7 +614,7 @@ export interface SsrAutosave { [k: string]: unknown; }[] | null; - media?: (number | null) | Media; + media?: (string | null) | Media; }; layout?: | ( @@ -633,11 +633,11 @@ export interface SsrAutosave { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -667,11 +667,11 @@ export interface SsrAutosave { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -687,7 +687,7 @@ export interface SsrAutosave { | { invertBackground?: boolean | null; position?: ('default' | 'fullscreen') | null; - media: number | Media; + media: string | Media; id?: string | null; blockName?: string | null; blockType: 'mediaBlock'; @@ -700,18 +700,18 @@ export interface SsrAutosave { | null; populateBy?: ('collection' | 'selection') | null; relationTo?: 'posts' | null; - categories?: (number | Category)[] | null; + categories?: (string | Category)[] | null; limit?: number | null; selectedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocs?: | { relationTo: 'posts'; - value: number | Post; + value: string | Post; }[] | null; populatedDocsTotal?: number | null; @@ -724,7 +724,7 @@ export interface SsrAutosave { meta?: { title?: string | null; description?: string | null; - image?: (number | null) | Media; + image?: (string | null) | Media; }; updatedAt: string; createdAt: string; @@ -735,44 +735,44 @@ export interface SsrAutosave { * via the `definition` "payload-locked-documents". */ export interface PayloadLockedDocument { - id: number; + id: string; document?: | ({ relationTo: 'users'; - value: number | User; + value: string | User; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null) | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'ssr'; - value: number | Ssr; + value: string | Ssr; } | null) | ({ relationTo: 'ssr-autosave'; - value: number | SsrAutosave; + value: string | SsrAutosave; } | null) | ({ relationTo: 'tenants'; - value: number | Tenant; + value: string | Tenant; } | null) | ({ relationTo: 'categories'; - value: number | Category; + value: string | Category; } | null) | ({ relationTo: 'media'; - value: number | Media; + value: string | Media; } | null); globalSlug?: string | null; user: { relationTo: 'users'; - value: number | User; + value: string | User; }; updatedAt: string; createdAt: string; @@ -782,10 +782,10 @@ export interface PayloadLockedDocument { * via the `definition` "payload-preferences". */ export interface PayloadPreference { - id: number; + id: string; user: { relationTo: 'users'; - value: number | User; + value: string | User; }; key?: string | null; value?: @@ -805,7 +805,7 @@ export interface PayloadPreference { * via the `definition` "payload-migrations". */ export interface PayloadMigration { - id: number; + id: string; name?: string | null; batch?: number | null; updatedAt: string; @@ -1328,7 +1328,7 @@ export interface PayloadMigrationsSelect { * via the `definition` "header". */ export interface Header { - id: number; + id: string; navItems?: | { link: { @@ -1337,11 +1337,11 @@ export interface Header { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; @@ -1358,7 +1358,7 @@ export interface Header { * via the `definition` "footer". */ export interface Footer { - id: number; + id: string; navItems?: | { link: { @@ -1367,11 +1367,11 @@ export interface Footer { reference?: | ({ relationTo: 'posts'; - value: number | Post; + value: string | Post; } | null) | ({ relationTo: 'pages'; - value: number | Page; + value: string | Page; } | null); url?: string | null; label: string; diff --git a/test/queues/config.ts b/test/queues/config.ts index d758301e8..1db0c9b81 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -173,6 +173,83 @@ export default buildConfigWithDefaults({ } }, } as TaskConfig<'CreateSimple'>, + { + slug: 'CreateSimpleRetriesUndefined', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, + } as TaskConfig<'CreateSimpleRetriesUndefined'>, + { + slug: 'CreateSimpleRetries0', + retries: 0, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, + } as TaskConfig<'CreateSimpleRetries0'>, { retries: 2, slug: 'CreateSimpleWithDuplicateMessage', @@ -445,6 +522,209 @@ export default buildConfigWithDefaults({ // This will never be reached }, } as WorkflowConfig<'retriesWorkflowLevelTest'>, + { + slug: 'workflowNoRetriesSet', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } as WorkflowConfig<'workflowNoRetriesSet'>, + { + slug: 'workflowRetries0', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + retries: 0, + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimple('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimple('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } as WorkflowConfig<'workflowRetries0'>, + { + slug: 'workflowAndTasksRetriesUndefined', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetriesUndefined('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetriesUndefined('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } as WorkflowConfig<'workflowAndTasksRetriesUndefined'>, + { + slug: 'workflowRetries2TasksRetriesUndefined', + retries: 2, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetriesUndefined('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetriesUndefined('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } as WorkflowConfig<'workflowRetries2TasksRetriesUndefined'>, + { + slug: 'workflowRetries2TasksRetries0', + retries: 2, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, tasks, req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountRetried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountRetried !== undefined ? job.input.amountRetried + 1 : 0, + }, + }, + id: job.id, + }) + + await tasks.CreateSimpleRetries0('1', { + input: { + message: job.input.message, + }, + }) + + // At this point there should always be one post created. + // job.input.amountRetried will go up to 2 as CreatePost has 2 retries + await tasks.CreateSimpleRetries0('2', { + input: { + message: job.input.message, + shouldFail: true, + }, + }) + // This will never be reached + }, + } as WorkflowConfig<'workflowRetries2TasksRetries0'>, { slug: 'inlineTaskTest', inputSchema: [ diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 8390b4b14..05324edb2 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -224,6 +224,191 @@ describe('Queues', () => { payload.config.jobs.deleteJobOnComplete = true }) + it('ensure workflows dont limit retries if no retries property is sett', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'workflowNoRetriesSet', + input: { + message: 'hello', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error amountRetried is new arbitrary data and not in the type + expect(jobAfterRun.input.amountRetried).toBe(3) + + payload.config.jobs.deleteJobOnComplete = true + }) + + it('ensure workflows dont retry if retries set to 0, even if individual tasks have retries > 0 set', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'workflowRetries0', + input: { + message: 'hello', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error amountRetried is new arbitrary data and not in the type + expect(jobAfterRun.input.amountRetried).toBe(0) + + payload.config.jobs.deleteJobOnComplete = true + }) + + it('ensure workflows dont retry if neither workflows nor tasks have retries set', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'workflowAndTasksRetriesUndefined', + input: { + message: 'hello', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error amountRetried is new arbitrary data and not in the type + expect(jobAfterRun.input.amountRetried).toBe(0) + + payload.config.jobs.deleteJobOnComplete = true + }) + + it('ensure workflows retry if workflows have retries set and tasks do not have retries set, due to tasks inheriting workflow retries', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'workflowRetries2TasksRetriesUndefined', + input: { + message: 'hello', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error amountRetried is new arbitrary data and not in the type + expect(jobAfterRun.input.amountRetried).toBe(2) + + payload.config.jobs.deleteJobOnComplete = true + }) + + it('ensure workflows do not retry if workflows have retries set and tasks have retries set to 0', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'workflowRetries2TasksRetries0', + input: { + message: 'hello', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error amountRetried is new arbitrary data and not in the type + expect(jobAfterRun.input.amountRetried).toBe(0) + + payload.config.jobs.deleteJobOnComplete = true + }) + /* // Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues it('ensure failed tasks are rolled back via transactions', async () => { @@ -583,9 +768,10 @@ describe('Queues', () => { expect(allSimples.docs[7].title).toBe('from single task') }) - it('can queue single tasks 500 times', async () => { + it('can queue single tasks 150 times', async () => { + // TODO: Ramp up the limit from 150 to 500 or 1000, to test reliability of the database payload.config.jobs.deleteJobOnComplete = false - for (let i = 0; i < 500; i++) { + for (let i = 0; i < 150; i++) { await payload.jobs.queue({ task: 'CreateSimple', input: { @@ -603,14 +789,14 @@ describe('Queues', () => { limit: 1000, }) - expect(allSimples.totalDocs).toBe(500) // Default limit: 10 + expect(allSimples.totalDocs).toBe(150) // Default limit: 10 expect(allSimples.docs[0].title).toBe('from single task') - expect(allSimples.docs[490].title).toBe('from single task') + expect(allSimples.docs[140].title).toBe('from single task') payload.config.jobs.deleteJobOnComplete = true }) it('ensure default jobs run limit of 10 works', async () => { - for (let i = 0; i < 500; i++) { + for (let i = 0; i < 65; i++) { await payload.jobs.queue({ task: 'CreateSimple', input: { @@ -632,7 +818,7 @@ describe('Queues', () => { }) it('ensure jobs run limit can be customized', async () => { - for (let i = 0; i < 500; i++) { + for (let i = 0; i < 110; i++) { await payload.jobs.queue({ task: 'CreateSimple', input: { diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 963d164b6..876de22b6 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -43,6 +43,8 @@ export interface Config { UpdatePost: MyUpdatePostType; UpdatePostStep2: TaskUpdatePostStep2; CreateSimple: TaskCreateSimple; + CreateSimpleRetriesUndefined: TaskCreateSimpleRetriesUndefined; + CreateSimpleRetries0: TaskCreateSimpleRetries0; CreateSimpleWithDuplicateMessage: TaskCreateSimpleWithDuplicateMessage; ExternalTask: TaskExternalTask; inline: { @@ -56,6 +58,11 @@ export interface Config { retriesTest: WorkflowRetriesTest; retriesRollbackTest: WorkflowRetriesRollbackTest; retriesWorkflowLevelTest: WorkflowRetriesWorkflowLevelTest; + workflowNoRetriesSet: WorkflowWorkflowNoRetriesSet; + workflowRetries0: WorkflowWorkflowRetries0; + workflowAndTasksRetriesUndefined: WorkflowWorkflowAndTasksRetriesUndefined; + workflowRetries2TasksRetriesUndefined: WorkflowWorkflowRetries2TasksRetriesUndefined; + workflowRetries2TasksRetries0: WorkflowWorkflowRetries2TasksRetries0; inlineTaskTest: WorkflowInlineTaskTest; externalWorkflow: WorkflowExternalWorkflow; retriesBackoffTest: WorkflowRetriesBackoffTest; @@ -179,6 +186,8 @@ export interface PayloadJob { | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' + | 'CreateSimpleRetriesUndefined' + | 'CreateSimpleRetries0' | 'CreateSimpleWithDuplicateMessage' | 'ExternalTask'; taskID: string; @@ -220,6 +229,11 @@ export interface PayloadJob { | 'retriesTest' | 'retriesRollbackTest' | 'retriesWorkflowLevelTest' + | 'workflowNoRetriesSet' + | 'workflowRetries0' + | 'workflowAndTasksRetriesUndefined' + | 'workflowRetries2TasksRetriesUndefined' + | 'workflowRetries2TasksRetries0' | 'inlineTaskTest' | 'externalWorkflow' | 'retriesBackoffTest' @@ -231,6 +245,8 @@ export interface PayloadJob { | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' + | 'CreateSimpleRetriesUndefined' + | 'CreateSimpleRetries0' | 'CreateSimpleWithDuplicateMessage' | 'ExternalTask' ) @@ -443,6 +459,32 @@ export interface TaskCreateSimple { simpleID: string; }; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskCreateSimpleRetriesUndefined". + */ +export interface TaskCreateSimpleRetriesUndefined { + input: { + message: string; + shouldFail?: boolean | null; + }; + output: { + simpleID: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskCreateSimpleRetries0". + */ +export interface TaskCreateSimpleRetries0 { + input: { + message: string; + shouldFail?: boolean | null; + }; + output: { + simpleID: string; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "TaskCreateSimpleWithDuplicateMessage". @@ -515,6 +557,51 @@ export interface WorkflowRetriesWorkflowLevelTest { message: string; }; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowWorkflowNoRetriesSet". + */ +export interface WorkflowWorkflowNoRetriesSet { + input: { + message: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowWorkflowRetries0". + */ +export interface WorkflowWorkflowRetries0 { + input: { + message: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowWorkflowAndTasksRetriesUndefined". + */ +export interface WorkflowWorkflowAndTasksRetriesUndefined { + input: { + message: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowWorkflowRetries2TasksRetriesUndefined". + */ +export interface WorkflowWorkflowRetries2TasksRetriesUndefined { + input: { + message: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowWorkflowRetries2TasksRetries0". + */ +export interface WorkflowWorkflowRetries2TasksRetries0 { + input: { + message: string; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "WorkflowInlineTaskTest".