← All Articles

Oban Pro v1.6 RC Released

Oban Pro v1.6 RC Released

The first Oban Pro v1.6 release candidate is out! It introduces major workflow improvements like sub-workflows and context sharing, along with overhauled queue partitioning for better performance, and various usability improvements.

⚠️ This release includes a required (and extremely helpful) migration. Follow the upgrade guide closely!

First up, our favorite addition...

🗂️ Sub-Workflows

Workflows gain powerful new capabilities for organizing complex job relationships with two major enhancements: add_workflow/4 and add_many/4.

Use add_workflow/4 to nest entire workflows within others to create hierarchical job dependencies. This makes complex workflows more maintainable by grouping related jobs together:

extr_flow = 
  Workflow.new(workflow_name: "extract")
  |> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
  |> Workflow.add(:transform, WorkerB.new(%{}), deps: :extract)

# Add sub-workflow as a dependency
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add(:finalize, WorkerC.new(%{}), deps: :extract)

Workflows can depend on other workflows, and downstream deps will wait until the sub-workflow completes before executing.

Need to run similar jobs in parallel? Use add_many/4 to add multiple jobs with a single dependency name:

# Add multiple email jobs that can run in parallel
email_jobs = Enum.map(users, &EmailWorker.new(%{user_id: &1.id}))

workflow =
  Workflow.new()
  |> Workflow.add_many(:emails, email_jobs)
  |> Workflow.add(:report, ReportWorker.new(), deps: :emails)

The add_many/4 step creates a sub workflow from either a list or a map, and the full recorded results can be extracted with a single call:

def process(job) do
  map_of_results = Workflow.all_recorded(job, with_subs: true)
end

🖼️ Context and Workflow Status

Workflows that rely on common data can now share data without duplicating arguments using put_context/3:

workflow = 
  Workflow.new()
  |> Workflow.put_context(%{user_id: 123, app_version: "1.2.3"})
  |> Workflow.add(:job_a, WorkerA.new(%{}))
  |> Workflow.add(:job_b, WorkerB.new(%{}))

# Later in a worker:
def process(job) do
  context = Workflow.get_context(job)
  # Use context map...
end

It's now easier to check workflow progress with status/1, which provides execution stats:

%{
  total: 5,
  state: "executing", 
  counts: %{completed: 3, executing: 2},
  duration: 15_620
} = Workflow.status(workflow_id)

🧰 Queue Partitioning Overhaul

Queue partitioning is completely redesigned for dramatic performance improvements. Jobs are now assigned partition keys on insert rather than at runtime, enabling more efficient querying and eliminating head-of-line blocking when one partition has many jobs.

The new design has none of the issues of the previous solution:

  • Job processing is completely fair. Jobs from a single partition can't block processing of other partitions after bulk insert. No priority or scheduling workarounds are necessary.
  • Querying in partitioned queues relies on a single, partial index
  • Partitioning uses a single, optimized query without any unions or dynamic sections. That allows ecto to prepare and cache a single plan for faster fetching and less memory usage.

In a benchmark of 10k jobs spread across 20 partitions (200k jobs), processing took 17s in v1.6, down from 360s in v1.5 (20x faster) with far less load on the database.

🧨 Global Burst Mode

Global partitioning gained an advanced feature called "burst mode" that allows you to maximize throughput by temporarily exceeding per-partition global limits when there are available resources.

Each global partition is typically restricted to the configured allowed value. However, with burst mode enabled, the system can intelligently allocate more jobs to each active partition, potentially exceeding the per-partition limit while still respecting the overall queue concurrency.

This is particularly useful when:

  1. You have many potential partitions but only a few are active at any given time
  2. You want to maximize throughput while maintaining some level of fairness between partitions
  3. You need to ensure your queues aren't sitting idle when capacity is available

Here's an example of a queue that will 5 jobs from a single partition concurrently under load, but can burst up to 100 for a single partition when there is available capacity:

config :my_app, Oban,
  queues: [
    exports: [
      local_limit: 100,
      global_limit: [allowed: 5, burst: true, partition: [args: :tenant_id]]
    ]
  ]

🍋 Preserve DynamicQueues Updates

DynamicQueues now preserves queue changes made at runtime across application restarts. This brings two key improvements:

  1. Runtime changes to queues (via Web or CLI) persist until explicitly changed in configuration
  2. A new :automatic sync mode that can manage queue deletions based on configuration
# Automatic mode - Deletes queues missing from configuration
config :my_app, Oban,
  plugins: [{DynamicQueues, sync_mode: :automatic, queues: [...]}]

In automatic mode, any queue that exists in the database but isn't defined in the configuration will be automatically deleted during startup. This is useful when you want to ensure your runtime queue configuration exactly matches what's defined in your application config.

Now when you pause a queue through the dashboard or change its limits via API, those changes will persist across application restarts until you explicitly update those options in your configuration.

🎨 Decorated Job Enhancements

Decorated jobs gain a few new capabilities. You can now use current_job/0 to access the underlying job struct from within decorated functions, making it easier to work with job context or pass job details to other functions. Additionally, you can mark any decorated job as recorded at runtime with the recorded option, enabling workflow composition and return value access without separate modules.

defmodule Business do
  use Oban.Pro.Decorator

  @job queue: :default, recorded: true
  def process_account(account_id) do
    job = current_job()

    IO.puts("Processing account #{account_id} with job #{job.id}")

    {:ok, %{processed: true}}
  end
end

Take a look through the complete changelog, or check out the v1.6 Upgrade Guide for complete upgrade steps and migration caveats.


As usual, if you have any questions or comments, ask in the Elixir Forum or the #oban channel on Elixir Slack. For future announcements and insight into what we're working on next, subscribe to our newsletter.