Changelog for Oban Pro v0.14

This release includes engine and supervisor improvements which require Oban v2.15+


💸 Horizontal Auto-Scaling with DynamicScaler

The new DynamicScaler plugin monitors queue throughput and horizontally scales cloud infrastructure to optimize processing. Horizontal scaling is applied at the node level, not the queue level, so you can distribute processing over more physical hardware.

With auto-scaling you can spin up additional nodes during high traffic events, and pare down to a single node during a lull. Beyond optimizing throughput, scaling can save money in environments with little to no usage at off-peak times, e.g. staging.

DynamicScaler calculates the optimal scale by predicting the future size of a queue based on recent trends. You provide an acceptable range of node sizes, which queues to track, and a cloud module—auto-scaling takes care of the rest (with observability, of course).

It's designed to integrate with popular cloud infrastructure like AWS, GCP, K8S, and Fly via a simple, flexible behaviour. For example, here we declare auto-scaling rules for two distinct queues:

 scalers: [
   [queue: :reports, range: 0..1, cloud: {MyApp.Cloud, asg: "rep-asg"}],
   [queue: :exports, range: 1..4, cloud: {MyApp.Cloud, asg: "exp-asg"}]

There are also options to filter scaling by a particular queue, optimize responsiveness with scaling steps, and tunable cooldown periods to prevent unnecessary scaling.

See the DynamicScaler docs for a complete reference and a guide on defining cloud modules.


🏗️ Args Schema Macro for Structured Workers

Structured args are indispensable for enforcing an args schema. However, the legacy keyword list syntax with field: :type, implicit enums, and an asterisk symbol for required fields was simply awkward. We're correcting that awkwardness with a new args_schema/1 macro for defining structured workers. The args_schema macro defines a DSL that's a subset of Ecto's schema, optimized for JSON compatibility and without the need for dedicated changeset functions.

Here's a taste that demonstrates multiple field types, the required option, enums, and embedded structures:

use Oban.Pro.Worker

alias __MODULE__, as: Args

args_schema do
  field :id, :id, required: true
  field :mode, :enum, values: ~w(enabled disabled paused)a

  embeds_one :data do
    field :office_id, :uuid, required: true
    field :has_notes, :boolean
    field :addons, {:array, :string}

  embeds_many :address do
    field :street, :string
    field :city, :string
    field :country, :string

@impl Oban.Pro.Worker
def process(%{args: %Args{id: id, mode: mode} = args) do
  %Args.Data{office_id: office_id} =
  [%Args.Address{street: street} | _] = args.addresses


The legacy (and legacy-legacy) syntax is still viable and it generates the appropriate field declarations automatically. However, we strongly recommend updating to the new syntax for your own sanity.

See the Structured Jobs docs for specifics.


🍪 Delcarative Chunk Partitioning

Previously, chunk workers executed jobs in groups based on size or timeout, with the grouping always consisting of jobs from the same queue, regardless of worker, args, or other job attributes. However, sometimes there's a need for more flexibility in grouping jobs based on different criteria.

To address this, we have introduced partitioning, which allows grouping chunks by worker and/or a subset of args or meta. This improvement enables you to methodically compose chunks of jobs with the same args or meta, instead of running a separate queue for each chunk.

Here's an example that demonstrates using GPT to summarize groups of messages from a particular author:

defmodule MyApp.MessageSummarizer do
  use Oban.Pro.Workers.Chunk,
      by: [:worker, args: :author_id],
      size: 100,
      timeout: :timer.minutes(5)

  @impl true
  def process([%{"author_id" => author_id} | _] = jobs) do
    messages =
      |> &1.args["message_id"])
      |> MyApp.Messages.all()

    {:ok, summary} = MyApp.GPT.summarize(messages)

    # Push the summary

By leveraging the enhanced partitioning capabilities, you can now effectively group and process jobs based on specific criteria, providing a more streamlined approach to managing your workloads.

Chunk queries have been optimized for tables of any size to compensate for their newfound advanced complexity. As a result, even with hundreds of thousands of available jobs, queries run in milliseconds.

See the Chunk Partitioning docs for details.


🗄️ Batch Performance

Some teams run batches containing upwards of 50k and often run multiple batches simultaneously to the tune of millions of jobs a day. That load level exposed some opportunities for performance tuning that we're excited to provide for batches of all sizes.

In short, batches are lazier, their queries are faster, and they'll put less load on your database. If query tuning and performance optimizations are your things, read on for the details!

  • Debounce batch callback queries so that only one database call is made for each batch within a short window of time. By default, batching debounces for 100ms, but it's configurable at the batch level if you'd prefer to debounce for longer.

  • Only query for the exact details needed by a batch's supported callbacks. If a batch only has a cancelled handler, then no other states are checked.

  • Optimize state-checking queries to avoid using exact counts and ignore completed jobs entirely, as that's typically the state with most jobs.

  • Use index-powered match queries to find existing callback jobs, again, only for the current batch's supported callbacks.


v0.14.0 — 2023-04-13



  • [DynamicLifeline] Remove transactional wrapper and add a :timeout option for rescue and discard queries.

  • [Chunk] Apply cancellation to all jobs within a chunk

    Only the leader job is registered as "running" by the queue producer. Now a running chunk listens for cancel messages for any job in the chunk and cancels the leader. Without this, cancelling doesn't apply to the leader and chunk jobs were orphaned.

  • [SmartEngine] Bypass transaction on insert for small batches

    Inserting a list of non-unique jobs smaller than the batch size is no longer wrapped in a transaction.


Bug Fixes

  • [Chunk] Monitor chunks with timeouts to prevent orphans

    The queue executor uses :timer.exit_after/2 to kill jobs that exceed a timeout. Then, the queue's producer catches the :DOWN message and uses that to ack a job error. Producers aren't aware of the chunks, so we spawn a sentinel process to monitor the parent process and ack the chunk jobs.

  • [SmartEngine] Cast jsonb to text before casting to integer for global tracking

    A recent improvement to global tracking broke a previous compatibility trick used for legacy Postgres support. While we officially support PG 12+ now, there's no reason to break backward compatibility for a simple cast.

  • [SmartEngine] Use a CTE for fetch_jobs query to prevent overfetching

    The CTE prevents Postgres from optimizing the subquery and ignoring the limit. More details are available in