Smart Engine

The SmartEngine is an alternative to Oban's built-in Basic engine that enables advanced features such as:

  • Global concurrency across nodes
  • Rate limiting across nodes
  • Partitioning by worker, args, or specific fields
  • Unique bulk inserts
  • Batched bulk inserts
  • Resiliency against database connection errors
  • Enhanced observability

installation

Installation

The SmartEngine relies on centralized producer records in an oban_producers table to coordinate between nodes with minimal load on the database. To start, create a migration to create an oban_producers table:

$ mix ecto.gen.migration add_oban_producers

Within the migration module:

use Ecto.Migration

defdelegate change, to: Oban.Pro.Migrations.Producers

If you have multiple Oban instances or use prefixes, you can specify the prefix and create multiple tables in one migration:

use Ecto.Migration

def change do
  Oban.Pro.Migrations.Producers.change()
  Oban.Pro.Migrations.Producers.change(prefix: "special")
  Oban.Pro.Migrations.Producers.change(prefix: "private")
end

The Producers migration also exposes up/0 and down/0 functions if change/0 doesn't fit your usecase.

Next, update your config to use the SmartEngine:

config :my_app, Oban,
  engine: Oban.Pro.Queue.SmartEngine,
  ...

If you have multiple Oban instances you'll need to configure each one to use the SmartEngine, otherwise they'll default to the Basic engine.

unique-bulk-insert

Unique Bulk Insert

Where the Basic engine requires you to insert unique jobs individually, the SmartEngine adds unique job support to Oban.insert_all/2. No additional configuration is necessary—simply use insert_all instead for unique jobs.

Oban.insert_all(lots_of_unique_jobs)

Bulk insert also features automatic batching to support inserting an arbitrary number of jobs without hitting database limits (PostgreSQL's binary protocol has a limit of 65,535 parameters that may be sent in a single call. That presents an upper limit on the number of rows that may be inserted at one time.)

list_of_args
|> Enum.map(&MyApp.MyWorker.new/1)
|> Oban.insert_all()

The default batch size is 1000, but you can override it with batch_size:

Oban.insert_all(lots_of_jobs, batch_size: 1500)

It's also possible to set a custom timeout for batch inserts:

Oban.insert_all(lots_of_jobs, timeout: :infinity)

Each batch of jobs are inserted in a single transaction, unless there are 10k total unique jobs to insert. After that threshold each batch is committed in a separate transaction to prevent memory errors. It's possible to control the transaction threshold with xact_limit if you happen to have a tuned database. For example, to set the limit at 20k jobs:

Oban.insert_all(lots_of_jobs, xact_limit: 20_000)

configuring-queues

Configuring Queues

With the SmartEngine in place, you can define global concurrency limits and rate limits at the queue level. The engine supports the basic queue: limit format as well as the expanded format.

Let's look at a few examples:

queues: [
  alpha: 1,
  gamma: [global_limit: 1],
  delta: [local_limit: 2, global_limit: 5],
  kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
  omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
]
  • local_limit — This limits the number of concurrent jobs that run within a single node. Typically the global concurrency limit is local_limit * num_nodes. For example, with three nodes and a local limit of 10, you'll have a global limit of 30. If a global_limit is present, and the local_limit is omitted, then the local_limit falls back to the global_limit.

  • global_limit — This limits the number of concurrent jobs that run across all nodes. The queue will process at most the local limit or global limit, whichever is lower. The only way to guarantee that all connected nodes will run exactly one job concurrently is to set global_limit: 1.

  • rate_limit — This limits the number of jobs that are executed, regardless of the result, within a sliding window of time, across all nodes. Rate limiting uses counts from all other producer records for the same queue in the cluster. The limiter uses a sliding window over the configured period to accurately approximate a limit.

Understanding Concurrency Limits

The local, global, or rate limit with the lowest value determines how many jobs are executed concurrently. For example, with a local_limit of 10 and a global_limit of 20, a single node will run 10 jobs concurrently. If that same queue had a rate_limit that allowed 5 jobs within a period, then a single node is limited to 5 jobs.

rate-limit-periods

Rate Limit Periods

Without a modifier, the rate_limit period is defined in seconds. However, you can provide a :second, :minute, :hour or :day modifier to use more intuitive values. Here are a few examples:

  • period: 30 — 30 seconds
  • period: {1, :minute} — 60 seconds
  • period: {2, :minutes} — 120 seconds
  • period: {1, :hour} — 3,600 seconds
  • period: {1, :day} —86,400 seconds

partitioning

Partitioning

In addition to global and rate limits at the queue level, you can partition limits by worker, args, or specific keys within a queue.

Partitions are specified with fields and keys, where keys is optional but highly recommended if you've included :args. Aside from making partitions easier to reason about, partitioning by keys minimizes the amount of data a queue needs to track and simplifies job-fetching queries.

configuring-partitions

Configuring Partitions

@type partition :: [fields: [:args | :worker], keys: [atom()]]

The partition syntax is identical for global and rate limits (note that you can partition by global or rate, but not both.)

Here are a few examples of viable partitioning schemes:

# Partition by worker alone
partition: [fields: [:worker]]

# Partition by both worker and all args
partition: [fields: [:worker, :args]]

# Partition by the `id` and `account_id` keys
partition: [fields: [:args], keys: [:id, :account_id]]

# Partition by worker, and `account_id` key
partition: [fields: [:worker, :args], keys: [:account_id]]

Remember, take care to minimize partition cardinality by using keys whenever possible.

global-partitioning

Global Partitioning

@type global_limit :: pos_integer() | [allowed: pos_integer(), partition: partition()]

Global partitioning changes global concurency behavior. Rather than applying a fixed number for the queue, it applies to every partition within the queue.

Consider the following example:

local_limit: 10, global_limit: [allowed: 1, partition: [fields: [:worker]]]

The queue is configured to run one job per-worker across every node, but only 10 concurrently on a single node. That is in contrast to the standard behaviour of global_limit, which would override the local_limit and only allow 1 concurrent job across every node.

rate-limit-partitioning

Rate Limit Partitioning

@type rate_limit :: [allowed: pos_integer(), period: perioud(), partition: partition()]

Rate limit partitions operate similarly to global partitions. Rather than limiting all jobs within the queue, they limit each partition within the queue.

For example, to allow one job per-worker, every five seconds, across every instance of the alpha queue in your cluster:

local_limit: 10, rate_limit: [allowed: 1, period: 5, partition: [fields: [:worker]]]