feat: add shouldRestore config to job queue tasks (#10059)
By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed. This PR allows you to configure this behavior through the `retries.shouldRestore` property. This property accepts a boolean or a function for more complex restore behaviors.
This commit is contained in:
@@ -141,3 +141,65 @@ export const createPostHandler: TaskHandler<'createPost'> = async ({ input, job,
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Configuring task restoration
|
||||
|
||||
By default, if a task has passed previously and a workflow is re-run, the task will not be re-run. Instead, the output from the previous task run will be returned. This is to prevent unnecessary re-runs of tasks that have already passed.
|
||||
|
||||
You can configure this behavior through the `retries.shouldRestore` property. This property accepts a boolean or a function.
|
||||
|
||||
If `shouldRestore` is set to true, the task will only be re-run if it previously failed. This is the default behavior.
|
||||
|
||||
If `shouldRestore` this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.
|
||||
|
||||
If `shouldRestore` is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic, e.g you may want to re-run a task up to X amount of times and then restore it for consecutive runs, or only re-run a task if the input has changed.
|
||||
|
||||
Example:
|
||||
|
||||
```ts
|
||||
export default buildConfig({
|
||||
// ...
|
||||
jobs: {
|
||||
tasks: [
|
||||
{
|
||||
slug: 'myTask',
|
||||
retries: {
|
||||
shouldRestore: false,
|
||||
}
|
||||
// ...
|
||||
} as TaskConfig<'myTask'>,
|
||||
]
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
Example - determine whether a task should be restored based on the input data:
|
||||
|
||||
```ts
|
||||
export default buildConfig({
|
||||
// ...
|
||||
jobs: {
|
||||
tasks: [
|
||||
{
|
||||
slug: 'myTask',
|
||||
inputSchema: [
|
||||
{
|
||||
name: 'someDate',
|
||||
type: 'date',
|
||||
required: true,
|
||||
},
|
||||
],
|
||||
retries: {
|
||||
shouldRestore: ({ input }) => {
|
||||
if(new Date(input.someDate) > new Date()) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
},
|
||||
}
|
||||
// ...
|
||||
} as TaskConfig<'myTask'>,
|
||||
]
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import type { Field, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
|
||||
import type { RunningJob, RunningJobSimple } from './workflowTypes.js'
|
||||
import type { BaseJob, RunningJob, RunningJobSimple, SingleTaskStatus } from './workflowTypes.js'
|
||||
|
||||
export type TaskInputOutput = {
|
||||
input: object
|
||||
@@ -101,8 +101,23 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
|
||||
},
|
||||
) => Promise<TTaskOutput>
|
||||
|
||||
export type ShouldRestoreFn = (args: {
|
||||
/**
|
||||
* Input data passed to the task
|
||||
*/
|
||||
input: object
|
||||
job: BaseJob
|
||||
req: PayloadRequest
|
||||
taskStatus: SingleTaskStatus<string>
|
||||
}) => boolean | Promise<boolean>
|
||||
|
||||
export type RetryConfig = {
|
||||
attempts: number
|
||||
/**
|
||||
* This controls how many times the task should be retried if it fails.
|
||||
*
|
||||
* @default undefined - attempts are either inherited from the workflow retry config or set to 0.
|
||||
*/
|
||||
attempts?: number
|
||||
/**
|
||||
* The backoff strategy to use when retrying the task. This determines how long to wait before retrying the task.
|
||||
*
|
||||
@@ -137,6 +152,19 @@ export type RetryConfig = {
|
||||
*/
|
||||
type: 'exponential' | 'fixed'
|
||||
}
|
||||
/**
|
||||
* This controls whether the task output should be restored if the task previously succeeded and the workflow is being retried.
|
||||
*
|
||||
* If this is set to false, the task will be re-run even if it previously succeeded, ignoring the maximum number of retries.
|
||||
*
|
||||
* If this is set to true, the task will only be re-run if it previously failed.
|
||||
*
|
||||
* If this is a function, the return value of the function will determine whether the task should be re-run. This can be used for more complex restore logic,
|
||||
* e.g you may want to re-run a task up until a certain point and then restore it, or only re-run a task if the input has changed.
|
||||
*
|
||||
* @default true - the task output will be restored if the task previously succeeded.
|
||||
*/
|
||||
shouldRestore?: boolean | ShouldRestoreFn
|
||||
}
|
||||
|
||||
export type TaskConfig<
|
||||
|
||||
@@ -170,39 +170,47 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
|
||||
inlineRunner = task
|
||||
}
|
||||
|
||||
let retriesConfig: number | RetryConfig = retries
|
||||
let taskConfig: TaskConfig<string>
|
||||
if (!isInline) {
|
||||
taskConfig = req.payload.config.jobs.tasks.find((t) => t.slug === taskSlug)
|
||||
if (!retriesConfig) {
|
||||
retriesConfig = taskConfig.retries
|
||||
}
|
||||
|
||||
if (!taskConfig) {
|
||||
throw new Error(`Task ${taskSlug} not found in workflow ${job.workflowSlug}`)
|
||||
}
|
||||
}
|
||||
let maxRetries: number =
|
||||
typeof retriesConfig === 'object' ? retriesConfig?.attempts : retriesConfig
|
||||
|
||||
if (maxRetries === undefined || maxRetries === null) {
|
||||
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
|
||||
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
|
||||
maxRetries =
|
||||
typeof workflowConfig.retries === 'object'
|
||||
? workflowConfig.retries.attempts
|
||||
: workflowConfig.retries
|
||||
} else {
|
||||
maxRetries = 0
|
||||
}
|
||||
const retriesConfigFromPropsNormalized =
|
||||
retries == undefined || retries == null
|
||||
? {}
|
||||
: typeof retries === 'number'
|
||||
? { attempts: retries }
|
||||
: retries
|
||||
const retriesConfigFromTaskConfigNormalized = taskConfig
|
||||
? typeof taskConfig.retries === 'number'
|
||||
? { attempts: taskConfig.retries }
|
||||
: taskConfig.retries
|
||||
: {}
|
||||
|
||||
const finalRetriesConfig: RetryConfig = {
|
||||
...retriesConfigFromTaskConfigNormalized,
|
||||
...retriesConfigFromPropsNormalized, // Retry config from props takes precedence
|
||||
}
|
||||
|
||||
const taskStatus: null | SingleTaskStatus<string> = job?.taskStatus?.[taskSlug]
|
||||
? job.taskStatus[taskSlug][taskID]
|
||||
: null
|
||||
|
||||
// Handle restoration of task if it succeeded in a previous run
|
||||
if (taskStatus && taskStatus.complete === true) {
|
||||
return taskStatus.output
|
||||
let shouldRestore = true
|
||||
if (finalRetriesConfig?.shouldRestore === false) {
|
||||
shouldRestore = false
|
||||
} else if (typeof finalRetriesConfig?.shouldRestore === 'function') {
|
||||
shouldRestore = await finalRetriesConfig.shouldRestore({ input, job, req, taskStatus })
|
||||
}
|
||||
if (shouldRestore) {
|
||||
return taskStatus.output
|
||||
}
|
||||
}
|
||||
|
||||
let runner: TaskHandler<TaskType>
|
||||
@@ -245,6 +253,20 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
|
||||
|
||||
let output: object = {}
|
||||
|
||||
let maxRetries: number | undefined = finalRetriesConfig?.attempts
|
||||
|
||||
if (maxRetries === undefined || maxRetries === null) {
|
||||
// Inherit retries from workflow config, if they are undefined and the workflow config has retries configured
|
||||
if (workflowConfig.retries !== undefined && workflowConfig.retries !== null) {
|
||||
maxRetries =
|
||||
typeof workflowConfig.retries === 'object'
|
||||
? workflowConfig.retries.attempts
|
||||
: workflowConfig.retries
|
||||
} else {
|
||||
maxRetries = 0
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
const runnerOutput = await runner({
|
||||
input,
|
||||
@@ -260,7 +282,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
|
||||
maxRetries,
|
||||
output,
|
||||
req,
|
||||
retriesConfig,
|
||||
retriesConfig: finalRetriesConfig,
|
||||
runnerOutput,
|
||||
state,
|
||||
taskConfig,
|
||||
@@ -282,7 +304,7 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
|
||||
maxRetries,
|
||||
output,
|
||||
req,
|
||||
retriesConfig,
|
||||
retriesConfig: finalRetriesConfig,
|
||||
state,
|
||||
taskConfig,
|
||||
taskID,
|
||||
|
||||
Reference in New Issue
Block a user