diff --git a/packages/payload/src/queues/config/jobsCollection.ts b/packages/payload/src/queues/config/jobsCollection.ts index a311d2b7b..88304ff7c 100644 --- a/packages/payload/src/queues/config/jobsCollection.ts +++ b/packages/payload/src/queues/config/jobsCollection.ts @@ -13,14 +13,8 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu const workflowSlugs: Set = new Set() const taskSlugs: Set = new Set(['inline']) - const queueNames: Set = new Set(['default']) - config.jobs?.workflows.forEach((workflow) => { workflowSlugs.add(workflow.slug) - - if (workflow.queue) { - queueNames.add(workflow.queue) - } }) config.jobs.tasks.forEach((task) => { @@ -168,13 +162,12 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu }, { name: 'queue', - type: 'select', + type: 'text', admin: { position: 'sidebar', }, defaultValue: 'default', index: true, - options: [...queueNames], }, { name: 'waitUntil', diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index fc58f5764..3ee703c77 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -108,8 +108,9 @@ export type WorkflowConfig ({ args: | { input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input'] + queue?: string req?: PayloadRequest // TTaskOrWorkflowlug with keyof TypedJobs['workflows'] removed: task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never @@ -24,6 +25,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({ } | { input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input'] + queue?: string req?: PayloadRequest task?: never workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows'] @@ -35,10 +37,25 @@ export const getJobsLocalAPI = (payload: Payload) => ({ ? RunningJob : RunningJobFromTask > => { + let queue: string + + // If user specifies queue, use that + if (args.queue) { + queue = args.queue + } else if (args.workflow) { + // Otherwise, if there is a workflow specified, and it has a default queue to use, + // use that + const workflow = payload.config.jobs?.workflows?.find(({ slug }) => slug === args.workflow) + if (workflow?.queue) { + queue = workflow.queue + } + } + return (await payload.create({ collection: 'payload-jobs', data: { input: args.input, + queue, taskSlug: 'task' in args ? args.task : undefined, workflowSlug: 'workflow' in args ? args.workflow : undefined, }, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 186379af3..08cc35ec7 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -153,6 +153,7 @@ describe('Queues', () => { it('ensure job retrying works', async () => { const job = await payload.jobs.queue({ workflow: 'retriesTest', + queue: 'default', input: { message: 'hello', },