Oban.Pro.Plugins.DynamicPruner (Oban Pro v1.4.2)
DynamicPruner enhances the default Pruner plugin's behaviour by allowing you to customize how jobs are retained in the jobs table. Where the Pruner operates on a fixed schedule and treats all jobs the same, with the DynamicPruner, you can use a flexible CRON schedule and provide custom rules for specific queues, workers, and job states.
Using the Plugin
To start using the DynamicPruner
add the module to your list of Oban plugins in config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicPruner]
...
Without any additional options the pruner operates in maximum length mode (max_len
) and
retains a conservative 1,000 completed
, cancelled
, or discarded
jobs. To increase the
number of jobs retained you can provide your own mode
configuration:
plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_len, 50_000}}]
Now the pruner will retain the most recent 50,000 jobs instead.
A fixed limit on the number of jobs isn't always ideal. Often you want to retain jobs based on
their age instead. For example, if you your application needs to ensure that a duplicate job
hasn't been enqueued within the past 24 hours you need to retain jobs for at least 24 hours; a
fixed limit simply won't work. For that we can use maximum age (max_age
) mode instead:
plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, 60 * 60 * 48}}]
Here we've specified max_age
using seconds, where 60 * 60 * 48
is the number of seconds in
two days.
Calculating the number of seconds in a period isn't especially readable, particularly when you
have numerous max_age
declarations in overrides (see below). For clarity you can specify the
age's time unit as :second
, :minute
, :hour
, :day
or :month
. Here is the same 48 hour
configuration from above, but specified in terms of days:
plugins: [{Oban.Pro.Plugins.DynamicPruner, mode: {:max_age, {2, :days}}}]
Now you can tell exactly how long jobs should be retained, without reverse calculating how many seconds an expression represents.
Providing Overrides
The mode
option is indiscriminate when determining which jobs to prune. It pays no attention
to which queue they are in, what worker the job is for, or which state they landed in. The
DynamicPruner
allows you to specify per-queue, per-worker and per-state overrides that fine
tune pruning.
We'll start with a simple example of limiting the total number of retained jobs in the events
queue:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
mode: {:max_age, {7, :days}},
queue_overrides: [events: {:max_len, 1_000}]
}]
With this configuration most jobs will be retained for seven days, but we'll only keep the latest 1,000 jobs in the events queue. We can extend this further and override all of our queues (and omit the default mode entirely):
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
queue_overrides: [
default: {:max_age, {6, :hours}},
analysis: {:max_age, {1, :day}},
events: {:max_age, {10, :minutes}},
mailers: {:max_age, {2, :weeks}},
media: {:max_age, {2, :months}}
]
}]
When pruning by queue isn't granular enough you can provide overrides by worker instead:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
worker_overrides: [
"MyApp.BusyWorker": {:max_age, {1, :day}},
"MyApp.SecretWorker": {:max_age, {1, :second}},
"MyApp.HistoricWorker": {:max_age, {1, :month}}
]
}]
You can also override by state, which allows you to keep discarded jobs for inspection while quickly purging cancelled or successfully completed jobs:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
state_overrides: [
cancelled: {:max_age, {1, :hour}},
completed: {:max_age, {1, :day}},
discarded: {:max_age, {1, :month}}
]
}]
Naturally you can mix and match overrides to finely control job retention:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
mode: {:max_age, {7, :days}},
queue_overrides: [events: {:max_age, {10, :minutes}}],
state_overrides: [discarded: {:max_age, {2, :days}}],
worker_overrides: ["MyApp.SecretWorker": {:max_age, {1, :second}}]
}]
Override Precedence
Overrides are applied sequentially, in this order:
- Queue
- State
- Worker
- Default
Using the example above, jobs in the events
queue are deleted first, followed by jobs in the
discarded
state, then the MyApp.SecretWorker
worker, and finally any other jobs older than 7
days that weren't covered by any overrides.
Using per State Timestamps
Pruning checks the scheduled_at
timestamp for optimized, index-backed queries by default. In
rare situations where the scheduled_at
timestamp isn't accurate enough to identify prunable
jobs, e.g. cancelling large swaths of jobs scheduled far into the future, you can use the
by_state_timestamp
option for increased accuracy.
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
by_state_timestamp: true,
mode: {:max_age, {7, :days}}
}]
Preventing Timeouts with Overrides
Worker override queries aren't able to use any of Oban's standard indexes. If you're processing
a high volume of jobs, pruning with worker overrides may be extremely slow due to sequential
scans. To prevent timeouts, and speed up pruning altogether, you should add a compound index to
the oban_jobs
table:
create_if_not_exists index(:oban_jobs, [:worker, :state, :id], concurrently: true)
Retaining Jobs Forever
To retain jobs in a queue, state, or for a particular worker forever (without using something
like {:max_age, {999, :years}}
use :infinity
as the length or duration:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
mode: {:max_age, {7, :days}},
state_overrides: [
cancelled: {:max_len, :infinity},
discarded: {:max_age, :infinity}
]
}]
Keeping Up With Inserts
With the default settings the DynamicPruner
will only delete 10,000 jobs each time it prunes.
The limit exists to prevent connection timeouts and excessive table locks. A busy system can
easily insert more than 10,000 jobs per minute during standard operation. If you find that jobs
are accumulating despite active pruning you can override the limit
.
Here we set the delete limit to 25,000 and give it 60 seconds to complete:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
mode: {:max_len, 100_000},
limit: 25_000,
timeout: :timer.seconds(60)
}]
Deleting in PostgreSQL is very fast, and the 10k default is rather conservative. Feel free to increase the limit to a number that your system can handle.
Setting a Schedule
By default, pruning happens at the top of every minute based on the CRON schedule * * * * *
.
You're free to set any CRON schedule you prefer for greater control over when to prune. For
example, to prune once an hour instead:
plugins: [{Oban.Pro.Plugins.DynamicPruner, schedule: "0 * * * *"}]
Or, to prune once a day at midnight in your local timezone:
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
limit: 100_000,
schedule: "0 0 * * *",
timezone: "America/Chicago",
timeout: :timer.minutes(1)
}]
Pruning less frequently can reduce load on your system, particularly if you're using
multiple overrides. However, be sure to set a higher limit
and
timeout
(as shown above) to compensate for more accumulated jobs.
Executing a Callback Before Delete
Sometimes jobs are a historic record of activity and it's desirable to operate on them before they're deleted. For example, you may want copy some jobs into cold storage prior to completion.
To accomplish this, specify a callback to execute before proceeding with the deletion.:
defmodule DeleteHandler do
def call(job_ids) do
# Use the ids at this point, from within a transaction
end
end
plugins: [{
Oban.Pro.Plugins.DynamicPruner,
mode: {:max_age, {7, :days}},
before_delete: {DeleteHandler, :call, []}
}]
The callback receives a list of the ids for the jobs that are about to be deleted. The callback runs within the same transaction that's used for deletion, and you should keep it quick or move heavy processing to an async process. Note that because it runs in the same transaction as deletion, the jobs won't be available after the callback exits.
To pass in extra arguments as "configuration" you can provide args to the callback MFA:
defmodule DeleteHandler do
import Ecto.Query
def call(job_ids, storage_name) do
jobs = MyApp.Repo.all(where(Oban.Job, [j], j.id in ^job_ids))
Storage.call(storage_name, jobs)
end
end
before_delete: {DeleteHandler, :call, [ColdStorage]}
Implementation Notes
Some additional notes about pruning in general and nuances of the DynamicPruner
plugin:
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
Pruning is only applied to jobs that are
completed
,cancelled
ordiscarded
(has reached the maximum number of retries or has been manually killed). It'll never delete a new, scheduled, or retryable job.Only a single node will prune at any given time, which prevents potential deadlocks between transactions.
Instrumenting with Telemetry
The DynamicPruner
plugin adds the following metadata to the [:oban, :plugin, :stop]
event:
:pruned_jobs
- the jobs that were deleted from the database
Note: jobs only include id
, queue
, and state
fields.
Summary
Types
max_age()
@type max_age() :: pos_integer() | {pos_integer(), time_unit()}
max_len()
@type max_len() :: pos_integer()
mode()
option()
@type option() :: {:conf, Oban.Config.t()} | {:schedule, String.t()} | {:name, Oban.name()}
override()
@type override() :: state_override() | queue_override() | worker_override()
queue_override()
state_override()
@type state_override() :: {:completed | :cancelled | :discarded, mode()}
time_unit()
@type time_unit() ::
:second
| :seconds
| :minute
| :minutes
| :hour
| :hours
| :day
| :days
| :week
| :weeks
| :month
| :month