Oban.Pro.Plugins.DynamicQueues (Oban Pro v1.3.1)

The DynamicQueue plugin extends Oban's basic queue management by persisting queue changes across restarts, globally, across all connected nodes. It also boasts a declarative syntax for specifying which nodes a queue will run on.

DynamicQueues are ideal for applications that dynamically start, stop, or modify queues at runtime.

Installation

Before running the DynamicQueues plugin, you must run a migration to add the oban_queues table to your database.

mix ecto.gen.migration add_oban_queues

Open the generated migration in your editor and delegate to the dynamic queues migration:

defmodule MyApp.Repo.Migrations.AddObanQueues do
  use Ecto.Migration

  defdelegate change, to: Oban.Pro.Migrations.DynamicQueues
end

As with the base Oban tables, you can optionally provide a prefix to "namespace" the table within your database. Here we specify a "private" prefix:

defmodule MyApp.Repo.Migrations.AddObanQueues do
  use Ecto.Migration

  def change, do: Oban.Pro.Migrations.DynamicQueues.change(prefix: "private")
end

Run the migration to create the table:

mix ecto.migrate

Now you can use the DynamicQueues plugin and start scheduling periodic jobs!

Using and Configuring

To begin using DynamicQueues, add the module to your list of Oban plugins in config.exs:

config :my_app, Oban,
  plugins: [Oban.Pro.Plugins.DynamicQueues]
  ...

Without providing a list of queues the plugin doesn't have anything to run. The syntax for specifying dynamic queues is identical to Oban's built-in :queues option, which means you can copy them over:

plugins: [{
  Oban.Pro.Plugins.DynamicQueues,
  queues: [
    default: 20,
    mailers: [global_limit: 20],
    events: [local_limit: 30, rate_limit: [allowed: 100, period: 60]]
  ]
}]

Prevent Queue Conflicts

Be sure to either omit any top level queues configuration to prevent a conflict between basic and dynamic queues.

Now, when DynamicQueues initializes, it will persist all of the queues to the database and start supervising them. The queues syntax is nearly identical to Oban's standard queues, with an important enhancement we'll look at shortly.

Each of the persisted queues are referenced globally, by all other connected Oban instances that are running the DynamicQueues plugin. Changing the queue's name, pausing, scaling, or changing any other options will automatically update queues across all nodes—and persist across restarts.

Persisted queues are referenced by name, so you can tweak a queue's options by changing the definition within your config. For example, to bump the mailer's global limit up to 30:

queues: [
  mailers: [global_limit: 30],
  ...
]

That isn't especially interesting—after all, that's exactly how regular queues work! Dynamic queues start to shine when you insert, update, or delete them dynamically, either through Oban Web or your own application code. But first, let's look at how to limit where dynamic queues run.

Limiting Where Queues Run

Dynamic queues can be configured to run on a subset of available nodes. This is especially useful when wish to restrict resource-intensive queues to only dedicated nodes. Restriction is configured through the only option, which you use like this:

queues: [
  basic: [local_limit: 10, only: {:node, :=~, "web|worker"}],
  audio: [local_limit: 5, only: {:node, "worker.1"}],
  video: [local_limit: 5, only: {:node, "worker.2"}],
  learn: [local_limit: 5, only: {:sys_env, "EXLA", "CUDA"}],
  store: [local_limit: 1, only: {:sys_env, "WAREHOUSE", true}]
]

In this example we've defined five queues, with the following restrictions:

  • basic — will run on a node named web or worker
  • audio — will only run on a node named worker.1
  • video — will only run on a node named worker.2
  • learn — will run wherever EXLA=CUDA is an environment variable
  • store — will run wherever WAREHOUSE=true is an environment variable

Here are the various match modes, operators, and allowed patterns:

Modes

  • :node — matches the node name as set in your Oban config. By default, node is the node's id in a cluster, the hostname outside a cluster, or a DYNO variable on Heroku.
  • :sys_env — matches a single system environment variable as retrieved by System.get_env/1

Operators

  • :== — compares the pattern and runtime value for equality as strings. This is the default operator if nothing is specified.
  • :!= — compares the pattern and runtime value for inequality as strings
  • :=~ — treats the pattern as a regex and matches it against a runtime value

Patterns

  • boolean — either true or false, which is stringified before comparison
  • string — either a literal pattern or a regular expression, depending on the supplied operator

Deleting Persisted Queues

It is possible to delete a persisted queue during initialization by passing the :delete option:

queues: [
  some_old_queue: [delete: true],
  ...
]

Multiple queues can be deleted simultaneously, if necessary. Deleting queues is also idempotent; nothing will happen if a matching queue can't be found.

In the next section we'll look at how to list, insert, update and delete queues dynamically at runtime.

Runtime Updates

Dynamic queues are persisted to the database, making it easy to manipulate them directly through CRUD operations, or indirectly with Oban's queue operations, i.e. pause_queue/2, scale_queue/2.

Explicit queue options will be overwritten on restart, while omitted fields are retained. For example, consider the following dynamic queue entry:

queues: [
  default: [limit: 10],
  ...
]

Scaling that default queue up or down at runtime wouldn't persist across a restart, because the definition will overwrite the limit. However, pausing the queue or changing the global_limit would persist because they aren't included in the queue definition.

# pause, global_limit, etc. will persist, but limit won't
default: [limit: 10]

# pause will persist, but limit and global_limit won't
default: [limit: 10, global_limit: 20]

# neither limits nor pausing will persist
default: [limit: 10, global_limit: 20, paused: true]

See function documentation for all/0, insert/1, update/2, and delete/1 for more information about runtime updates.

Enabling Polling Mode

In environments with restricted connectivity (where PubSub doesn't work) you can still use DynamicQueues at runtime through polling mode. The polling interval is entirely up to you, as it's disabled by default.

config :my_app, Oban,
  plugins: [{Oban.Pro.Plugins.DynamicQueues, interval: :timer.minutes(1)}]

With the interval above each DynamicQueues instance will wake up every minute, check the database for changes, and start new queues.

Isolation and Namespacing

All DynamicQueues functions have an alternate clause that accepts an Oban instance name for the first argument. This matches base Oban functions such as Oban.pause_queue/2, which allow you to seamlessly work with multiple Oban instances and across multiple database prefixes. For example, you can use all/1 to list all queues for the instance named ObanPrivate:

queues = DynamicQueues.all(ObanPrivate)

Likewise, to insert a new queue using the configuration associated with the ObanPrivate instance:

DynamicQueues.insert(ObanPrivate, private: limit: 10)

Summary

Functions

Retrieve all persisted queues.

Returns a specification to start this module under a supervisor.

Delete a queue by name at runtime, rather than using the :delete option into the queues list in your configuration.

Persist a list of queue inputs, exactly like the :queues option passed as configuration.

Modify a single queue's options.

Types

@type oban_name() :: term()
@type only() ::
  {:node, pattern()}
  | {:node, operator(), pattern()}
  | {:sys_env, sys_key(), pattern()}
  | {:sys_env, sys_key(), operator(), pattern()}
@type operator() :: :== | :!= | :=~
@type partition() :: Oban.Pro.Engines.Smart.partition()
@type pattern() :: boolean() | String.t()
@type period() :: Oban.Pro.Engines.Smart.period()
Link to this type

queue_input()

@type queue_input() :: [{queue_name(), pos_integer() | queue_opts() | [queue_opts()]}]
Link to this type

queue_name()

@type queue_name() :: atom() | binary()
Link to this type

queue_opts()

@type queue_opts() ::
  {:local_limit, pos_integer()}
  | {:global_limit, Oban.Pro.Engines.Smart.global_limit()}
  | {:only, only()}
  | {:paused, boolean()}
  | {:rate_limit, Oban.Pro.Engines.Smart.rate_limit()}
@type sys_key() :: String.t()

Functions

Link to this function

all(oban_name \\ Oban)

@spec all(oban_name()) :: [Ecto.Schema.t()]

Retrieve all persisted queues.

While it's possible to modify queue's returned from all/0, it is recommended that you use update/2 to ensure options are cast and validated correctly.

Examples

Retrieve a list of all queue schemas with persisted attributes:

DynamicQueues.all()
Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

delete(oban_name \\ Oban, name)

@spec delete(oban_name(), queue_name()) ::
  {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

Delete a queue by name at runtime, rather than using the :delete option into the queues list in your configuration.

Examples

Delete ethe "audio" queue:

{:ok, _} = DynamicQueues.delete(:audio)
Link to this function

insert(oban_name \\ Oban, entries)

@spec insert(oban_name(), queue_input()) ::
  {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}

Persist a list of queue inputs, exactly like the :queues option passed as configuration.

Note that insert/1 acts like an upsert, making it possible to modify queues if the name matches. Still, it is better to use update/2 to make targeted updates.

Examples

Insert a variety of queues with standard and advanced options:

DynamicQueues.insert(
  basic: 10,
  audio: [global_limit: 10],
  video: [global_limit: 10],
  learn: [local_limit: 5, only: {:node, :=~, "learn"}]
)
Link to this function

update(oban_name \\ Oban, name, opts)

@spec update(oban_name(), queue_name(), queue_opts()) ::
  {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t()}

Modify a single queue's options.

Every option available when inserting queues can be updated.

Examples

The following call demonstrates updating every possible option:

DynamicQueues.update(
  :video,
  local_limit: 5,
  global_limit: 20,
  rate_limit: [allowed: 10, period: 30, partition: [fields: [:worker]]],
  only: {:node, :=~, "media"},
  paused: false
)

Updating a single option won't remove other persisted options. If you'd like to clear an uption you must set them to nil:

DynamicQueues.update(:video, global_limit: nil)

Since update/2 operates on a single queue, it is possible to rename a queue without doing a delete/insert dance:

DynamicQueues.update(:video, name: :media)