Dynamic Queues Plugin
The DynamicQueue
plugin extends Oban's basic queue management by persisting
changes across restarts, globally, across all connected nodes. It also boasts a
declarative syntax for specifying which nodes a queue will run on.
DynamicQueues
are ideal for applications that dynamically start, stop, or
modify queues at runtime.
installation
Installation
Before running the DynamicQueues
plugin you must run a migration to add the
oban_queues
table to your database.
mix ecto.gen.migration add_oban_queues
Open the generated migration in your editor and call the change
function on
Oban.Pro.Migrations.DynamicQueues
:
defmodule MyApp.Repo.Migrations.AddObanQueues do
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.DynamicQueues
end
As with the base Oban tables you can optionally provide a prefix
to
"namespace" the table within your database. Here we specify a "private"
prefix:
defmodule MyApp.Repo.Migrations.AddObanQueues do
use Ecto.Migration
def change, do: Oban.Pro.Migrations.DynamicQueues.change("private")
end
Run the migration to create the table:
mix ecto.migrate
Now you can use the DynamicQueues
plugin and start scheduling periodic jobs!
using-and-configuring
Using and Configuring
To begin using DynamicQueues
, add the module to your list of Oban plugins in
config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicQueues]
...
By itself, without providing a list of queues, the plugin doesn't have anything
to run. The syntax for specifying dynamic queues is identical to Oban's built-in
:queues
option, which means you can copy them over:
plugins: [{
Oban.Pro.Plugins.DynamicQueues,
queues: [
default: 20,
mailers: [global_limit: 20],
events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
]
}]
⚠️ Be sure to set queues: []
or queues: false
at this point, to prevent
a conflict between basic and dynamic queues.
Now, when DynamicQueues
initializes, it will persist all of the queues to the
database and start supervising them. The queues
syntax is nearly identical
to Oban's standard queues, with an important enhancement we'll look at shortly.
Each of the persisted queues are referenced globally, by all other connected
Oban instances that are running the DynamicQueues
plugin. Changing the queue's
name, pausing, scaling, or changing any other options will automatically update
queues across all nodes—and persist across restarts.
Persisted queues are referenced by name, so you can tweak a queue's options by
changing the definition within your config. For example, to bump the mailer's
global limit up to 30
:
queues: [
mailers: [global_limit: 30],
...
]
That isn't especially interesting—after all, that's exactly how regular queues work! Dynamic queues start to shine when you insert, update, or delete them dynamically, either through Oban Web or your own application code. But first, let's look at how to limit where dynamic queues run.
limiting-where-queues-run
Limiting Where Queues Run
Dynamic queues can be configured to run on a subset of available nodes. This is
especially useful when wish to restrict resource-intensive queues to only
dedicated nodes. Restriction is configured through the only
option, which you
use like this:
queues: [
basic: [local_limit: 10, only: {:node, :=~, "web|worker"}],
audio: [local_limit: 5, only: {:node, "worker.1"}],
video: [local_limit: 5, only: {:node, "worker.2"}],
learn: [local_limit: 5, only: {:sys_env, "EXLA", "CUDA"}],
store: [local_limit: 1, only: {:sys_env, "WAREHOUSE", true}]
]
In this example we've defined five queues, with the following restrictions:
basic
— will run on a node namedweb
orworker
audio
— will only run on a node namedworker.1
video
— will only run on a node namedworker.2
learn
— will run whereverEXLA=CUDA
is an environment variablestore
— will run whereverWAREHOUSE=true
is an environment variable
Here are the various match modes, operators, and allowed patterns:
Modes
:node
— matches the node name as set in your Oban config. By default,node
is the node's id in a cluster, the hostname outside a cluster, or aDYNO
variable on Heroku.:sys_env
— matches a single system environment variable as retrieved bySystem.get_env/1
Operators
:==
— compares the pattern and runtime value for equality as strings. This is the default operator if nothing is specified.:!=
— compares the pattern and runtime value for inequality as strings:=~
— treats the pattern as a regex and matches it against a runtime value
Patterns
boolean
— eithertrue
orfalse
, which is stringified before comparisonstring
— either a literal pattern or a regular expression, depending on the supplied operator
📚 For types look at the only
section in Typespecs below
deleting-persisted-queues
Deleting Persisted Queues
It is possible to delete a persisted queue during initialization by passing the
:delete
option:
queues: [
some_old_queue: [delete: true],
...
]
Multiple queues can be deleted simultaneously, if necessary. Deleting queues is also idempotent; nothing will happen if a matching queue can't be found.
In the next section we'll look at how to list, insert, update and delete queues dynamically at runtime.
typespecs
Typespecs
📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.
# Partitions
@type period :: pos_integer() | {:seconds | :minutes | :hours | :days, pos_integer()}
@type partition :: [fields: [:worker | :args], keys: [atom()]]
# Only
@type operator :: :== | :!= | :=~
@type pattern :: boolean() | String.t()
@type sys_key :: String.t()
@type only ::
{:node, pattern()}
| {:node, operator(), pattern()}
| {:sys_env, sys_key(), pattern()}
| {:sys_env, sys_key(), operator(), pattern()}
# General
@type queue_name :: atom()
@type queue_opts ::
{:local_limit, pos_integer()}
| {:global_limit, pos_integer()}
| {:only, only()}
| {:paused, boolean()}
| {:rate_limit, [allowed: pos_integer(), period: period(), partition: partition()]}
@type queue_input :: [{queue_name(), pos_integer() | [queue_opts()]}]
runtime-updates
Runtime Updates
Dynamic queues are persisted to the database, making it easy to manipulate them
directly through CRUD operations, or indirectly with Oban's queue operations,
i.e. pause_queue/2
, scale_queue/2
.
In this section we'll walk through each of the available functions and look at some examples.
listing-queues
Listing Queues
@spec all() :: [Ecto.Schema.t()]
Use all/0
to retrieve all persisted queues:
queues = DynamicQueues.all()
This returns a list of Oban.Pro.Queue
schemas with persisted attributes.
Because it is an Ecto schema you're free to compose queries that will filter
down the queues. For example, to find all queues named with an "account" prefix:
import Ecto.Query, only: [where: 3]
Oban.Pro.Queue
|> where([c], ilike(c.name, "account_%"))
|> MyApp.Repo.all()
While it's possible to modify queue's returned from all/0
, it is recommended
that you use update/2
to ensure options are cast and validated correctly.
inserting-queues
Inserting Queues
@spec insert([queue_input()]) :: {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}
The insert/1
function takes a list of queue inputs, exactly like the :queues
option you'd pass as configuration:
DynamicQueues.insert(
basic: 10,
audio: [global_limit: 10],
video: [global_limit: 10],
learn: [local_limit: 5, only: {:node, :=~, "learn"}]
)
Note that insert/1
acts like an upsert, making it possible to modify queues if
the name matches. Still, it is better to use update/2
to make targeted
updates.
updating-queues
Updating Queues
@spec update(queue_name(), queue_opts()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
Use update/2
to modify a single queue by name. Any option available when
inserting queues can be updated. For example, the following call demonstrates
every possible option:
DynamicQueues.update(
:video,
local_limit: 5,
global_limit: 20,
rate_limit: [allowed: 10, period: 30, partition: [fields: [:worker]]],
only: {:node, :=~, "media"},
paused: false
)
Updating a single option won't remove other persisted options. If you'd like to
clear an uption you must set them to nil
:
DynamicQueues.update(:video, global_limit: nil)
Since update/2
operates on a single queue, it is possible to rename a queue
without doing a delete
/insert
dance:
DynamicQueues.update(:video, name: :media)
deleting-queues
Deleting Queues
@spec delete(queue_name()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
The delete/1
function references queues by name. You can use it to delete
queues at runtime, rather than using the :delete
option into the queues
list
in your configuration.
{:ok, _} = DynamicQueues.delete(:audio)
enabling-polling-mode
Enabling Polling Mode
In environments with restricted connectivity (where PubSub doesn't work) you can still use DynamicQueues at runtime through polling mode. The polling interval is entirely up to you, as it's disabled by default.
config :my_app, Oban,
plugins: [{Oban.Pro.Plugins.DynamicQueues, interval: :timer.minutes(1)}]
With the interval above each DynamicQueues instance will wake up every minute, check the database for changes, and start new queues.
isolation-and-namespacing
Isolation and Namespacing
All DynamicQueues functions have an alternate clause that accepts an Oban
instance name for the first argument. This matches base Oban
functions such as
Oban.pause_queue/2
, which allow you to seamlessly work with multiple Oban
instances and across multiple database prefixes. For example, you can use
all/1
to list all queues for the instance named ObanPrivate
:
queues = DynamicQueues.all(ObanPrivate)
Likewise, to insert a new queue using the configuration associated with the
ObanPrivate
instance:
DynamicQueues.insert(ObanPrivate, private: limit: 10)