fix: allow specifying queue (#9151)
Allows user to specify a queue when calling `payload.jobs.queue()`. Closes #9133
This commit is contained in:
@@ -13,14 +13,8 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
|
||||
const workflowSlugs: Set<string> = new Set()
|
||||
const taskSlugs: Set<string> = new Set(['inline'])
|
||||
|
||||
const queueNames: Set<string> = new Set(['default'])
|
||||
|
||||
config.jobs?.workflows.forEach((workflow) => {
|
||||
workflowSlugs.add(workflow.slug)
|
||||
|
||||
if (workflow.queue) {
|
||||
queueNames.add(workflow.queue)
|
||||
}
|
||||
})
|
||||
|
||||
config.jobs.tasks.forEach((task) => {
|
||||
@@ -168,13 +162,12 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu
|
||||
},
|
||||
{
|
||||
name: 'queue',
|
||||
type: 'select',
|
||||
type: 'text',
|
||||
admin: {
|
||||
position: 'sidebar',
|
||||
},
|
||||
defaultValue: 'default',
|
||||
index: true,
|
||||
options: [...queueNames],
|
||||
},
|
||||
{
|
||||
name: 'waitUntil',
|
||||
|
||||
@@ -108,8 +108,9 @@ export type WorkflowConfig<TWorkflowSlugOrInput extends keyof TypedJobs['workflo
|
||||
*/
|
||||
label?: string
|
||||
/**
|
||||
* Optionally, define the queue name that this workflow should be tied to.
|
||||
* Optionally, define the default queue name that this workflow should be tied to.
|
||||
* Defaults to "default".
|
||||
* Can be overridden when queuing jobs via Local API.
|
||||
*/
|
||||
queue?: string
|
||||
/**
|
||||
|
||||
@@ -17,6 +17,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
args:
|
||||
| {
|
||||
input: TypedJobs['tasks'][TTaskOrWorkflowSlug]['input']
|
||||
queue?: string
|
||||
req?: PayloadRequest
|
||||
// TTaskOrWorkflowlug with keyof TypedJobs['workflows'] removed:
|
||||
task: TTaskOrWorkflowSlug extends keyof TypedJobs['tasks'] ? TTaskOrWorkflowSlug : never
|
||||
@@ -24,6 +25,7 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
}
|
||||
| {
|
||||
input: TypedJobs['workflows'][TTaskOrWorkflowSlug]['input']
|
||||
queue?: string
|
||||
req?: PayloadRequest
|
||||
task?: never
|
||||
workflow: TTaskOrWorkflowSlug extends keyof TypedJobs['workflows']
|
||||
@@ -35,10 +37,25 @@ export const getJobsLocalAPI = (payload: Payload) => ({
|
||||
? RunningJob<TTaskOrWorkflowSlug>
|
||||
: RunningJobFromTask<TTaskOrWorkflowSlug>
|
||||
> => {
|
||||
let queue: string
|
||||
|
||||
// If user specifies queue, use that
|
||||
if (args.queue) {
|
||||
queue = args.queue
|
||||
} else if (args.workflow) {
|
||||
// Otherwise, if there is a workflow specified, and it has a default queue to use,
|
||||
// use that
|
||||
const workflow = payload.config.jobs?.workflows?.find(({ slug }) => slug === args.workflow)
|
||||
if (workflow?.queue) {
|
||||
queue = workflow.queue
|
||||
}
|
||||
}
|
||||
|
||||
return (await payload.create({
|
||||
collection: 'payload-jobs',
|
||||
data: {
|
||||
input: args.input,
|
||||
queue,
|
||||
taskSlug: 'task' in args ? args.task : undefined,
|
||||
workflowSlug: 'workflow' in args ? args.workflow : undefined,
|
||||
},
|
||||
|
||||
@@ -153,6 +153,7 @@ describe('Queues', () => {
|
||||
it('ensure job retrying works', async () => {
|
||||
const job = await payload.jobs.queue({
|
||||
workflow: 'retriesTest',
|
||||
queue: 'default',
|
||||
input: {
|
||||
message: 'hello',
|
||||
},
|
||||
|
||||
Reference in New Issue
Block a user