Oban Releases

Pro v0.9.0

Partitioned Rate Limiting

Driven by popular demand, v0.9 brings partitions to the Smart Engine’s rate limiter. With partitions, rate limits are applied per-worker, on args, or on a subset of args fields rather than across the entire queue. This enables your application to enforce limits per-customer or respect external throttling, without splitting jobs into multiple queues.

rate_limit: [allowed: 100, period: 5, partition: [fields: [:worker]]

Alternatively, you can partition by the job’s account_id field:

rate_limit: [allowed: 100, period: 5, partition: [fields: [:args], keys: [:account_id]]]

Naturally, a combination of worker, args, and any number of keys works.

Check out the SmartEngine Guide for options and details.

Batch Worker Improvements

Batches are the oldest worker in Pro, and as such, they existed prior to the meta field. Finally, that difference is rectified. Building batches on meta enables a handful of ergonomic improvements and new functionality:

  • Forwarding args through the batch_callback_args option
  • Heterogeneous batches with an alternative callback module through the batch_callback_worker option
  • Fetch all jobs in a batch with stream_batch_jobs/2 for map/reduce processing

IMPORTANT: For in-flight batch callback jobs to run after an upgrade you’ll need to migrate batch_id and callback into meta. Run the following SQL in a migration immediately prior to, or following, the upgrade.

UPDATE oban_jobs
SET meta = jsonb_set_lax(
             jsonb_set(meta, '{batch_id}', args->'batch_id'),
             '{callback}',
             args->'callback',
             true,
             'return_target'
           )
WHERE state NOT IN ('cancelled', 'completed', 'discarded')
  AND args ? 'batch_id'

Workflow Worker Improvements

Workflows got a little ergonomic love, too. Now you can dynamically extend workflows at runtime with the new append_workflow function:

def process(%Job{} = job) do
  jobs =
    job
    |> append_workflow(check_deps: false)
    |> add(:d, WorkerD.new(%{}), deps: [:a])
    |> add(:e, WorkerE.new(%{}), deps: [:b])
    |> add(:f, WorkerF.new(%{}), deps: [:c])
    |> Oban.insert_all()

  {:ok, jobs}
end

Note the use of check_deps: false to prevent dependency validation. To be safe and check jobs while appending, we’ll use the new stream_workflow_jobs/1 function to load all of the previous jobs and feed them in:

def process(%Job{} = job) do
  {:ok, jobs} =
    MyApp.Repo.transaction(fn ->
      job
      |> stream_workflow_jobs()
      |> Enum.to_list()
    end)

  jobs
  |> append_workflow()
  |> add(:d, WorkerD.new(%{}), deps: [:a])
  |> add(:e, WorkerE.new(%{}), deps: [:b])
  |> add(:f, WorkerF.new(%{}), deps: [:c])
  |> Oban.insert_all()

  :ok
end

Changes

  • [Oban] Require Oban ~> v2.9 to support the new cancel_all_jobs engine callback.

  • [Oban.Pro.Queue.SmartEngine] Make operations like refresh more efficient (less data transport) and more failure tolerant to prevent producer crashes.

Pro v0.9.1

Bug Fixes

  • [SmartEngine] Respect configured log level when performing transaction lock queries. Previously, the Smart Engine could log every time it made an advisory lock query with a global_limit or rate_limit set.

Pro v0.9.2

Bug Fixes

  • [SmartEngine] Verify the presence of a rate-limit period before calculating window time. This fixes situations where a producer record for the same queue existed, but lacked a rate limit structure.

  • [Producer] Improve legacy Ecto support by lazily calculating the local limit, without assuming the params are coerced into a map.

Pro v0.9.3

Enhancements

  • [DynamicPruner] Support :infinity as duration in dynamic pruning so users don’t have to specify ludicrous values like {999, :years}.

Bug Fixes

  • [Relay] Attach the Relay telemetry handler using module function capture syntax to prevent warnings.

  • [DynamicCron] Include :expression as an available update option to prevent dialyzer errors.

  • [SmartEngine] Preserve existing rate limit fields when scaling or otherwise changing a producer’s meta values.

  • [SmartEngine] Ensure that the total fetch demand is never negative.

    When running queues are converted to global mode there may be a defecit between the total jobs and the global limit. In that case we must fetch 0 jobs rather than passing a negative number

Pro v0.9.4

Bug Fixes

  • [BatchManager] Consider all states for batch callback uniquness.

    Previously, if a callback failed enough it could be discarded and not considered for subsequent uniquness checks.