Oban.Pro.Plugins.DynamicQueues (Oban Pro v1.1.1)
The DynamicQueue
plugin extends Oban's basic queue management by persisting queue changes
across restarts, globally, across all connected nodes. It also boasts a declarative syntax for
specifying which nodes a queue will run on.
DynamicQueues
are ideal for applications that dynamically start, stop, or modify queues at
runtime.
Installation
Before running the DynamicQueues
plugin, you must run a migration to add the oban_queues
table to your database.
mix ecto.gen.migration add_oban_queues
Open the generated migration in your editor and delegate to the dynamic queues migration:
defmodule MyApp.Repo.Migrations.AddObanQueues do
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.DynamicQueues
end
As with the base Oban tables, you can optionally provide a prefix
to "namespace" the table
within your database. Here we specify a "private"
prefix:
defmodule MyApp.Repo.Migrations.AddObanQueues do
use Ecto.Migration
def change, do: Oban.Pro.Migrations.DynamicQueues.change(prefix: "private")
end
Run the migration to create the table:
mix ecto.migrate
Now you can use the DynamicQueues
plugin and start scheduling periodic jobs!
Using and Configuring
To begin using DynamicQueues
, add the module to your list of Oban plugins in config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicQueues]
...
Without providing a list of queues the plugin doesn't have anything to run. The syntax for
specifying dynamic queues is identical to Oban's built-in :queues
option, which means you can
copy them over:
plugins: [{
Oban.Pro.Plugins.DynamicQueues,
queues: [
default: 20,
mailers: [global_limit: 20],
events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
]
}]
Prevent Queue Conflicts
Be sure to either omit any top level
queues
configuration to prevent a conflict between basic and dynamic queues.
Now, when DynamicQueues
initializes, it will persist all of the queues to the database and
start supervising them. The queues
syntax is nearly identical to Oban's standard queues,
with an important enhancement we'll look at shortly.
Each of the persisted queues are referenced globally, by all other connected Oban instances that
are running the DynamicQueues
plugin. Changing the queue's name, pausing, scaling, or changing
any other options will automatically update queues across all nodes—and persist across restarts.
Persisted queues are referenced by name, so you can tweak a queue's options by changing the
definition within your config. For example, to bump the mailer's global limit up to 30
:
queues: [
mailers: [global_limit: 30],
...
]
That isn't especially interesting—after all, that's exactly how regular queues work! Dynamic queues start to shine when you insert, update, or delete them dynamically, either through Oban Web or your own application code. But first, let's look at how to limit where dynamic queues run.
Limiting Where Queues Run
Dynamic queues can be configured to run on a subset of available nodes. This is especially
useful when wish to restrict resource-intensive queues to only dedicated nodes. Restriction is
configured through the only
option, which you use like this:
queues: [
basic: [local_limit: 10, only: {:node, :=~, "web|worker"}],
audio: [local_limit: 5, only: {:node, "worker.1"}],
video: [local_limit: 5, only: {:node, "worker.2"}],
learn: [local_limit: 5, only: {:sys_env, "EXLA", "CUDA"}],
store: [local_limit: 1, only: {:sys_env, "WAREHOUSE", true}]
]
In this example we've defined five queues, with the following restrictions:
basic
— will run on a node namedweb
orworker
audio
— will only run on a node namedworker.1
video
— will only run on a node namedworker.2
learn
— will run whereverEXLA=CUDA
is an environment variablestore
— will run whereverWAREHOUSE=true
is an environment variable
Here are the various match modes, operators, and allowed patterns:
Modes
:node
— matches the node name as set in your Oban config. By default,node
is the node's id in a cluster, the hostname outside a cluster, or aDYNO
variable on Heroku.:sys_env
— matches a single system environment variable as retrieved bySystem.get_env/1
Operators
:==
— compares the pattern and runtime value for equality as strings. This is the default operator if nothing is specified.:!=
— compares the pattern and runtime value for inequality as strings:=~
— treats the pattern as a regex and matches it against a runtime value
Patterns
boolean
— eithertrue
orfalse
, which is stringified before comparisonstring
— either a literal pattern or a regular expression, depending on the supplied operator
Deleting Persisted Queues
It is possible to delete a persisted queue during initialization by passing the :delete
option:
queues: [
some_old_queue: [delete: true],
...
]
Multiple queues can be deleted simultaneously, if necessary. Deleting queues is also idempotent; nothing will happen if a matching queue can't be found.
In the next section we'll look at how to list, insert, update and delete queues dynamically at runtime.
Runtime Updates
Dynamic queues are persisted to the database, making it easy to manipulate them directly through
CRUD operations, or indirectly with Oban's queue operations, i.e. pause_queue/2
,
scale_queue/2
.
Explicit queue options will be overwritten on restart, while omitted fields are retained. For example, consider the following dynamic queue entry:
queues: [
default: [limit: 10],
...
]
Scaling that default
queue up or down at runtime wouldn't persist across a restart, because
the definition will overwrite the limit
. However, pausing the queue or changing the
global_limit
would persist because they aren't included in the queue definition.
# pause, global_limit, etc. will persist, but limit won't
default: [limit: 10]
# pause will persist, but limit and global_limit won't
default: [limit: 10, global_limit: 20]
# neither limits nor pausing will persist
default: [limit: 10, global_limit: 20, paused: true]
See function documentation for all/0
, insert/1
, update/2
, and delete/1
for more
information about runtime updates.
Enabling Polling Mode
In environments with restricted connectivity (where PubSub doesn't work) you can still use DynamicQueues at runtime through polling mode. The polling interval is entirely up to you, as it's disabled by default.
config :my_app, Oban,
plugins: [{Oban.Pro.Plugins.DynamicQueues, interval: :timer.minutes(1)}]
With the interval above each DynamicQueues instance will wake up every minute, check the database for changes, and start new queues.
Isolation and Namespacing
All DynamicQueues functions have an alternate clause that accepts an Oban instance name for the
first argument. This matches base Oban
functions such as Oban.pause_queue/2
, which allow you
to seamlessly work with multiple Oban instances and across multiple database prefixes. For
example, you can use all/1
to list all queues for the instance named ObanPrivate
:
queues = DynamicQueues.all(ObanPrivate)
Likewise, to insert a new queue using the configuration associated with the ObanPrivate
instance:
DynamicQueues.insert(ObanPrivate, private: limit: 10)
Summary
Functions
Retrieve all persisted queues.
Returns a specification to start this module under a supervisor.
Delete a queue by name at runtime, rather than using the :delete
option into the queues
list
in your configuration.
Persist a list of queue inputs, exactly like the :queues
option passed as configuration.
Modify a single queue's options.
Types
oban_name()
@type oban_name() :: term()
only()
operator()
@type operator() :: :== | :!= | :=~
partition()
@type partition() :: Oban.Pro.Engines.Smart.partition()
pattern()
period()
@type period() :: Oban.Pro.Engines.Smart.period()
queue_input()
@type queue_input() :: [{queue_name(), pos_integer() | queue_opts()}]
queue_name()
@type queue_name() :: atom()
queue_opts()
@type queue_opts() :: {:local_limit, pos_integer()} | {:global_limit, Oban.Pro.Engines.Smart.global_limit()} | {:only, only()} | {:paused, boolean()} | {:rate_limit, Oban.Pro.Engines.Smart.rate_limit()}
sys_key()
@type sys_key() :: String.t()
Functions
all(oban_name \\ Oban)
@spec all(oban_name()) :: [Ecto.Schema.t()]
Retrieve all persisted queues.
While it's possible to modify queue's returned from all/0
, it is recommended that you use
update/2
to ensure options are cast and validated correctly.
Examples
Retrieve a list of all queue schemas with persisted attributes:
DynamicQueues.all()
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
delete(oban_name \\ Oban, name)
@spec delete(oban_name(), queue_name()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
Delete a queue by name at runtime, rather than using the :delete
option into the queues
list
in your configuration.
Examples
Delete ethe "audio" queue:
{:ok, _} = DynamicQueues.delete(:audio)
insert(oban_name \\ Oban, entries)
@spec insert(oban_name(), queue_input()) :: {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}
Persist a list of queue inputs, exactly like the :queues
option passed as configuration.
Note that insert/1
acts like an upsert, making it possible to modify queues if the name
matches. Still, it is better to use update/2
to make targeted updates.
Examples
Insert a variety of queues with standard and advanced options:
DynamicQueues.insert(
basic: 10,
audio: [global_limit: 10],
video: [global_limit: 10],
learn: [local_limit: 5, only: {:node, :=~, "learn"}]
)
update(oban_name \\ Oban, name, opts)
@spec update(oban_name(), queue_name(), queue_opts()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}
Modify a single queue's options.
Every option available when inserting queues can be updated.
Examples
The following call demonstrates updating every possible option:
DynamicQueues.update(
:video,
local_limit: 5,
global_limit: 20,
rate_limit: [allowed: 10, period: 30, partition: [fields: [:worker]]],
only: {:node, :=~, "media"},
paused: false
)
Updating a single option won't remove other persisted options. If you'd like to
clear an uption you must set them to nil
:
DynamicQueues.update(:video, global_limit: nil)
Since update/2
operates on a single queue, it is possible to rename a queue
without doing a delete
/insert
dance:
DynamicQueues.update(:video, name: :media)