Oban.Pro.Engines.Smart (Oban Pro v1.5.0-rc.5)
The Smart
engine provides advanced concurrency features, enhanced observability, lighter
weight bulk processing, and provides a foundation for accurate job lifecycle management. As an
Oban.Engine
, it is responsible for all non-plugin database interaction, from inserting through
executing jobs.
Major features include:
- Global Concurrency — limit the number of concurrent jobs that run across all nodes
- Rate Limiting — limit the number of jobs that execute within a window of time
- Queue Partitioning — segment a queue so concurrency or rate limits apply separately to each partition
- Async Tracking — bundle job acks (completed, cancelled, etc.) to minimize transactions and reduce load on the database
- Enhanced Unique — enforce job uniqueness with a custom index to accelerate inserting unique jobs safely between processes and nodes
- Bulk Inserts — automatically batch inserts to prevent hitting
database limits, reduce data sent over the wire, and respect
unique
options when usingOban.insert_all/2
- Accurate Snooze — differentiate between attempts with errors and intentional snoozes
Installation
See the Smart Engine section in the adoption guide to get started.
Global Concurrency
Global concurrency limits the number of concurrent jobs that run across all nodes.
Typically the global concurrency limit is local_limit * num_nodes
. For example, with three
nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit
is present,
and the local_limit
is omitted, then the local_limit
falls back to the global_limit
.
The only way to guarantee that all connected nodes will run exactly one job concurrently is to
set global_limit: 1
.
Here are some examples:
# Execute 10 jobs concurrently across all nodes, with up to 10 on a single node
my_queue: [global_limit: 10]
# Execute 10 jobs concurrently, but only 3 jobs on a single node
my_queue: [local_limit: 3, global_limit: 10]
# Execute at most 1 job concurrently
my_queue: [global_limit: 1]
Rate Limiting
Rate limiting controls the number of jobs that execute within a period of time.
Rate limiting uses counts for the same queue from all other nodes in the cluster (with or without Distributed Erlang). The limiter uses a sliding window over the configured period to accurately approximate a limit.
Every job execution counts toward the rate limit, regardless of whether the job completes, errors, snoozes, etc.
Without a modifier, the rate_limit
period is defined in seconds. However, you can provide a
:second
, :minute
, :hour
or :day
modifier to use more intuitive values.
period: 30
— 30 secondsperiod: {1, :minute}
— 60 secondsperiod: {2, :minutes}
— 120 secondsperiod: {1, :hour}
— 3,600 secondsperiod: {1, :day}
—86,400 seconds
Here are a few examples:
# Execute at most 1 job per second, the lowest possible limit
my_queue: [rate_limit: [allowed: 1, period: 1]]
# Execute at most 10 jobs per 30 seconds
my_queue: [rate_limit: [allowed: 10, period: 30]]
# Execute at most 10 jobs per minute
my_queue: [rate_limit: [allowed: 10, period: {1, :minute}]]
# Execute at most 1000 jobs per hour
my_queue: [rate_limit: [allowed: 1000, period: {1, :hour}]]
Understanding Concurrency Limits
The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a
local_limit
of 10 and aglobal_limit
of 20, a single node will run 10 jobs concurrently. If that same queue had arate_limit
that allowed 5 jobs within a period, then a single node is limited to 5 jobs.
Queue Partitioning
In addition to global and rate limits at the queue level, you can partition a queue so that it's treated as multiple queues where concurrency or rate limits apply separately to each partition.
Partitions are specified with fields
and keys
, where keys
is optional but highly
recommended if you've included :args
. Aside from making partitions easier to reason about,
partitioning by keys minimizes the amount of data a queue needs to track and simplifies
job-fetching queries.
Configuring Partitions
The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)
Here are a few examples of viable partitioning schemes:
# Partition by worker alone
partition: :worker
# Partition by the `id` and `account_id` from args, ignoring the worker
partition: [args: [:id, :account_id]]
# Partition by worker and the `account_id` key from args
partition: [:worker, args: :account_id]
Remember, take care to minimize partition cardinality by using a few keys
whenever possible.
Partitioning based on every permutation of your args
makes concurrency or rate limits hard to
reason about and can negatively impact queue performance.
Global Partitioning
Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.
Consider the following example:
local_limit: 10, global_limit: [allowed: 1, partition: :worker]
The queue is configured to run one job per-worker across every node, but only 10 concurrently on a
single node. That is in contrast to the standard behaviour of global_limit
, which would override
the local_limit
and only allow 1 concurrent job across every node.
Alternatively, you could partition by a single key:
local_limit: 10, global_limit: [allowed: 1, partition: [args: :tenant_id]]
That configures the queue to run one job concurrently across the entire cluster per tenant_id
.
Rate Limit Partitioning
Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.
For example, to allow one job per-worker, every five seconds, across every instance of the alpha
queue in your cluster:
local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: :worker]
Async Tracking
The results of job execution, e.g. completed
, cancelled
, etc., are bundled together into a
single transaction to minimize load on an app's Ecto pool and the database.
Bundling updates and reporting them asynchronously dramatically reduces the number of transactions per second. However, async bundling introduces a slight lag (up to 5ms) between job execution finishing and recording the outcome in the database.
Async tracking can be disabled for specific queues with the ack_async
option:
queues: [
standard: 30,
critical: [ack_async: false, local_limit: 10]
]
Enhanced Unique
The Smart
engine uses an alternative mechanism for unique jobs that's designed for speed,
correctness, scalability, and simplicity. Uniqueness is enforced through a unique index that
makes insertion entirely safe between processes and nodes, without the use of advisory locks or
multiple queries.
Unlike standard uniqueness, which is only checked as jobs are inserted, the index-backed version applies for the job's entire lifetime. That prevents race conditions where a job changes states and inadvertently causes a conflict.
When conflicts are detected, the conflicted job, i.e. the one already in the database, is
annotated with uniq_conflict: true
.
Partitioned Tables
Partitioned tables don't allow arbitrary unique indexes. That makes the enhanced unique mode incompatible with
DynamicPartitioner
, and the Smart engine falls back to the legacy mode instead.
Bulk Inserts
Where the Basic
engine requires you to insert unique jobs individually, the Smart
engine adds
unique job support to Oban.insert_all/2
. No additional configuration is necessary—simply use
insert_all
instead for unique jobs.
Oban.insert_all(lots_of_unique_jobs)
Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)
list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()
The default batch size for unique jobs is 250
, and 1_000
for non-unique jobs. Regardless, you
can override with batch_size
:
Oban.insert_all(lots_of_jobs, batch_size: 1500)
It's also possible to set a custom timeout for batch inserts:
Oban.insert_all(lots_of_jobs, timeout: :infinity)
Accurate Snooze
Unlike the Basic
engine which increments attempts
and max_attempts
, the Smart engine rolls
back the attempt
on snooze. This approach preserves the original max_attempts
and records a
snoozed
count in meta
. As a result, it's simple to differentiate between "real" attempts and
snoozes, and backoff calculation remains accurate regardless of snoozing.
The following process/1
function demonstrates checking a job's meta
for a snoozed
count:
def process(job) do
case job.meta do
%{"orig_scheduled_at" => unix_microseconds, "snoozed" => snoozed} ->
IO.inspect({snoozed, unix_microseconds}, label: "Times snoozed since")
_ ->
# This job has never snoozed before
end
end
Summary
Types
global_limit()
@type global_limit() :: pos_integer() | [allowed: pos_integer(), partition: partition()]
local_limit()
@type local_limit() :: pos_integer()
partition()
period()
@type period() :: pos_integer() | {pos_integer(), unit()}
rate_limit()
@type rate_limit() :: [ allowed: pos_integer(), period: period(), partition: partition() ]
unit()
@type unit() ::
:second | :seconds | :minute | :minutes | :hour | :hours | :day | :days