Oban.Pro.Workers.Chunk (Oban Pro v1.4.1)

Chunk workers execute jobs together in groups based on a size or a timeout option. e.g. when 1000 jobs are available or after 10 minutes have ellapsed.

Multiple chunks can run in parallel within a single queue, and each chunk may be composed of many thousands of jobs. Aside from improved throughput for a single queue, chunks are ideal as the initial stage of data-ingestion and data-processing pipelines.

Usage

Let's define a worker that sends SMS messages in chunks, rather than individually:

defmodule MyApp.ChunkWorker do
  use Oban.Pro.Workers.Chunk, queue: :messages, size: 100, timeout: 1000

  @impl true
  def process([_ | _] = jobs) do
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

    :ok
  end
end

Notice that we declared a size and a timeout along with a queue for the worker. If size or timeout are omitted they fall back to their defaults: 100 size and 1000ms respectively.

To process larger batches less frequently, we can increase both values:

use Oban.Pro.Workers.Chunk, size: 500, timeout: :timer.seconds(5)

Now chunks will run with up to 500 jobs or every 5 seconds, whichever comes first.

Unlike other Pro workers, a Chunk worker's process/1 receives a list of jobs rather than a single job struct. Fittingly, the expected return values are different as well.

Chunk Partitioning

By default, chunks are divided into groups based on the queue and worker. This means that each chunk consists of workers belonging to the same queue, regardless of their args or meta. However, this approach may not always be suitable. For instance, you may want to group workers based on a specific argument such as :account_id instead of just the worker. In such cases, you can use the :by option to customize how chunks are partitioned.

Here are a few examples of the :by option that you can use to achieve fine-grained control over chunk partitioning:

# Explicitly chunk by :worker
use Oban.Pro.Workers.Chunk, by: :worker

# Chunk by a single args key without considering the worker
use Oban.Pro.Workers.Chunk, by: [args: :account_id]

# Chunk by multiple args keys without considering the worker
use Oban.Pro.Workers.Chunk, by: [args: [:account_id, :cohort]]

# Chunk by worker and a single args key
use Oban.Pro.Workers.Chunk, by: [:worker, args: :account_id]

# Chunk by a single meta key
use Oban.Pro.Workers.Chunk, by: [meta: :source_id]

When partitioning chunks of jobs, it's important to note that using only :args or :meta without :worker may result in heterogeneous chunks of jobs from different workers. Nevertheless, regardless of the partitioning method chunks always consist of jobs from the same queue.

Here's a simple example of partitioning by worker and an author_id field to batch analysis of recent messages per author:

defmodule MyApp.LLMAnalysis do
  use Oban.Pro.Workers.Chunk, by: [:worker, args: :author_id], size: 50, timeout: 30_000

  @impl true
  def process([%{args: %{"author_id" => author_id}} | _] = jobs) do
    messages =
      jobs
      |> Enum.map(& &1.args["message_id"])
      |> MyApp.Messages.all()

    {:ok, sentiment} = MyApp.GPT.gauge_sentiment(messages)

    MyApp.Messages.record_sentiment(author_id)
  end
end

Chunk Results and Error Handling

Chunk worker's result type is tailored to handling multiple jobs at once. For reference, here are the types and callback definition for process/1:

The result types allow you to succeed an entire chunk, or selectively fail parts of it. Here are each of the possible results:

  • :ok — The chunk succeeded and all jobs can be marked complete

  • {:ok, result} — Like :ok, but with a result for testing.

  • {:error, reason, jobs} — One or more jobs in the chunk failed and may be retried, any unlisted jobs are a success.

  • {:cancel, reason, jobs} — One or more jobs in the chunk should be cancelled, any unlisted jobs are a success.

  • [error: {reason, jobs}, cancel: {reason, jobs}] — Retry some jobs and cancel some other jobs, leaving any jobs not in either list a success.

To illustrate using chunk results let's expand on the message processing example from earlier. We'll extend it to complete the whole batch when all messages are delivered or cancel undeliverable messages:

def process([_ | _] = jobs) do
  notifications =
    jobs
    |> Enum.map(& &1.args)
    |> MyApp.Messaging.send_batch()

  bad_token = fn %{response: response} -> response == :bad_token end

  if Enum.any?(notifications, bad_token) do
    cancels =
      notifications
      |> Enum.zip(jobs)
      |> Enum.filter(fn {notification, _job} -> bad_token.(notification) end)
      |> Enum.map(&elem(&1, 1))

    {:cancel, :bad_token, cancels}
  else
    {:ok, notifications}
  end
end

In the event of an ephemeral crash, like a network issue, the entire batch may be retried if there are any remaining attempts.

Cancelling any jobs in a chunk will cancel the entire chunk, including the leader job.

Chunk Organization

Chunks are ran by a leader job (which has nothing to do with peer leadership). When the leader executes it determines whether a complete chunk is available or if enough time has elapsed to run anyhow. If neither case applies, then the leader will delay until the timeout elapsed and execute with as many jobs as it can find.

Completion queries run every 1000ms by default. You can use the :sleep option to control how long the leader delays between queries to check for complete chunks:

use Oban.Pro.Workers.Chunk, size: 50, sleep: 2_000, timeout: 10_000

Optimizing Chunks

Queue's with high concurrency and low throughput may have multiple chunk leaders running simultaneously rather than getting bundled into a single chunk. The solution is to reduce the queue’s global concurrency to a smaller number so that new chunk leader jobs don’t start and the existing chunk can run a bigger batch.

queues: [
  chunked_queue: [global_limit: 1]
]

A low global limit will reduce overall throughput. Partitioning the queue with the same options as the chunk can preserve throughput by allowing one concurrent leader per chunk. For example, if you were chunking by :user_id in args, you'd partition the queue with the same option:

queues: [
  chunked_queue: [local_limit: 20, global_limit: [allowed: 1, partition: [args: :user_id]]]
]

Summary

Types

@type chunk_by() ::
  :worker
  | {:args, key_or_keys()}
  | {:meta, key_or_keys()}
  | [:worker | {:args, key_or_keys()} | {:meta, key_or_keys()}]
Link to this type

key_or_keys()

@type key_or_keys() :: atom() | [atom()]
@type options() :: [
  by: chunk_by(),
  size: pos_integer(),
  sleep: pos_integer(),
  timeout: pos_integer()
]
@type result() ::
  :ok
  | {:ok, term()}
  | {:cancel | :discard | :error, reason :: term(), [Oban.Job.t()]}
  | [cancel: sub_result(), discard: sub_result(), error: sub_result()]
Link to this type

sub_result()

@type sub_result() :: {reason :: term(), [Oban.Job.t()]}