From c08b2aea8971863369f23440a7069e2cea05e646 Mon Sep 17 00:00:00 2001 From: Alessio Gravili Date: Fri, 18 Jul 2025 03:48:27 -0700 Subject: [PATCH] feat: scheduling jobs (#12863) Adds a new `schedule` property to workflow and task configs that can be used to have Payload automatically _queue_ jobs following a certain _schedule_. Docs: https://payloadcms.com/docs/dynamic/jobs-queue/schedules?branch=feat/schedule-jobs ## API Example ```ts export default buildConfig({ // ... jobs: { // ... scheduler: 'manual', // Or `cron` if you're not using serverless. If `manual` is used, then user needs to set up running /api/payload-jobs/handleSchedules or payload.jobs.handleSchedules in regular intervals tasks: [ { schedule: [ { cron: '* * * * * *', queue: 'autorunSecond', // Hooks are optional hooks: { // Not an array, as providing and calling `defaultBeforeSchedule` would be more error-prone if this was an array beforeSchedule: async (args) => { // Handles verifying that there are no jobs already scheduled or processing. // You can override this behavior by not calling defaultBeforeSchedule, e.g. if you wanted // to allow a maximum of 3 scheduled jobs in the queue instead of 1, or add any additional conditions const result = await args.defaultBeforeSchedule(args) return { ...result, input: { message: 'This task runs every second', }, } }, afterSchedule: async (args) => { await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global args.req.payload.logger.info( 'EverySecond task scheduled: ' + (args.status === 'success' ? args.job.id : 'skipped or failed to schedule'), ) }, }, }, ], slug: 'EverySecond', inputSchema: [ { name: 'message', type: 'text', required: true, }, ], handler: ({ input, req }) => { req.payload.logger.info(input.message) return { output: {}, } }, } ] } }) ``` --- - To see the specific tasks where the Asana app for GitHub is being used, see below: - https://app.asana.com/0/0/1210495300843759 --- docs/jobs-queue/schedules.mdx | 156 +++++++ .../db-mongodb/src/utilities/transform.ts | 4 + packages/payload/package.json | 2 +- packages/payload/src/config/sanitize.ts | 28 +- .../payload/src/database/defaultUpdateJobs.ts | 2 +- packages/payload/src/database/types.ts | 3 + packages/payload/src/index.ts | 48 +- .../queues/config/{index.ts => collection.ts} | 33 +- packages/payload/src/queues/config/global.ts | 45 ++ .../payload/src/queues/config/types/index.ts | 139 +++++- .../src/queues/config/types/taskTypes.ts | 8 + .../src/queues/config/types/workflowTypes.ts | 15 + .../src/queues/endpoints/handleSchedules.ts | 66 +++ packages/payload/src/queues/endpoints/run.ts | 118 +++++ .../errors/calculateBackoffWaitUntil.ts | 12 +- .../src/queues/errors/handleTaskError.ts | 42 +- .../src/queues/errors/handleWorkflowError.ts | 24 +- packages/payload/src/queues/localAPI.ts | 56 ++- .../countRunnableOrActiveJobsForQueue.ts | 74 ++++ .../handleSchedules/defaultAfterSchedule.ts | 64 +++ .../handleSchedules/defaultBeforeSchedule.ts | 20 + .../handleSchedules/getQueuesWithSchedules.ts | 50 +++ .../operations/handleSchedules/index.ts | 223 ++++++++++ .../src/queues/operations/runJobs/index.ts | 44 +- .../operations/runJobs/runJSONJob/index.ts | 15 +- .../runJobs/runJob/getRunTaskFunction.ts | 5 +- .../queues/operations/runJobs/runJob/index.ts | 16 +- .../payload/src/queues/restEndpointRun.ts | 91 ---- .../src/queues/utilities/getCurrentDate.ts | 21 + .../payload/src/queues/utilities/updateJob.ts | 2 +- .../versions/deleteScheduledPublishJobs.ts | 2 +- pnpm-lock.yaml | 10 +- test/helpers/initPayloadInt.ts | 7 +- test/initDevAndTest.ts | 3 +- test/queues/config.schedules-autocron.ts | 22 + test/queues/config.schedules.ts | 22 + test/queues/config.ts | 416 +----------------- test/queues/getConfig.ts | 176 ++++++++ test/queues/int.spec.ts | 148 ++++--- test/queues/payload-types.ts | 72 ++- test/queues/schedules-autocron.int.spec.ts | 105 +++++ test/queues/schedules.int.spec.ts | 341 ++++++++++++++ test/queues/tasks/CreateSimpleRetries0Task.ts | 41 ++ .../tasks/CreateSimpleRetriesUndefinedTask.ts | 40 ++ test/queues/tasks/CreateSimpleTask.ts | 41 ++ .../CreateSimpleWithDuplicateMessageTask.ts | 42 ++ test/queues/tasks/EverySecondMax2Task.ts | 67 +++ test/queues/tasks/EverySecondTask.ts | 54 +++ test/queues/tasks/ExternalTask.ts | 26 ++ test/queues/tasks/ReturnCustomErrorTask.ts | 20 + test/queues/tasks/ReturnErrorTask.ts | 13 + test/queues/tasks/ThrowErrorTask.ts | 11 + test/queues/tasks/UpdatePostStep2Task.ts | 23 + test/queues/tasks/UpdatePostTask.ts | 31 ++ test/queues/utilities.ts | 62 +++ test/runInit.ts | 3 +- 56 files changed, 2579 insertions(+), 645 deletions(-) create mode 100644 docs/jobs-queue/schedules.mdx rename packages/payload/src/queues/config/{index.ts => collection.ts} (84%) create mode 100644 packages/payload/src/queues/config/global.ts create mode 100644 packages/payload/src/queues/endpoints/handleSchedules.ts create mode 100644 packages/payload/src/queues/endpoints/run.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/getQueuesWithSchedules.ts create mode 100644 packages/payload/src/queues/operations/handleSchedules/index.ts delete mode 100644 packages/payload/src/queues/restEndpointRun.ts create mode 100644 packages/payload/src/queues/utilities/getCurrentDate.ts create mode 100644 test/queues/config.schedules-autocron.ts create mode 100644 test/queues/config.schedules.ts create mode 100644 test/queues/getConfig.ts create mode 100644 test/queues/schedules-autocron.int.spec.ts create mode 100644 test/queues/schedules.int.spec.ts create mode 100644 test/queues/tasks/CreateSimpleRetries0Task.ts create mode 100644 test/queues/tasks/CreateSimpleRetriesUndefinedTask.ts create mode 100644 test/queues/tasks/CreateSimpleTask.ts create mode 100644 test/queues/tasks/CreateSimpleWithDuplicateMessageTask.ts create mode 100644 test/queues/tasks/EverySecondMax2Task.ts create mode 100644 test/queues/tasks/EverySecondTask.ts create mode 100644 test/queues/tasks/ExternalTask.ts create mode 100644 test/queues/tasks/ReturnCustomErrorTask.ts create mode 100644 test/queues/tasks/ReturnErrorTask.ts create mode 100644 test/queues/tasks/ThrowErrorTask.ts create mode 100644 test/queues/tasks/UpdatePostStep2Task.ts create mode 100644 test/queues/tasks/UpdatePostTask.ts create mode 100644 test/queues/utilities.ts diff --git a/docs/jobs-queue/schedules.mdx b/docs/jobs-queue/schedules.mdx new file mode 100644 index 000000000..a2b7aa331 --- /dev/null +++ b/docs/jobs-queue/schedules.mdx @@ -0,0 +1,156 @@ +--- +title: Job Schedules +label: Schedules +order: 60 +desc: Payload allows you to schedule jobs to run periodically +keywords: jobs queue, application framework, typescript, node, react, nextjs, scheduling, cron, schedule +--- + +Payload's `schedule` property lets you enqueue Jobs regularly according to a cron schedule - daily, weekly, hourly, or any custom interval. This is ideal for tasks or workflows that must repeat automatically and without manual intervention. + +Scheduling Jobs differs significantly from running them: + +- **Queueing**: Scheduling only creates (enqueues) the Job according to your cron expression. It does not immediately execute any business logic. +- **Running**: Execution happens separately through your Jobs runner - such as autorun, or manual invocation using `payload.jobs.run()` or the `payload-jobs/run` endpoint. + +Use the `schedule` property specifically when you have recurring tasks or workflows. To enqueue a single Job to run once in the future, use the `waitUntil` property instead. + +## Example use cases + +**Regular emails or notifications** + +Send nightly digests, weekly newsletters, or hourly updates. + +**Batch processing during off-hours** + +Process analytics data or rebuild static sites during low-traffic times. + +**Periodic data synchronization** + +Regularly push or pull updates to or from external APIs. + +## Handling schedules + +Something needs to actually trigger the scheduling of jobs (execute the scheduling lifecycle seen below). By default, the `jobs.autorun` configuration, as well as the `/api/payload-jobs/run` will also handle scheduling for the queue specified in the `autorun` configuration. + +You can disable this behavior by setting `disableScheduling: true` in your `autorun` configuration, or by passing `disableScheduling=true` to the `/api/payload-jobs/run` endpoint. This is useful if you want to handle scheduling manually, for example, by using a cron job or a serverless function that calls the `/api/payload-jobs/handle-schedules` endpoint or the `payload.jobs.handleSchedules()` local API method. + +## Defining schedules on Tasks or Workflows + +Schedules are defined using the `schedule` property: + +```ts +export type ScheduleConfig = { + cron: string // required, supports seconds precision + queue: string // required, the queue to push Jobs onto + hooks?: { + // Optional hooks to customize scheduling behavior + beforeSchedule?: BeforeScheduleFn + afterSchedule?: AfterScheduleFn + } +} +``` + +### Example schedule + +The following example demonstrates scheduling a Job to enqueue every day at midnight: + +```ts +import type { TaskConfig } from 'payload' + +export const SendDigestEmail: TaskConfig<'SendDigestEmail'> = { + slug: 'SendDigestEmail', + schedule: [ + { + cron: '0 0 * * *', // Every day at midnight + queue: 'nightly', + }, + ], + handler: async () => { + await sendDigestToAllUsers() + }, +} +``` + +This configuration only queues the Job - it does not execute it immediately. To actually run the queued Job, you configure autorun in your Payload config (note that autorun should **not** be used on serverless platforms): + +```ts +export default buildConfig({ + jobs: { + scheduler: 'cron', + autoRun: [ + { + cron: '* * * * *', // Runs every minute + queue: 'nightly', + }, + ], + tasks: [SendDigestEmail], + }, +}) +``` + +That way, Payload's scheduler will automatically enqueue the job into the `nightly` queue every day at midnight. The autorun configuration will check the `nightly` queue every minute and execute any Jobs that are due to run. + +## Scheduling lifecycle + +Here's how the scheduling process operates in detail: + +1. **Cron evaluation**: Payload (or your external trigger in `manual` mode) identifies which schedules are due to run. To do that, it will + read the `payload-jobs-stats` global which contains information about the last time each scheduled task or workflow was run. +2. **BeforeSchedule hook**: + - The default beforeSchedule hook checks how many active or runnable jobs of the same type that have been queued by the scheduling system currently exist. + If such a job exists, it will skip scheduling a new one. + - You can provide your own `beforeSchedule` hook to customize this behavior. For example, you might want to allow multiple overlapping Jobs or dynamically set the Job input data. +3. **Enqueue Job**: Payload queues up a new job. This job will have `waitUntil` set to the next scheduled time based on the cron expression. +4. **AfterSchedule hook**: + - The default afterSchedule hook updates the `payload-jobs-stats` global metadata with the last scheduled time for the Job. + - You can provide your own afterSchedule hook to it for custom logging, metrics, or other post-scheduling actions. + +## Customizing concurrency and input (Advanced) + +You may want more control over concurrency or dynamically set Job inputs at scheduling time. For instance, allowing multiple overlapping Jobs to be scheduled, even if a previously scheduled job has not completed yet, or preparing dynamic data to pass to your Job handler: + +```ts +import { countRunnableOrActiveJobsForQueue } from 'payload' + +schedule: [ + { + cron: '* * * * *', // every minute + queue: 'reports', + hooks: { + beforeSchedule: async ({ queueable, req }) => { + const runnableOrActiveJobsForQueue = + await countRunnableOrActiveJobsForQueue({ + queue: queueable.scheduleConfig.queue, + req, + taskSlug: queueable.taskConfig?.slug, + workflowSlug: queueable.workflowConfig?.slug, + onlyScheduled: true, + }) + + // Allow up to 3 simultaneous scheduled jobs and set dynamic input + return { + shouldSchedule: runnableOrActiveJobsForQueue < 3, + input: { text: 'Hi there' }, + } + }, + }, + }, +] +``` + +This allows fine-grained control over how many Jobs can run simultaneously and provides dynamically computed input values each time a Job is scheduled. + +## Scheduling in serverless environments + +On serverless platforms, scheduling must be triggered externally since Payload does not automatically run cron schedules in ephemeral environments. You have two main ways to trigger scheduling manually: + +- **Invoke via Payload's API:** `payload.jobs.handleSchedules()` +- **Use the REST API endpoint:** `/api/payload-jobs/handle-schedules` +- **Use the run endpoint, which also handles scheduling by default:** `GET /api/payload-jobs/run` + +For example, on Vercel, you can set up a Vercel Cron to regularly trigger scheduling: + +- **Vercel Cron Job:** Configure Vercel Cron to periodically call `GET /api/payload-jobs/handle-schedules`. If you would like to auto-run your scheduled jobs as well, you can use the `GET /api/payload-jobs/run` endpoint. + +Once Jobs are queued, their execution depends entirely on your configured runner setup (e.g., autorun, or manual invocation). diff --git a/packages/db-mongodb/src/utilities/transform.ts b/packages/db-mongodb/src/utilities/transform.ts index 24113806a..35a271877 100644 --- a/packages/db-mongodb/src/utilities/transform.ts +++ b/packages/db-mongodb/src/utilities/transform.ts @@ -406,6 +406,10 @@ export const transform = ({ parentIsLocalized = false, validateRelationships = true, }: Args) => { + if (!data) { + return null + } + if (Array.isArray(data)) { for (const item of data) { transform({ $inc, adapter, data: item, fields, globalSlug, operation, validateRelationships }) diff --git a/packages/payload/package.json b/packages/payload/package.json index 3437d8d14..ed757a0d2 100644 --- a/packages/payload/package.json +++ b/packages/payload/package.json @@ -92,7 +92,7 @@ "busboy": "^1.6.0", "ci-info": "^4.1.0", "console-table-printer": "2.12.1", - "croner": "9.0.0", + "croner": "9.1.0", "dataloader": "2.2.3", "deepmerge": "4.3.1", "file-type": "19.3.0", diff --git a/packages/payload/src/config/sanitize.ts b/packages/payload/src/config/sanitize.ts index c90ee9703..043bb34d3 100644 --- a/packages/payload/src/config/sanitize.ts +++ b/packages/payload/src/config/sanitize.ts @@ -29,7 +29,8 @@ import { } from '../locked-documents/config.js' import { getPreferencesCollection, preferencesCollectionSlug } from '../preferences/config.js' import { getQueryPresetsConfig, queryPresetsCollectionSlug } from '../query-presets/config.js' -import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/index.js' +import { getDefaultJobsCollection, jobsCollectionSlug } from '../queues/config/collection.js' +import { getJobStatsGlobal } from '../queues/config/global.js' import { flattenBlock } from '../utilities/flattenAllFields.js' import { getSchedulePublishTask } from '../versions/schedule/job.js' import { addDefaultsToConfig } from './defaults.js' @@ -313,7 +314,28 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise task.schedule)) || + (config?.jobs?.workflows?.length && + config.jobs.workflows.some((workflow) => workflow.schedule)) + + if (hasScheduleProperty) { + config.jobs.scheduling = true + // Add payload-jobs-stats global for tracking when a job of a specific slug was last run + ;(config.globals ??= []).push( + await sanitizeGlobal( + config as unknown as Config, + getJobStatsGlobal(config as unknown as Config), + richTextSanitizationPromises, + validRelationships, + ), + ) + + config.jobs.stats = true + } + + let defaultJobsCollection = getDefaultJobsCollection(config.jobs) if (typeof config.jobs.jobsCollectionOverrides === 'function') { defaultJobsCollection = config.jobs.jobsCollectionOverrides({ @@ -342,7 +364,7 @@ export const sanitizeConfig = async (incomingConfig: Config): Promise { - const job = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => { + const jobAutorunCron = new Cron(cronConfig.cron ?? DEFAULT_CRON, async () => { + if ( + _internal_jobSystemGlobals.shouldAutoSchedule && + !cronConfig.disableScheduling && + this.config.jobs.scheduling + ) { + await this.jobs.handleSchedules({ + queue: cronConfig.queue, + }) + } + + if (!_internal_jobSystemGlobals.shouldAutoRun) { + return + } + if (typeof this.config.jobs.shouldAutoRun === 'function') { const shouldAutoRun = await this.config.jobs.shouldAutoRun(this) if (!shouldAutoRun) { - job.stop() - - return false + jobAutorunCron.stop() + return } } await this.jobs.run({ limit: cronConfig.limit ?? DEFAULT_LIMIT, queue: cronConfig.queue, + silent: cronConfig.silent, }) }) - this.crons.push(job) + this.crons.push(jobAutorunCron) }), ) } @@ -931,8 +946,10 @@ export const reload = async ( payload: Payload, skipImportMapGeneration?: boolean, ): Promise => { - await payload.destroy() - + if (typeof payload.db.destroy === 'function') { + // Only destroy db, as we then later only call payload.db.init and not payload.init + await payload.db.destroy() + } payload.config = config payload.collections = config.collections.reduce( @@ -1176,6 +1193,7 @@ export type { export type { CompoundIndex } from './collections/config/types.js' export type { SanitizedCompoundIndex } from './collections/config/types.js' + export { createDataloaderCacheKey, getDataLoader } from './collections/dataloader.js' export { countOperation } from './collections/operations/count.js' export { createOperation } from './collections/operations/create.js' @@ -1321,6 +1339,7 @@ export { export type { ValidationFieldError } from './errors/index.js' export { baseBlockFields } from './fields/baseFields/baseBlockFields.js' + export { baseIDField } from './fields/baseFields/baseIDField.js' export { @@ -1444,6 +1463,7 @@ export type { export { getDefaultValue } from './fields/getDefaultValue.js' export { traverseFields as afterChangeTraverseFields } from './fields/hooks/afterChange/traverseFields.js' + export { promise as afterReadPromise } from './fields/hooks/afterRead/promise.js' export { traverseFields as afterReadTraverseFields } from './fields/hooks/afterRead/traverseFields.js' export { traverseFields as beforeChangeTraverseFields } from './fields/hooks/beforeChange/traverseFields.js' @@ -1451,6 +1471,7 @@ export { traverseFields as beforeValidateTraverseFields } from './fields/hooks/b export { sortableFieldTypes } from './fields/sortableFieldTypes.js' export { validations } from './fields/validations.js' + export type { ArrayFieldValidation, BlocksFieldValidation, @@ -1505,6 +1526,7 @@ export type { export { docAccessOperation as docAccessOperationGlobal } from './globals/operations/docAccess.js' export { findOneOperation } from './globals/operations/findOne.js' + export { findVersionByIDOperation as findVersionByIDOperationGlobal } from './globals/operations/findVersionByID.js' export { findVersionsOperation as findVersionsOperationGlobal } from './globals/operations/findVersions.js' export { restoreVersionOperation as restoreVersionOperationGlobal } from './globals/operations/restoreVersion.js' @@ -1525,8 +1547,7 @@ export type { TabsPreferences, } from './preferences/types.js' export type { QueryPreset } from './query-presets/types.js' -export { jobAfterRead } from './queues/config/index.js' - +export { jobAfterRead } from './queues/config/collection.js' export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js' export type { RunInlineTaskFunction, @@ -1541,6 +1562,7 @@ export type { TaskOutput, TaskType, } from './queues/config/types/taskTypes.js' + export type { BaseJob, JobLog, @@ -1551,8 +1573,14 @@ export type { WorkflowHandler, WorkflowTypes, } from './queues/config/types/workflowTypes.js' - +export { countRunnableOrActiveJobsForQueue } from './queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.js' export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js' + +export { + _internal_jobSystemGlobals, + _internal_resetJobSystemGlobals, + getCurrentDate, +} from './queues/utilities/getCurrentDate.js' export { getLocalI18n } from './translations/getLocalI18n.js' export * from './types/index.js' export { getFileByPath } from './uploads/getFileByPath.js' diff --git a/packages/payload/src/queues/config/index.ts b/packages/payload/src/queues/config/collection.ts similarity index 84% rename from packages/payload/src/queues/config/index.ts rename to packages/payload/src/queues/config/collection.ts index 9628a29bf..e9e925d80 100644 --- a/packages/payload/src/queues/config/index.ts +++ b/packages/payload/src/queues/config/collection.ts @@ -1,25 +1,28 @@ import type { CollectionConfig } from '../../collections/config/types.js' -import type { Config, SanitizedConfig } from '../../config/types.js' +import type { SanitizedConfig } from '../../config/types.js' import type { Field } from '../../fields/config/types.js' import type { Job } from '../../index.js' -import { runJobsEndpoint } from '../restEndpointRun.js' +import { handleSchedulesJobsEndpoint } from '../endpoints/handleSchedules.js' +import { runJobsEndpoint } from '../endpoints/run.js' import { getJobTaskStatus } from '../utilities/getJobTaskStatus.js' export const jobsCollectionSlug = 'payload-jobs' -export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (config) => { +export const getDefaultJobsCollection: (jobsConfig: SanitizedConfig['jobs']) => CollectionConfig = ( + jobsConfig, +) => { const workflowSlugs: Set = new Set() const taskSlugs: Set = new Set(['inline']) - if (config.jobs?.workflows?.length) { - config.jobs?.workflows.forEach((workflow) => { + if (jobsConfig.workflows?.length) { + jobsConfig.workflows.forEach((workflow) => { workflowSlugs.add(workflow.slug) }) } - if (config.jobs?.tasks?.length) { - config.jobs.tasks.forEach((task) => { + if (jobsConfig.tasks?.length) { + jobsConfig.tasks.forEach((task) => { if (workflowSlugs.has(task.slug)) { throw new Error( `Task slug "${task.slug}" is already used by a workflow. No tasks are allowed to have the same slug as a workflow.`, @@ -78,7 +81,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c }, ] - if (config?.jobs?.addParentToTaskLog) { + if (jobsConfig.addParentToTaskLog) { logFields.push({ name: 'parent', type: 'group', @@ -102,7 +105,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c group: 'System', hidden: true, }, - endpoints: [runJobsEndpoint], + endpoints: [runJobsEndpoint, handleSchedulesJobsEndpoint], fields: [ { name: 'input', @@ -198,6 +201,9 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c { name: 'waitUntil', type: 'date', + admin: { + date: { pickerAppearance: 'dayAndTime' }, + }, index: true, }, { @@ -237,6 +243,15 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig = (c lockDocuments: false, } + if (jobsConfig.stats) { + // TODO: In 4.0, this should be added by default. + // The meta field can be used to store arbitrary data about the job. The scheduling system uses this to store + // `scheduled: true` to indicate that the job was queued by the scheduling system. + jobsCollection.fields.push({ + name: 'meta', + type: 'json', + }) + } return jobsCollection } diff --git a/packages/payload/src/queues/config/global.ts b/packages/payload/src/queues/config/global.ts new file mode 100644 index 000000000..55c87d247 --- /dev/null +++ b/packages/payload/src/queues/config/global.ts @@ -0,0 +1,45 @@ +import type { Config } from '../../config/types.js' +import type { GlobalConfig } from '../../globals/config/types.js' +import type { TaskType } from './types/taskTypes.js' +import type { WorkflowTypes } from './types/workflowTypes.js' + +export const jobStatsGlobalSlug = 'payload-jobs-stats' + +/** + * Type for data stored in the payload-jobs-stats global. + */ +export type JobStats = { + stats?: { + scheduledRuns?: { + queues?: { + [queueSlug: string]: { + tasks?: { + [taskSlug: TaskType]: { + lastScheduledRun: string + } + } + workflows?: { + [workflowSlug: WorkflowTypes]: { + lastScheduledRun: string + } + } + } + } + } + } +} + +/** + * Global config for job statistics. + */ +export const getJobStatsGlobal: (config: Config) => GlobalConfig = (config) => { + return { + slug: jobStatsGlobalSlug, + fields: [ + { + name: 'stats', + type: 'json', + }, + ], + } +} diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index 6bf730f44..9ea4ff223 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -1,10 +1,12 @@ -import type { CollectionConfig } from '../../../index.js' +import type { CollectionConfig, Job } from '../../../index.js' import type { Payload, PayloadRequest, Sort } from '../../../types/index.js' +import type { RunJobsSilent } from '../../localAPI.js' import type { RunJobsArgs } from '../../operations/runJobs/index.js' +import type { JobStats } from '../global.js' import type { TaskConfig } from './taskTypes.js' import type { WorkflowConfig } from './workflowTypes.js' -export type CronConfig = { +export type AutorunCronConfig = { /** * The cron schedule for the job. * @default '* * * * *' (every minute). @@ -26,6 +28,15 @@ export type CronConfig = { * - '* * * * * *' every second */ cron?: string + /** + * By default, the autorun will attempt to schedule jobs for tasks and workflows that have a `schedule` property, given + * the queue name is the same. + * + * Set this to `true` to disable the scheduling of jobs automatically. + * + * @default false + */ + disableScheduling?: boolean /** * The limit for the job. This can be overridden by the user. Defaults to 10. */ @@ -34,6 +45,15 @@ export type CronConfig = { * The queue name for the job. */ queue?: string + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent } export type RunJobAccessArgs = { @@ -48,6 +68,16 @@ export type SanitizedJobsConfig = { * This property is automatically set during sanitization. */ enabled?: boolean + /** + * If set to `true`, at least one task or workflow has scheduling enabled. + * This property is automatically set during sanitization. + */ + scheduling?: boolean + /** + * If set to `true`, a payload-job-stats global exists. + * This property is automatically set during sanitization. + */ + stats?: boolean } & JobsConfig export type JobsConfig = { /** @@ -73,7 +103,9 @@ export type JobsConfig = { * * @remark this property should not be used on serverless platforms like Vercel */ - autoRun?: ((payload: Payload) => CronConfig[] | Promise) | CronConfig[] + autoRun?: + | ((payload: Payload) => AutorunCronConfig[] | Promise) + | AutorunCronConfig[] /** * Determine whether or not to delete a job after it has successfully completed. */ @@ -135,3 +167,104 @@ export type JobsConfig = { */ workflows?: WorkflowConfig[] } + +export type Queueable = { + scheduleConfig: ScheduleConfig + taskConfig?: TaskConfig + // If not set, queue it immediately + waitUntil?: Date + workflowConfig?: WorkflowConfig +} + +type OptionalPromise = Promise | T + +export type BeforeScheduleFn = (args: { + defaultBeforeSchedule: BeforeScheduleFn + /** + * payload-job-stats global data + */ + jobStats: JobStats + queueable: Queueable + req: PayloadRequest +}) => OptionalPromise<{ + input?: object + shouldSchedule: boolean + waitUntil?: Date +}> + +export type AfterScheduleFn = ( + args: { + defaultAfterSchedule: AfterScheduleFn + /** + * payload-job-stats global data. If the global does not exist, it will be null. + */ + jobStats: JobStats | null + queueable: Queueable + req: PayloadRequest + } & ( + | { + error: Error + job?: never + status: 'error' + } + | { + error?: never + job: Job + status: 'success' + } + | { + error?: never + job?: never + /** + * If the beforeSchedule hook returned `shouldSchedule: false`, this will be called with status `skipped`. + */ + status: 'skipped' + } + ), +) => OptionalPromise + +export type ScheduleConfig = { + /** + * The cron for scheduling the job. + * + * @example + * ┌───────────── (optional) second (0 - 59) + * │ ┌───────────── minute (0 - 59) + * │ │ ┌───────────── hour (0 - 23) + * │ │ │ ┌───────────── day of the month (1 - 31) + * │ │ │ │ ┌───────────── month (1 - 12) + * │ │ │ │ │ ┌───────────── day of the week (0 - 6) (Sunday to Saturday) + * │ │ │ │ │ │ + * │ │ │ │ │ │ + * - '* 0 * * * *' every hour at minute 0 + * - '* 0 0 * * *' daily at midnight + * - '* 0 0 * * 0' weekly at midnight on Sundays + * - '* 0 0 1 * *' monthly at midnight on the 1st day of the month + * - '* 0/5 * * * *' every 5 minutes + * - '* * * * * *' every second + */ + cron: string + hooks?: { + /** + * Functions that will be executed after the job has been successfully scheduled. + * + * @default By default, global update?? Unless global update should happen before + */ + afterSchedule?: AfterScheduleFn + /** + * Functions that will be executed before the job is scheduled. + * You can use this to control whether or not the job should be scheduled, or what input + * data should be passed to the job. + * + * @default By default, this has one function that returns { shouldSchedule: true } if the following conditions are met: + * - There currently is no job of the same type in the specified queue that is currently running + * - There currently is no job of the same type in the specified queue that is scheduled to run in the future + * - There currently is no job of the same type in the specified queue that failed previously but can be retried + */ + beforeSchedule?: BeforeScheduleFn + } + /** + * Queue to which the scheduled job will be added. + */ + queue: string +} diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index fe9107b8f..48edff688 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -1,4 +1,5 @@ import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' +import type { ScheduleConfig } from './index.js' import type { SingleTaskStatus } from './workflowTypes.js' export type TaskInputOutput = { @@ -54,6 +55,9 @@ export type TaskHandler< args: TaskHandlerArgs, ) => Promise> | TaskHandlerResult +/** + * @todo rename to TaskSlug in 4.0, similar to CollectionSlug + */ export type TaskType = StringKeyOf // Extracts the type of `input` corresponding to each task @@ -233,6 +237,10 @@ export type TaskConfig< * @default By default, tasks are not retried and `retries` is `undefined`. */ retries?: number | RetryConfig | undefined + /** + * Allows automatically scheduling this task to run regularly at a specified interval. + */ + schedule?: ScheduleConfig[] /** * Define a slug-based name for this job. This slug needs to be unique among both tasks and workflows. */ diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index 6a4adc011..8c1682510 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -7,6 +7,7 @@ import type { TypedJobs, } from '../../../index.js' import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js' +import type { ScheduleConfig } from './index.js' import type { RetryConfig, RunInlineTaskFunction, @@ -53,6 +54,13 @@ export type BaseJob< ? TypedJobs['workflows'][TWorkflowSlugOrInput]['input'] : TWorkflowSlugOrInput log?: JobLog[] + meta?: { + [key: string]: unknown + /** + * If true, this job was queued by the scheduling system. + */ + scheduled?: boolean + } processing?: boolean queue?: string taskSlug?: null | TaskType @@ -63,6 +71,9 @@ export type BaseJob< workflowSlug?: null | WorkflowTypes } +/** + * @todo rename to WorkflowSlug in 4.0, similar to CollectionSlug + */ export type WorkflowTypes = StringKeyOf /** @@ -155,6 +166,10 @@ export type WorkflowConfig< * @default undefined. By default, workflows retries are defined by their tasks */ retries?: number | RetryConfig | undefined + /** + * Allows automatically scheduling this workflow to run regularly at a specified interval. + */ + schedule?: ScheduleConfig[] /** * Define a slug-based name for this job. */ diff --git a/packages/payload/src/queues/endpoints/handleSchedules.ts b/packages/payload/src/queues/endpoints/handleSchedules.ts new file mode 100644 index 000000000..385cb496e --- /dev/null +++ b/packages/payload/src/queues/endpoints/handleSchedules.ts @@ -0,0 +1,66 @@ +import type { Endpoint } from '../../config/types.js' + +import { handleSchedules } from '../operations/handleSchedules/index.js' +import { configHasJobs } from './run.js' + +/** + * GET /api/payload-jobs/handle-schedules endpoint + * + * This endpoint is GET instead of POST to allow it to be used in a Vercel Cron. + */ +export const handleSchedulesJobsEndpoint: Endpoint = { + handler: async (req) => { + const jobsConfig = req.payload.config.jobs + + if (!configHasJobs(jobsConfig)) { + return Response.json( + { + message: 'No jobs to schedule.', + }, + { status: 200 }, + ) + } + + const accessFn = jobsConfig.access?.run ?? (() => true) + + const hasAccess = await accessFn({ req }) + + if (!hasAccess) { + return Response.json( + { + message: req.i18n.t('error:unauthorized'), + }, + { status: 401 }, + ) + } + + if (!jobsConfig.scheduling) { + // There is no reason to call the handleSchedules endpoint if the stats global is not enabled (= no schedules defined) + return Response.json( + { + message: + 'Cannot handle schedules because no tasks or workflows with schedules are defined.', + }, + { status: 500 }, + ) + } + + const { queue } = req.query as { + queue?: string + } + + const { errored, queued, skipped } = await handleSchedules({ queue, req }) + + return Response.json( + { + errored, + message: req.i18n.t('general:success'), + queued, + skipped, + }, + { status: 200 }, + ) + }, + method: 'get', + path: '/handle-schedules', +} diff --git a/packages/payload/src/queues/endpoints/run.ts b/packages/payload/src/queues/endpoints/run.ts new file mode 100644 index 000000000..a362a7d2c --- /dev/null +++ b/packages/payload/src/queues/endpoints/run.ts @@ -0,0 +1,118 @@ +import type { Endpoint } from '../../config/types.js' +import type { SanitizedJobsConfig } from '../config/types/index.js' + +import { runJobs, type RunJobsArgs } from '../operations/runJobs/index.js' + +/** + * /api/payload-jobs/run endpoint + * + * This endpoint is GET instead of POST to allow it to be used in a Vercel Cron. + */ +export const runJobsEndpoint: Endpoint = { + handler: async (req) => { + const jobsConfig = req.payload.config.jobs + + if (!configHasJobs(jobsConfig)) { + return Response.json( + { + message: 'No jobs to run.', + }, + { status: 200 }, + ) + } + + const accessFn = jobsConfig.access?.run ?? (() => true) + + const hasAccess = await accessFn({ req }) + + if (!hasAccess) { + return Response.json( + { + message: req.i18n.t('error:unauthorized'), + }, + { status: 401 }, + ) + } + + const { + allQueues, + disableScheduling: disableSchedulingParam, + limit, + queue, + silent: silentParam, + } = req.query as { + allQueues?: 'false' | 'true' + disableScheduling?: 'false' | 'true' + limit?: number + queue?: string + silent?: string + } + + const silent = silentParam === 'true' + + const shouldHandleSchedules = disableSchedulingParam !== 'true' + + const runAllQueues = allQueues && !(typeof allQueues === 'string' && allQueues === 'false') + + if (shouldHandleSchedules && jobsConfig.scheduling) { + // If should handle schedules and schedules are defined + await req.payload.jobs.handleSchedules({ queue: runAllQueues ? undefined : queue, req }) + } + + const runJobsArgs: RunJobsArgs = { + queue, + req, + // Access is validated above, so it's safe to override here + allQueues: runAllQueues, + overrideAccess: true, + silent, + } + + if (typeof queue === 'string') { + runJobsArgs.queue = queue + } + + const parsedLimit = Number(limit) + if (!isNaN(parsedLimit)) { + runJobsArgs.limit = parsedLimit + } + + let noJobsRemaining = false + let remainingJobsFromQueried = 0 + try { + const result = await runJobs(runJobsArgs) + noJobsRemaining = !!result.noJobsRemaining + remainingJobsFromQueried = result.remainingJobsFromQueried + } catch (err) { + req.payload.logger.error({ + err, + msg: 'There was an error running jobs:', + queue: runJobsArgs.queue, + }) + + return Response.json( + { + message: req.i18n.t('error:unknown'), + noJobsRemaining: true, + remainingJobsFromQueried, + }, + { status: 500 }, + ) + } + + return Response.json( + { + message: req.i18n.t('general:success'), + noJobsRemaining, + remainingJobsFromQueried, + }, + { status: 200 }, + ) + }, + method: 'get', + path: '/run', +} + +export const configHasJobs = (jobsConfig: SanitizedJobsConfig): boolean => { + return Boolean(jobsConfig.tasks?.length || jobsConfig.workflows?.length) +} diff --git a/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts b/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts index e8ff239e8..0214ecd14 100644 --- a/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts +++ b/packages/payload/src/queues/errors/calculateBackoffWaitUntil.ts @@ -1,5 +1,7 @@ import type { RetryConfig } from '../config/types/taskTypes.js' +import { getCurrentDate } from '../utilities/getCurrentDate.js' + export function calculateBackoffWaitUntil({ retriesConfig, totalTried, @@ -7,23 +9,23 @@ export function calculateBackoffWaitUntil({ retriesConfig: number | RetryConfig totalTried: number }): Date { - let waitUntil: Date = new Date() + let waitUntil: Date = getCurrentDate() if (typeof retriesConfig === 'object') { if (retriesConfig.backoff) { if (retriesConfig.backoff.type === 'fixed') { waitUntil = retriesConfig.backoff.delay - ? new Date(new Date().getTime() + retriesConfig.backoff.delay) - : new Date() + ? new Date(getCurrentDate().getTime() + retriesConfig.backoff.delay) + : getCurrentDate() } else if (retriesConfig.backoff.type === 'exponential') { // 2 ^ (attempts - 1) * delay (current attempt is not included in totalTried, thus no need for -1) const delay = retriesConfig.backoff.delay ? retriesConfig.backoff.delay : 0 - waitUntil = new Date(new Date().getTime() + Math.pow(2, totalTried) * delay) + waitUntil = new Date(getCurrentDate().getTime() + Math.pow(2, totalTried) * delay) } } } /* - const differenceInMSBetweenNowAndWaitUntil = waitUntil.getTime() - new Date().getTime() + const differenceInMSBetweenNowAndWaitUntil = waitUntil.getTime() - getCurrentDate().getTime() const differenceInSBetweenNowAndWaitUntil = differenceInMSBetweenNowAndWaitUntil / 1000 console.log('Calculated backoff', { diff --git a/packages/payload/src/queues/errors/handleTaskError.ts b/packages/payload/src/queues/errors/handleTaskError.ts index 3d6b491c9..3366b0a15 100644 --- a/packages/payload/src/queues/errors/handleTaskError.ts +++ b/packages/payload/src/queues/errors/handleTaskError.ts @@ -1,9 +1,11 @@ import ObjectIdImport from 'bson-objectid' import type { PayloadRequest } from '../../index.js' +import type { RunJobsSilent } from '../localAPI.js' import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' import type { TaskError } from './index.js' +import { getCurrentDate } from '../utilities/getCurrentDate.js' import { calculateBackoffWaitUntil } from './calculateBackoffWaitUntil.js' import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js' @@ -13,10 +15,20 @@ const ObjectId = (ObjectIdImport.default || export async function handleTaskError({ error, req, + silent = false, updateJob, }: { error: TaskError req: PayloadRequest + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent updateJob: UpdateJobFunction }): Promise<{ hasFinalError: boolean @@ -46,7 +58,7 @@ export async function handleTaskError({ stack: error.stack, } - const currentDate = new Date() + const currentDate = getCurrentDate() ;(job.log ??= []).push({ id: new ObjectId().toHexString(), @@ -102,12 +114,14 @@ export async function handleTaskError({ waitUntil: job.waitUntil, }) - req.payload.logger.error({ - err: error, - job, - msg: `Error running task ${taskID}. Attempt ${job.totalTried} - max retries reached`, - taskSlug, - }) + if (!silent || (typeof silent === 'object' && !silent.error)) { + req.payload.logger.error({ + err: error, + job, + msg: `Error running task ${taskID}. Attempt ${job.totalTried} - max retries reached`, + taskSlug, + }) + } return { hasFinalError: true, } @@ -135,12 +149,14 @@ export async function handleTaskError({ retriesConfig: workflowConfig.retries, }) - req.payload.logger.error({ - err: error, - job, - msg: `Error running task ${taskID}. Attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, - taskSlug, - }) + if (!silent || (typeof silent === 'object' && !silent.error)) { + req.payload.logger.error({ + err: error, + job, + msg: `Error running task ${taskID}. Attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, + taskSlug, + }) + } // Update job's waitUntil only if this waitUntil is later than the current one if (waitUntil && (!job.waitUntil || waitUntil > new Date(job.waitUntil))) { diff --git a/packages/payload/src/queues/errors/handleWorkflowError.ts b/packages/payload/src/queues/errors/handleWorkflowError.ts index 6c5fed8da..2716aebde 100644 --- a/packages/payload/src/queues/errors/handleWorkflowError.ts +++ b/packages/payload/src/queues/errors/handleWorkflowError.ts @@ -1,7 +1,9 @@ import type { PayloadRequest } from '../../index.js' +import type { RunJobsSilent } from '../localAPI.js' import type { UpdateJobFunction } from '../operations/runJobs/runJob/getUpdateJobFunction.js' import type { WorkflowError } from './index.js' +import { getCurrentDate } from '../utilities/getCurrentDate.js' import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js' /** @@ -15,10 +17,20 @@ import { getWorkflowRetryBehavior } from './getWorkflowRetryBehavior.js' export async function handleWorkflowError({ error, req, + silent = false, updateJob, }: { error: WorkflowError req: PayloadRequest + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent updateJob: UpdateJobFunction }): Promise<{ hasFinalError: boolean @@ -41,7 +53,7 @@ export async function handleWorkflowError({ if (job.waitUntil) { // Check if waitUntil is in the past const waitUntil = new Date(job.waitUntil) - if (waitUntil < new Date()) { + if (waitUntil < getCurrentDate()) { // Outdated waitUntil, remove it delete job.waitUntil } @@ -55,10 +67,12 @@ export async function handleWorkflowError({ const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` - req.payload.logger.error({ - err: error, - msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, - }) + if (!silent || (typeof silent === 'object' && !silent.error)) { + req.payload.logger.error({ + err: error, + msg: `Error running job ${jobLabel} id: ${job.id} attempt ${job.totalTried + 1}${maxWorkflowRetries !== undefined ? '/' + (maxWorkflowRetries + 1) : ''}`, + }) + } // Tasks update the job if they error - but in case there is an unhandled error (e.g. in the workflow itself, not in a task) // we need to ensure the job is updated to reflect the error diff --git a/packages/payload/src/queues/localAPI.ts b/packages/payload/src/queues/localAPI.ts index f1449c566..c38a64868 100644 --- a/packages/payload/src/queues/localAPI.ts +++ b/packages/payload/src/queues/localAPI.ts @@ -1,4 +1,4 @@ -import type { RunningJobFromTask } from './config/types/workflowTypes.js' +import type { BaseJob, RunningJobFromTask } from './config/types/workflowTypes.js' import { createLocalReq, @@ -9,11 +9,37 @@ import { type TypedJobs, type Where, } from '../index.js' -import { jobAfterRead, jobsCollectionSlug } from './config/index.js' +import { jobAfterRead, jobsCollectionSlug } from './config/collection.js' +import { handleSchedules, type HandleSchedulesResult } from './operations/handleSchedules/index.js' import { runJobs } from './operations/runJobs/index.js' import { updateJob, updateJobs } from './utilities/updateJob.js' +export type RunJobsSilent = + | { + error?: boolean + info?: boolean + } + | boolean export const getJobsLocalAPI = (payload: Payload) => ({ + handleSchedules: async (args?: { + // By default, schedule all queues - only scheduling jobs scheduled to be added to the `default` queue would not make sense + // here, as you'd usually specify a different queue than `default` here, especially if this is used in combination with autorun. + // The `queue` property for setting up schedules is required, and not optional. + /** + * If you want to only schedule jobs that are set to schedule in a specific queue, set this to the queue name. + * + * @default all jobs for all queues will be scheduled. + */ + queue?: string + req?: PayloadRequest + }): Promise => { + const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) + + return await handleSchedules({ + queue: args?.queue, + req: newReq, + }) + }, queue: async < // eslint-disable-next-line @typescript-eslint/no-duplicate-type-constituents TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] | keyof TypedJobs['workflows'], @@ -21,6 +47,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ args: | { input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input'] + meta?: BaseJob['meta'] queue?: string req?: PayloadRequest // TTaskOrWorkflowlug with keyof TypedJobs['workflows'] removed: @@ -30,6 +57,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ } | { input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input'] + meta?: BaseJob['meta'] queue?: string req?: PayloadRequest task?: never @@ -74,6 +102,10 @@ export const getJobsLocalAPI = (payload: Payload) => ({ data.taskSlug = args.task as string } + if (args.meta) { + data.meta = args.meta + } + type ReturnType = TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] ? Job : RunningJobFromTask // Type assertion is still needed here @@ -130,6 +162,15 @@ export const getJobsLocalAPI = (payload: Payload) => ({ * If you want to run them in sequence, set this to true. */ sequential?: boolean + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent where?: Where }): Promise> => { const newReq: PayloadRequest = args?.req ?? (await createLocalReq({}, payload)) @@ -142,6 +183,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ queue: args?.queue, req: newReq, sequential: args?.sequential, + silent: args?.silent, where: args?.where, }) }, @@ -150,6 +192,15 @@ export const getJobsLocalAPI = (payload: Payload) => ({ id: number | string overrideAccess?: boolean req?: PayloadRequest + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent }): Promise> => { const newReq: PayloadRequest = args.req ?? (await createLocalReq({}, payload)) @@ -157,6 +208,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ id: args.id, overrideAccess: args.overrideAccess !== false, req: newReq, + silent: args.silent, }) }, diff --git a/packages/payload/src/queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.ts b/packages/payload/src/queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.ts new file mode 100644 index 000000000..713cfa204 --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.ts @@ -0,0 +1,74 @@ +import type { PayloadRequest, Where } from '../../../types/index.js' +import type { TaskType } from '../../config/types/taskTypes.js' +import type { WorkflowTypes } from '../../config/types/workflowTypes.js' + +/** + * Gets all queued jobs that can be run. This means they either: + * - failed but do not have a definitive error => can be retried + * - are currently processing + * - have not been started yet + */ +export async function countRunnableOrActiveJobsForQueue({ + onlyScheduled = false, + queue, + req, + taskSlug, + workflowSlug, +}: { + /** + * If true, this counts only jobs that have been created through the scheduling system. + * + * @default false + */ + onlyScheduled?: boolean + queue: string + req: PayloadRequest + taskSlug?: TaskType + workflowSlug?: WorkflowTypes +}): Promise { + const and: Where[] = [ + { + queue: { + equals: queue, + }, + }, + + { + completedAt: { exists: false }, + }, + { + error: { exists: false }, + }, + ] + + if (taskSlug) { + and.push({ + taskSlug: { + equals: taskSlug, + }, + }) + } else if (workflowSlug) { + and.push({ + workflowSlug: { + equals: workflowSlug, + }, + }) + } + if (onlyScheduled) { + and.push({ + 'meta.scheduled': { + equals: true, + }, + }) + } + + const runnableOrActiveJobsForQueue = await req.payload.db.count({ + collection: 'payload-jobs', + req, + where: { + and, + }, + }) + + return runnableOrActiveJobsForQueue.totalDocs +} diff --git a/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts new file mode 100644 index 000000000..4627c407c --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/defaultAfterSchedule.ts @@ -0,0 +1,64 @@ +import type { AfterScheduleFn } from '../../config/types/index.js' + +import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' +import { getCurrentDate } from '../../utilities/getCurrentDate.js' + +type JobStatsScheduledRuns = NonNullable< + NonNullable['scheduledRuns']>['queues'] +>[string] + +export const defaultAfterSchedule: AfterScheduleFn = async ({ jobStats, queueable, req }) => { + const existingQueuesConfig = + jobStats?.stats?.scheduledRuns?.queues?.[queueable.scheduleConfig.queue] || {} + + const queueConfig: JobStatsScheduledRuns = { + ...existingQueuesConfig, + } + if (queueable.taskConfig) { + ;(queueConfig.tasks ??= {})[queueable.taskConfig.slug] = { + lastScheduledRun: getCurrentDate().toISOString(), + } + } else if (queueable.workflowConfig) { + ;(queueConfig.workflows ??= {})[queueable.workflowConfig.slug] = { + lastScheduledRun: getCurrentDate().toISOString(), + } + } + + // Add to payload-jobs-stats global regardless of the status + if (jobStats) { + await req.payload.db.updateGlobal({ + slug: jobStatsGlobalSlug, + data: { + ...(jobStats || {}), + stats: { + ...(jobStats?.stats || {}), + scheduledRuns: { + ...(jobStats?.stats?.scheduledRuns || {}), + queues: { + ...(jobStats?.stats?.scheduledRuns?.queues || {}), + [queueable.scheduleConfig.queue]: queueConfig, + }, + }, + }, + } as JobStats, + req, + returning: false, + }) + } else { + await req.payload.db.createGlobal({ + slug: jobStatsGlobalSlug, + data: { + createdAt: getCurrentDate().toISOString(), + stats: { + scheduledRuns: { + queues: { + [queueable.scheduleConfig.queue]: queueConfig, + }, + }, + }, + } as JobStats, + req, + returning: false, + }) + } +} diff --git a/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts b/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts new file mode 100644 index 000000000..96b809225 --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/defaultBeforeSchedule.ts @@ -0,0 +1,20 @@ +import type { BeforeScheduleFn } from '../../config/types/index.js' + +import { countRunnableOrActiveJobsForQueue } from './countRunnableOrActiveJobsForQueue.js' + +export const defaultBeforeSchedule: BeforeScheduleFn = async ({ queueable, req }) => { + // All tasks in that queue that are either currently processing or can be run + const runnableOrActiveJobsForQueue = await countRunnableOrActiveJobsForQueue({ + onlyScheduled: true, + queue: queueable.scheduleConfig.queue, + req, + taskSlug: queueable.taskConfig?.slug, + workflowSlug: queueable.workflowConfig?.slug, + }) + + return { + input: {}, + shouldSchedule: runnableOrActiveJobsForQueue === 0, + waitUntil: queueable.waitUntil, + } +} diff --git a/packages/payload/src/queues/operations/handleSchedules/getQueuesWithSchedules.ts b/packages/payload/src/queues/operations/handleSchedules/getQueuesWithSchedules.ts new file mode 100644 index 000000000..817de244f --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/getQueuesWithSchedules.ts @@ -0,0 +1,50 @@ +import type { SanitizedJobsConfig, ScheduleConfig } from '../../config/types/index.js' +import type { TaskConfig } from '../../config/types/taskTypes.js' +import type { WorkflowConfig } from '../../config/types/workflowTypes.js' + +type QueuesWithSchedules = { + [queue: string]: { + schedules: { + scheduleConfig: ScheduleConfig + taskConfig?: TaskConfig + workflowConfig?: WorkflowConfig + }[] + } +} + +export const getQueuesWithSchedules = ({ + jobsConfig, +}: { + jobsConfig: SanitizedJobsConfig +}): QueuesWithSchedules => { + const tasksWithSchedules = + jobsConfig.tasks?.filter((task) => { + return task.schedule?.length + }) ?? [] + + const workflowsWithSchedules = + jobsConfig.workflows?.filter((workflow) => { + return workflow.schedule?.length + }) ?? [] + + const queuesWithSchedules: QueuesWithSchedules = {} + + for (const task of tasksWithSchedules) { + for (const schedule of task.schedule ?? []) { + ;(queuesWithSchedules[schedule.queue] ??= { schedules: [] }).schedules.push({ + scheduleConfig: schedule, + taskConfig: task, + }) + } + } + for (const workflow of workflowsWithSchedules) { + for (const schedule of workflow.schedule ?? []) { + ;(queuesWithSchedules[schedule.queue] ??= { schedules: [] }).schedules.push({ + scheduleConfig: schedule, + workflowConfig: workflow, + }) + } + } + + return queuesWithSchedules +} diff --git a/packages/payload/src/queues/operations/handleSchedules/index.ts b/packages/payload/src/queues/operations/handleSchedules/index.ts new file mode 100644 index 000000000..b5daefccb --- /dev/null +++ b/packages/payload/src/queues/operations/handleSchedules/index.ts @@ -0,0 +1,223 @@ +import { Cron } from 'croner' + +import type { Job, TaskConfig, WorkflowConfig } from '../../../index.js' +import type { PayloadRequest } from '../../../types/index.js' +import type { BeforeScheduleFn, Queueable, ScheduleConfig } from '../../config/types/index.js' + +import { type JobStats, jobStatsGlobalSlug } from '../../config/global.js' +import { defaultAfterSchedule } from './defaultAfterSchedule.js' +import { defaultBeforeSchedule } from './defaultBeforeSchedule.js' +import { getQueuesWithSchedules } from './getQueuesWithSchedules.js' + +export type HandleSchedulesResult = { + errored: Queueable[] + queued: Queueable[] + skipped: Queueable[] +} + +/** + * On vercel, we cannot auto-schedule jobs using a Cron - instead, we'll use this same endpoint that can + * also be called from Vercel Cron for auto-running jobs. + * + * The benefit of doing it like this instead of a separate endpoint is that we can run jobs immediately + * after they are scheduled + */ +export async function handleSchedules({ + queue, + req, +}: { + /** + * If you want to only schedule jobs that are set to schedule in a specific queue, set this to the queue name. + * + * @default all jobs for all queues will be scheduled. + */ + queue?: string + req: PayloadRequest +}): Promise { + const jobsConfig = req.payload.config.jobs + const queuesWithSchedules = getQueuesWithSchedules({ + jobsConfig, + }) + + const stats: JobStats = await req.payload.db.findGlobal({ + slug: jobStatsGlobalSlug, + req, + }) + + /** + * Almost last step! Tasks and Workflows added here just need to be constraint-checked (e.g max. 1 running task etc.), + * before we can queue them + */ + const queueables: Queueable[] = [] + + // Need to know when that particular job was last scheduled in that particular queue + + for (const [queueName, { schedules }] of Object.entries(queuesWithSchedules)) { + if (queue && queueName !== queue) { + // If a queue is specified, only schedule jobs for that queue + continue + } + for (const schedulable of schedules) { + const queuable = checkQueueableTimeConstraints({ + queue: queueName, + scheduleConfig: schedulable.scheduleConfig, + stats, + taskConfig: schedulable.taskConfig, + workflowConfig: schedulable.workflowConfig, + }) + if (queuable) { + queueables.push(queuable) + } + } + } + + const queued: Queueable[] = [] + const skipped: Queueable[] = [] + const errored: Queueable[] = [] + + /** + * Now queue, but check for constraints (= beforeSchedule) first. + * Default constraint (= defaultBeforeSchedule): max. 1 running / scheduled task or workflow per queue + */ + for (const queueable of queueables) { + const { status } = await scheduleQueueable({ + queueable, + req, + stats, + }) + switch (status) { + case 'error': + errored.push(queueable) + break + case 'skipped': + skipped.push(queueable) + break + case 'success': + queued.push(queueable) + break + } + } + return { + errored, + queued, + skipped, + } +} + +export function checkQueueableTimeConstraints({ + queue, + scheduleConfig, + stats, + taskConfig, + workflowConfig, +}: { + queue: string + scheduleConfig: ScheduleConfig + stats: JobStats + taskConfig?: TaskConfig + workflowConfig?: WorkflowConfig +}): false | Queueable { + const queueScheduleStats = stats?.stats?.scheduledRuns?.queues?.[queue] + + const lastScheduledRun = taskConfig + ? queueScheduleStats?.tasks?.[taskConfig.slug]?.lastScheduledRun + : queueScheduleStats?.workflows?.[workflowConfig?.slug ?? '']?.lastScheduledRun + + const nextRun = new Cron(scheduleConfig.cron).nextRun(lastScheduledRun ?? undefined) + + if (!nextRun) { + return false + } + return { + scheduleConfig, + taskConfig, + waitUntil: nextRun, + workflowConfig, + } +} + +export async function scheduleQueueable({ + queueable, + req, + stats, +}: { + queueable: Queueable + req: PayloadRequest + stats: JobStats +}): Promise<{ + job?: Job + status: 'error' | 'skipped' | 'success' +}> { + if (!queueable.taskConfig && !queueable.workflowConfig) { + return { + status: 'error', + } + } + + const beforeScheduleFn = queueable.scheduleConfig.hooks?.beforeSchedule + const afterScheduleFN = queueable.scheduleConfig.hooks?.afterSchedule + + try { + const beforeScheduleResult: Awaited> = await ( + beforeScheduleFn ?? defaultBeforeSchedule + )({ + // @ts-expect-error we know defaultBeforeSchedule will never call itself => pass null + defaultBeforeSchedule: beforeScheduleFn ? defaultBeforeSchedule : null, + jobStats: stats, + queueable, + req, + }) + + if (!beforeScheduleResult.shouldSchedule) { + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + jobStats: stats, + queueable, + req, + status: 'skipped', + }) + return { + status: 'skipped', + } + } + + const job = (await req.payload.jobs.queue({ + input: beforeScheduleResult.input ?? {}, + meta: { + scheduled: true, + }, + queue: queueable.scheduleConfig.queue, + req, + task: queueable?.taskConfig?.slug, + waitUntil: beforeScheduleResult.waitUntil, + workflow: queueable.workflowConfig?.slug, + } as Parameters[0])) as unknown as Job + + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + job, + jobStats: stats, + queueable, + req, + status: 'success', + }) + return { + status: 'success', + } + } catch (error) { + await (afterScheduleFN ?? defaultAfterSchedule)({ + // @ts-expect-error we know defaultAfterchedule will never call itself => pass null + defaultAfterSchedule: afterScheduleFN ? defaultAfterSchedule : null, + error: error as Error, + jobStats: stats, + queueable, + req, + status: 'error', + }) + return { + status: 'error', + } + } +} diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index c62610346..953059478 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -2,12 +2,14 @@ import type { Job } from '../../../index.js' import type { PayloadRequest, Sort, Where } from '../../../types/index.js' import type { WorkflowJSON } from '../../config/types/workflowJSONTypes.js' import type { WorkflowConfig, WorkflowHandler } from '../../config/types/workflowTypes.js' +import type { RunJobsSilent } from '../../localAPI.js' import type { RunJobResult } from './runJob/index.js' import { Forbidden } from '../../../errors/Forbidden.js' import { isolateObjectProperty } from '../../../utilities/isolateObjectProperty.js' -import { jobsCollectionSlug } from '../../config/index.js' +import { jobsCollectionSlug } from '../../config/collection.js' import { JobCancelledError } from '../../errors/index.js' +import { getCurrentDate } from '../../utilities/getCurrentDate.js' import { updateJob, updateJobs } from '../../utilities/updateJob.js' import { getUpdateJobFunction } from './runJob/getUpdateJobFunction.js' import { importHandlerPath } from './runJob/importHandlerPath.js' @@ -53,6 +55,15 @@ export type RunJobsArgs = { * If you want to run them in sequence, set this to true. */ sequential?: boolean + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent where?: Where } @@ -84,6 +95,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { }, }, sequential, + silent = false, where: whereFromProps, } = args @@ -119,7 +131,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { }, { waitUntil: { - less_than: new Date().toISOString(), + less_than: getCurrentDate().toISOString(), }, }, ], @@ -219,11 +231,13 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } } - payload.logger.info({ - msg: `Running ${jobs.length} jobs.`, - new: newJobs?.length, - retrying: existingJobs?.length, - }) + if (!silent || (typeof silent === 'object' && !silent.info)) { + payload.logger.info({ + msg: `Running ${jobs.length} jobs.`, + new: newJobs?.length, + retrying: existingJobs?.length, + }) + } const successfullyCompletedJobs: (number | string)[] = [] @@ -277,7 +291,9 @@ export const runJobs = async (args: RunJobsArgs): Promise => { if (!workflowHandler) { const jobLabel = job.workflowSlug || `Task: ${job.taskSlug}` const errorMessage = `Can't find runner while importing with the path ${workflowConfig.handler} in job type ${jobLabel}.` - payload.logger.error(errorMessage) + if (!silent || (typeof silent === 'object' && !silent.error)) { + payload.logger.error(errorMessage) + } await updateJob({ error: { @@ -300,6 +316,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { const result = await runJob({ job, req: jobReq, + silent, updateJob, workflowConfig, workflowHandler, @@ -314,6 +331,7 @@ export const runJobs = async (args: RunJobsArgs): Promise => { const result = await runJSONJob({ job, req: jobReq, + silent, updateJob, workflowConfig, workflowHandler, @@ -370,10 +388,12 @@ export const runJobs = async (args: RunJobsArgs): Promise => { }) } } catch (err) { - payload.logger.error({ - err, - msg: `Failed to delete jobs ${successfullyCompletedJobs.join(', ')} on complete`, - }) + if (!silent || (typeof silent === 'object' && !silent.error)) { + payload.logger.error({ + err, + msg: `Failed to delete jobs ${successfullyCompletedJobs.join(', ')} on complete`, + }) + } } } diff --git a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts index 66b015672..87f599590 100644 --- a/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJSONJob/index.ts @@ -2,16 +2,27 @@ import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' import type { WorkflowJSON, WorkflowStep } from '../../../config/types/workflowJSONTypes.js' import type { WorkflowConfig } from '../../../config/types/workflowTypes.js' +import type { RunJobsSilent } from '../../../localAPI.js' import type { UpdateJobFunction } from '../runJob/getUpdateJobFunction.js' import type { JobRunStatus } from '../runJob/index.js' import { handleWorkflowError } from '../../../errors/handleWorkflowError.js' import { WorkflowError } from '../../../errors/index.js' +import { getCurrentDate } from '../../../utilities/getCurrentDate.js' import { getRunTaskFunction } from '../runJob/getRunTaskFunction.js' type Args = { job: Job req: PayloadRequest + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent updateJob: UpdateJobFunction workflowConfig: WorkflowConfig workflowHandler: WorkflowJSON @@ -24,6 +35,7 @@ export type RunJSONJobResult = { export const runJSONJob = async ({ job, req, + silent = false, updateJob, workflowConfig, workflowHandler, @@ -79,6 +91,7 @@ export const runJSONJob = async ({ : 'An unhandled error occurred', workflowConfig, }), + silent, req, updateJob, @@ -111,7 +124,7 @@ export const runJSONJob = async ({ if (workflowCompleted) { await updateJob({ - completedAt: new Date().toISOString(), + completedAt: getCurrentDate().toISOString(), processing: false, totalTried: (job.totalTried ?? 0) + 1, }) diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 868ac5602..aa9f17156 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -20,6 +20,7 @@ import type { import type { UpdateJobFunction } from './getUpdateJobFunction.js' import { TaskError } from '../../../errors/index.js' +import { getCurrentDate } from '../../../utilities/getCurrentDate.js' import { getTaskHandlerFromConfig } from './importHandlerPath.js' const ObjectId = (ObjectIdImport.default || @@ -54,7 +55,7 @@ export const getRunTaskFunction = ( task, }: Parameters[1] & Parameters>[1], ) => { - const executedAt = new Date() + const executedAt = getCurrentDate() let taskConfig: TaskConfig | undefined if (!isInline) { @@ -186,7 +187,7 @@ export const getRunTaskFunction = ( ;(job.log ??= []).push({ id: new ObjectId().toHexString(), - completedAt: new Date().toISOString(), + completedAt: getCurrentDate().toISOString(), executedAt: executedAt.toISOString(), input, output, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJob/index.ts index fe8f6256e..c92599e29 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/index.ts @@ -1,16 +1,27 @@ import type { Job } from '../../../../index.js' import type { PayloadRequest } from '../../../../types/index.js' import type { WorkflowConfig, WorkflowHandler } from '../../../config/types/workflowTypes.js' +import type { RunJobsSilent } from '../../../localAPI.js' import type { UpdateJobFunction } from './getUpdateJobFunction.js' import { handleTaskError } from '../../../errors/handleTaskError.js' import { handleWorkflowError } from '../../../errors/handleWorkflowError.js' import { JobCancelledError, TaskError, WorkflowError } from '../../../errors/index.js' +import { getCurrentDate } from '../../../utilities/getCurrentDate.js' import { getRunTaskFunction } from './getRunTaskFunction.js' type Args = { job: Job req: PayloadRequest + /** + * If set to true, the job system will not log any output to the console (for both info and error logs). + * Can be an option for more granular control over logging. + * + * This will not automatically affect user-configured logs (e.g. if you call `console.log` or `payload.logger.info` in your job code). + * + * @default false + */ + silent?: RunJobsSilent updateJob: UpdateJobFunction workflowConfig: WorkflowConfig workflowHandler: WorkflowHandler @@ -25,6 +36,7 @@ export type RunJobResult = { export const runJob = async ({ job, req, + silent, updateJob, workflowConfig, workflowHandler, @@ -45,6 +57,7 @@ export const runJob = async ({ const { hasFinalError } = await handleTaskError({ error, req, + silent, updateJob, }) @@ -66,6 +79,7 @@ export const runJob = async ({ workflowConfig, }), req, + silent, updateJob, }) @@ -76,7 +90,7 @@ export const runJob = async ({ // Workflow has completed successfully await updateJob({ - completedAt: new Date().toISOString(), + completedAt: getCurrentDate().toISOString(), log: job.log, processing: false, totalTried: (job.totalTried ?? 0) + 1, diff --git a/packages/payload/src/queues/restEndpointRun.ts b/packages/payload/src/queues/restEndpointRun.ts deleted file mode 100644 index 14c425a94..000000000 --- a/packages/payload/src/queues/restEndpointRun.ts +++ /dev/null @@ -1,91 +0,0 @@ -import type { Endpoint, SanitizedConfig } from '../config/types.js' - -import { runJobs, type RunJobsArgs } from './operations/runJobs/index.js' - -const configHasJobs = (config: SanitizedConfig): boolean => { - return Boolean(config.jobs?.tasks?.length || config.jobs?.workflows?.length) -} - -/** - * /api/payload-jobs/run endpoint - */ -export const runJobsEndpoint: Endpoint = { - handler: async (req) => { - if (!configHasJobs(req.payload.config)) { - return Response.json( - { - message: 'No jobs to run.', - }, - { status: 200 }, - ) - } - - const accessFn = req.payload.config.jobs?.access?.run ?? (() => true) - - const hasAccess = await accessFn({ req }) - - if (!hasAccess) { - return Response.json( - { - message: req.i18n.t('error:unauthorized'), - }, - { status: 401 }, - ) - } - - const { allQueues, limit, queue } = req.query as { - allQueues?: boolean - limit?: number - queue?: string - } - - const runJobsArgs: RunJobsArgs = { - queue, - req, - // We are checking access above, so we can override it here - overrideAccess: true, - } - - if (typeof limit !== 'undefined') { - runJobsArgs.limit = Number(limit) - } - - if (allQueues && !(typeof allQueues === 'string' && allQueues === 'false')) { - runJobsArgs.allQueues = true - } - - let noJobsRemaining = false - let remainingJobsFromQueried = 0 - try { - const result = await runJobs(runJobsArgs) - noJobsRemaining = !!result.noJobsRemaining - remainingJobsFromQueried = result.remainingJobsFromQueried - } catch (err) { - req.payload.logger.error({ - err, - msg: 'There was an error running jobs:', - queue: runJobsArgs.queue, - }) - - return Response.json( - { - message: req.i18n.t('error:unknown'), - noJobsRemaining: true, - remainingJobsFromQueried, - }, - { status: 500 }, - ) - } - - return Response.json( - { - message: req.i18n.t('general:success'), - noJobsRemaining, - remainingJobsFromQueried, - }, - { status: 200 }, - ) - }, - method: 'get', - path: '/run', -} diff --git a/packages/payload/src/queues/utilities/getCurrentDate.ts b/packages/payload/src/queues/utilities/getCurrentDate.ts new file mode 100644 index 000000000..6e0d67af3 --- /dev/null +++ b/packages/payload/src/queues/utilities/getCurrentDate.ts @@ -0,0 +1,21 @@ +/** + * Globals that are used by our integration tests to modify the behavior of the job system during runtime. + * This is useful to avoid having to wait for the cron jobs to run, or to pause auto-running jobs. + */ +export const _internal_jobSystemGlobals = { + getCurrentDate: () => { + return new Date() + }, + shouldAutoRun: true, + shouldAutoSchedule: true, +} + +export function _internal_resetJobSystemGlobals() { + _internal_jobSystemGlobals.getCurrentDate = () => new Date() + _internal_jobSystemGlobals.shouldAutoRun = true + _internal_jobSystemGlobals.shouldAutoSchedule = true +} + +export const getCurrentDate: () => Date = () => { + return _internal_jobSystemGlobals.getCurrentDate() +} diff --git a/packages/payload/src/queues/utilities/updateJob.ts b/packages/payload/src/queues/utilities/updateJob.ts index 6ce4479ea..a8a4ff69e 100644 --- a/packages/payload/src/queues/utilities/updateJob.ts +++ b/packages/payload/src/queues/utilities/updateJob.ts @@ -3,7 +3,7 @@ import type { UpdateJobsArgs } from '../../database/types.js' import type { Job } from '../../index.js' import type { PayloadRequest, Sort, Where } from '../../types/index.js' -import { jobAfterRead, jobsCollectionSlug } from '../config/index.js' +import { jobAfterRead, jobsCollectionSlug } from '../config/collection.js' type BaseArgs = { data: Partial diff --git a/packages/payload/src/versions/deleteScheduledPublishJobs.ts b/packages/payload/src/versions/deleteScheduledPublishJobs.ts index 4020ad4fd..6ce4199f8 100644 --- a/packages/payload/src/versions/deleteScheduledPublishJobs.ts +++ b/packages/payload/src/versions/deleteScheduledPublishJobs.ts @@ -1,7 +1,7 @@ import type { PayloadRequest } from '../types/index.js' import { type Payload } from '../index.js' -import { jobsCollectionSlug } from '../queues/config/index.js' +import { jobsCollectionSlug } from '../queues/config/collection.js' type Args = { id?: number | string diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 9f7e558f3..8012d5e44 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -823,8 +823,8 @@ importers: specifier: 2.12.1 version: 2.12.1 croner: - specifier: 9.0.0 - version: 9.0.0 + specifier: 9.1.0 + version: 9.1.0 dataloader: specifier: 2.2.3 version: 2.2.3 @@ -7401,8 +7401,8 @@ packages: engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} hasBin: true - croner@9.0.0: - resolution: {integrity: sha512-onMB0OkDjkXunhdW9htFjEhqrD54+M94i6ackoUkjHKbRnXdyEyKRelp4nJ1kAz32+s27jP1FsebpJCVl0BsvA==} + croner@9.1.0: + resolution: {integrity: sha512-p9nwwR4qyT5W996vBZhdvBCnMhicY5ytZkR4D1Xj0wuTDEiMnjwR57Q3RXYY/s0EpX6Ay3vgIcfaR+ewGHsi+g==} engines: {node: '>=18.0'} cross-env@7.0.3: @@ -18799,7 +18799,7 @@ snapshots: - supports-color - ts-node - croner@9.0.0: {} + croner@9.1.0: {} cross-env@7.0.3: dependencies: diff --git a/test/helpers/initPayloadInt.ts b/test/helpers/initPayloadInt.ts index 2801b5985..ffd2584ef 100644 --- a/test/helpers/initPayloadInt.ts +++ b/test/helpers/initPayloadInt.ts @@ -13,15 +13,16 @@ export async function initPayloadInt { const testSuiteName = testSuiteNameOverride ?? path.basename(dirname) - await runInit(testSuiteName, false, true) - console.log('importing config', path.resolve(dirname, 'config.ts')) - const { default: config } = await import(path.resolve(dirname, 'config.ts')) + await runInit(testSuiteName, false, true, configFile) + console.log('importing config', path.resolve(dirname, configFile ?? 'config.ts')) + const { default: config } = await import(path.resolve(dirname, configFile ?? 'config.ts')) if (initializePayload === false) { return { config: await config } as any diff --git a/test/initDevAndTest.ts b/test/initDevAndTest.ts index 791fea9ef..2d1039728 100644 --- a/test/initDevAndTest.ts +++ b/test/initDevAndTest.ts @@ -17,6 +17,7 @@ export async function initDevAndTest( testSuiteArg: string, writeDBAdapter: string, skipGenImportMap: string, + configFile?: string, ): Promise { const importMapPath: string = path.resolve( getNextRootDir(testSuiteArg).rootDir, @@ -44,7 +45,7 @@ export async function initDevAndTest( const testDir = path.resolve(dirname, testSuiteArg) console.log('Generating import map for config:', testDir) - const configUrl = pathToFileURL(path.resolve(testDir, 'config.ts')).href + const configUrl = pathToFileURL(path.resolve(testDir, configFile ?? 'config.ts')).href const config: SanitizedConfig = await (await import(configUrl)).default process.env.ROOT_DIR = getNextRootDir(testSuiteArg).rootDir diff --git a/test/queues/config.schedules-autocron.ts b/test/queues/config.schedules-autocron.ts new file mode 100644 index 000000000..7c5a06326 --- /dev/null +++ b/test/queues/config.schedules-autocron.ts @@ -0,0 +1,22 @@ +/* eslint-disable no-restricted-exports */ +import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js' +import { getConfig } from './getConfig.js' +import { EverySecondMax2Task } from './tasks/EverySecondMax2Task.js' +import { EverySecondTask } from './tasks/EverySecondTask.js' + +const config = getConfig() + +export default buildConfigWithDefaults({ + ...config, + jobs: { + ...config.jobs, + tasks: [...(config?.jobs?.tasks || []), EverySecondTask, EverySecondMax2Task], + autoRun: [ + { + // @ts-expect-error not undefined + ...config.jobs.autoRun[0], + disableScheduling: false, + }, + ], + }, +}) diff --git a/test/queues/config.schedules.ts b/test/queues/config.schedules.ts new file mode 100644 index 000000000..a5435e995 --- /dev/null +++ b/test/queues/config.schedules.ts @@ -0,0 +1,22 @@ +/* eslint-disable no-restricted-exports */ +import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js' +import { getConfig } from './getConfig.js' +import { EverySecondMax2Task } from './tasks/EverySecondMax2Task.js' +import { EverySecondTask } from './tasks/EverySecondTask.js' + +const config = getConfig() + +export default buildConfigWithDefaults({ + ...config, + jobs: { + ...config.jobs, + tasks: [...(config?.jobs?.tasks || []), EverySecondTask, EverySecondMax2Task], + autoRun: [ + { + // @ts-expect-error not undefined + ...config.jobs.autoRun[0], + disableScheduling: true, + }, + ], + }, +}) diff --git a/test/queues/config.ts b/test/queues/config.ts index 331d89fd1..6c16a4bd2 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -1,416 +1,4 @@ -import type { TaskConfig } from 'payload' - -import { lexicalEditor } from '@payloadcms/richtext-lexical' -import { fileURLToPath } from 'node:url' -import path from 'path' - import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js' -import { devUser } from '../credentials.js' -import { updatePostStep1, updatePostStep2 } from './runners/updatePost.js' -import { seed } from './seed.js' -import { externalWorkflow } from './workflows/externalWorkflow.js' -import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js' -import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' -import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' -import { longRunningWorkflow } from './workflows/longRunning.js' -import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js' -import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js' -import { retries0Workflow } from './workflows/retries0.js' -import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js' -import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js' -import { retriesTestWorkflow } from './workflows/retriesTest.js' -import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js' -import { subTaskWorkflow } from './workflows/subTask.js' -import { subTaskFailsWorkflow } from './workflows/subTaskFails.js' -import { updatePostWorkflow } from './workflows/updatePost.js' -import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js' -import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js' -import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js' -import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js' +import { getConfig } from './getConfig.js' -const filename = fileURLToPath(import.meta.url) -const dirname = path.dirname(filename) - -// eslint-disable-next-line no-restricted-exports -export default buildConfigWithDefaults({ - collections: [ - { - slug: 'posts', - admin: { - useAsTitle: 'title', - }, - hooks: { - afterChange: [ - async ({ req, doc, context }) => { - await req.payload.jobs.queue({ - workflow: context.useJSONWorkflow ? 'updatePostJSONWorkflow' : 'updatePost', - input: { - post: doc.id, - message: 'hello', - }, - req, - }) - }, - ], - }, - fields: [ - { - name: 'title', - type: 'text', - required: true, - }, - { - name: 'content', - type: 'richText', - }, - { - name: 'jobStep1Ran', - type: 'text', - }, - { - name: 'jobStep2Ran', - type: 'text', - }, - ], - }, - { - slug: 'simple', - admin: { - useAsTitle: 'title', - }, - fields: [ - { - name: 'title', - type: 'text', - required: true, - }, - ], - }, - ], - admin: { - importMap: { - baseDir: path.resolve(dirname), - }, - autoLogin: { - prefillOnly: true, - email: devUser.email, - password: devUser.password, - }, - }, - jobs: { - autoRun: [ - { - // Every second - cron: '* * * * * *', - limit: 100, - queue: 'autorunSecond', // name of the queue - }, - // add as many cron jobs as you want - ], - shouldAutoRun: () => true, - jobsCollectionOverrides: ({ defaultJobsCollection }) => { - return { - ...defaultJobsCollection, - admin: { - ...(defaultJobsCollection?.admin || {}), - hidden: false, - }, - } - }, - processingOrder: { - queues: { - lifo: '-createdAt', - }, - }, - tasks: [ - { - retries: 2, - slug: 'UpdatePost', - interfaceName: 'MyUpdatePostType', - inputSchema: [ - { - name: 'post', - type: 'relationship', - relationTo: 'posts', - maxDepth: 0, - required: true, - }, - { - name: 'message', - type: 'text', - required: true, - }, - ], - outputSchema: [ - { - name: 'messageTwice', - type: 'text', - required: true, - }, - ], - handler: updatePostStep1, - } as TaskConfig<'UpdatePost'>, - { - retries: 2, - slug: 'UpdatePostStep2', - inputSchema: [ - { - name: 'post', - type: 'relationship', - relationTo: 'posts', - maxDepth: 0, - required: true, - }, - { - name: 'messageTwice', - type: 'text', - required: true, - }, - ], - handler: updatePostStep2, - } as TaskConfig<'UpdatePostStep2'>, - { - retries: 3, - slug: 'CreateSimple', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - { - name: 'shouldFail', - type: 'checkbox', - }, - ], - outputSchema: [ - { - name: 'simpleID', - type: 'text', - required: true, - }, - ], - handler: async ({ input, req }) => { - if (input.shouldFail) { - throw new Error('Failed on purpose') - } - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - } as TaskConfig<'CreateSimple'>, - { - slug: 'CreateSimpleRetriesUndefined', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - { - name: 'shouldFail', - type: 'checkbox', - }, - ], - outputSchema: [ - { - name: 'simpleID', - type: 'text', - required: true, - }, - ], - handler: async ({ input, req }) => { - if (input.shouldFail) { - throw new Error('Failed on purpose') - } - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - } as TaskConfig<'CreateSimpleRetriesUndefined'>, - { - slug: 'CreateSimpleRetries0', - retries: 0, - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - { - name: 'shouldFail', - type: 'checkbox', - }, - ], - outputSchema: [ - { - name: 'simpleID', - type: 'text', - required: true, - }, - ], - handler: async ({ input, req }) => { - if (input.shouldFail) { - throw new Error('Failed on purpose') - } - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - } as TaskConfig<'CreateSimpleRetries0'>, - { - retries: 2, - slug: 'CreateSimpleWithDuplicateMessage', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - { - name: 'shouldFail', - type: 'checkbox', - }, - ], - outputSchema: [ - { - name: 'simpleID', - type: 'text', - required: true, - }, - ], - handler: async ({ input, req }) => { - if (input.shouldFail) { - throw new Error('Failed on purpose') - } - const newSimple = await req.payload.create({ - collection: 'simple', - req, - data: { - title: input.message + input.message, - }, - }) - return { - output: { - simpleID: newSimple.id, - }, - } - }, - } as TaskConfig<'CreateSimpleWithDuplicateMessage'>, - { - retries: 2, - slug: 'ExternalTask', - inputSchema: [ - { - name: 'message', - type: 'text', - required: true, - }, - ], - outputSchema: [ - { - name: 'simpleID', - type: 'text', - required: true, - }, - ], - handler: path.resolve(dirname, 'runners/externalTask.ts') + '#externalTaskHandler', - } as TaskConfig<'ExternalTask'>, - { - retries: 0, - slug: 'ThrowError', - inputSchema: [], - outputSchema: [], - handler: () => { - throw new Error('failed') - }, - } as TaskConfig<'ThrowError'>, - { - retries: 0, - slug: 'ReturnError', - inputSchema: [], - outputSchema: [], - handler: () => { - return { - state: 'failed', - } - }, - } as TaskConfig<'ReturnError'>, - { - retries: 0, - slug: 'ReturnCustomError', - inputSchema: [ - { - name: 'errorMessage', - type: 'text', - required: true, - }, - ], - outputSchema: [], - handler: ({ input }) => { - return { - state: 'failed', - errorMessage: input.errorMessage, - } - }, - } as TaskConfig<'ReturnCustomError'>, - ], - workflows: [ - updatePostWorkflow, - updatePostJSONWorkflow, - retriesTestWorkflow, - retriesRollbackTestWorkflow, - retriesWorkflowLevelTestWorkflow, - noRetriesSetWorkflow, - retries0Workflow, - workflowAndTasksRetriesUndefinedWorkflow, - workflowRetries2TasksRetriesUndefinedWorkflow, - workflowRetries2TasksRetries0Workflow, - inlineTaskTestWorkflow, - failsImmediatelyWorkflow, - inlineTaskTestDelayedWorkflow, - externalWorkflow, - retriesBackoffTestWorkflow, - subTaskWorkflow, - subTaskFailsWorkflow, - longRunningWorkflow, - parallelTaskWorkflow, - ], - }, - editor: lexicalEditor(), - onInit: async (payload) => { - if (process.env.SEED_IN_CONFIG_ONINIT !== 'false') { - await seed(payload) - } - }, - typescript: { - outputFile: path.resolve(dirname, 'payload-types.ts'), - }, -}) +export default buildConfigWithDefaults(getConfig()) diff --git a/test/queues/getConfig.ts b/test/queues/getConfig.ts new file mode 100644 index 000000000..f1c8126e3 --- /dev/null +++ b/test/queues/getConfig.ts @@ -0,0 +1,176 @@ +import type { Config } from 'payload' + +import { lexicalEditor } from '@payloadcms/richtext-lexical' +import { fileURLToPath } from 'node:url' +import path from 'path' + +import { devUser } from '../credentials.js' +import { seed } from './seed.js' +import { CreateSimpleRetries0Task } from './tasks/CreateSimpleRetries0Task.js' +import { CreateSimpleRetriesUndefinedTask } from './tasks/CreateSimpleRetriesUndefinedTask.js' +import { CreateSimpleTask } from './tasks/CreateSimpleTask.js' +import { CreateSimpleWithDuplicateMessageTask } from './tasks/CreateSimpleWithDuplicateMessageTask.js' +import { ExternalTask } from './tasks/ExternalTask.js' +import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js' +import { ReturnErrorTask } from './tasks/ReturnErrorTask.js' +import { ThrowErrorTask } from './tasks/ThrowErrorTask.js' +import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js' +import { UpdatePostTask } from './tasks/UpdatePostTask.js' +import { externalWorkflow } from './workflows/externalWorkflow.js' +import { failsImmediatelyWorkflow } from './workflows/failsImmediately.js' +import { inlineTaskTestWorkflow } from './workflows/inlineTaskTest.js' +import { inlineTaskTestDelayedWorkflow } from './workflows/inlineTaskTestDelayed.js' +import { longRunningWorkflow } from './workflows/longRunning.js' +import { noRetriesSetWorkflow } from './workflows/noRetriesSet.js' +import { parallelTaskWorkflow } from './workflows/parallelTaskWorkflow.js' +import { retries0Workflow } from './workflows/retries0.js' +import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js' +import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js' +import { retriesTestWorkflow } from './workflows/retriesTest.js' +import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js' +import { subTaskWorkflow } from './workflows/subTask.js' +import { subTaskFailsWorkflow } from './workflows/subTaskFails.js' +import { updatePostWorkflow } from './workflows/updatePost.js' +import { updatePostJSONWorkflow } from './workflows/updatePostJSON.js' +import { workflowAndTasksRetriesUndefinedWorkflow } from './workflows/workflowAndTasksRetriesUndefined.js' +import { workflowRetries2TasksRetries0Workflow } from './workflows/workflowRetries2TasksRetries0.js' +import { workflowRetries2TasksRetriesUndefinedWorkflow } from './workflows/workflowRetries2TasksRetriesUndefined.js' + +const dirname = path.dirname(fileURLToPath(import.meta.url)) + +// Needs to be a function to prevent object reference issues due to duplicative configs +export const getConfig: () => Partial = () => ({ + collections: [ + { + slug: 'posts', + admin: { + useAsTitle: 'title', + }, + hooks: { + afterChange: [ + async ({ req, doc, context }) => { + await req.payload.jobs.queue({ + workflow: context.useJSONWorkflow ? 'updatePostJSONWorkflow' : 'updatePost', + input: { + post: doc.id, + message: 'hello', + }, + req, + }) + }, + ], + }, + fields: [ + { + name: 'title', + type: 'text', + required: true, + }, + { + name: 'content', + type: 'richText', + }, + { + name: 'jobStep1Ran', + type: 'text', + }, + { + name: 'jobStep2Ran', + type: 'text', + }, + ], + }, + { + slug: 'simple', + admin: { + useAsTitle: 'title', + }, + fields: [ + { + name: 'title', + type: 'text', + required: true, + }, + ], + }, + ], + admin: { + importMap: { + baseDir: path.resolve(dirname), + }, + autoLogin: { + prefillOnly: true, + email: devUser.email, + password: devUser.password, + }, + }, + jobs: { + autoRun: [ + { + silent: true, + // Every second + cron: '* * * * * *', + limit: 100, + queue: 'autorunSecond', + }, + // add as many cron jobs as you want + ], + shouldAutoRun: () => true, + jobsCollectionOverrides: ({ defaultJobsCollection }) => { + return { + ...defaultJobsCollection, + admin: { + ...(defaultJobsCollection?.admin || {}), + hidden: false, + }, + } + }, + processingOrder: { + queues: { + lifo: '-createdAt', + }, + }, + tasks: [ + UpdatePostTask, + UpdatePostStep2Task, + CreateSimpleTask, + CreateSimpleRetriesUndefinedTask, + CreateSimpleRetries0Task, + CreateSimpleWithDuplicateMessageTask, + ExternalTask, + ThrowErrorTask, + ReturnErrorTask, + ReturnCustomErrorTask, + ], + workflows: [ + updatePostWorkflow, + updatePostJSONWorkflow, + retriesTestWorkflow, + retriesRollbackTestWorkflow, + retriesWorkflowLevelTestWorkflow, + noRetriesSetWorkflow, + retries0Workflow, + workflowAndTasksRetriesUndefinedWorkflow, + workflowRetries2TasksRetriesUndefinedWorkflow, + workflowRetries2TasksRetries0Workflow, + inlineTaskTestWorkflow, + failsImmediatelyWorkflow, + inlineTaskTestDelayedWorkflow, + externalWorkflow, + retriesBackoffTestWorkflow, + subTaskWorkflow, + subTaskFailsWorkflow, + longRunningWorkflow, + parallelTaskWorkflow, + ], + }, + editor: lexicalEditor(), + onInit: async (payload) => { + if (process.env.SEED_IN_CONFIG_ONINIT !== 'false') { + await seed(payload) + } + }, + typescript: { + outputFile: path.resolve(dirname, 'payload-types.ts'), + }, +}) diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 0bb23d474..e5b55963b 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -1,7 +1,13 @@ -import type { JobTaskStatus, Payload, SanitizedConfig } from 'payload' - import path from 'path' +import { + _internal_jobSystemGlobals, + _internal_resetJobSystemGlobals, + type JobTaskStatus, + type Payload, + type SanitizedConfig, +} from 'payload' import { migrateCLI } from 'payload' +import { wait } from 'payload/shared' import { fileURLToPath } from 'url' import type { NextRESTClient } from '../helpers/NextRESTClient.js' @@ -9,6 +15,7 @@ import type { NextRESTClient } from '../helpers/NextRESTClient.js' import { devUser } from '../credentials.js' import { initPayloadInt } from '../helpers/initPayloadInt.js' import { clearAndSeedEverything } from './seed.js' +import { waitUntilAutorunIsDone } from './utilities.js' let payload: Payload let restClient: NextRESTClient @@ -25,10 +32,25 @@ describe('Queues', () => { }) afterAll(async () => { + // Ensure no new crons are scheduled + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false + // Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues + // Cron function runs may persist after a test has finished + await wait(3000) + // Now we can destroy the payload instance await payload.destroy() + _internal_resetJobSystemGlobals() + }) + + afterEach(() => { + _internal_resetJobSystemGlobals() }) beforeEach(async () => { + // Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false await clearAndSeedEverything(payload) const data = await restClient .POST('/users/login', { @@ -43,10 +65,12 @@ describe('Queues', () => { token = data.token } payload.config.jobs.deleteJobOnComplete = true + _internal_jobSystemGlobals.shouldAutoRun = true + _internal_jobSystemGlobals.shouldAutoSchedule = true }) it('will run access control on jobs runner', async () => { - const response = await restClient.GET('/payload-jobs/run', { + const response = await restClient.GET('/payload-jobs/run?silent=true', { headers: { // Authorization: `JWT ${token}`, }, @@ -55,7 +79,7 @@ describe('Queues', () => { }) it('will return 200 from jobs runner', async () => { - const response = await restClient.GET('/payload-jobs/run', { + const response = await restClient.GET('/payload-jobs/run?silent=true', { headers: { Authorization: `JWT ${token}`, }, @@ -109,7 +133,7 @@ describe('Queues', () => { expect(retrievedPost.jobStep1Ran).toBeFalsy() expect(retrievedPost.jobStep2Ran).toBeFalsy() - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const postAfterJobs = await payload.findByID({ collection: 'posts', @@ -139,7 +163,7 @@ describe('Queues', () => { expect(retrievedPost.jobStep1Ran).toBeFalsy() expect(retrievedPost.jobStep2Ran).toBeFalsy() - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const postAfterJobs = await payload.findByID({ collection: 'posts', @@ -163,7 +187,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -198,7 +222,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -221,7 +245,7 @@ describe('Queues', () => { expect(jobAfterRun.input.amountRetried).toBe(2) }) - it('ensure workflows dont limit retries if no retries property is sett', async () => { + it('ensure workflows dont limit retries if no retries property is set', async () => { payload.config.jobs.deleteJobOnComplete = false const job = await payload.jobs.queue({ workflow: 'workflowNoRetriesSet', @@ -233,7 +257,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -268,7 +292,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -303,7 +327,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -338,7 +362,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -373,7 +397,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -409,7 +433,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({silent: true}) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -453,7 +477,7 @@ describe('Queues', () => { !firstGotNoJobs || new Date().getTime() - firstGotNoJobs.getTime() < 3000 ) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { if (hasJobsRemaining) { @@ -537,6 +561,7 @@ describe('Queues', () => { await payload.jobs.run({ sequential: true, + silent: true, }) const allSimples = await payload.find({ @@ -569,6 +594,7 @@ describe('Queues', () => { await payload.jobs.run({ sequential: true, + silent: true, processingOrder: '-createdAt', }) @@ -604,6 +630,7 @@ describe('Queues', () => { await payload.jobs.run({ sequential: true, + silent: true, queue: 'lifo', }) @@ -626,7 +653,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -637,29 +664,6 @@ describe('Queues', () => { expect(allSimples.docs[0]?.title).toBe('hello!') }) - it('can create and autorun jobs', async () => { - await payload.jobs.queue({ - workflow: 'inlineTaskTest', - queue: 'autorunSecond', - input: { - message: 'hello!', - }, - }) - - // Do not call payload.jobs.run() - - // Autorun runs every second - so should definitely be done if we wait 2 seconds - await new Promise((resolve) => setTimeout(resolve, 2000)) - - const allSimples = await payload.find({ - collection: 'simple', - limit: 100, - }) - - expect(allSimples.totalDocs).toBe(1) - expect(allSimples?.docs?.[0]?.title).toBe('hello!') - }) - it('should respect deleteJobOnComplete true default configuration', async () => { const { id } = await payload.jobs.queue({ workflow: 'inlineTaskTest', @@ -671,7 +675,7 @@ describe('Queues', () => { const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(before?.id).toBe(id) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(after).toBeNull() @@ -686,7 +690,7 @@ describe('Queues', () => { const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(before?.id).toBe(id) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(after?.id).toBe(id) @@ -704,7 +708,7 @@ describe('Queues', () => { const before = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(before?.id).toBe(id) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const after = await payload.findByID({ collection: 'payload-jobs', id, disableErrors: true }) expect(after?.id).toBe(id) @@ -718,7 +722,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -739,7 +743,7 @@ describe('Queues', () => { }, }) - await restClient.GET('/payload-jobs/run', { + await restClient.GET('/payload-jobs/run?silent=true', { headers: { Authorization: `JWT ${token}`, }, @@ -877,7 +881,7 @@ describe('Queues', () => { }) } - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -903,6 +907,7 @@ describe('Queues', () => { } await payload.jobs.run({ + silent: true, limit: numberOfTasks, }) @@ -926,7 +931,7 @@ describe('Queues', () => { }) } - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -950,6 +955,7 @@ describe('Queues', () => { await payload.jobs.run({ limit: 42, + silent: true, }) const allSimples = await payload.find({ @@ -985,7 +991,7 @@ describe('Queues', () => { }) } - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -1017,7 +1023,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -1036,7 +1042,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -1066,6 +1072,7 @@ describe('Queues', () => { await payload.jobs.runByID({ id: lastJobID, + silent: true, }) const allSimples = await payload.find({ @@ -1108,6 +1115,7 @@ describe('Queues', () => { } await payload.jobs.run({ + silent: true, where: { id: { equals: lastJobID, @@ -1150,6 +1158,7 @@ describe('Queues', () => { } await payload.jobs.run({ + silent: true, where: { 'input.message': { equals: 'from single task 2', @@ -1188,7 +1197,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const allSimples = await payload.find({ collection: 'simple', @@ -1229,7 +1238,7 @@ describe('Queues', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run() + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -1262,7 +1271,7 @@ describe('Queues', () => { workflow: 'longRunning', input: {}, }) - void payload.jobs.run().catch((_ignored) => {}) + void payload.jobs.run({ silent: true }).catch((_ignored) => {}) await new Promise((resolve) => setTimeout(resolve, 1000)) // Should be in processing - cancel job @@ -1296,7 +1305,7 @@ describe('Queues', () => { workflow: 'longRunning', input: {}, }) - void payload.jobs.run().catch((_ignored) => {}) + void payload.jobs.run({ silent: true }).catch((_ignored) => {}) await new Promise((resolve) => setTimeout(resolve, 1000)) // Cancel all jobs @@ -1335,7 +1344,7 @@ describe('Queues', () => { input: {}, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', @@ -1356,7 +1365,7 @@ describe('Queues', () => { input: {}, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', @@ -1379,7 +1388,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', @@ -1408,7 +1417,7 @@ describe('Queues', () => { }, }) - await payload.jobs.run() + await payload.jobs.run({ silent: true }) const jobAfterRun = await payload.findByID({ collection: 'payload-jobs', @@ -1434,6 +1443,29 @@ describe('Queues', () => { expect((logEntry?.output as any)?.simpleID).toBe(simpleDoc?.id) } }) + + it('can create and autorun jobs', async () => { + await payload.jobs.queue({ + workflow: 'inlineTaskTest', + queue: 'autorunSecond', + input: { + message: 'hello!', + }, + }) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples?.docs?.[0]?.title).toBe('hello!') + }) }) describe('Queues - CLI', () => { diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 54945be89..19e3d8782 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -88,14 +88,20 @@ export interface Config { db: { defaultIDType: string; }; - globals: {}; - globalsSelect: {}; + globals: { + 'payload-jobs-stats': PayloadJobsStat; + }; + globalsSelect: { + 'payload-jobs-stats': PayloadJobsStatsSelect | PayloadJobsStatsSelect; + }; locale: null; user: User & { collection: 'users'; }; jobs: { tasks: { + EverySecond: TaskEverySecond; + EverySecondMax2: TaskEverySecondMax2; UpdatePost: MyUpdatePostType; UpdatePostStep2: TaskUpdatePostStep2; CreateSimple: TaskCreateSimple; @@ -260,6 +266,8 @@ export interface PayloadJob { completedAt: string; taskSlug: | 'inline' + | 'EverySecond' + | 'EverySecondMax2' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -328,6 +336,8 @@ export interface PayloadJob { taskSlug?: | ( | 'inline' + | 'EverySecond' + | 'EverySecondMax2' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -343,6 +353,15 @@ export interface PayloadJob { queue?: string | null; waitUntil?: string | null; processing?: boolean | null; + meta?: + | { + [k: string]: unknown; + } + | unknown[] + | string + | number + | boolean + | null; updatedAt: string; createdAt: string; } @@ -476,6 +495,7 @@ export interface PayloadJobsSelect { queue?: T; waitUntil?: T; processing?: T; + meta?: T; updatedAt?: T; createdAt?: T; } @@ -511,6 +531,54 @@ export interface PayloadMigrationsSelect { updatedAt?: T; createdAt?: T; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "payload-jobs-stats". + */ +export interface PayloadJobsStat { + id: string; + stats?: + | { + [k: string]: unknown; + } + | unknown[] + | string + | number + | boolean + | null; + updatedAt?: string | null; + createdAt?: string | null; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "payload-jobs-stats_select". + */ +export interface PayloadJobsStatsSelect { + stats?: T; + updatedAt?: T; + createdAt?: T; + globalType?: T; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskEverySecond". + */ +export interface TaskEverySecond { + input: { + message: string; + }; + output?: unknown; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskEverySecondMax2". + */ +export interface TaskEverySecondMax2 { + input: { + message: string; + }; + output?: unknown; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "MyUpdatePostType". diff --git a/test/queues/schedules-autocron.int.spec.ts b/test/queues/schedules-autocron.int.spec.ts new file mode 100644 index 000000000..1e9773fe1 --- /dev/null +++ b/test/queues/schedules-autocron.int.spec.ts @@ -0,0 +1,105 @@ +import path from 'path' +import { _internal_jobSystemGlobals, _internal_resetJobSystemGlobals, type Payload } from 'payload' +import { wait } from 'payload/shared' +import { fileURLToPath } from 'url' + +import type { NextRESTClient } from '../helpers/NextRESTClient.js' + +import { devUser } from '../credentials.js' +import { initPayloadInt } from '../helpers/initPayloadInt.js' +import { clearAndSeedEverything } from './seed.js' + +let payload: Payload +let restClient: NextRESTClient +let token: string + +const { email, password } = devUser +const filename = fileURLToPath(import.meta.url) +const dirname = path.dirname(filename) + +describe('Queues - scheduling, with automatic scheduling handling', () => { + beforeAll(async () => { + process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit + ;({ payload, restClient } = await initPayloadInt( + dirname, + undefined, + undefined, + 'config.schedules-autocron.ts', + )) + }) + + afterAll(async () => { + // Ensure no new crons are scheduled + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false + // Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues + // Cron function runs may persist after a test has finished + await wait(3000) + // Now we can destroy the payload instance + await payload.destroy() + _internal_resetJobSystemGlobals() + }) + + afterEach(() => { + _internal_resetJobSystemGlobals() + }) + + beforeEach(async () => { + // Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false + + await clearAndSeedEverything(payload) + const data = await restClient + .POST('/users/login', { + body: JSON.stringify({ + email, + password, + }), + }) + .then((res) => res.json()) + + if (data.token) { + token = data.token + } + payload.config.jobs.deleteJobOnComplete = true + _internal_jobSystemGlobals.shouldAutoRun = true + _internal_jobSystemGlobals.shouldAutoSchedule = true + }) + + it('can auto-schedule through automatic crons and autorun jobs', async () => { + // Do not call payload.jobs.run() or payload.jobs.handleSchedules() - payload should automatically schedule crons for auto-scheduling + + // Autorun and Autoschedule runs every second - so should have autorun at least twice after 3.5 seconds. Case with the lowest amount of jobs completed, + // if autoschedule runs after the first autorun: + // Second 1: Autorun runs => no jobs + // Second 1: Autoschedule runs => scheduels 1 job + // Second 2: Autorun runs => runs 1 job => 1 + // Second 2: Autoschedule runs => schedules 1 job + // Second 3: Autorun runs => runs 1 job => 2 + // Second 3: Autoschedule runs => schedules 1 job + // Status after 3.5 seconds: 2 jobs running, 1 job scheduled + + // Best case - most amounts of jobs completed: + // Second 1: Autoschedule runs => schedules 1 job + // Second 1: Autorun runs => runs 1 job => 1 + // Second 2: Autoschedule runs => schedules 1 job + // Second 2: Autorun runs => runs 1 job => 2 + // Second 3: Autoschedule runs => schedules 1 job + // Second 3: Autorun runs => runs 1 job => 3 + // Status after 3.5 seconds: 3 jobs running, no jobs scheduled + const minJobsCompleted = 2 + const maxJobsCompleted = 3 + + await new Promise((resolve) => setTimeout(resolve, 3500)) // 3 seconds + 0.5 seconds to ensure the last job has been completed + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBeGreaterThanOrEqual(minJobsCompleted) + expect(allSimples.totalDocs).toBeLessThanOrEqual(maxJobsCompleted) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) +}) diff --git a/test/queues/schedules.int.spec.ts b/test/queues/schedules.int.spec.ts new file mode 100644 index 000000000..f4fa8a3ed --- /dev/null +++ b/test/queues/schedules.int.spec.ts @@ -0,0 +1,341 @@ +import path from 'path' +import { _internal_jobSystemGlobals, _internal_resetJobSystemGlobals, type Payload } from 'payload' +import { wait } from 'payload/shared' +import { fileURLToPath } from 'url' + +import type { NextRESTClient } from '../helpers/NextRESTClient.js' + +import { devUser } from '../credentials.js' +import { initPayloadInt } from '../helpers/initPayloadInt.js' +import { clearAndSeedEverything } from './seed.js' +import { timeFreeze, timeTravel, waitUntilAutorunIsDone, withoutAutoRun } from './utilities.js' + +let payload: Payload +let restClient: NextRESTClient +let token: string + +const { email, password } = devUser +const filename = fileURLToPath(import.meta.url) +const dirname = path.dirname(filename) + +describe('Queues - scheduling, without automatic scheduling handling', () => { + beforeAll(async () => { + process.env.SEED_IN_CONFIG_ONINIT = 'false' // Makes it so the payload config onInit seed is not run. Otherwise, the seed would be run unnecessarily twice for the initial test run - once for beforeEach and once for onInit + ;({ payload, restClient } = await initPayloadInt( + dirname, + undefined, + undefined, + 'config.schedules.ts', + )) + }) + + afterAll(async () => { + // Ensure no new crons are scheduled + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false + // Wait 3 seconds to ensure all currently-running crons are done. If we shut down the db while a function is running, it can cause issues + // Cron function runs may persist after a test has finished + await wait(3000) + // Now we can destroy the payload instance + await payload.destroy() + _internal_resetJobSystemGlobals() + }) + + afterEach(() => { + _internal_resetJobSystemGlobals() + }) + + beforeEach(async () => { + // Set autorun to false during seed process to ensure no crons are scheduled, which may affect the tests + _internal_jobSystemGlobals.shouldAutoRun = false + _internal_jobSystemGlobals.shouldAutoSchedule = false + await clearAndSeedEverything(payload) + const data = await restClient + .POST('/users/login', { + body: JSON.stringify({ + email, + password, + }), + }) + .then((res) => res.json()) + + if (data.token) { + token = data.token + } + payload.config.jobs.deleteJobOnComplete = true + _internal_jobSystemGlobals.shouldAutoRun = true + _internal_jobSystemGlobals.shouldAutoSchedule = true + }) + + it('can auto-schedule through local API and autorun jobs', async () => { + // Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here + await payload.jobs.handleSchedules() + + // Do not call payload.jobs.run{silent: true}) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('can auto-schedule through handleSchedules REST API and autorun jobs', async () => { + // Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here + await restClient.GET('/payload-jobs/handle-schedules', { + headers: { + Authorization: `JWT ${token}`, + }, + }) + + // Do not call payload.jobs.run({silent: true}) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('can auto-schedule through run REST API and autorun jobs', async () => { + // Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here + await restClient.GET('/payload-jobs/run?silent=true', { + headers: { + Authorization: `JWT ${token}`, + }, + }) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('do not auto-schedule through run REST API when passing disableScheduling=true', async () => { + // Do not call payload.jobs.queue() - the `EverySecond` task should be scheduled here + await restClient.GET('/payload-jobs/run?silent=true&disableScheduling=true', { + headers: { + Authorization: `JWT ${token}`, + }, + }) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(0) + }) + + it('ensure scheduler does not schedule more jobs than needed if executed sequentially', async () => { + await withoutAutoRun(async () => { + for (let i = 0; i < 3; i++) { + await payload.jobs.handleSchedules() + } + }) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('ensure scheduler max-one-job condition, by default, ignores jobs not scheduled by scheduler', async () => { + await withoutAutoRun(async () => { + for (let i = 0; i < 2; i++) { + await payload.jobs.queue({ + task: 'EverySecond', + queue: 'autorunSecond', + input: { + message: 'This task runs every second', + }, + }) + } + for (let i = 0; i < 3; i++) { + await payload.jobs.handleSchedules() + } + }) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(3) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('ensure scheduler max-one-job condition, respects jobs not scheduled by scheduler due to task setting onlyScheduled: false', async () => { + timeFreeze() + await withoutAutoRun(async () => { + for (let i = 0; i < 2; i++) { + await payload.jobs.queue({ + task: 'EverySecondMax2', + input: { + message: 'This task runs every second - max 2 per second', + }, + }) + } + for (let i = 0; i < 3; i++) { + await payload.jobs.handleSchedules({ queue: 'default' }) + } + }) + + timeTravel(20) // Advance time to satisfy the waitUntil of newly scheduled jobs + + await payload.jobs.run({ + limit: 100, + silent: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(2) // Would be 4 by default, if only scheduled jobs were respected in handleSchedules condition + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second - max 2 per second') + }) + + it('ensure scheduler does not schedule more jobs than needed if executed sequentially - max. 2 jobs configured', async () => { + timeFreeze() + for (let i = 0; i < 3; i++) { + await payload.jobs.handleSchedules({ queue: 'default' }) + } + + // Advance time to satisfy the waitUntil of newly scheduled jobs + timeTravel(20) + + // default queue is not scheduled to autorun + await payload.jobs.run({ + silent: true, + }) + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(2) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second - max 2 per second') + }) + + it('ensure job is scheduled every second', async () => { + timeFreeze() + for (let i = 0; i < 3; i++) { + await withoutAutoRun(async () => { + // Call it twice to test that it only schedules one + await payload.jobs.handleSchedules() + await payload.jobs.handleSchedules() + }) + // Advance time to satisfy the waitUntil of newly scheduled jobs + timeTravel(20) + + await waitUntilAutorunIsDone({ + payload, + queue: 'autorunSecond', + onlyScheduled: true, + }) + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(3) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second') + }) + + it('ensure job is scheduled every second - max. 2 jobs configured', async () => { + timeFreeze() + + for (let i = 0; i < 3; i++) { + await withoutAutoRun(async () => { + // Call it 3x to test that it only schedules two + await payload.jobs.handleSchedules({ queue: 'default' }) + await payload.jobs.handleSchedules({ queue: 'default' }) + await payload.jobs.handleSchedules({ queue: 'default' }) + }) + + // Advance time to satisfy the waitUntil of newly scheduled jobs + timeTravel(20) + + // default queue is not scheduled to autorun => run manually + await payload.jobs.run({ + silent: true, + }) + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + where: { + title: { + equals: 'This task runs every second - max 2 per second', + }, + }, + }) + + expect(allSimples.totalDocs).toBe(6) + expect(allSimples?.docs?.[0]?.title).toBe('This task runs every second - max 2 per second') + }) + + it('should not auto-schedule through automatic crons if scheduler set to manual', async () => { + // Autorun runs every second - so should definitely be done if we wait 2 seconds + await new Promise((resolve) => setTimeout(resolve, 2000)) // Should not flake, as we are expecting nothing to happen + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(0) + }) +}) diff --git a/test/queues/tasks/CreateSimpleRetries0Task.ts b/test/queues/tasks/CreateSimpleRetries0Task.ts new file mode 100644 index 000000000..cc85b26cf --- /dev/null +++ b/test/queues/tasks/CreateSimpleRetries0Task.ts @@ -0,0 +1,41 @@ +import type { TaskConfig } from 'payload' + +export const CreateSimpleRetries0Task: TaskConfig<'CreateSimpleRetries0'> = { + slug: 'CreateSimpleRetries0', + retries: 0, + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, +} diff --git a/test/queues/tasks/CreateSimpleRetriesUndefinedTask.ts b/test/queues/tasks/CreateSimpleRetriesUndefinedTask.ts new file mode 100644 index 000000000..267150005 --- /dev/null +++ b/test/queues/tasks/CreateSimpleRetriesUndefinedTask.ts @@ -0,0 +1,40 @@ +import type { TaskConfig } from 'payload' + +export const CreateSimpleRetriesUndefinedTask: TaskConfig<'CreateSimpleRetriesUndefined'> = { + slug: 'CreateSimpleRetriesUndefined', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, +} diff --git a/test/queues/tasks/CreateSimpleTask.ts b/test/queues/tasks/CreateSimpleTask.ts new file mode 100644 index 000000000..8279aa1fd --- /dev/null +++ b/test/queues/tasks/CreateSimpleTask.ts @@ -0,0 +1,41 @@ +import type { TaskConfig } from 'payload' + +export const CreateSimpleTask: TaskConfig<'CreateSimple'> = { + retries: 3, + slug: 'CreateSimple', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, +} diff --git a/test/queues/tasks/CreateSimpleWithDuplicateMessageTask.ts b/test/queues/tasks/CreateSimpleWithDuplicateMessageTask.ts new file mode 100644 index 000000000..887a9060f --- /dev/null +++ b/test/queues/tasks/CreateSimpleWithDuplicateMessageTask.ts @@ -0,0 +1,42 @@ +import type { TaskConfig } from 'payload' + +export const CreateSimpleWithDuplicateMessageTask: TaskConfig<'CreateSimpleWithDuplicateMessage'> = + { + retries: 2, + slug: 'CreateSimpleWithDuplicateMessage', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + { + name: 'shouldFail', + type: 'checkbox', + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + if (input.shouldFail) { + throw new Error('Failed on purpose') + } + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message + input.message, + }, + }) + return { + output: { + simpleID: newSimple.id, + }, + } + }, + } diff --git a/test/queues/tasks/EverySecondMax2Task.ts b/test/queues/tasks/EverySecondMax2Task.ts new file mode 100644 index 000000000..b307f5dfd --- /dev/null +++ b/test/queues/tasks/EverySecondMax2Task.ts @@ -0,0 +1,67 @@ +import { + countRunnableOrActiveJobsForQueue, + type TaskConfig, + type TaskType, + type WorkflowTypes, +} from 'payload' + +export const EverySecondMax2Task: TaskConfig<'EverySecondMax2'> = { + schedule: [ + { + cron: '* * * * * *', + queue: 'default', + hooks: { + beforeSchedule: async ({ queueable, req }) => { + const runnableOrActiveJobsForQueue = await countRunnableOrActiveJobsForQueue({ + queue: queueable.scheduleConfig.queue, + req, + taskSlug: queueable.taskConfig?.slug as TaskType, + workflowSlug: queueable.workflowConfig?.slug as WorkflowTypes, + onlyScheduled: false, // Set to false, used to test it + }) + + return { + input: { + message: 'This task runs every second - max 2 per second', + }, + shouldSchedule: runnableOrActiveJobsForQueue <= 1, + waitUntil: queueable.waitUntil, + } + }, + afterSchedule: async (args) => { + await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global + args.req.payload.logger.info( + 'EverySecondMax2 task scheduled: ' + + (args.status === 'success' + ? String(args.job.id) + : args.status === 'skipped' + ? 'skipped' + : 'error'), + ) + }, + }, + }, + ], + slug: 'EverySecondMax2', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + req.payload.logger.info(input.message) + + await req.payload.create({ + collection: 'simple', + data: { + title: input.message, + }, + req, + }) + return { + output: {}, + } + }, +} diff --git a/test/queues/tasks/EverySecondTask.ts b/test/queues/tasks/EverySecondTask.ts new file mode 100644 index 000000000..d79a3ee96 --- /dev/null +++ b/test/queues/tasks/EverySecondTask.ts @@ -0,0 +1,54 @@ +import type { TaskConfig } from 'payload' + +export const EverySecondTask: TaskConfig<'EverySecond'> = { + schedule: [ + { + cron: '* * * * * *', + queue: 'autorunSecond', + hooks: { + beforeSchedule: async (args) => { + const result = await args.defaultBeforeSchedule(args) // Handles verifying that there are no jobs already scheduled or processing + return { + ...result, + input: { + message: 'This task runs every second', + }, + } + }, + afterSchedule: async (args) => { + await args.defaultAfterSchedule(args) // Handles updating the payload-jobs-stats global + args.req.payload.logger.info( + 'EverySecond task scheduled: ' + + (args.status === 'success' + ? String(args.job.id) + : args.status === 'skipped' + ? 'skipped' + : 'error'), + ) + }, + }, + }, + ], + slug: 'EverySecond', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ input, req }) => { + req.payload.logger.info(input.message) + + await req.payload.create({ + collection: 'simple', + data: { + title: input.message, + }, + req, + }) + return { + output: {}, + } + }, +} diff --git a/test/queues/tasks/ExternalTask.ts b/test/queues/tasks/ExternalTask.ts new file mode 100644 index 000000000..9eda7dc0d --- /dev/null +++ b/test/queues/tasks/ExternalTask.ts @@ -0,0 +1,26 @@ +import type { TaskConfig } from 'payload' + +import path from 'path' +import { fileURLToPath } from 'url' + +const dirname = path.dirname(fileURLToPath(import.meta.url)) + +export const ExternalTask: TaskConfig<'ExternalTask'> = { + retries: 2, + slug: 'ExternalTask', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + outputSchema: [ + { + name: 'simpleID', + type: 'text', + required: true, + }, + ], + handler: path.resolve(dirname, '../runners/externalTask.ts') + '#externalTaskHandler', +} diff --git a/test/queues/tasks/ReturnCustomErrorTask.ts b/test/queues/tasks/ReturnCustomErrorTask.ts new file mode 100644 index 000000000..0c7253d26 --- /dev/null +++ b/test/queues/tasks/ReturnCustomErrorTask.ts @@ -0,0 +1,20 @@ +import type { TaskConfig } from 'payload' + +export const ReturnCustomErrorTask: TaskConfig<'ReturnCustomError'> = { + retries: 0, + slug: 'ReturnCustomError', + inputSchema: [ + { + name: 'errorMessage', + type: 'text', + required: true, + }, + ], + outputSchema: [], + handler: ({ input }) => { + return { + state: 'failed', + errorMessage: input.errorMessage, + } + }, +} diff --git a/test/queues/tasks/ReturnErrorTask.ts b/test/queues/tasks/ReturnErrorTask.ts new file mode 100644 index 000000000..661551ddd --- /dev/null +++ b/test/queues/tasks/ReturnErrorTask.ts @@ -0,0 +1,13 @@ +import type { TaskConfig } from 'payload' + +export const ReturnErrorTask: TaskConfig<'ReturnError'> = { + retries: 0, + slug: 'ReturnError', + inputSchema: [], + outputSchema: [], + handler: () => { + return { + state: 'failed', + } + }, +} diff --git a/test/queues/tasks/ThrowErrorTask.ts b/test/queues/tasks/ThrowErrorTask.ts new file mode 100644 index 000000000..fa6f9ea30 --- /dev/null +++ b/test/queues/tasks/ThrowErrorTask.ts @@ -0,0 +1,11 @@ +import type { TaskConfig } from 'payload' + +export const ThrowErrorTask: TaskConfig<'ThrowError'> = { + retries: 0, + slug: 'ThrowError', + inputSchema: [], + outputSchema: [], + handler: () => { + throw new Error('failed') + }, +} diff --git a/test/queues/tasks/UpdatePostStep2Task.ts b/test/queues/tasks/UpdatePostStep2Task.ts new file mode 100644 index 000000000..de7b31045 --- /dev/null +++ b/test/queues/tasks/UpdatePostStep2Task.ts @@ -0,0 +1,23 @@ +import type { TaskConfig } from 'payload' + +import { updatePostStep2 } from '../runners/updatePost.js' + +export const UpdatePostStep2Task: TaskConfig<'UpdatePostStep2'> = { + retries: 2, + slug: 'UpdatePostStep2', + inputSchema: [ + { + name: 'post', + type: 'relationship', + relationTo: 'posts', + maxDepth: 0, + required: true, + }, + { + name: 'messageTwice', + type: 'text', + required: true, + }, + ], + handler: updatePostStep2, +} diff --git a/test/queues/tasks/UpdatePostTask.ts b/test/queues/tasks/UpdatePostTask.ts new file mode 100644 index 000000000..b8cdfd52a --- /dev/null +++ b/test/queues/tasks/UpdatePostTask.ts @@ -0,0 +1,31 @@ +import type { TaskConfig } from 'payload' + +import { updatePostStep1 } from '../runners/updatePost.js' + +export const UpdatePostTask: TaskConfig<'UpdatePost'> = { + retries: 2, + slug: 'UpdatePost', + interfaceName: 'MyUpdatePostType', + inputSchema: [ + { + name: 'post', + type: 'relationship', + relationTo: 'posts', + maxDepth: 0, + required: true, + }, + { + name: 'message', + type: 'text', + required: true, + }, + ], + outputSchema: [ + { + name: 'messageTwice', + type: 'text', + required: true, + }, + ], + handler: updatePostStep1, +} diff --git a/test/queues/utilities.ts b/test/queues/utilities.ts new file mode 100644 index 000000000..bda1e0ac8 --- /dev/null +++ b/test/queues/utilities.ts @@ -0,0 +1,62 @@ +import { + _internal_jobSystemGlobals, + countRunnableOrActiveJobsForQueue, + createLocalReq, + type Payload, +} from 'payload' + +export async function waitUntilAutorunIsDone({ + payload, + queue, + onlyScheduled = false, +}: { + onlyScheduled?: boolean + payload: Payload + queue: string +}): Promise { + const req = await createLocalReq({}, payload) + + return new Promise((resolve) => { + const interval = setInterval(async () => { + const count = await countRunnableOrActiveJobsForQueue({ + queue, + req, + onlyScheduled, + }) + if (count === 0) { + clearInterval(interval) + resolve() + } + }, 200) + }) +} + +export function timeFreeze() { + const curDate = new Date() + _internal_jobSystemGlobals.getCurrentDate = () => curDate +} + +export function timeTravel(seconds: number) { + const curDate = _internal_jobSystemGlobals.getCurrentDate() + _internal_jobSystemGlobals.getCurrentDate = () => new Date(curDate.getTime() + seconds * 1000) +} + +export async function withoutAutoRun(fn: () => Promise): Promise { + const originalValue = _internal_jobSystemGlobals.shouldAutoRun + _internal_jobSystemGlobals.shouldAutoRun = false + try { + return await fn() + } finally { + _internal_jobSystemGlobals.shouldAutoRun = originalValue + } +} + +export async function withoutAutoSchedule(fn: () => Promise): Promise { + const originalValue = _internal_jobSystemGlobals.shouldAutoSchedule + _internal_jobSystemGlobals.shouldAutoSchedule = false + try { + return await fn() + } finally { + _internal_jobSystemGlobals.shouldAutoSchedule = originalValue + } +} diff --git a/test/runInit.ts b/test/runInit.ts index 46b38b287..3ff6c7b73 100644 --- a/test/runInit.ts +++ b/test/runInit.ts @@ -4,6 +4,7 @@ export async function runInit( testSuiteArg: string, writeDBAdapter: boolean, skipGenImportMap: boolean = false, + configFile?: string, ): Promise { - await initDevAndTest(testSuiteArg, String(writeDBAdapter), String(skipGenImportMap)) + await initDevAndTest(testSuiteArg, String(writeDBAdapter), String(skipGenImportMap), configFile) }