Oban.Pro.Workers.Chain (Oban Pro v1.4.11)

Chain workers link jobs together to ensure they run in a strict sequential order. Downstream jobs won't execute until the upstream job is completed, cancelled, or discarded. Behaviour in the event of cancellation or discards is customizable to allow for uninterrupted processing, holding for outside intervention, or cascading cancellation.

Jobs in a chain only run after the previous job completes successfully, regardless of snoozing or retries.

Usage

Chains are appropriate in situations where jobs are used to synchronize internal state with outside state via events. For example, imagine a system that relies on webhooks from a payment processor to track account balance.

defmodule MyApp.WebhookWorker do
  use Oban.Pro.Workers.Chain, queue: :webhooks, by: :worker

  @impl true
  def process(%Job{args: %{"account_id" => account_id, "event" => event}}) do
    account_id
    |> MyApp.Account.find()
    |> MyApp.Account.handle_webhook_event(event)
  end
end

It's essential that jobs for an account are processed in order, while jobs from separate accounts can run concurrently. Modify the :by option to partition by worker and :account_id:

defmodule MyApp.WebhookWorker do
  use Oban.Pro.Workers.Chain, queue: :webhooks, by: [:worker, args: :account_id]

  ...

Now webhooks for each account are guaranteed to run in order regardless of queue concurrency or errors. See chain partitioning for more partitioning examples.

Chain Partitioning

By default, chains are sequenced by worker, which means any job with the same worker forms a chain. This approach may not always be suitable. For instance, you may want to link workers based on a field like :account_id instead of just the worker. In such cases, you can use the :by option to customize how chains are partitioned.

Here are a few examples of using :by to achieve fine-grained control over chain partitioning:

# Explicitly chain by :worker
use Oban.Pro.Workers.Chain, by: :worker

# Chain by a single args key without considering the worker
use Oban.Pro.Workers.Chain, by: [args: :account_id]

# Chain by multiple args keys without considering the worker
use Oban.Pro.Workers.Chain, by: [args: [:account_id, :cohort]]

# Chain by worker and a single args key
use Oban.Pro.Workers.Chain, by: [:worker, args: :account_id]

# Chain by worker and multiple args key
use Oban.Pro.Workers.Chain, by: [:worker, args: [:account_id, :cohort]]

# Chain by a single meta key
use Oban.Pro.Workers.Chain, by: [meta: :source_id]

Handling Cancelled/Discarded

The way a chain behaves when jobs are cancelled or discarded is customizable with the :on_discarded and :on_cancelled options.

There are three strategies for handling upstream discards and cancellations:

  • :ignore — keep processing jobs in the chain as if upstream cancelled or discarded jobs completed successfully. This is the default behaviour.

  • :hold — stop processing any jobs in the chain until the cancelled or discarded job is completed or eventually deleted.

  • :halt — cancel jobs in the chain when an upstream job is cancelled or discarded. Cancelling cascades until jobs are retried via manual intervention, e.g. with the Web dashboard or Oban.retry_all_jobs/1.

    Because halting cancels a job, you must use on_discarded: :halt along with on_cancelled: :halt to fully stop a chain.

Here's an example of a chain that halts on discarded and holds on cancelled:

use Oban.Pro.Workers.Chain, on_discarded: :halt, on_cancelled: :hold

Holding a job snoozes it briefly while it awaits manual intervention. The default snooze period is 60s, and you can customize it through the :hold_snooze option. Here we're bumping snooze up to 600s (10 minutes):

use Oban.Pro.Workers.Chain, on_cancelled: :hold, hold_snooze: 600

Customizing Chains

Chains use conservative defaults for safe, and relatively quick, dependency resolution. You can customize waiting times and retry intensity by providing a few top-level options:

  • :wait_sleep — the number of milliseconds to wait between checks. This value, combined with :wait_retry, dictates how long a job waits before snoozing. Defaults to 1000ms.

  • :wait_retry — the number of times to retry a check between sleeping. This value, combined with :wait_sleep, dictates how long a job waits before snoozing. Defaults to 10.

  • :wait_snooze — the number of seconds a job will snooze after awaiting executing upstream dependencies. If upstream dependencies are scheduled or retryable then the job snoozes until the latest dependency's scheduled_at timestamp. Defaults to 5.

Optimizing Chains

It's inefficient to have jobs in the same chain run concurrently because only the head of the chain will actually execute. To optimize throughput while allowing concurrent execution, you can partition the queue using the same options as the chain.

For example, here's how to configure a queue for a chain that's partitioned by worker and :user_id, to allow 20 concurrent jobs per-node but only 1 globally for that partition:

use Oban.Pro.Workers.Chain, by: [:worker, args: :user_id]

queues: [
  chain_queue: [limit: 20, global_limit: [allowed: 1, partition: [:worker, args: :user_id]]]
]

Summary

Types

@type chain_by() ::
  :worker
  | {:args, key_or_keys()}
  | {:meta, key_or_keys()}
  | [:worker | {:args, key_or_keys()} | {:meta, key_or_keys()}]
Link to this type

key_or_keys()

@type key_or_keys() :: atom() | [atom()]
@type on_action() :: :halt | :hold | :ignore
@type options() :: [
  by: chain_by(),
  hold_snooze: pos_integer(),
  on_cancelled: on_action(),
  on_discarded: on_action(),
  wait_retry: pos_integer(),
  wait_sleep: timeout(),
  wait_snooze: pos_integer()
]