Oban.Pro.Batch behaviour (Oban Pro v1.5.0-rc.6)

Batches link the execution of many jobs as a group and runs optional callbacks after jobs are processed. This allows your application to coordinate the execution of any number of jobs in parallel.

Usage

Batches are composed of one or more Pro workers that are linked with a shared batch_id and optional callbacks. As a simple example, let's define a worker that delivers daily emails:

defmodule MyApp.EmailBatch do
  use Oban.Pro.Worker, queue: :mailers

  @behaviour Oban.Pro.Batch

  @impl Oban.Pro.Worker
  def process(%Job{args: %{"email" => email}}) do
    MyApp.Mailer.daily_update(email)
  end

  @impl Oban.Pro.Batch
  def batch_completed(_job) do
    Logger.info("BATCH COMPLETE")

    :ok
  end
end

Now, create a batch with new/1 by passing a list of job changesets:

emails
|> Enum.map(&MyApp.EmailWorker.new(%{email: &1}))
|> Batch.new()
|> Oban.insert_all()

After all jobs in the batch are completed, the batch_completed/1 callback will be triggered.

Handler Callbacks

After jobs in the batch are processed, a callback job may be inserted. There are four optional batch handler callbacks that a worker may define:

callbackenqueued after
batch_attempted/1all jobs executed at least once
batch_cancelled/1the first job is cancelled
batch_completed/1all jobs in the batch are completed
batch_discarded/1the first job is discarded
batch_exhausted/1all jobs are completed, cancelled, or discarded
batch_retryable/1the first job is retryable

Each callback runs in a separate, isolated job, so it may be retried or discarded like any other job. The callback function receives an Oban.Job struct with the batch_id in meta and should return :ok (or another valid Worker.result()).

Here we'll implement each of the optional handler callbacks and have them print out the batch status along with the batch_id:

defmodule MyApp.BatchWorker do
  use Oban.Pro.Worker

  @behaviour Oban.Pro.Batch

  @impl Oban.Pro.Worker
  def process(_job), do: :ok

  @impl Oban.Pro.Batch
  def batch_attempted(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:attempted,  batch_id})
    :ok
  end

  @impl Oban.Pro.Batch
  def batch_cancelled(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:cancelled,  batch_id})
    :ok
  end

  @impl Oban.Pro.Batch
  def batch_completed(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:completed,  batch_id})
    :ok
  end

  @impl Oban.Pro.Batch
  def batch_discarded(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:discarded,  batch_id})
    :ok
  end

  @impl Oban.Pro.Batch
  def batch_exhausted(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:exhausted,  batch_id})
    :ok
  end

  @impl Oban.Pro.Batch
  def batch_retryable(%Job{meta: %{"batch_id" => batch_id}}) do
    IO.inspect({:retryable,  batch_id})
    :ok
  end
end

Hybrid Batches

Batches may also be built from a variety of workers, though you must provide an explicit worker for callbacks:

mail_jobs = Enum.map(mail_args, &MyApp.MailWorker.new/1)
push_jobs = Enum.map(push_args, &MyApp.PushWorker.new/1)

[callback_worker: MyApp.CallbackWorker]
|> Batch.new()
|> Batch.add(mail_jobs)
|> Batch.add(push_jobs)

Without an explicit callback_worker, any worker in the batch may be used for callback. That makes callback handling unpredictable and unexpected.

Customizing Batches

Batches accept a variety of options for grouping and callback customization.

Batch ID

By default a batch_id is generated as a time-ordered random UUIDv7. UUIDs are more than sufficient to ensure that batches are unique between workers and nodes for any period. However, if you require control, you can override batch_id generation at the worker level or pass a value directly to the new/2 function.

Batch.new(batch_id: "custom-batch-id")

Batch Name

Batches accept an optional name to describe the purpose of the batch, beyond the generated id or individual jobs in it. While the batch_id must be unique, the batch_name doesn't, so it can be used as a general purpose label.

Batch.new(batch_name: "nightly-etl")

Callback Workers

For some batches, notably those with heterogeneous jobs, it's necessary to specify a different worker for callbacks. That is easily accomplished by passing the :callback_worker option to new/2:

Batch.new(callback_worker: MyCallbackWorker)

The callback worker must be an Oban.Pro.Worker that defines one or more of the batch callback handlers.

Callback Options

By default, callback jobs have an empty args map and inherit other options from batch jobs. With callback_opts, you can set standard job options for batch callback jobs (only args, max_attempts, meta, priority, queue, and tags are allowed).

For example, here we're passing a webhook URLs as args:

Batch.new(callback_opts: [args: %{webhook: "https://web.hook"}])

Here, we change the callback queue and knock the priority down:

Batch.new(callback_opts: [queue: :callbacks, priority: 9])

Be careful to minimize callback_opts as they are stored in each batch job's meta.

Fetching Batch Jobs

To pull more context from batch jobs, it's possible to load all jobs from the batch with all_jobs/2 and stream_jobs/2. The functions take a single batch job and returns a list or stream of all non-callback jobs in the batch, which you can then operate on with Enum or Stream functions.

As an example, imagine you have a batch that ran for a few thousand accounts and you'd like to notify admins that the batch is complete.

defmodule MyApp.BatchWorker do
  use Oban.Pro.Worker

  @behaviour Oban.Pro.Batch

  @impl Oban.Pro.Batch
  def batch_completed(%Job{} = job) do
    {:ok, account_ids} =
        job
        |> Oban.Pro.Batch.all_jobs()
        |> Enum.map(& &1.args["account_id"])

    account_ids
    |> MyApp.Accounts.all()
    |> MyApp.Mailer.notify_admins_about_batch()
  end

Fetching a thousand jobs may be alright, but for larger batches you don't want to load that much data into memory. Instead, you can use stream_jobs to iterate through them lazily:

{:ok, account_ids} =
  MyApp.Repo.transaction(fn ->
    job
    |> Batch.stream_jobs()
    |> Stream.map(& &1.args["account_id"])
    |> Enum.to_list()
  end)

Streaming is provided by Ecto's Repo.stream, and it must take place within a transaction. While it may be overkill for small batches, for batches with tens or hundreds of thousands of jobs, it will prevent massive memory spikes or the database grinding to a halt.

Summary

Callbacks

Called after all jobs in the batch were attempted at least once.

Called after any jobs in the batch have a cancelled state.

Called after all jobs in the batch have a completed state.

Called after any jobs in the batch have a discarded state.

Called after all jobs in the batch have either a completed or discarded state.

Called after any jobs in the batch have a retryable state.

Functions

Add one or more jobs to a batch.

Get all non-callback jobs from a batch.

Append to a batch from an existing batch job.

Cancel all non-callback jobs in a batch.

Create a batch from a workflow.

Build a new batch from a list or stream of job changesets and some options.

Stream all non-callback jobs from a batch.

Types

Link to this type

batch_opts()

@type batch_opts() :: [
  batch_id: String.t(),
  batch_name: String.t(),
  callback_opts: callback_opts(),
  callback_worker: module()
]
Link to this type

callback_opts()

@type callback_opts() :: [
  args: Oban.Job.args(),
  max_attempts: pos_integer(),
  meta: map(),
  priority: 0..9,
  queue: atom() | String.t(),
  tags: Oban.Job.tags()
]
@type changeset() :: Oban.Job.changeset()
@type repo_opts() :: [{:timeout, timeout()}]
@type t() :: %Oban.Pro.Batch{changesets: Enumerable.t(changeset()), opts: map()}

Callbacks

Link to this callback

batch_attempted(job)

(optional)
@callback batch_attempted(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after all jobs in the batch were attempted at least once.

If a batch_attempted/1 function is defined it is executed by an isolated callback job.

Example

Print when all jobs were attempted:

def batch_attempted(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Attempted")
end
Link to this callback

batch_cancelled(job)

(optional)
@callback batch_cancelled(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after any jobs in the batch have a cancelled state.

If a batch_cancelled/1 function is defined it is executed by an isolated callback job.

Example

Print when any jobs are discarded:

def batch_cancelled(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Cancelled")
end
Link to this callback

batch_completed(job)

(optional)
@callback batch_completed(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after all jobs in the batch have a completed state.

If a batch_completed/1 function is defined it is executed by an isolated callback job.

Example

Print when all jobs are completed:

def batch_completed(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Completed")
end
Link to this callback

batch_discarded(job)

(optional)
@callback batch_discarded(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after any jobs in the batch have a discarded state.

If a batch_discarded/1 function is defined it is executed by an isolated callback job.

Example

Print when any jobs are discarded:

def batch_discarded(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Discarded")
end
Link to this callback

batch_exhausted(job)

(optional)
@callback batch_exhausted(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after all jobs in the batch have either a completed or discarded state.

If a batch_exhausted/1 function is defined it is executed by an isolated callback job.

Example

Print when all jobs are completed or discarded:

def batch_exhausted(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Exhausted")
end
Link to this callback

batch_retryable(job)

(optional)
@callback batch_retryable(job :: Oban.Job.t()) :: Oban.Worker.result()

Called after any jobs in the batch have a retryable state.

If a batch_retryable/1 function is defined it is executed by an isolated callback job.

Example

Print when any jobs are retryable:

def batch_retryable(%Job{meta: %{"batch_id" => batch_id}}) do
  IO.inspect(batch_id, label: "Retryable")
end

Functions

Link to this function

add(batch, changeset)

@spec add(t(), Enumerable.t(changeset())) :: t()

Add one or more jobs to a batch.

Examples

Add jobs to an existing batche:

Batch.add(batch, Enum.map(args, &MyWorker.new/1))

Add jobs to a batch one at a time:

Batch.new()
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
Link to this function

all_jobs(name \\ Oban, job_or_batch_id, opts \\ [])

@spec all_jobs(Oban.name(), Oban.Job.t() | String.t(), [repo_opts()]) :: [
  Oban.Job.t()
]

Get all non-callback jobs from a batch.

Examples

Fetch results from all jobs from within a batch_completed/1 callback:

def batch_completed(%Job{} = job) do
  results =
    job
    |> Batch.all_jobs()
    |> Enum.map(&fetch_recorded/1)

  ...
end

Get all batch jobs from anywhere using the batch_id:

Batch.all_jobs("some-uuid-1234-5678")

Get all jobs from anywhere with a custom Oban instance name:

Batch.all_jobs(MyApp.Oban, "some-uuid-1234-5678")
Link to this function

append(job, opts \\ [])

@spec append(Oban.Job.t(), batch_opts()) :: t()

Append to a batch from an existing batch job.

The batch_id and any other batch options are propagated to the newly created batch.

Appending and Callbacks

Batch callback jobs are only inserted once. Appending to a batch where callbacks were already triggered, e.g. batch_completed, won't re-trigger the callback.

Examples

Build an empty batch from an existing batch job:

job
|> Batch.append()
|> Batch.add(MyWorker.new(%{}))
|> Batch.add(MyWorker.new(%{}))
|> Oban.insert_all()

Build a batch from an existing job with overriden options:

Batch.append(job, callback_worker: MyApp.OtherWorker)
Link to this function

cancel_jobs(oban_name \\ Oban, job_or_batch_id)

@spec cancel_jobs(Oban.name(), Oban.Job.t() | String.t()) :: {:ok, non_neg_integer()}

Cancel all non-callback jobs in a batch.

Examples

Cancel jobs with the job in a process/1 function:

def process(job) do
  if should_stop_processing?(job.args) do
    Batch.cancel_jobs(job)
  else
    ...
  end
end

Cancel jobs from anywhere using the batch_id:

Batch.cancel_jobs("some-uuid-1234-5678")

Cancel jobs from anywhere with a custom Oban instance name:

Batch.cancel_jobs(MyApp.Oban, "some-uuid-1234-5678")
Link to this function

from_workflow(workflow, opts \\ [])

@spec from_workflow(Oban.Pro.Workflow.t(), batch_opts()) :: t()

Create a batch from a workflow.

All jobs in the workflow are augmented to also be part of a batch.

Examples

Create a batch from a workflow without any extra options:

Workflow.new()
|> Workflow.add(:step_1, MyWorker.new(%{id: 123}))
|> Workflow.add(:step_2, MyWorker.new(%{id: 345}), deps: [:step_1])
|> Workflow.add(:step_3, MyWorker.new(%{id: 456}), deps: [:step_2])
|> Batch.from_workflow()
|> Oban.insert_all()

Create a batch with callback options:

Batch.from_workflow(workflow, callback_opts: [priority: 3], callback_worker: MyApp.Worker)
Link to this function

new(changesets, opts)

@spec new(Enumerable.t(changeset()), batch_opts()) :: t()

Build a new batch from a list or stream of job changesets and some options.

Examples

Build an empty batch without any jobs or options:

Batch.new()

Build a batch from a list of changesets:

1..3
|> Enum.map(fn id -> MyWorker.new(%{id: id}) end)
|> Batch.new()

Build a batch from a stream:

stream_of_args
|> Stream.map(&MyWorker.new/1)
|> Batch.new()

Build a batch with callback options:

Batch.new(list_of_jobs, batch_id: "abc-123", callback_worker: MyApp.CallbackWorker)
Link to this function

stream_jobs(name \\ Oban, job_or_batch_id, opts \\ [])

@spec stream_jobs(Oban.name(), Oban.Job.t() | String.t(), [repo_opts()]) :: Enum.t()

Stream all non-callback jobs from a batch.

Examples

Stream all batch jobs from within a batch_completed/1 callback:

def batch_completed(%Job{} = job) do
  {:ok, account_ids} =
    MyApp.Repo.transaction(fn ->
      job
      |> Batch.stream_jobs()
      |> Enum.map(& &1.args["account_id"])
    end)

    # use_account_ids
end

Stream all batch jobs from anywhere using the batch_id:

MyApp.Repo.transaction(fn ->
  "some-uuid-1234-5678"
  |> Batch.stream_jobs()
  |> Enum.map(& &1.args["account_id"])
end)

Stream all batch jobs using a custom Oban instance name:

MyApp.Repo.transaction(fn ->
  MyApp.Oban
  |> Batch.stream_jobs("some-uuid-1234-5678")
  |> Enum.map(& &1.args["account_id"])
end)