Partitioned Rate Limiting
Driven by popular demand, v0.9 brings partitions to the Smart Engine’s rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.
rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]
Alternatively, you can partition by the job’s account_id
field:
rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]
Naturally, a combination of worker
, args
, and any number of keys
works.
Check out the SmartEngine Guide for options and details.
Batch Worker Improvements
Batches are the oldest worker in Pro, and as such, they existed prior to the
meta
field. Finally, that difference is rectified. Building batches on meta
enables a handful of ergonomic improvements and new functionality:
-
Forwarding
args
through thebatch_callback_args
option -
Heterogeneous batches with an alternative callback module through
the
batch_callback_worker
option -
Fetch all jobs in a batch with
stream_batch_jobs/2
for map/reduce processing
IMPORTANT: For in-flight batch callback jobs to run after an upgrade you’ll
need to migrate batch_id
and callback
into meta
. Run the following SQL in
a migration immediately prior to, or following, the upgrade.
UPDATE oban_jobs
SET meta = jsonb_set_lax(
jsonb_set(meta, '{batch_id}', args->'batch_id'),
'{callback}',
args->'callback',
true,
'return_target'
)
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
AND args ? 'batch_id'
Workflow Worker Improvements
Workflows got a little ergonomic love, too. Now you can dynamically extend
workflows at runtime with the new append_workflow
function:
def process(%Job{} = job) do
jobs =
job
|> append_workflow(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
Note the use of check_deps: false
to prevent dependency validation. To be safe
and check jobs while appending, we’ll use the new stream_workflow_jobs/1
function to load all of the previous jobs and feed them in:
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> stream_workflow_jobs()
|> Enum.to_list()
end)
jobs
|> append_workflow()
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
Changes
-
[Oban] Require Oban
~> v2.9
to support the newcancel_all_jobs
engine callback. -
[Oban.Pro.Queue.SmartEngine] Make operations like
refresh
more efficient (less data transport) and more failure tolerant to prevent producer crashes.