Oban Releases

Pro v1.6.0-rc.0

This release enhances workflows with sub-workflows and context sharing, overhauls queue partitioning for better performance, improves dynamic plugins, and adds various usability improvements.

See the v1.6 Upgrade Guide for complete upgrade steps and migration caveats.

🗂️ Sub-Workflows

Workflows gain powerful new capabilities for organizing complex job relationships with two major enhancements: add_workflow/4 and add_many/4.

Use add_workflow/4 to nest entire workflows within others to create hierarchical job dependencies. This makes complex workflows more maintainable by grouping related jobs together:

extr_flow = 
  Workflow.new(workflow_name: "extract")
  |> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
  |> Workflow.add(:transform, WorkerB.new(%{}), deps: :extract)

# Add sub-workflow as a dependency
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add(:finalize, WorkerC.new(%{}), deps: :extract)

Workflows can depend on other workflows, and downstream deps will wait until the sub-workflow completes before executing.

Need to run similar jobs in parallel? Use add_many/4 to add multiple jobs with a single dependency name:

# Add multiple email jobs that can run in parallel
email_jobs = Enum.map(users, &EmailWorker.new(%{user_id: &1.id}))

workflow =
  Workflow.new()
  |> Workflow.add_many(:emails, email_jobs)
  |> Workflow.add(:report, ReportWorker.new(), deps: :emails)

The add_many/4 step creates a sub workflow from either a list or a map, and the full recorded results can be extracted with a single call:

def process(job) do
  map_of_results = Workflow.all_recorded(job, with_subs: true)
end

🖼️ Context and Workflow Status

Workflows that rely on common data can now share data without duplicating arguments using put_context/3:

workflow = 
  Workflow.new()
  |> Workflow.put_context(%{user_id: 123, app_version: "1.2.3"})
  |> Workflow.add(:job_a, WorkerA.new(%{}))
  |> Workflow.add(:job_b, WorkerB.new(%{}))

# Later in a worker:
def process(job) do
  context = Workflow.get_context(job)
  # Use context map...
end

It's now easier to check workflow progress with status/1, which provides execution stats:

%{
  total: 5,
  state: "executing", 
  counts: %{completed: 3, executing: 2},
  duration: 15_620
} = Workflow.status(workflow_id)

🧰 Queue Partitioning Overhaul

Queue partitioning is completely redesigned for dramatic performance improvements. Jobs are now assigned partition keys on insert rather than at runtime, enabling more efficient querying and eliminating head-of-line blocking when one partition has many jobs.

The new design has none of the issues of the previous solution:

  • Job processing is completely fair. Jobs from a single partition can't block processing of other partitions after bulk insert. No priority or scheduling workarounds are necessary.
  • Querying in partitioned queues relies on a single, partial index
  • Partitioning uses a single, optimized query without any unions or dynamic sections. That allows ecto to prepare and cache a single plan for faster fetching and less memory usage.

In a benchmark of 10k jobs spread across 20 partitions (200k jobs), processing took 17s in v1.6, down from 360s in v1.5 (20x faster) with far less load on the database.

🧨 Global Burst Mode

Global partitioning gained an advanced feature called "burst mode" that allows you to maximize throughput by temporarily exceeding per-partition global limits when there are available resources.

Each global partition is typically restricted to the configured allowed value. However, with burst mode enabled, the system can intelligently allocate more jobs to each active partition, potentially exceeding the per-partition limit while still respecting the overall queue concurrency.

This is particularly useful when:

  1. You have many potential partitions but only a few are active at any given time
  2. You want to maximize throughput while maintaining some level of fairness between partitions
  3. You need to ensure your queues aren't sitting idle when capacity is available

Here's an example of a queue that will 5 jobs from a single partition concurrently under load, but can burst up to 100 for a single partition when there is available capacity:

queues: [
  exports: [
    global_limit: [
      allowed: 5,
      burst: true,
      partition: [args: :tenant_id]
    ],
    local_limit: 100
  ]
]

🍋 Preserve DynamicQueues Updates

DynamicQueues now preserves queue changes made at runtime across application restarts. This brings two key improvements:

  1. Runtime changes to queues (via Web or CLI) persist until explicitly changed in configuration
  2. A new :automatic sync mode that can manage queue deletions based on configuration
# Automatic mode - Deletes queues missing from configuration
config :my_app, Oban,
  plugins: [{DynamicQueues, sync_mode: :automatic, queues: [...]}]

In automatic mode, any queue that exists in the database but isn't defined in the configuration will be automatically deleted during startup. This is useful when you want to ensure your runtime queue configuration exactly matches what's defined in your application config.

Now when you pause a queue through the dashboard or change its limits via API, those changes will persist across application restarts until you explicitly update those options in your configuration.

🎨 Decorated Job Enhancements

Decorated jobs gain a few new capabilities. You can now use current_job/0 to access the underlying job struct from within decorated functions, making it easier to work with job context or pass job details to other functions. Additionally, you can mark any decorated job as recorded at runtime with the recorded option, enabling workflow composition and return value access without separate modules.

defmodule Business do
  use Oban.Pro.Decorator

  @job queue: :default, recorded: true
  def process_account(account_id) do
    job = current_job()

    IO.puts("Processing account #{account_id} with job #{job.id}")

    {:ok, %{processed: true}}
  end
end

Enhancements

  • [Workflow] Add add_workflow/4 for creating nested sub-workflows.

    Sub-workflows simplify organizing and managing complex job dependencies by grouping related jobs:

    Workflow.new()
    |> Workflow.add_workflow(:sub, MyApp.SubWorkflow.new())
    |> Workflow.add(:final, FinalWorker.new(%{}), deps: :sub)
    

    Downstream dependencies can reference the sub-workflow as a whole, including any jobs that may be dynamically added during runtime.

  • [Workflow] Add add_many/4 function for creating fan-out sub-workflows.

    This helper enables adding multiple jobs to a workflow with a single name:

    email_jobs = Enum.map(users, &EmailWorker.new(%{user_id: &1.id})
    
    workflow =
      Workflow.new()
      |> Workflow.add_many(:emails, email_jobs))
      |> Workflow.add(:report, ReportWorker.new(), deps: :emails)
    
  • [Workflow] Add sub-workflow metadata and query support.

    Support retrieving sub-workflow jobs with new with_subs option. This makes it possible to fetch all sub-workflow jobs, including all recordings, with a single call.

    Workflow.all_recorded(job, with_subs: true)
    
  • [Workflow] Add put_context/3 for sharing values between workflow jobs.

    The helper inserts a completed job with a recorded value for all other jobs to fetch. This simplifies context sharing and eliminates the need to add the same values as args to all jobs in a workflow.

    workflow = Workflow.put_context(workflow, %{id: 123, name: "Alice"})
    
    def process(job) do
      %{id: id, name: name} = Workflow.get_recorded(job, :context)
    end
    
  • [Workflow] Add status/1 helper for getting workflow execution information.

    The new helper simplifies gathering runtime information about a workflow, including the name, total jbs, overall state of the workflow, elapsed duration, state counts, and timestamps.

    %{total: 5, counts: %{completed: 5}} = Workflow.status(workflow.id)
    
  • [Workflow] Add retry_jobs/2 for retrying workflow jobs.

    The new helper function will retry jobs in the workflow and hold jobs with dependencies accordingly.

    Workflow.retry_jobs(job)
    
  • [Workflow] Optimize workflow indexes and eliminate containment queries.

    A new partial workflow index checks held jobs, workflow ID, and sub-workflow ID at once, eliminating the need for a GIN index on meta and args.

  • [Smart] Add burst mode for global partitioned queues.

    Allows partitioned queues to exceed per-partition limits when capacity is available, while still respecting the overall concurrency limit:

    config :my_app, Oban,
      queues: [
        exports: [
          global_limit: [
            allowed: 5,
            burst: true,
            partition: [args: :tenant_id]
          ],
          local_limit: 100
        ]
      ]
    
  • [Smart] Allow using :meta in partition configuration.

    Now queue partitioning can use values from the job's metadata, including composition values such as workflow_id.

    config :my_app, Oban,
      queues: [media: [global_limit: [allowed: 1, partition: [meta: :workflow_id]]]]
    
  • [Smart] Overhaul queue partitioning for significant performance improvements.

    Adds a generated partition_key column with a partial index to oban_jobs. Jobs are pre-partitioned on insert based on queue configuration, which simplifies partition queries and provides ~20x faster performance.

  • [Smart] Centralize producer refresh and cleanup.

    Producer refreshing and cleanup is now centralized to reduce database load. Rather than one transaction per queue per node every 30 seconds, there is now one transaction per node every 30 seconds. This offers significant query savings in systems running numerous queues.

  • [Smart] Provide alternate map hashing to avoid collision.

    The use of phash2/1 could cause collisions between values such as uuids, which led to false positives for unique checks. This adds an alternate mechanism for hashing that avoids any such collisions, but it is opt-in for backward compatibility.

    To switch, set the following compile time config:

    config :oban_pro, Oban.Pro.Utils, safe_hash: true
    
  • [Decorator] Add current_job/0 to decorated modules.

    The current_job/0 function allows decorated jobs to access the underlying job for inspection or to pass as context to other functions such as Backoff, or Workflow helpers.

  • [Decorator] Support recording as a runtime option.

    Any job may now be marked as recorded at runtime, including decorated jobs:

    @job queue: :processing, recorded: true
    def process_account(account_id) do
      # ...
    end
    
  • [DynamicPrioritizer] Expand options for finer control.

    New :limit and :max_priority options add control for many jobs are prioritized and their maximum priority level.

  • [DynamicQueues] Preserve runtime updates without configuration changes.

    Add automatic sync_mode to insert/update/delete queues based on configuration and prevent overwriting runtime updates to queues unless the configuration changes.

  • [DynamicCron] Use optimized cron expression calculation.

    The new last_at/2 and next_at/2 cron expression functions are vastly faster and more efficient than the previous implementation. This improves cron job insertion performance in guaranteed mode.

Bug Fixes

  • [Migration] Drop tables entirely during initial down migration

    The presence of a uniq_key column prevents dropping the partitioned table after a rollback. The specific sequence of migrating and backfilling is unlikely to be used in the real world.

Pro v1.6.0-rc.1

Bug Fixes

  • [Smart] Safely accept legacy rate limit windows.

    The legacy windows format is a list of maps, not a single map. This adds a custom type to translate legacy values, and correctly handles those values in the limiter itself to prevent crashes.

  • [Smart] Safely handle legacy global tracking data.

    During the initial upgrade, legacy producers will have a different shape of tracked data for partitioned and non-partitioned queues. This addds a translation step to prevent crashing new producers.