The first Oban Pro v1.6 release candidate is out! It introduces major workflow improvements like sub-workflows and context sharing, along with overhauled queue partitioning for better performance, and various usability improvements.
⚠️ This release includes a required (and extremely helpful) migration. Follow the upgrade guide closely!
First up, our favorite addition...
🗂️ Sub-Workflows
Workflows gain powerful new capabilities for organizing complex job relationships with two major
enhancements: add_workflow/4
and add_many/4
.
Use add_workflow/4
to nest entire workflows within others to create hierarchical job
dependencies. This makes complex workflows more maintainable by grouping related jobs together:
extr_flow =
Workflow.new(workflow_name: "extract")
|> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
|> Workflow.add(:transform, WorkerB.new(%{}), deps: :extract)
# Add sub-workflow as a dependency
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add(:finalize, WorkerC.new(%{}), deps: :extract)
Workflows can depend on other workflows, and downstream deps will wait until the sub-workflow completes before executing.
Need to run similar jobs in parallel? Use add_many/4
to add multiple jobs with a single
dependency name:
# Add multiple email jobs that can run in parallel
email_jobs = Enum.map(users, &EmailWorker.new(%{user_id: &1.id}))
workflow =
Workflow.new()
|> Workflow.add_many(:emails, email_jobs)
|> Workflow.add(:report, ReportWorker.new(), deps: :emails)
The add_many/4
step creates a sub workflow from either a list or a map, and the full recorded
results can be extracted with a single call:
def process(job) do
map_of_results = Workflow.all_recorded(job, with_subs: true)
end
🖼️ Context and Workflow Status
Workflows that rely on common data can now share data without duplicating arguments using
put_context/3
:
workflow =
Workflow.new()
|> Workflow.put_context(%{user_id: 123, app_version: "1.2.3"})
|> Workflow.add(:job_a, WorkerA.new(%{}))
|> Workflow.add(:job_b, WorkerB.new(%{}))
# Later in a worker:
def process(job) do
context = Workflow.get_context(job)
# Use context map...
end
It's now easier to check workflow progress with status/1
, which provides execution stats:
%{
total: 5,
state: "executing",
counts: %{completed: 3, executing: 2},
duration: 15_620
} = Workflow.status(workflow_id)
🧰 Queue Partitioning Overhaul
Queue partitioning is completely redesigned for dramatic performance improvements. Jobs are now assigned partition keys on insert rather than at runtime, enabling more efficient querying and eliminating head-of-line blocking when one partition has many jobs.
The new design has none of the issues of the previous solution:
- Job processing is completely fair. Jobs from a single partition can't block processing of other partitions after bulk insert. No priority or scheduling workarounds are necessary.
- Querying in partitioned queues relies on a single, partial index
- Partitioning uses a single, optimized query without any unions or dynamic sections. That allows ecto to prepare and cache a single plan for faster fetching and less memory usage.
In a benchmark of 10k jobs spread across 20 partitions (200k jobs), processing took 17s in v1.6, down from 360s in v1.5 (20x faster) with far less load on the database.
🧨 Global Burst Mode
Global partitioning gained an advanced feature called "burst mode" that allows you to maximize throughput by temporarily exceeding per-partition global limits when there are available resources.
Each global partition is typically restricted to the configured allowed
value. However, with
burst mode enabled, the system can intelligently allocate more jobs to each active partition,
potentially exceeding the per-partition limit while still respecting the overall queue
concurrency.
This is particularly useful when:
- You have many potential partitions but only a few are active at any given time
- You want to maximize throughput while maintaining some level of fairness between partitions
- You need to ensure your queues aren't sitting idle when capacity is available
Here's an example of a queue that will 5 jobs from a single partition concurrently under load, but can burst up to 100 for a single partition when there is available capacity:
config :my_app, Oban,
queues: [
exports: [
local_limit: 100,
global_limit: [allowed: 5, burst: true, partition: [args: :tenant_id]]
]
]
🍋 Preserve DynamicQueues Updates
DynamicQueues now preserves queue changes made at runtime across application restarts. This brings two key improvements:
- Runtime changes to queues (via Web or CLI) persist until explicitly changed in configuration
- A new
:automatic
sync mode that can manage queue deletions based on configuration
# Automatic mode - Deletes queues missing from configuration
config :my_app, Oban,
plugins: [{DynamicQueues, sync_mode: :automatic, queues: [...]}]
In automatic mode, any queue that exists in the database but isn't defined in the configuration will be automatically deleted during startup. This is useful when you want to ensure your runtime queue configuration exactly matches what's defined in your application config.
Now when you pause a queue through the dashboard or change its limits via API, those changes will persist across application restarts until you explicitly update those options in your configuration.
🎨 Decorated Job Enhancements
Decorated jobs gain a few new capabilities. You can now use current_job/0
to access the
underlying job struct from within decorated functions, making it easier to work with job context
or pass job details to other functions. Additionally, you can mark any decorated job as recorded
at runtime with the recorded
option, enabling workflow composition and return value access
without separate modules.
defmodule Business do
use Oban.Pro.Decorator
@job queue: :default, recorded: true
def process_account(account_id) do
job = current_job()
IO.puts("Processing account #{account_id} with job #{job.id}")
{:ok, %{processed: true}}
end
end
Take a look through the complete changelog, or check out the v1.6 Upgrade Guide for complete upgrade steps and migration caveats.
As usual, if you have any questions or comments, ask in the Elixir Forum or the #oban channel on Elixir Slack. For future announcements and insight into what we're working on next, subscribe to our newsletter.