Composition

Oban Pro provides powerful worker composition tools to orchestrate complex job patterns with ease. Each composition tool addresses specific coordination scenarios, helping you design jobs that can scale reliably across nodes while maintaining precise execution patterns.

This guide explores each of Pro's worker composition tools and demonstrates how they solve real-world challenges:

  • Batch—Process related jobs while tracking overall progress across all nodes and executing optional callbacks.
  • Chain—Process jobs in strict sequential order regardless of scheduling, retries, and queue concurrency.
  • Chunk—Process multiple jobs at once with robust error handling semantics and optional partitioning.
  • Workflow—Compose workers together with arbitrary dependencies between jobs, allowing sequential, fan-out and fan-in execution.

Batch

Batches link the execution of related jobs in a group and run optional callbacks after all jobs are processed. This allows your application to coordinate many jobs in parallel and respond when they complete or fail.

When to Use Batches

Use a batch when you need to:

  • Run many similar operations in parallel
  • Track overall progress of the operations
  • Execute code after all operations have completed
  • Respond to specific events like failures or cancellations

Batches allows applications to track the execution of related jobs in parallel. Batches can define optional callbacks that execute as a separate job when any of these conditions are matched:

  • batch_attempted/1 - All jobs in batch executed at least once
  • batch_cancelled/1 - Any job in the batch is cancelled
  • batch_completed/1 - All jobs in the batch complete successfully
  • batch_discarded/1 - Any job in the batch is discarded
  • batch_exhausted/1 - All jobs are either completed, cancelled, or discarded
  • batch_retryable/1 - Any job in the batch enters retryable state

Batches are ideal for operations where you need to parallelize many jobs and react to events like completion of the batch, individual failures, or retry exhaustion.

Batch Example: Image Processing Service

Imagine a service that thumbnails images and then archives them on demand. While you could do all of the thumbnailing and archiving in a single job, it wouldn't scale horizontally across nodes and it'd lose all progress when the node restarts. Instead, you can model processing as a batch where each job thumbnails a single image and a callback generates the final archive.

Start by defining a batch thumbnailer with callbacks for when the entire batch is completed, or all retries are exhausted and the batch can't ever finish:

defmodule MyApp.ImageProcessor do
  use Oban.Pro.Worker, queue: :media

  @behaviour Oban.Pro.Batch

  alias MyApp.{Media, Notifications}

  @impl Oban.Pro.Worker
  def process(%Job{args: %{"image_id" => id, "operations" => operations}}) do
    with {:ok, image} <- Media.get_image(id),
      Media.process(image, operations)
    end
  end

  @impl Oban.Pro.Batch
  def batch_completed(%Job{meta: %{"batch_id" => batch_id, "account_id" => account_id}}) do
    with {:ok, images} <- Media.get_batch_images(batch_id),
         {:ok, archive_path} <- Media.create_archive(images),
         {:ok, url} <- Media.store_archive(archive_path, account_id) do
      Notifications.image_processing_complete(account_id, url)
    end
  end

  @impl Oban.Pro.Batch
  def batch_discarded(%Job{meta: %{"batch_id" => batch_id, "account_id" => account_id}}) do
    Notifications.processing_failed(account_id, batch_id)
  end
end

The process/1 function handles generating thumbnails for each image. The batch_completed/1 and batch_discarded/1 callbacks are where the magic happens after all the thumbnailing is executed.

Then, create the batch and insert all jobs:

alias Oban.Pro.Batch
alias MyApp.ImageProcessor

image_ids
|> Enum.map(&ImageProcessor.new(%{image_id: &1,  operations: ~w(resize optimize watermark)}))
|> Batch.new(callback_opts: [args: %{account_id: current_user.id}])
|> Oban.insert_all()

This batch will process all images in parallel across all available nodes, triggering the appropriate callback when finished. The batch's progress persists across node restarts, making it resilient to failures.

Chain

Chains link jobs together to ensure they run in strict sequential order. Downstream jobs in the chain won't execute until the upstream job is completed, regardless of scheduling, retries, or queue concurrency.

When to Use Chains

Use chains when you need to:

  • Ensure jobs for a specific resource execute in order
  • Process events exactly in the order they arrived
  • Implement state transitions that must happen sequentially
  • Synchronize internal state with external systems via webhooks

By default, chains are sequenced by worker, but you can customize partitioning with the :by option. For example, to chain by worker and account_id in the job's arguments:

use Oban.Pro.Worker, chain: [by: [:worker, args: :account_id]]

Chain Example: Payment Webhook Processing

For reliable payment processing, we must handle webhooks in exact order to maintain a consistent account balance:

defmodule MyApp.PaymentWebhookWorker do
  use Oban.Pro.Worker, queue: :webhooks, chain: [by: [args: :account_id]]

  alias MyApp.{Accounts, Payments, Transactions}

  @impl Oban.Pro.Worker
  def process(%Job{args: %{"account_id" => account_id, "event" => event, "data" => data}}) do
    with {:ok, account} <- Accounts.get(account_id) do
      case event do
        "payment.succeeded" -> Transactions.record_payment(account, data)
        "payment.refunded" -> Transactions.record_refund(account, data)
        "subscription.renewed" -> Payments.renew_subscription(account, data)
      end
    end
  end
end

With this configuration, all webhook events for the same account will process in sequential order, regardless of how many jobs are in the queue or how many nodes are processing jobs.

If a payment fails and retries, the chain ensures that no subsequent payments for that account will process until the retrying payment either succeeds or is discarded.

Chunk

Chunk workers execute jobs together in groups, either when a certain number of jobs are available or after a timeout elapses. This enables dramatically increased throughput for a single queue by processing many jobs in a single operation.

When to Use Chunks

Use chunks when you need to:

  • Process large volumes of jobs with maximum throughput
  • Group operations for efficiency (like database operations, API calls)
  • Deduplicate or aggregate data across multiple jobs
  • Implement ETL pipelines with high-volume data

Chunk Example: Database ETL Pipeline

A chunk is unique among Oban workers because it receives a list of jobs which it operates on at the same time. That enables operations that span large amounts of data based on a naturally spaced stream of events.

For a high-throughput data import system, we can use a chunk worker to batch insert records:

defmodule MyApp.DataImporter do
  use Oban.Pro.Workers.Chunk,
    queue: :etl,
    size: 1000,
    timeout: 30_000,
    by: [args: :source]

  alias MyApp.{DataWarehouse, Transformers}

  @impl Oban.Pro.Worker
  def process(jobs) do
    grouped_jobs = Enum.group_by(jobs, &(&1.args["source"]))

    Enum.reduce_while(grouped_jobs, :ok, fn {source, source_jobs}, _acc ->
      records =
        source_jobs
        |> Enum.map(&(&1.args["data"]))
        |> Transformers.normalize_for_source(source)
        |> Transformers.deduplicate()
        |> Transformers.enrich()

      case DataWarehouse.bulk_insert(records) do
        {:ok, _} -> {:cont, :ok}
        {:error, reason} -> {:halt, {:error, reason, source_jobs}}
      end
    end)
  end
end

When used with partitioning (by: [args: :source]), chunks will group jobs by their source, allowing efficient batch processing while maintaining separation between different data sources.

Workflow

Workflows compose jobs together with arbitrary dependencies, allowing sequential, fan-out, and fan-in execution patterns. Using a directed acyclic graph (DAG) model, workflows ensure jobs execute in the prescribed order regardless of scheduling or retries.

When to Use Workflows

Use workflows when you need to:

  • Orchestrate complex multi-step processes
  • Define dependencies between different types of jobs
  • Mix sequential and parallel execution
  • Implement fan-out/fan-in patterns
  • Share results between dependent jobs

Workflow Example: Video Processing Pipeline

A video platform requires multiple processing steps after upload, some sequential and some that can run in parallel:

defmodule MyApp.Video.ProcessingOrchestrator do
  use Oban.Pro.Worker, queue: :processing

  alias MyApp.Video.{Transcode, Thumbnail, Analyze}
  alias MyApp.Video.{Transcribe, Classify, Notify}
  alias Oban.Pro.Workflow

  def invoke(video_id) do
    args = %{video_id: video_id}

    Workflow.new(workflow_name: "video-processing")
    |> Workflow.add(:transcode, Transcode.new(args))
    |> Workflow.add(:thumbnail, Thumbnail.new(args), deps: [:transcode])
    |> Workflow.add(:analyze, Analyze.new(args), deps: [:transcode])
    |> Workflow.add(:transcribe, Transcribe.new(args), deps: [:transcode])
    |> Workflow.add(:classify, Classify.new(args), deps: [:analyze, :transcribe])
    |> Workflow.add(:notify, Notify.new(args), deps: [:thumbnail, :classify])
    |> Oban.insert_all()
  end
end

This workflow ensures that:

  1. Transcoding happens first
  2. Thumbnail generation, analysis, and transcription happen after transcoding
  3. Classification starts only after both analysis and transcription complete
  4. Notification is sent only after thumbnail and classification finish

We kick it all off by passing video_id:

MyApp.Video.ProcessingOrchestrator.invoke(video_id)

Choosing the Right Composition Tool

Each composition tool has different strengths:

  • Batch - For processing groups of similar jobs and responding to completion events
  • Chain - For ensuring jobs run in a strict sequential order
  • Chunk - For maximum throughput by processing multiple jobs in a single operation
  • Workflow - For complex orchestration with arbitrary dependencies between different jobs

Here's a decision guide:

If you need to...Use
Run many similar jobs and execute code when they all completeBatch
Ensure jobs for a specific resource run in exact orderChain
Process thousands of jobs with maximum throughputChunk
Execute multiple jobs together as a collectionChunk
Create complex dependencies between different types of jobsWorkflow
Share results between dependent jobsWorkflow
Have a mix of sequential and parallel executionWorkflow

In advanced scenarios, you might even combine these tools - for example, wrapping a Workflow in a Batch for callbacks.

With these powerful composition tools, Pro enables you to build robust, distributed job processing pipelines that can handle complex business requirements while maintaining reliability.