fix: cron jobs running when calling bin scripts, leading to db errors (#13135)
Previously, we were always initializing cronjobs when calling
`getPayload` or `payload.init`.
This is undesired in bin scripts - we don't want cron jobs to start
triggering db calls while we're running an initial migration using
`payload migrate` for example. This has previously led to a race
condition, triggering the following, occasional error, if job autoruns
were enabled:
```ts
DrizzleQueryError: Failed query: select "payload_jobs"."id", "payload_jobs"."input", "payload_jobs"."completed_at", "payload_jobs"."total_tried", "payload_jobs"."has_error", "payload_jobs"."error", "payload_jobs"."workflow_slug", "payload_jobs"."task_slug", "payload_jobs"."queue", "payload_jobs"."wait_until", "payload_jobs"."processing", "payload_jobs"."updated_at", "payload_jobs"."created_at", "payload_jobs_log"."data" as "log" from "payload_jobs" "payload_jobs" left join lateral (select coalesce(json_agg(json_build_array("payload_jobs_log"."_order", "payload_jobs_log"."id", "payload_jobs_log"."executed_at", "payload_jobs_log"."completed_at", "payload_jobs_log"."task_slug", "payload_jobs_log"."task_i_d", "payload_jobs_log"."input", "payload_jobs_log"."output", "payload_jobs_log"."state", "payload_jobs_log"."error") order by "payload_jobs_log"."_order" asc), '[]'::json) as "data" from (select * from "payload_jobs_log" "payload_jobs_log" where "payload_jobs_log"."_parent_id" = "payload_jobs"."id" order by "payload_jobs_log"."_order" asc) "payload_jobs_log") "payload_jobs_log" on true where ("payload_jobs"."completed_at" is null and ("payload_jobs"."has_error" is null or "payload_jobs"."has_error" <> $1) and "payload_jobs"."processing" = $2 and ("payload_jobs"."wait_until" is null or "payload_jobs"."wait_until" < $3) and "payload_jobs"."queue" = $4) order by "payload_jobs"."created_at" asc limit $5
params: true,false,2025-07-10T21:25:03.002Z,autorunSecond,100
at NodePgPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/pg-core/session.ts:74:11)
at processTicksAndRejections (node:internal/process/task_queues:105:5)
at /Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/node-postgres/session.ts:154:19
... 6 lines matching cause stack trace ...
at N._trigger (/Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/croner@9.0.0/node_modules/croner/dist/croner.cjs:1:16806) {
query: `select "payload_jobs"."id", "payload_jobs"."input", "payload_jobs"."completed_at", "payload_jobs"."total_tried", "payload_jobs"."has_error", "payload_jobs"."error", "payload_jobs"."workflow_slug", "payload_jobs"."task_slug", "payload_jobs"."queue", "payload_jobs"."wait_until", "payload_jobs"."processing", "payload_jobs"."updated_at", "payload_jobs"."created_at", "payload_jobs_log"."data" as "log" from "payload_jobs" "payload_jobs" left join lateral (select coalesce(json_agg(json_build_array("payload_jobs_log"."_order", "payload_jobs_log"."id", "payload_jobs_log"."executed_at", "payload_jobs_log"."completed_at", "payload_jobs_log"."task_slug", "payload_jobs_log"."task_i_d", "payload_jobs_log"."input", "payload_jobs_log"."output", "payload_jobs_log"."state", "payload_jobs_log"."error") order by "payload_jobs_log"."_order" asc), '[]'::json) as "data" from (select * from "payload_jobs_log" "payload_jobs_log" where "payload_jobs_log"."_parent_id" = "payload_jobs"."id" order by "payload_jobs_log"."_order" asc) "payload_jobs_log") "payload_jobs_log" on true where ("payload_jobs"."completed_at" is null and ("payload_jobs"."has_error" is null or "payload_jobs"."has_error" <> $1) and "payload_jobs"."processing" = $2 and ("payload_jobs"."wait_until" is null or "payload_jobs"."wait_until" < $3) and "payload_jobs"."queue" = $4) order by "payload_jobs"."created_at" asc limit $5`,
params: [ true, false, '2025-07-10T21:25:03.002Z', 'autorunSecond', 100 ],
cause: error: relation "payload_jobs" does not exist
at /Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/pg@8.16.3/node_modules/pg/lib/client.js:545:17
at processTicksAndRejections (node:internal/process/task_queues:105:5)
at /Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/node-postgres/session.ts:161:13
at NodePgPreparedQuery.queryWithCache (/Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/pg-core/session.ts:72:12)
at /Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/drizzle-orm@0.44.2_@libsql+client@0.14.0_bufferutil@4.0.8_utf-8-validate@6.0.5__@opentelemetr_asjmtflojkxlnxrshoh4fj5f6u/node_modules/src/node-postgres/session.ts:154:19
at find (/Users/alessio/Documents/GitHub/payload2/packages/drizzle/src/find/findMany.ts:162:19)
at Object.updateMany (/Users/alessio/Documents/GitHub/payload2/packages/drizzle/src/updateJobs.ts:26:16)
at updateJobs (/Users/alessio/Documents/GitHub/payload2/packages/payload/src/queues/utilities/updateJob.ts:102:37)
at runJobs (/Users/alessio/Documents/GitHub/payload2/packages/payload/src/queues/operations/runJobs/index.ts:181:25)
at Object.run (/Users/alessio/Documents/GitHub/payload2/packages/payload/src/queues/localAPI.ts:137:12)
at N.fn (/Users/alessio/Documents/GitHub/payload2/packages/payload/src/index.ts:866:13)
at N._trigger (/Users/alessio/Documents/GitHub/payload2/node_modules/.pnpm/croner@9.0.0/node_modules/croner/dist/croner.cjs:1:16806) {
length: 112,
severity: 'ERROR',
code: '42P01',
detail: undefined,
hint: undefined,
position: '406',
internalPosition: undefined,
internalQuery: undefined,
where: undefined,
schema: undefined,
table: undefined,
column: undefined,
dataType: undefined,
constraint: undefined,
file: 'parse_relation.c',
line: '1449',
routine: 'parserOpenTable'
}
}
```
This PR makes running crons opt-in using a new `cron` flag. By default,
no cron jobs will be created.
This commit is contained in:
@@ -51,7 +51,7 @@ export default buildConfig({
|
||||
// add as many cron jobs as you want
|
||||
],
|
||||
shouldAutoRun: async (payload) => {
|
||||
// Tell Payload if it should run jobs or not.
|
||||
// Tell Payload if it should run jobs or not. This function is optional and will return true by default.
|
||||
// This function will be invoked each time Payload goes to pick up and run jobs.
|
||||
// If this function ever returns false, the cron schedule will be stopped.
|
||||
return true
|
||||
|
||||
@@ -27,7 +27,7 @@ export async function login({ collection, config, email, password, username }: L
|
||||
token?: string
|
||||
user: any
|
||||
}> {
|
||||
const payload = await getPayload({ config })
|
||||
const payload = await getPayload({ config, cron: true })
|
||||
|
||||
const authConfig = payload.collections[collection]?.config.auth
|
||||
|
||||
|
||||
@@ -14,7 +14,7 @@ export async function logout({
|
||||
allSessions?: boolean
|
||||
config: Promise<SanitizedConfig> | SanitizedConfig
|
||||
}) {
|
||||
const payload = await getPayload({ config })
|
||||
const payload = await getPayload({ config, cron: true })
|
||||
const headers = await nextHeaders()
|
||||
const authResult = await payload.auth({ headers })
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ import { getExistingAuthToken } from '../utilities/getExistingAuthToken.js'
|
||||
import { setPayloadAuthCookie } from '../utilities/setPayloadAuthCookie.js'
|
||||
|
||||
export async function refresh({ config }: { config: any }) {
|
||||
const payload = await getPayload({ config })
|
||||
const payload = await getPayload({ config, cron: true })
|
||||
const headers = await nextHeaders()
|
||||
const result = await payload.auth({ headers })
|
||||
|
||||
|
||||
@@ -66,7 +66,7 @@ export const initReq = async function ({
|
||||
|
||||
const partialResult = await partialReqCache.get(async () => {
|
||||
const config = await configPromise
|
||||
const payload = await getPayload({ config, importMap })
|
||||
const payload = await getPayload({ config, cron: true, importMap })
|
||||
const languageCode = getRequestLanguage({
|
||||
config,
|
||||
cookies,
|
||||
|
||||
@@ -107,7 +107,7 @@ export const bin = async () => {
|
||||
}
|
||||
|
||||
if (script === 'jobs:run') {
|
||||
const payload = await getPayload({ config })
|
||||
const payload = await getPayload({ config }) // Do not setup crons here - this bin script can set up its own crons
|
||||
const limit = args.limit ? parseInt(args.limit, 10) : undefined
|
||||
const queue = args.queue ? args.queue : undefined
|
||||
const allQueues = !!args.allQueues
|
||||
|
||||
@@ -257,6 +257,13 @@ export type InitOptions = {
|
||||
* and the backend functionality
|
||||
*/
|
||||
config: Promise<SanitizedConfig> | SanitizedConfig
|
||||
/**
|
||||
* If set to `true`, payload will initialize crons for things like autorunning jobs on initialization.
|
||||
*
|
||||
* @default false
|
||||
*/
|
||||
cron?: boolean
|
||||
|
||||
/**
|
||||
* Disable connect to the database on init
|
||||
*/
|
||||
@@ -268,7 +275,6 @@ export type InitOptions = {
|
||||
disableOnInit?: boolean
|
||||
|
||||
importMap?: ImportMap
|
||||
|
||||
/**
|
||||
* A function that is called immediately following startup that receives the Payload instance as it's only argument.
|
||||
*/
|
||||
|
||||
@@ -836,7 +836,7 @@ export class BasePayload {
|
||||
throw error
|
||||
}
|
||||
|
||||
if (this.config.jobs.enabled && this.config.jobs.autoRun && !isNextBuild()) {
|
||||
if (this.config.jobs.enabled && this.config.jobs.autoRun && !isNextBuild() && options.cron) {
|
||||
const DEFAULT_CRON = '* * * * *'
|
||||
const DEFAULT_LIMIT = 10
|
||||
|
||||
@@ -974,7 +974,7 @@ export const reload = async (
|
||||
}
|
||||
|
||||
export const getPayload = async (
|
||||
options: Pick<InitOptions, 'config' | 'importMap'>,
|
||||
options: Pick<InitOptions, 'config' | 'cron' | 'importMap'>,
|
||||
): Promise<Payload> => {
|
||||
if (!options?.config) {
|
||||
throw new Error('Error: the payload config is required for getPayload to work.')
|
||||
@@ -1109,6 +1109,8 @@ export { generateImportMap } from './bin/generateImportMap/index.js'
|
||||
|
||||
export type { ImportMap } from './bin/generateImportMap/index.js'
|
||||
export { genImportMapIterateFields } from './bin/generateImportMap/iterateFields.js'
|
||||
export { migrate as migrateCLI } from './bin/migrate.js'
|
||||
|
||||
export {
|
||||
type ClientCollectionConfig,
|
||||
createClientCollectionConfig,
|
||||
@@ -1155,7 +1157,6 @@ export type {
|
||||
} from './collections/config/types.js'
|
||||
|
||||
export type { CompoundIndex } from './collections/config/types.js'
|
||||
|
||||
export type { SanitizedCompoundIndex } from './collections/config/types.js'
|
||||
export { createDataloaderCacheKey, getDataLoader } from './collections/dataloader.js'
|
||||
export { countOperation } from './collections/operations/count.js'
|
||||
@@ -1171,6 +1172,7 @@ export { findVersionsOperation } from './collections/operations/findVersions.js'
|
||||
export { restoreVersionOperation } from './collections/operations/restoreVersion.js'
|
||||
export { updateOperation } from './collections/operations/update.js'
|
||||
export { updateByIDOperation } from './collections/operations/updateByID.js'
|
||||
|
||||
export { buildConfig } from './config/build.js'
|
||||
|
||||
export {
|
||||
@@ -1180,7 +1182,6 @@ export {
|
||||
serverOnlyConfigProperties,
|
||||
type UnsanitizedClientConfig,
|
||||
} from './config/client.js'
|
||||
|
||||
export { defaults } from './config/defaults.js'
|
||||
export { type OrderableEndpointBody } from './config/orderable/index.js'
|
||||
export { sanitizeConfig } from './config/sanitize.js'
|
||||
@@ -1297,10 +1298,11 @@ export {
|
||||
ValidationError,
|
||||
ValidationErrorName,
|
||||
} from './errors/index.js'
|
||||
export type { ValidationFieldError } from './errors/index.js'
|
||||
|
||||
export type { ValidationFieldError } from './errors/index.js'
|
||||
export { baseBlockFields } from './fields/baseFields/baseBlockFields.js'
|
||||
export { baseIDField } from './fields/baseFields/baseIDField.js'
|
||||
|
||||
export {
|
||||
createClientField,
|
||||
createClientFields,
|
||||
@@ -1308,10 +1310,10 @@ export {
|
||||
type ServerOnlyFieldProperties,
|
||||
} from './fields/config/client.js'
|
||||
|
||||
export { sanitizeFields } from './fields/config/sanitize.js'
|
||||
|
||||
export interface FieldCustom extends Record<string, any> {}
|
||||
|
||||
export { sanitizeFields } from './fields/config/sanitize.js'
|
||||
|
||||
export type {
|
||||
AdminClient,
|
||||
ArrayField,
|
||||
@@ -1421,14 +1423,13 @@ export type {
|
||||
} from './fields/config/types.js'
|
||||
|
||||
export { getDefaultValue } from './fields/getDefaultValue.js'
|
||||
|
||||
export { traverseFields as afterChangeTraverseFields } from './fields/hooks/afterChange/traverseFields.js'
|
||||
export { promise as afterReadPromise } from './fields/hooks/afterRead/promise.js'
|
||||
export { traverseFields as afterReadTraverseFields } from './fields/hooks/afterRead/traverseFields.js'
|
||||
export { traverseFields as beforeChangeTraverseFields } from './fields/hooks/beforeChange/traverseFields.js'
|
||||
export { traverseFields as beforeValidateTraverseFields } from './fields/hooks/beforeValidate/traverseFields.js'
|
||||
export { sortableFieldTypes } from './fields/sortableFieldTypes.js'
|
||||
|
||||
export { sortableFieldTypes } from './fields/sortableFieldTypes.js'
|
||||
export { validations } from './fields/validations.js'
|
||||
export type {
|
||||
ArrayFieldValidation,
|
||||
@@ -1481,8 +1482,8 @@ export type {
|
||||
GlobalConfig,
|
||||
SanitizedGlobalConfig,
|
||||
} from './globals/config/types.js'
|
||||
export { docAccessOperation as docAccessOperationGlobal } from './globals/operations/docAccess.js'
|
||||
|
||||
export { docAccessOperation as docAccessOperationGlobal } from './globals/operations/docAccess.js'
|
||||
export { findOneOperation } from './globals/operations/findOne.js'
|
||||
export { findVersionByIDOperation as findVersionByIDOperationGlobal } from './globals/operations/findVersionByID.js'
|
||||
export { findVersionsOperation as findVersionsOperationGlobal } from './globals/operations/findVersions.js'
|
||||
@@ -1505,8 +1506,8 @@ export type {
|
||||
} from './preferences/types.js'
|
||||
export type { QueryPreset } from './query-presets/types.js'
|
||||
export { jobAfterRead } from './queues/config/index.js'
|
||||
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'
|
||||
|
||||
export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config/types/index.js'
|
||||
export type {
|
||||
RunInlineTaskFunction,
|
||||
RunTaskFunction,
|
||||
@@ -1530,14 +1531,14 @@ export type {
|
||||
WorkflowHandler,
|
||||
WorkflowTypes,
|
||||
} from './queues/config/types/workflowTypes.js'
|
||||
export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js'
|
||||
|
||||
export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js'
|
||||
export { getLocalI18n } from './translations/getLocalI18n.js'
|
||||
export * from './types/index.js'
|
||||
export { getFileByPath } from './uploads/getFileByPath.js'
|
||||
export { _internal_safeFetchGlobal } from './uploads/safeFetch.js'
|
||||
export type * from './uploads/types.js'
|
||||
|
||||
export type * from './uploads/types.js'
|
||||
export { addDataAndFileToRequest } from './utilities/addDataAndFileToRequest.js'
|
||||
export { addLocalesToRequestFromData, sanitizeLocales } from './utilities/addLocalesToRequest.js'
|
||||
export { commitTransaction } from './utilities/commitTransaction.js'
|
||||
@@ -1609,8 +1610,8 @@ export { versionDefaults } from './versions/defaults.js'
|
||||
export { deleteCollectionVersions } from './versions/deleteCollectionVersions.js'
|
||||
export { appendVersionToQueryKey } from './versions/drafts/appendVersionToQueryKey.js'
|
||||
export { getQueryDraftsSort } from './versions/drafts/getQueryDraftsSort.js'
|
||||
export { enforceMaxVersions } from './versions/enforceMaxVersions.js'
|
||||
|
||||
export { enforceMaxVersions } from './versions/enforceMaxVersions.js'
|
||||
export { getLatestCollectionVersion } from './versions/getLatestCollectionVersion.js'
|
||||
export { getLatestGlobalVersion } from './versions/getLatestGlobalVersion.js'
|
||||
export { saveVersion } from './versions/saveVersion.js'
|
||||
|
||||
@@ -121,6 +121,7 @@ export type JobsConfig = {
|
||||
/**
|
||||
* A function that will be executed before Payload picks up jobs which are configured by the `jobs.autorun` function.
|
||||
* If this function returns true, jobs will be queried and picked up. If it returns false, jobs will not be run.
|
||||
* @default undefined - if this function is not defined, jobs will be run - as if () => true was passed.
|
||||
* @param payload
|
||||
* @returns boolean
|
||||
*/
|
||||
|
||||
@@ -27,7 +27,7 @@ export const createPayloadRequest = async ({
|
||||
request,
|
||||
}: Args): Promise<PayloadRequest> => {
|
||||
const cookies = parseCookies(request.headers)
|
||||
const payload = await getPayload({ config: configPromise })
|
||||
const payload = await getPayload({ config: configPromise, cron: true })
|
||||
|
||||
const { config } = payload
|
||||
const localization = config.localization
|
||||
|
||||
@@ -39,7 +39,7 @@ export const routeError = async ({
|
||||
|
||||
if (!payload) {
|
||||
try {
|
||||
payload = await getPayload({ config: configArg })
|
||||
payload = await getPayload({ config: configArg, cron: true })
|
||||
} catch (ignore) {
|
||||
return Response.json(
|
||||
{
|
||||
|
||||
@@ -29,7 +29,7 @@ export async function initPayloadInt<TInitializePayload extends boolean | undefi
|
||||
|
||||
console.log('starting payload')
|
||||
|
||||
const payload = await getPayload({ config })
|
||||
const payload = await getPayload({ config, cron: true })
|
||||
console.log('initializing rest client')
|
||||
const restClient = new NextRESTClient(payload.config)
|
||||
console.log('initPayloadInt done')
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
import type { JobTaskStatus, Payload } from 'payload'
|
||||
import type { JobTaskStatus, Payload, SanitizedConfig } from 'payload'
|
||||
|
||||
import path from 'path'
|
||||
import { migrateCLI } from 'payload'
|
||||
import { fileURLToPath } from 'url'
|
||||
|
||||
import type { NextRESTClient } from '../helpers/NextRESTClient.js'
|
||||
@@ -1434,3 +1435,24 @@ describe('Queues', () => {
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe('Queues - CLI', () => {
|
||||
let config: SanitizedConfig
|
||||
beforeAll(async () => {
|
||||
;({ config } = await initPayloadInt(dirname, undefined, false))
|
||||
})
|
||||
it('can run migrate CLI without jobs attempting to run', async () => {
|
||||
await migrateCLI({
|
||||
config,
|
||||
parsedArgs: {
|
||||
_: ['migrate'],
|
||||
},
|
||||
})
|
||||
|
||||
// Wait 3 seconds to let potential autorun crons trigger
|
||||
await new Promise((resolve) => setTimeout(resolve, 3000))
|
||||
|
||||
// Expect no errors. Previously, this would throw an "error: relation "payload_jobs" does not exist" error
|
||||
expect(true).toBe(true)
|
||||
})
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user