Composition
Oban Pro provides powerful worker composition tools to orchestrate complex job patterns with ease. Each composition tool addresses specific coordination scenarios, helping you design jobs that can scale reliably across nodes while maintaining precise execution patterns.
This guide explores each of Pro's worker composition tools and demonstrates how they solve real-world challenges:
- Batch—Process related jobs while tracking overall progress across all nodes and executing optional callbacks.
- Chain—Process jobs in strict sequential order regardless of scheduling, retries, and queue concurrency.
- Chunk—Process multiple jobs at once with robust error handling semantics and optional partitioning.
- Workflow—Compose workers together with arbitrary dependencies between jobs, allowing sequential, fan-out and fan-in execution.
Batch
Batches link the execution of related jobs in a group and run optional callbacks after all jobs are processed. This allows your application to coordinate many jobs in parallel and respond when they complete or fail.
When to Use Batches
Use a batch when you need to:
- Run many similar operations in parallel
- Track overall progress of the operations
- Execute code after all operations have completed
- Respond to specific events like failures or cancellations
Batches allows applications to track the execution of related jobs in parallel. Batches can define optional callbacks that execute as a separate job when any of these conditions are matched:
batch_attempted/1
- All jobs in batch executed at least oncebatch_cancelled/1
- Any job in the batch is cancelledbatch_completed/1
- All jobs in the batch complete successfullybatch_discarded/1
- Any job in the batch is discardedbatch_exhausted/1
- All jobs are either completed, cancelled, or discardedbatch_retryable/1
- Any job in the batch enters retryable state
Batches are ideal for operations where you need to parallelize many jobs and react to events like completion of the batch, individual failures, or retry exhaustion.
Batch Example: Image Processing Service
Imagine a service that thumbnails images and then archives them on demand. While you could do all of the thumbnailing and archiving in a single job, it wouldn't scale horizontally across nodes and it'd lose all progress when the node restarts. Instead, you can model processing as a batch where each job thumbnails a single image and a callback generates the final archive.
Start by defining a batch thumbnailer with callbacks for when the entire batch is completed, or all retries are exhausted and the batch can't ever finish:
defmodule MyApp.ImageProcessor do
use Oban.Pro.Worker, queue: :media
@behaviour Oban.Pro.Batch
alias MyApp.{Media, Notifications}
@impl Oban.Pro.Worker
def process(%Job{args: %{"image_id" => id, "operations" => operations}}) do
with {:ok, image} <- Media.get_image(id),
Media.process(image, operations)
end
end
@impl Oban.Pro.Batch
def batch_completed(%Job{meta: %{"batch_id" => batch_id, "account_id" => account_id}}) do
with {:ok, images} <- Media.get_batch_images(batch_id),
{:ok, archive_path} <- Media.create_archive(images),
{:ok, url} <- Media.store_archive(archive_path, account_id) do
Notifications.image_processing_complete(account_id, url)
end
end
@impl Oban.Pro.Batch
def batch_discarded(%Job{meta: %{"batch_id" => batch_id, "account_id" => account_id}}) do
Notifications.processing_failed(account_id, batch_id)
end
end
The process/1
function handles generating thumbnails for each image. The batch_completed/1
and
batch_discarded/1
callbacks are where the magic happens after all the thumbnailing is executed.
Then, create the batch and insert all jobs:
alias Oban.Pro.Batch
alias MyApp.ImageProcessor
image_ids
|> Enum.map(&ImageProcessor.new(%{image_id: &1, operations: ~w(resize optimize watermark)}))
|> Batch.new(callback_opts: [args: %{account_id: current_user.id}])
|> Oban.insert_all()
This batch will process all images in parallel across all available nodes, triggering the appropriate callback when finished. The batch's progress persists across node restarts, making it resilient to failures.
Chain
Chains link jobs together to ensure they run in strict sequential order. Downstream jobs in the chain won't execute until the upstream job is completed, regardless of scheduling, retries, or queue concurrency.
When to Use Chains
Use chains when you need to:
- Ensure jobs for a specific resource execute in order
- Process events exactly in the order they arrived
- Implement state transitions that must happen sequentially
- Synchronize internal state with external systems via webhooks
By default, chains are sequenced by worker
, but you can customize partitioning with the :by
option. For example, to chain by worker
and account_id
in the job's arguments:
use Oban.Pro.Worker, chain: [by: [:worker, args: :account_id]]
Chain Example: Payment Webhook Processing
For reliable payment processing, we must handle webhooks in exact order to maintain a consistent account balance:
defmodule MyApp.PaymentWebhookWorker do
use Oban.Pro.Worker, queue: :webhooks, chain: [by: [args: :account_id]]
alias MyApp.{Accounts, Payments, Transactions}
@impl Oban.Pro.Worker
def process(%Job{args: %{"account_id" => account_id, "event" => event, "data" => data}}) do
with {:ok, account} <- Accounts.get(account_id) do
case event do
"payment.succeeded" -> Transactions.record_payment(account, data)
"payment.refunded" -> Transactions.record_refund(account, data)
"subscription.renewed" -> Payments.renew_subscription(account, data)
end
end
end
end
With this configuration, all webhook events for the same account will process in sequential order, regardless of how many jobs are in the queue or how many nodes are processing jobs.
If a payment fails and retries, the chain ensures that no subsequent payments for that account will process until the retrying payment either succeeds or is discarded.
Chunk
Chunk workers execute jobs together in groups, either when a certain number of jobs are available or after a timeout elapses. This enables dramatically increased throughput for a single queue by processing many jobs in a single operation.
When to Use Chunks
Use chunks when you need to:
- Process large volumes of jobs with maximum throughput
- Group operations for efficiency (like database operations, API calls)
- Deduplicate or aggregate data across multiple jobs
- Implement ETL pipelines with high-volume data
Chunk Example: Database ETL Pipeline
A chunk is unique among Oban workers because it receives a list of jobs which it operates on at the same time. That enables operations that span large amounts of data based on a naturally spaced stream of events.
For a high-throughput data import system, we can use a chunk worker to batch insert records:
defmodule MyApp.DataImporter do
use Oban.Pro.Workers.Chunk,
queue: :etl,
size: 1000,
timeout: 30_000,
by: [args: :source]
alias MyApp.{DataWarehouse, Transformers}
@impl Oban.Pro.Worker
def process(jobs) do
grouped_jobs = Enum.group_by(jobs, &(&1.args["source"]))
Enum.reduce_while(grouped_jobs, :ok, fn {source, source_jobs}, _acc ->
records =
source_jobs
|> Enum.map(&(&1.args["data"]))
|> Transformers.normalize_for_source(source)
|> Transformers.deduplicate()
|> Transformers.enrich()
case DataWarehouse.bulk_insert(records) do
{:ok, _} -> {:cont, :ok}
{:error, reason} -> {:halt, {:error, reason, source_jobs}}
end
end)
end
end
When used with partitioning (by: [args: :source]
), chunks will group jobs by their source,
allowing efficient batch processing while maintaining separation between different data sources.
Workflow
Workflows compose jobs together with arbitrary dependencies, allowing sequential, fan-out, and fan-in execution patterns. Using a directed acyclic graph (DAG) model, workflows ensure jobs execute in the prescribed order regardless of scheduling or retries.
When to Use Workflows
Use workflows when you need to:
- Orchestrate complex multi-step processes
- Define dependencies between different types of jobs
- Mix sequential and parallel execution
- Implement fan-out/fan-in patterns
- Share results between dependent jobs
Workflow Example: Video Processing Pipeline
A video platform requires multiple processing steps after upload, some sequential and some that can run in parallel:
defmodule MyApp.Video.ProcessingOrchestrator do
use Oban.Pro.Worker, queue: :processing
alias MyApp.Video.{Transcode, Thumbnail, Analyze}
alias MyApp.Video.{Transcribe, Classify, Notify}
alias Oban.Pro.Workflow
def invoke(video_id) do
args = %{video_id: video_id}
Workflow.new(workflow_name: "video-processing")
|> Workflow.add(:transcode, Transcode.new(args))
|> Workflow.add(:thumbnail, Thumbnail.new(args), deps: [:transcode])
|> Workflow.add(:analyze, Analyze.new(args), deps: [:transcode])
|> Workflow.add(:transcribe, Transcribe.new(args), deps: [:transcode])
|> Workflow.add(:classify, Classify.new(args), deps: [:analyze, :transcribe])
|> Workflow.add(:notify, Notify.new(args), deps: [:thumbnail, :classify])
|> Oban.insert_all()
end
end
This workflow ensures that:
- Transcoding happens first
- Thumbnail generation, analysis, and transcription happen after transcoding
- Classification starts only after both analysis and transcription complete
- Notification is sent only after thumbnail and classification finish
We kick it all off by passing video_id
:
MyApp.Video.ProcessingOrchestrator.invoke(video_id)
Choosing the Right Composition Tool
Each composition tool has different strengths:
- Batch - For processing groups of similar jobs and responding to completion events
- Chain - For ensuring jobs run in a strict sequential order
- Chunk - For maximum throughput by processing multiple jobs in a single operation
- Workflow - For complex orchestration with arbitrary dependencies between different jobs
Here's a decision guide:
If you need to... | Use |
---|---|
Run many similar jobs and execute code when they all complete | Batch |
Ensure jobs for a specific resource run in exact order | Chain |
Process thousands of jobs with maximum throughput | Chunk |
Execute multiple jobs together as a collection | Chunk |
Create complex dependencies between different types of jobs | Workflow |
Share results between dependent jobs | Workflow |
Have a mix of sequential and parallel execution | Workflow |
In advanced scenarios, you might even combine these tools - for example, wrapping a Workflow in a Batch for callbacks.
With these powerful composition tools, Pro enables you to build robust, distributed job processing pipelines that can handle complex business requirements while maintaining reliability.