Oban.Pro.Plugins.DynamicPrioritizer (Oban Pro v1.4.10)

The DynamicPrioritizer plugin automatically adjusts job's priorities to ensure all jobs are eventually processed.

Using mixed priorities in a queue causes certain jobs to execute before others. For example, a queue that processes jobs from various customers may prioritize customers that are in a higher tier or plan. All high priority (0) jobs are guaranteed to run before any with lower priority (1..9), which is wonderful for the higher tier customers but can lead to resource starvation. When there is a constant flow of high priority jobs the lower priority jobs will never get the chance to run.

Using the Plugin

To use the DynamicPrioritizer plugin add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicPrioritizer]

Without any additional options the plugin will automatically increase the priority of any jobs that are available for 5 minutes or more. To reprioritize after less time waiting you can configure the :after time:

plugins: [{Oban.Pro.Plugins.DynamicPrioritizer, after: :timer.minutes(2)}]

Now lower job priorities are bumped after 2 minutes of waiting, and every minute after that. To lower the reprioritization frequency you can change the :interval along with the :after time:

plugins: [{
  after: :timer.minutes(10),
  interval: :timer.minutes(5)

Here we've specified that job priority will increase every 5 minutes after the first 10 minutes of waiting.

Providing Overrides

The after option applies to jobs for any workers across all queues. The DynamicPrioritizer plugin allows you to specify per-queue and per-worker overrides that fine tune reprioritization.

Let's configure reprioritization for the analysis queue so that it nudges jobs after only 1 minute:

plugins: [{
  queue_overrides: [analysis: :timer.minutes(1)]

We can also effectively disable reprioritization for all other queues by setting the period to :infinity:

plugins: [{
  after: :infinity,
  queue_overrides: [analysis: :timer.minutes(1)]

If per-queue overrides aren't enough we can override on a per-worker basis instead:

plugins: [{
  interval: :timer.seconds(15),
  worker_overrides: [
    "MyApp.HighSLAWorker": :timer.seconds(30),
    "MyApp.LowSLAWorker": :timer.minutes(10)

Naturally you can mix and match overrides to finely control reprioritization:

plugins: [{
  interval: :timer.minutes(2),
  after: :timer.minutes(5),
  queue_overrides: [media: :timer.minutes(10)],
  worker_overrides: ["MyApp.HighSLAWorker": :timer.seconds(30)]

Instrumenting with Telemetry

The DynamicPrioritizer plugin adds the following metadata to the [:oban, :plugin, :stop] event:

  • :reprioritized_count — the number of jobs reprioritized



@type option() ::
  {:conf, Oban.Config.t()}
  | {:after, timeout()}
  | {:interval, pos_integer()}
  | {:name, Oban.name()}
  | {:queue_overrides, [{atom() | String.t(), timeout()}]}
  | {:worker_overrides, [{atom() | String.t(), timeout()}]}