Oban.Pro.Workers.Chunk (Oban Pro v0.13.2)

Chunk workers execute jobs together in groups based on a size or a timeout option. e.g. when 1000 jobs are available or after 10 minutes have ellapsed.

Multiple chunks can run in parallel within a single queue, and each chunk may be composed of many thousands of jobs. Aside from improved throughput for a single queue, chunks are ideal as the initial stage of data-ingestion and data-processing pipelines.

usage

Usage

Let's define a worker that sends SMS messages in chunks, rather than individually:

defmodule MyApp.ChunkWorker do
  use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000

  @impl true
  def process([_ | _] = jobs) do
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

    :ok
  end
end

Notice that we declared a size and a timeout along with a queue for the worker. If size or timeout are omitted they fall back to their defaults: 100 for size and 1000ms for timeout. To process larger batches less frequently, we can increase both values:

use Oban.Pro.Workers.Chunk, size: 500, timeout: :timer.seconds(5)

Now chunks will run with up to 500 jobs or every 5 seconds, whichever comes first.

Like other Pro workers, you define a process/1 callback rather than perform/1. The Chunk worker's process/1 is a little different, as it receives a list of jobs rather than a single struct. Fittingly, the expected return values are different as well.

chunk-results-and-error-handling

Chunk Results and Error Handling

Chunk worker's result type is tailored to handling multiple jobs at once. For reference, here are the types and callback definition for process/1:

The result types allow you to succeed an entire chunk, or selectively fail parts of it. Here are each of the possible results:

  • :ok — The chunk succeeded and all jobs can be marked complete

  • {:ok, result} — Like :ok, but with a result for testing.

  • {:error, reason, jobs} — One or more jobs in the chunk failed and may be retried, any unlisted jobs are a success.

  • {:cancel, reason, jobs} — One or more jobs in the chunk should be cancelled, any unlisted jobs are a success.

  • [error: {reason, jobs}, cancel: {reason, jobs}] — Retry some jobs and cancel some other jobs, leaving any jobs not in either list a success.

To illustrate using chunk results let's expand on the message processing example from earlier. We'll extend it to complete the whole batch when all messages are delivered or cancel undeliverable messages:

def process([_ | _] = jobs) do
  notifications =
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

  bad_token = fn %{response: response} -> response == :bad_token end

  if Enum.any?(notifications, bad_token) do
    cancels =
      notifications
      |> Enum.zip(jobs)
      |> Enum.filter(fn {notification, _job} -> bad_token.(notification) end)
      |> Enum.map(&elem(&1, 1))

    {:cancel, :bad_token, cancels}
  else
    {:ok, notifications}
  end
end

In the event of an ephemeral crash, like a network issue, the entire batch may be retried if there are any remaining attempts.

chunk-organization

Chunk Organization

Chunks are ran by a leader job (which has nothing to do with peer leadership). When the leader executes it determines whether a complete chunk is available or if enough time has elapsed to run anyhow. If neither case applies then the leader will delay until the timeout elapsed and execute with as many jobs as it can find.

Only the leader job may be cancelled as it is the only one tracked by a producer. Cancelling any other jobs in the chunk won't stop the chunk from running.

Link to this section Summary

Link to this section Types

@type jobs() :: [Oban.Job.t()]
@type reason() :: term()
@type result() ::
  :ok
  | {:ok, term()}
  | {:cancel | :discard | :error, reason(), jobs()}
  | [cancel: sub_result(), discard: sub_result(), error: sub_result()]
Link to this type

sub_result()

@type sub_result() :: {reason(), jobs()}