Oban.Pro.Plugins.DynamicQueues (Oban Pro v1.6.0-rc.1)
The DynamicQueue
plugin extends Oban's basic queue management by persisting queue changes
across restarts, globally, across all connected nodes. It also boasts a declarative syntax for
specifying which nodes a queue will run on.
DynamicQueues
are ideal for applications that dynamically start, stop, or modify queues at
runtime.
Using and Configuring
To begin using DynamicQueues
, add the module to your list of Oban plugins in config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicQueues]
...
Without providing a list of queues the plugin doesn't have anything to run. The syntax for
specifying dynamic queues is identical to Oban's built-in :queues
option, which means you can
copy them over:
- queues: [
- default: 20,
- mailers: [global_limit: 20],
- events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
- ]
+ plugins: [{
+ Oban.Pro.Plugins.DynamicQueues,
+ queues: [
+ default: 20,
+ mailers: [global_limit: 20],
+ events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
+ ]
+ }]
Now, when DynamicQueues
initializes, it will persist all of the queues to the database and
start supervising them. The queues
syntax is nearly identical to Oban's standard queues,
with an important enhancement we'll look at shortly.
Each of the persisted queues are referenced globally, by all other connected Oban instances that
are running the DynamicQueues
plugin. Changing the queue's name, pausing, scaling, or changing
any other options will automatically update queues across all nodes—and persist across restarts.
Persisted queues are referenced by name, so you can tweak a queue's options by changing the
definition within your config. For example, to bump the mailer's global limit up to 30
:
queues: [
mailers: [global_limit: 30],
...
]
That isn't especially interesting—after all, that's exactly how regular queues work! Dynamic queues start to shine when you insert, update, or delete them dynamically, either through Oban Web or your own application code.
Synchronizing Queues
The :sync_mode
option controls how DynamicQueues handles queue synchronization during startup.
There are two available modes:
:manual
- Only deletes queues explicitly marked withdelete: true
and inserts/updates the remaining queues. This is the default.:automatic
- Automatically deletes any queues that aren't defined in the configuration and inserts/updates the remaining queues.
Here's how to configure each mode:
# Manual mode (default)
config :my_app, Oban, plugins: [{DynamicQueues, sync_mode: :manual, queues: [...]}]
# Automatic mode
config :my_app, Oban, plugins: [{DynamicQueues, sync_mode: :automatic, queues: [...]}]
In manual mode, you must explicitly mark queues for deletion:
queues: [
old_queue: [delete: true],
default: 20,
mailers: [limit: 10]
]
In automatic mode, any queue that exists in the database but isn't defined in the configuration will be automatically deleted during startup. This is useful when you want to ensure your runtime queue configuration exactly matches what's defined in your application config.
Choosing a Sync Mode
Use :manual
mode when you want fine-grained control over queue deletion and want to preserve
dynamically created queues across restarts. Use :automatic
mode when you want to ensure your
queue configuration is the single source of truth and automatically clean up old queues.
In either mode, changes to queues are persisted across restarts until the configuration is
changed. For example, if you change the global_limit
of a queue through the Web dashboard,
that change will persist across restarts until you change the global_limit
in your config.
Limiting Where Queues Run
Dynamic queues can be configured to run on a subset of available nodes. This is especially
useful when wish to restrict resource-intensive queues to only dedicated nodes. Restriction is
configured through the only
option, which you use like this:
queues: [
basic: [local_limit: 10, only: {:node, :=~, "web|worker"}],
audio: [local_limit: 5, only: {:node, "worker.1"}],
video: [local_limit: 5, only: {:node, "worker.2"}],
learn: [local_limit: 5, only: {:sys_env, "EXLA", "CUDA"}],
store: [local_limit: 1, only: {:sys_env, "WAREHOUSE", true}]
]
In this example we've defined five queues, with the following restrictions:
basic
— will run on a node namedweb
orworker
audio
— will only run on a node namedworker.1
video
— will only run on a node namedworker.2
learn
— will run whereverEXLA=CUDA
is an environment variablestore
— will run whereverWAREHOUSE=true
is an environment variable
Here are the various match modes, operators, and allowed patterns:
Modes
:node
— matches the node name as set in your Oban config. By default,node
is the node's id in a cluster, the hostname outside a cluster, or aDYNO
variable on Heroku.:sys_env
— matches a single system environment variable as retrieved bySystem.get_env/1
Operators
:==
— compares the pattern and runtime value for equality as strings. This is the default operator if nothing is specified.:!=
— compares the pattern and runtime value for inequality as strings:=~
— treats the pattern as a regex and matches it against a runtime value
Patterns
boolean
— eithertrue
orfalse
, which is stringified before comparisonstring
— either a literal pattern or a regular expression, depending on the supplied operator
Runtime Updates
Dynamic queues are persisted to the database, making it easy to manipulate them directly through
CRUD operations, or indirectly with Oban's queue operations, i.e. pause_queue/2
,
scale_queue/2
.
Explicit queue options will be overwritten on restart, while omitted fields are retained. For example, consider the following dynamic queue entry:
queues: [
default: [limit: 10],
...
]
Scaling that default
queue up or down at runtime wouldn't persist across a restart, because
the definition will overwrite the limit
. However, pausing the queue or changing the
global_limit
would persist because they aren't included in the queue definition.
# pause, global_limit, etc. will persist, but limit won't
default: [limit: 10]
# pause will persist, but limit and global_limit won't
default: [limit: 10, global_limit: 20]
# neither limits nor pausing will persist
default: [limit: 10, global_limit: 20, paused: true]
See function documentation for all/0
, insert/1
, update/2
, and delete/1
for more
information about runtime updates.
Enabling Polling Mode
In environments with restricted connectivity (where PubSub doesn't work) you can still use DynamicQueues at runtime through polling mode. The polling interval is entirely up to you, as it's disabled by default.
config :my_app, Oban,
plugins: [{Oban.Pro.Plugins.DynamicQueues, interval: :timer.minutes(1)}]
With the interval above each DynamicQueues instance will wake up every minute, check the database for changes, and start new queues.
Isolation and Namespacing
All DynamicQueues functions have an alternate clause that accepts an Oban instance name for the
first argument. This matches base Oban
functions such as Oban.pause_queue/2
, which allow you
to seamlessly work with multiple Oban instances and across multiple database prefixes. For
example, you can use all/1
to list all queues for the instance named ObanPrivate
:
queues = DynamicQueues.all(ObanPrivate)
Likewise, to insert a new queue using the configuration associated with the ObanPrivate
instance:
DynamicQueues.insert(ObanPrivate, private: limit: 10)
Summary
Functions
Retrieve all persisted queues.
Returns a specification to start this module under a supervisor.
Delete a queue by name at runtime, rather than using the :delete
option into the queues
list
in your configuration.
Retrieve a single persisted queue.
Persist a list of queue inputs, exactly like the :queues
option passed as configuration.
Modify a single queue's options.
Types
@type oban_name() :: term()
@type operator() :: :== | :!= | :=~
@type partition() :: Oban.Pro.Engines.Smart.partition()
@type period() :: Oban.Pro.Engines.Smart.period()
@type queue_input() :: [{queue_name(), pos_integer() | queue_opts() | [queue_opts()]}]
@type queue_opts() :: {:local_limit, pos_integer()} | {:global_limit, Oban.Pro.Engines.Smart.global_limit()} | {:only, only()} | {:paused, boolean()} | {:rate_limit, Oban.Pro.Engines.Smart.rate_limit()}
@type sync_mode() :: :manual | :automatic
@type sys_key() :: String.t()
Functions
@spec all(oban_name()) :: [Ecto.Schema.t()]
Retrieve all persisted queues.
While it's possible to modify queue's returned from all/0
, it is recommended that you use
update/2
to ensure options are cast and validated correctly.
Examples
Retrieve a list of all queue schemas with persisted attributes:
DynamicQueues.all()
Returns a specification to start this module under a supervisor.
See Supervisor
.
@spec delete(oban_name(), queue_name()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
Delete a queue by name at runtime, rather than using the :delete
option into the queues
list
in your configuration.
Examples
Delete ethe "audio" queue:
{:ok, _} = DynamicQueues.delete(:audio)
@spec get(oban_name(), queue_name()) :: nil | Ecto.Schema.t()
Retrieve a single persisted queue.
Examples
Retrieve the default queue's schema with persisted attributes:
DynamicQueues.get(:default)
@spec insert(oban_name(), queue_input()) :: {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}
Persist a list of queue inputs, exactly like the :queues
option passed as configuration.
Note that insert/1
acts like an upsert, making it possible to modify queues if the name
matches. Still, it is better to use update/2
to make targeted updates.
Examples
Insert a variety of queues with standard and advanced options:
DynamicQueues.insert(
basic: 10,
audio: [global_limit: 10],
video: [global_limit: 10],
learn: [local_limit: 5, only: {:node, :=~, "learn"}]
)
@spec update(oban_name(), queue_name(), queue_opts()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
Modify a single queue's options.
Every option available when inserting queues can be updated.
Examples
The following call demonstrates updating every possible option:
DynamicQueues.update(
:video,
local_limit: 5,
global_limit: 20,
rate_limit: [allowed: 10, period: 30, partition: :worker]],
only: {:node, :=~, "media"},
paused: false
)
Updating a single option won't remove other persisted options. If you'd like to
clear an uption you must set them to nil
:
DynamicQueues.update(:video, global_limit: nil)
Since update/2
operates on a single queue, it is possible to rename a queue
without doing a delete
/insert
dance:
DynamicQueues.update(:video, name: :media)