diff --git a/packages/drizzle/src/updateJobs.ts b/packages/drizzle/src/updateJobs.ts index 885a0a658..6bfec911a 100644 --- a/packages/drizzle/src/updateJobs.ts +++ b/packages/drizzle/src/updateJobs.ts @@ -6,6 +6,7 @@ import type { DrizzleAdapter } from './types.js' import { findMany } from './find/findMany.js' import { upsertRow } from './upsertRow/index.js' +import { shouldUseOptimizedUpsertRow } from './upsertRow/shouldUseOptimizedUpsertRow.js' import { getTransaction } from './utilities/getTransaction.js' export const updateJobs: UpdateJobs = async function updateMany( @@ -23,6 +24,27 @@ export const updateJobs: UpdateJobs = async function updateMany( const tableName = this.tableNameMap.get(toSnakeCase(collection.slug)) const sort = sortArg !== undefined && sortArg !== null ? sortArg : collection.defaultSort + const useOptimizedUpsertRow = shouldUseOptimizedUpsertRow({ + data, + fields: collection.flattenedFields, + }) + + if (useOptimizedUpsertRow && id) { + const result = await upsertRow({ + id, + adapter: this, + data, + db, + fields: collection.flattenedFields, + ignoreResult: returning === false, + operation: 'update', + req, + tableName, + }) + + return returning === false ? null : [result] + } + const jobs = await findMany({ adapter: this, collectionSlug: 'payload-jobs', @@ -42,10 +64,12 @@ export const updateJobs: UpdateJobs = async function updateMany( // TODO: We need to batch this to reduce the amount of db calls. This can get very slow if we are updating a lot of rows. for (const job of jobs.docs) { - const updateData = { - ...job, - ...data, - } + const updateData = useOptimizedUpsertRow + ? data + : { + ...job, + ...data, + } const result = await upsertRow({ id: job.id, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/index.ts b/packages/payload/src/queues/operations/runJobs/runJob/index.ts index c92599e29..2195c5e02 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/index.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/index.ts @@ -89,9 +89,11 @@ export const runJob = async ({ } // Workflow has completed successfully + // Do not update the job log here, as that would result in unnecessary db calls when using postgres. + // Solely updating simple fields here will result in optimized db calls. + // Job log modifications are already updated at the end of the runTask function. await updateJob({ completedAt: getCurrentDate().toISOString(), - log: job.log, processing: false, totalTried: (job.totalTried ?? 0) + 1, }) diff --git a/test/queues/config.postgreslogs.ts b/test/queues/config.postgreslogs.ts new file mode 100644 index 000000000..d47ee88d8 --- /dev/null +++ b/test/queues/config.postgreslogs.ts @@ -0,0 +1,19 @@ +/* eslint-disable no-restricted-exports */ +import { buildConfigWithDefaults } from '../buildConfigWithDefaults.js' +import { getConfig } from './getConfig.js' + +const config = getConfig() + +import { postgresAdapter } from '@payloadcms/db-postgres' + +export const databaseAdapter = postgresAdapter({ + pool: { + connectionString: process.env.POSTGRES_URL || 'postgres://127.0.0.1:5432/payloadtests', + }, + logger: true, +}) + +export default buildConfigWithDefaults({ + ...config, + db: databaseAdapter, +}) diff --git a/test/queues/getConfig.ts b/test/queues/getConfig.ts index f1c8126e3..a5edc5912 100644 --- a/test/queues/getConfig.ts +++ b/test/queues/getConfig.ts @@ -10,6 +10,7 @@ import { CreateSimpleRetries0Task } from './tasks/CreateSimpleRetries0Task.js' import { CreateSimpleRetriesUndefinedTask } from './tasks/CreateSimpleRetriesUndefinedTask.js' import { CreateSimpleTask } from './tasks/CreateSimpleTask.js' import { CreateSimpleWithDuplicateMessageTask } from './tasks/CreateSimpleWithDuplicateMessageTask.js' +import { DoNothingTask } from './tasks/DoNothingTask.js' import { ExternalTask } from './tasks/ExternalTask.js' import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js' import { ReturnErrorTask } from './tasks/ReturnErrorTask.js' @@ -141,6 +142,7 @@ export const getConfig: () => Partial = () => ({ ThrowErrorTask, ReturnErrorTask, ReturnCustomErrorTask, + DoNothingTask, ], workflows: [ updatePostWorkflow, diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 19e3d8782..580bf0b64 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -88,20 +88,14 @@ export interface Config { db: { defaultIDType: string; }; - globals: { - 'payload-jobs-stats': PayloadJobsStat; - }; - globalsSelect: { - 'payload-jobs-stats': PayloadJobsStatsSelect | PayloadJobsStatsSelect; - }; + globals: {}; + globalsSelect: {}; locale: null; user: User & { collection: 'users'; }; jobs: { tasks: { - EverySecond: TaskEverySecond; - EverySecondMax2: TaskEverySecondMax2; UpdatePost: MyUpdatePostType; UpdatePostStep2: TaskUpdatePostStep2; CreateSimple: TaskCreateSimple; @@ -112,6 +106,7 @@ export interface Config { ThrowError: TaskThrowError; ReturnError: TaskReturnError; ReturnCustomError: TaskReturnCustomError; + DoNothingTask: TaskDoNothingTask; inline: { input: unknown; output: unknown; @@ -210,6 +205,13 @@ export interface User { hash?: string | null; loginAttempts?: number | null; lockUntil?: string | null; + sessions?: + | { + id: string; + createdAt?: string | null; + expiresAt: string; + }[] + | null; password?: string | null; } /** @@ -266,8 +268,6 @@ export interface PayloadJob { completedAt: string; taskSlug: | 'inline' - | 'EverySecond' - | 'EverySecondMax2' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -277,7 +277,8 @@ export interface PayloadJob { | 'ExternalTask' | 'ThrowError' | 'ReturnError' - | 'ReturnCustomError'; + | 'ReturnCustomError' + | 'DoNothingTask'; taskID: string; input?: | { @@ -336,8 +337,6 @@ export interface PayloadJob { taskSlug?: | ( | 'inline' - | 'EverySecond' - | 'EverySecondMax2' | 'UpdatePost' | 'UpdatePostStep2' | 'CreateSimple' @@ -348,20 +347,12 @@ export interface PayloadJob { | 'ThrowError' | 'ReturnError' | 'ReturnCustomError' + | 'DoNothingTask' ) | null; queue?: string | null; waitUntil?: string | null; processing?: boolean | null; - meta?: - | { - [k: string]: unknown; - } - | unknown[] - | string - | number - | boolean - | null; updatedAt: string; createdAt: string; } @@ -465,6 +456,13 @@ export interface UsersSelect { hash?: T; loginAttempts?: T; lockUntil?: T; + sessions?: + | T + | { + id?: T; + createdAt?: T; + expiresAt?: T; + }; } /** * This interface was referenced by `Config`'s JSON-Schema @@ -495,7 +493,6 @@ export interface PayloadJobsSelect { queue?: T; waitUntil?: T; processing?: T; - meta?: T; updatedAt?: T; createdAt?: T; } @@ -531,54 +528,6 @@ export interface PayloadMigrationsSelect { updatedAt?: T; createdAt?: T; } -/** - * This interface was referenced by `Config`'s JSON-Schema - * via the `definition` "payload-jobs-stats". - */ -export interface PayloadJobsStat { - id: string; - stats?: - | { - [k: string]: unknown; - } - | unknown[] - | string - | number - | boolean - | null; - updatedAt?: string | null; - createdAt?: string | null; -} -/** - * This interface was referenced by `Config`'s JSON-Schema - * via the `definition` "payload-jobs-stats_select". - */ -export interface PayloadJobsStatsSelect { - stats?: T; - updatedAt?: T; - createdAt?: T; - globalType?: T; -} -/** - * This interface was referenced by `Config`'s JSON-Schema - * via the `definition` "TaskEverySecond". - */ -export interface TaskEverySecond { - input: { - message: string; - }; - output?: unknown; -} -/** - * This interface was referenced by `Config`'s JSON-Schema - * via the `definition` "TaskEverySecondMax2". - */ -export interface TaskEverySecondMax2 { - input: { - message: string; - }; - output?: unknown; -} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "MyUpdatePostType". @@ -693,6 +642,16 @@ export interface TaskReturnCustomError { }; output?: unknown; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskDoNothingTask". + */ +export interface TaskDoNothingTask { + input: { + message: string; + }; + output?: unknown; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "MyUpdatePostWorkflowType". diff --git a/test/queues/postgres-logs.int.spec.ts b/test/queues/postgres-logs.int.spec.ts new file mode 100644 index 000000000..7a10cb84a --- /dev/null +++ b/test/queues/postgres-logs.int.spec.ts @@ -0,0 +1,59 @@ +import type { Payload } from 'payload' + +/* eslint-disable jest/require-top-level-describe */ +import assert from 'assert' +import path from 'path' +import { fileURLToPath } from 'url' + +import { initPayloadInt } from '../helpers/initPayloadInt.js' +import { withoutAutoRun } from './utilities.js' + +const filename = fileURLToPath(import.meta.url) +const dirname = path.dirname(filename) + +const describePostgres = process.env.PAYLOAD_DATABASE?.startsWith('postgres') + ? describe + : describe.skip + +let payload: Payload + +describePostgres('queues - postgres logs', () => { + beforeAll(async () => { + const initialized = await initPayloadInt( + dirname, + undefined, + undefined, + 'config.postgreslogs.ts', + ) + assert(initialized.payload) + assert(initialized.restClient) + ;({ payload } = initialized) + }) + + afterAll(async () => { + await payload.destroy() + }) + + it('ensure running jobs uses minimal db calls', async () => { + await withoutAutoRun(async () => { + await payload.jobs.queue({ + task: 'DoNothingTask', + input: { + message: 'test', + }, + }) + + // Count every console log (= db call) + const consoleCount = jest.spyOn(console, 'log').mockImplementation(() => {}) + + const res = await payload.jobs.run({}) + + expect(res).toEqual({ + jobStatus: { '1': { status: 'success' } }, + remainingJobsFromQueried: 0, + }) + expect(consoleCount).toHaveBeenCalledTimes(17) // Should be 17 sql calls if the optimizations are used. If not, this would be 22 calls + consoleCount.mockRestore() + }) + }) +}) diff --git a/test/queues/tasks/DoNothingTask.ts b/test/queues/tasks/DoNothingTask.ts new file mode 100644 index 000000000..4f9b018be --- /dev/null +++ b/test/queues/tasks/DoNothingTask.ts @@ -0,0 +1,23 @@ +/* eslint-disable @typescript-eslint/require-await */ +import type { TaskConfig } from 'payload' + +export const DoNothingTask: TaskConfig<'DoNothingTask'> = { + retries: 2, + slug: 'DoNothingTask', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + outputSchema: [], + handler: async ({ input }) => { + return { + state: 'succeeded', + output: { + message: input.message, + }, + } + }, +}