Oban.Pro.Workers.Chain (Oban Pro v1.4.11)
Chain workers link jobs together to ensure they run in a strict sequential order. Downstream
jobs won't execute until the upstream job is completed
, cancelled
, or discarded
. Behaviour
in the event of cancellation or discards is customizable to allow for uninterrupted processing,
holding for outside intervention, or cascading cancellation.
Jobs in a chain only run after the previous job completes successfully, regardless of snoozing or retries.
Usage
Chains are appropriate in situations where jobs are used to synchronize internal state with outside state via events. For example, imagine a system that relies on webhooks from a payment processor to track account balance.
defmodule MyApp.WebhookWorker do
use Oban.Pro.Workers.Chain, queue: :webhooks, by: :worker
@impl true
def process(%Job{args: %{"account_id" => account_id, "event" => event}}) do
account_id
|> MyApp.Account.find()
|> MyApp.Account.handle_webhook_event(event)
end
end
It's essential that jobs for an account are processed in order, while jobs from separate
accounts can run concurrently. Modify the :by
option to partition by worker and :account_id
:
defmodule MyApp.WebhookWorker do
use Oban.Pro.Workers.Chain, queue: :webhooks, by: [:worker, args: :account_id]
...
Now webhooks for each account are guaranteed to run in order regardless of queue concurrency or errors. See chain partitioning for more partitioning examples.
Chain Partitioning
By default, chains are sequenced by worker
, which means any job with the same worker forms a
chain. This approach may not always be suitable. For instance, you may want to link workers
based on a field like :account_id
instead of just the worker. In such cases, you can use the
:by
option to customize how chains are partitioned.
Here are a few examples of using :by
to achieve fine-grained control over chain partitioning:
# Explicitly chain by :worker
use Oban.Pro.Workers.Chain, by: :worker
# Chain by a single args key without considering the worker
use Oban.Pro.Workers.Chain, by: [args: :account_id]
# Chain by multiple args keys without considering the worker
use Oban.Pro.Workers.Chain, by: [args: [:account_id, :cohort]]
# Chain by worker and a single args key
use Oban.Pro.Workers.Chain, by: [:worker, args: :account_id]
# Chain by worker and multiple args key
use Oban.Pro.Workers.Chain, by: [:worker, args: [:account_id, :cohort]]
# Chain by a single meta key
use Oban.Pro.Workers.Chain, by: [meta: :source_id]
Handling Cancelled/Discarded
The way a chain behaves when jobs are cancelled or discarded is customizable with the
:on_discarded
and :on_cancelled
options.
There are three strategies for handling upstream discards and cancellations:
:ignore
— keep processing jobs in the chain as if upstreamcancelled
ordiscarded
jobs completed successfully. This is the default behaviour.:hold
— stop processing any jobs in the chain until thecancelled
ordiscarded
job iscompleted
or eventually deleted.:halt
— cancel jobs in the chain when an upstream job iscancelled
ordiscarded
. Cancelling cascades until jobs are retried via manual intervention, e.g. with the Web dashboard orOban.retry_all_jobs/1
.Because halting cancels a job, you must use
on_discarded: :halt
along withon_cancelled: :halt
to fully stop a chain.
Here's an example of a chain that halts on discarded
and holds on cancelled
:
use Oban.Pro.Workers.Chain, on_discarded: :halt, on_cancelled: :hold
Holding a job snoozes it briefly while it awaits manual intervention. The default snooze period
is 60s, and you can customize it through the :hold_snooze
option. Here we're bumping snooze up
to 600s (10 minutes):
use Oban.Pro.Workers.Chain, on_cancelled: :hold, hold_snooze: 600
Customizing Chains
Chains use conservative defaults for safe, and relatively quick, dependency resolution. You can customize waiting times and retry intensity by providing a few top-level options:
:wait_sleep
— the number of milliseconds to wait between checks. This value, combined with:wait_retry
, dictates how long a job waits before snoozing. Defaults to1000ms
.:wait_retry
— the number of times to retry a check between sleeping. This value, combined with:wait_sleep
, dictates how long a job waits before snoozing. Defaults to10
.:wait_snooze
— the number of seconds a job will snooze after awaitingexecuting
upstream dependencies. If upstream dependencies arescheduled
orretryable
then the job snoozes until the latest dependency'sscheduled_at
timestamp. Defaults to5
.
Optimizing Chains
It's inefficient to have jobs in the same chain run concurrently because only the head of the chain will actually execute. To optimize throughput while allowing concurrent execution, you can partition the queue using the same options as the chain.
For example, here's how to configure a queue for a chain that's partitioned by worker
and
:user_id
, to allow 20 concurrent jobs per-node but only 1 globally for that partition:
use Oban.Pro.Workers.Chain, by: [:worker, args: :user_id]
queues: [
chain_queue: [limit: 20, global_limit: [allowed: 1, partition: [:worker, args: :user_id]]]
]
Summary
Types
chain_by()
@type chain_by() :: :worker | {:args, key_or_keys()} | {:meta, key_or_keys()} | [:worker | {:args, key_or_keys()} | {:meta, key_or_keys()}]
key_or_keys()
on_action()
@type on_action() :: :halt | :hold | :ignore
options()
@type options() :: [ by: chain_by(), hold_snooze: pos_integer(), on_cancelled: on_action(), on_discarded: on_action(), wait_retry: pos_integer(), wait_sleep: timeout(), wait_snooze: pos_integer() ]