Dynamic Queues Plugin

The DynamicQueue plugin extends Oban's basic queue management by persisting changes across restarts, globally, across all connected nodes. It also boasts a declarative syntax for specifying which nodes a queue will run on. DynamicQueues are ideal for applications that dynamically start, stop, or modify queues at runtime.

installation

Installation

Before running the DynamicQueues plugin you must run a migration to add the oban_queues table to your database.

mix ecto.gen.migration add_oban_queues

Open the generated migration in your editor and call the change function on Oban.Pro.Migrations.DynamicQueues:

defmodule MyApp.Repo.Migrations.AddObanQueues do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicQueues
end

As with the base Oban tables you can optionally provide a prefix to "namespace" the table within your database. Here we specify a "private" prefix:

defmodule MyApp.Repo.Migrations.AddObanQueues do
  use Ecto.Migration

  def change, do: Oban.Pro.Migrations.DynamicQueues.change("private")
end

Run the migration to create the table:

mix ecto.migrate

Now you can use the DynamicQueues plugin and start scheduling periodic jobs!

using-and-configuring

Using and Configuring

To begin using DynamicQueues, add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicQueues]
  ...

By itself, without providing a list of queues, the plugin doesn't have anything to run. The syntax for specifying dynamic queues is identical to Oban's built-in :queues option, which means you can copy them over:

plugins: [{
  Oban.Pro.Plugins.DynamicQueues,
  queues: [
    default: 20,
    mailers: [global_limit: 20],
    events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
  ]
}]

⚠️ Be sure to either omit any top level queues configuration or explicitly set queues: [] to prevent a conflict between basic and dynamic queues.

Now, when DynamicQueues initializes, it will persist all of the queues to the database and start supervising them. The queues syntax is nearly identical to Oban's standard queues, with an important enhancement we'll look at shortly.

Each of the persisted queues are referenced globally, by all other connected Oban instances that are running the DynamicQueues plugin. Changing the queue's name, pausing, scaling, or changing any other options will automatically update queues across all nodes—and persist across restarts.

Persisted queues are referenced by name, so you can tweak a queue's options by changing the definition within your config. For example, to bump the mailer's global limit up to 30:

queues: [
  mailers: [global_limit: 30],
  ...
]

That isn't especially interesting—after all, that's exactly how regular queues work! Dynamic queues start to shine when you insert, update, or delete them dynamically, either through Oban Web or your own application code. But first, let's look at how to limit where dynamic queues run.

limiting-where-queues-run

Limiting Where Queues Run

Dynamic queues can be configured to run on a subset of available nodes. This is especially useful when wish to restrict resource-intensive queues to only dedicated nodes. Restriction is configured through the only option, which you use like this:

queues: [
  basic: [local_limit: 10, only: {:node, :=~, "web|worker"}],
  audio: [local_limit: 5, only: {:node, "worker.1"}],
  video: [local_limit: 5, only: {:node, "worker.2"}],
  learn: [local_limit: 5, only: {:sys_env, "EXLA", "CUDA"}],
  store: [local_limit: 1, only: {:sys_env, "WAREHOUSE", true}]
]

In this example we've defined five queues, with the following restrictions:

  • basic — will run on a node named web or worker
  • audio — will only run on a node named worker.1
  • video — will only run on a node named worker.2
  • learn — will run wherever EXLA=CUDA is an environment variable
  • store — will run wherever WAREHOUSE=true is an environment variable

Here are the various match modes, operators, and allowed patterns:

Modes

  • :node — matches the node name as set in your Oban config. By default, node is the node's id in a cluster, the hostname outside a cluster, or a DYNO variable on Heroku.
  • :sys_env — matches a single system environment variable as retrieved by System.get_env/1

Operators

  • :== — compares the pattern and runtime value for equality as strings. This is the default operator if nothing is specified.
  • :!= — compares the pattern and runtime value for inequality as strings
  • :=~ — treats the pattern as a regex and matches it against a runtime value

Patterns

  • boolean — either true or false, which is stringified before comparison
  • string — either a literal pattern or a regular expression, depending on the supplied operator

📚 For types look at the only section in Typespecs below

deleting-persisted-queues

Deleting Persisted Queues

It is possible to delete a persisted queue during initialization by passing the :delete option:

queues: [
  some_old_queue: [delete: true],
  ...
]

Multiple queues can be deleted simultaneously, if necessary. Deleting queues is also idempotent; nothing will happen if a matching queue can't be found.

In the next section we'll look at how to list, insert, update and delete queues dynamically at runtime.

typespecs

Typespecs

📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.

# Partitions
@type period :: pos_integer() | {:seconds | :minutes | :hours | :days, pos_integer()}
@type partition :: [fields: [:worker | :args], keys: [atom()]]

# Only
@type operator :: :== | :!= | :=~
@type pattern :: boolean() | String.t()
@type sys_key :: String.t()

@type only ::
        {:node, pattern()}
        | {:node, operator(), pattern()}
        | {:sys_env, sys_key(), pattern()}
        | {:sys_env, sys_key(), operator(), pattern()}

# General
@type queue_name :: atom()
@type queue_opts ::
        {:local_limit, pos_integer()}
        | {:global_limit, pos_integer()}
        | {:only, only()}
        | {:paused, boolean()}
        | {:rate_limit, [allowed: pos_integer(), period: period(), partition: partition()]}

@type queue_input :: [{queue_name(), pos_integer() | [queue_opts()]}]

runtime-updates

Runtime Updates

Dynamic queues are persisted to the database, making it easy to manipulate them directly through CRUD operations, or indirectly with Oban's queue operations, i.e. pause_queue/2, scale_queue/2.

In this section we'll walk through each of the available functions and look at some examples.

listing-queues

Listing Queues

@spec all() :: [Ecto.Schema.t()]

Use all/0 to retrieve all persisted queues:

queues = DynamicQueues.all()

This returns a list of Oban.Pro.Queue schemas with persisted attributes. Because it is an Ecto schema you're free to compose queries that will filter down the queues. For example, to find all queues named with an "account" prefix:

import Ecto.Query, only: [where: 3]

Oban.Pro.Queue
|> where([c], ilike(c.name, "account_%"))
|> MyApp.Repo.all()

While it's possible to modify queue's returned from all/0, it is recommended that you use update/2 to ensure options are cast and validated correctly.

inserting-queues

Inserting Queues

@spec insert([queue_input()]) :: {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}

The insert/1 function takes a list of queue inputs, exactly like the :queues option you'd pass as configuration:

DynamicQueues.insert(
  basic: 10,
  audio: [global_limit: 10],
  video: [global_limit: 10],
  learn: [local_limit: 5, only: {:node, :=~, "learn"}]
)

Note that insert/1 acts like an upsert, making it possible to modify queues if the name matches. Still, it is better to use update/2 to make targeted updates.

updating-queues

Updating Queues

@spec update(queue_name(), queue_opts()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

Use update/2 to modify a single queue by name. Any option available when inserting queues can be updated. For example, the following call demonstrates every possible option:

DynamicQueues.update(
  :video,
  local_limit: 5,
  global_limit: 20,
  rate_limit: [allowed: 10, period: 30, partition: [fields: [:worker]]],
  only: {:node, :=~, "media"},
  paused: false
)

Updating a single option won't remove other persisted options. If you'd like to clear an uption you must set them to nil:

DynamicQueues.update(:video, global_limit: nil)

Since update/2 operates on a single queue, it is possible to rename a queue without doing a delete/insert dance:

DynamicQueues.update(:video, name: :media)

deleting-queues

Deleting Queues

@spec delete(queue_name()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

The delete/1 function references queues by name. You can use it to delete queues at runtime, rather than using the :delete option into the queues list in your configuration.

{:ok, _} = DynamicQueues.delete(:audio)

enabling-polling-mode

Enabling Polling Mode

In environments with restricted connectivity (where PubSub doesn't work) you can still use DynamicQueues at runtime through polling mode. The polling interval is entirely up to you, as it's disabled by default.

config :my_app, Oban,
  plugins: [{Oban.Pro.Plugins.DynamicQueues, interval: :timer.minutes(1)}]

With the interval above each DynamicQueues instance will wake up every minute, check the database for changes, and start new queues.

isolation-and-namespacing

Isolation and Namespacing

All DynamicQueues functions have an alternate clause that accepts an Oban instance name for the first argument. This matches base Oban functions such as Oban.pause_queue/2, which allow you to seamlessly work with multiple Oban instances and across multiple database prefixes. For example, you can use all/1 to list all queues for the instance named ObanPrivate:

queues = DynamicQueues.all(ObanPrivate)

Likewise, to insert a new queue using the configuration associated with the ObanPrivate instance:

DynamicQueues.insert(ObanPrivate, private: limit: 10)