Oban.Pro.Engines.Smart (Oban Pro v1.4.11)
The Smart
engine provides advanced features, enhanced observability, and provides the
foundation for accurate pruning with Oban.Pro.Plugins.DynamicPruner
and persistence for
Oban.Pro.Plugins.DynamicQueues
.
As an Oban.Engine
, it is responsible for all non-plugin database interaction, from inserting
through executing jobs. Major features include:
- Global Concurrency — limits the number of concurrent jobs that run across all nodes
- Rate Limiting — control the number of jobs that execute within a window of time
- Queue Partitioning — segment a queue so concurrency or rate limits apply separately to each partition
- Async Tracking — bundle job acks (completed, cancelled, etc.) to minimize transactions and reduce load on the database
- Accurate Snooze — differentiate between attempts with errors and intentional snoozes
- Unique Bulk Inserts — respect unique constraints during bulk
inserts via
Oban.insert_all/2
with automatic batching to insert any amount of jobs without hitting database limits
Installation
See the Smart Engine section in the adoption guide to get started. The Producer Migrations section contains additional details for more complex setups with multiple instances or prefixes.
Global Concurrency
Global concurrency limits the number of concurrent jobs that run across all nodes.
Typically the global concurrency limit is local_limit * num_nodes
. For example, with three
nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit
is present,
and the local_limit
is omitted, then the local_limit
falls back to the global_limit
.
The only way to guarantee that all connected nodes will run exactly one job concurrently is to
set global_limit: 1
.
Here are some examples:
# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]
# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]
# Execute at most 1 job concurrently
my_queue: [global_limit: 1]
Rate Limiting
Rate limiting controls the number of jobs that execute within a period of time.
Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). The limiter uses a sliding window over the configured period to accurately approximate a limit.
Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.
Without a modifier, the rate_limit
period is defined in seconds. However, you can provide a
:second
, :minute
, :hour
or :day
modifier to use more intuitive values.
period: 30
— 30 secondsperiod: {1, :minute}
— 60 secondsperiod: {2, :minutes}
— 120 secondsperiod: {1, :hour}
— 3,600 secondsperiod: {1, :day}
—86,400 seconds
Here are a few examples:
# Execute at most 1 job per second, the lowest possible limit
my_queue: [rate_limit: [allowed: 1, period: 1]]
# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]
# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]
# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]
Understanding Concurrency Limits
The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a
local_limit
of 10 and aglobal_limit
of 20, a single node will run 10 jobs concurrently. If that same queue had arate_limit
that allowed 5 jobs within a period, then a single node is limited to 5 jobs.
Queue Partitioning
In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.
Partitions are specified with fields
and keys
, where keys
is optional but highly
recommended if you've included :args
. Aside from making partitions easier to reason about,
partitioning by keys minimizes the amount of data a queue needs to track and simplifies
job-fetching queries.
Configuring Partitions
The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)
Here are a few examples of viable partitioning schemes:
# Partition by worker alone
partition: :worker
# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]
# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]
Remember, take care to minimize partition cardinality by using a few keys
whenever possible.
Partitioning based on every permutation of your args
makes concurrency or rate limits hard to
reason about and can negatively impact queue performance.
Global Partitioning
Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.
Consider the following example:
local_limit: 10, global_limit: [allowed: 1, partition: :worker]
The queue is configured to run one job per-worker across every node, but only 10 concurrently on a
single node. That is in contrast to the standard behaviour of global_limit
, which would override
the local_limit
and only allow 1 concurrent job across every node.
Alternatively, you could partition by a single key:
local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]
That configures the queue to run one job concurrently across the entire cluster per tenant_id
.
Rate Limit Partitioning
Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.
For example, to allow one job per-worker, every five seconds, across every instance of the alpha
queue in your cluster:
local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: :worker]
Async Tracking
The results of job execution, e.g. completed
, cancelled
, etc., are bundled together into a
single transaction to minimize load on an app's Ecto pool and the database.
Bundling updates and reporting them asynchronously dramatically reduces the number of transactions per second. However, async bundling introduces a slight lag (up to 5ms) between job execution finishing and recording the outcome in the database.
Async tracking can be disabled for specific queues with the ack_async
option:
queues: [
standard: 30,
critical: [ack_async: false, local_limit: 10]
]
Accurate Snooze
Unlike the Basic
engine which increments attempts
and max_attempts
, the Smart engine rolls
back the attempt
on snooze. This approach preserves the original max_attempts
and records a
snoozed
count in meta
. As a result, it's simple to differentiate between "real" attempts and
snoozes, and backoff calculation remains accurate regardless of snoozing.
The following process/1
function demonstrates checking a job's meta
for a snoozed
count:
def process(job) do
case job.meta do
%{"orig_scheduled_at" => unix_microseconds, "snoozed" => snoozed} ->
IO.inspect({snoozed, unix_microseconds}, label: "Times snoozed since")
_ ->
# This job has never snoozed before
end
end
Unique Bulk Inserts
Where the Basic
engine requires you to insert unique jobs individually, the Smart
engine adds
unique job support to Oban.insert_all/2
. No additional configuration is necessary—simply use
insert_all
instead for unique jobs.
Oban.insert_all(lots_of_unique_jobs)
Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)
list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()
The default batch size for unique jobs is 250
, and 1_000
for non-unique jobs. Regardless, you
can override with batch_size
:
Oban.insert_all(lots_of_jobs, batch_size: 1500)
It's also possible to set a custom timeout for batch inserts:
Oban.insert_all(lots_of_jobs, timeout: :infinity)
A single batch of jobs is inserted without a transaction. Above that, each batch of jobs is
inserted in a single transaction, unless there are 10k total unique jobs to insert. After that
threshold each batch is committed in a separate transaction to prevent memory errors. It's
possible to control the transaction threshold with xact_limit
if you happen to have a tuned
database. For example, to set the limit at 20k jobs:
Oban.insert_all(lots_of_jobs, xact_limit: 20_000)
Producer Migrations
For multiple Oban instances you'll need to configure each one to use the Smart
engine,
otherwise they'll default to the Basic
engine.
If you use prefixes, or have multiple instances with different prefixes, you can specify the prefix and create multiple tables in one migration:
use Ecto.Migration
def change do
Oban.Pro.Migrations.Producers.change()
Oban.Pro.Migrations.Producers.change(prefix: "special")
Oban.Pro.Migrations.Producers.change(prefix: "private")
end
The Producers
migration also exposes up/0
and down/0
functions if change/0
doesn't fit
your usecase.
Summary
Types
global_limit()
@type global_limit() :: pos_integer() | [allowed: pos_integer(), partition: partition()]
local_limit()
@type local_limit() :: pos_integer()
partition()
period()
@type period() :: pos_integer() | {pos_integer(), unit()}
rate_limit()
@type rate_limit() :: [ allowed: pos_integer(), period: period(), partition: partition() ]
unit()
@type unit() ::
:second | :seconds | :minute | :minutes | :hour | :hours | :day | :days