Oban Releases

Pro v0.7.0

Pluggable Smart Engine

Pro’s SmartEngine enables truly global concurrency and global rate limiting by handling demand at the queue level, without any conflict or churn. The engine uses centralized producer records to coordinate with minimal load on the database.

config :my_app, Oban,
  engine: Oban.Pro.Queue.SmartEngine,
  queues: [
    alpha: 1,
    gamma: [global_limit: 1],
    delta: [local_limit: 2, global_limit: 5],
    kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
    omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
  ],
  ...

Relay Plugin

The Relay plugin lets you insert and await the results of jobs locally or remotely, across any number of nodes, i.e. persistent distributed tasks. Once the plugin is running, you can seamlessly distribute oban jobs and await the results synchronously:

alias Oban.Pro.Plugins.Relay

1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many(timeout: :timer.seconds(1))

# [{:ok, 2}, {:ok, 4}, {:ok, 6}]

Chunk Worker

Process jobs “broadway style”, in groups based on size or a timeout, but with the robust error handling semantics of Oban. Chunking operates at the worker level, allowing many chunks to run in parallel within the same queue.

defmodule MyApp.ChunkWorker do
  use Oban.Pro.Workers.Chunk, queue: :alpha, size: 10, timeout: :timer.seconds(5)

  @impl Chunk
  def process(jobs) do
    jobs
    |> Enum.map(& &1.args)
    |> Business.process_messages()

    :ok
  end
end

Pro Worker Testing Module

Testing batch, workflows and chunk workers can be tricky due to how they override the perform/1 function. The new Oban.Pro.Testing module provides a process_job/2,3 function that mirrors the functionality of perform_job/2,3 in the base Oban.Testing module.

With process_job/2,3 you can test pro workers in isolation, without worrying about whether a manager is running or the state of a workflow. To start using it, import it into your test case or test file:

import Oban.Pro.Testing

Then test worker logic in isolation:

assert :ok = process_job(MyApp.ProWorker, %{"id" => 1})

Changed

  • [Oban.Pro.Plugins.WorkflowManager] This plugin is no longer necessary and can be safely removed from your configuration.

  • Bump Oban dependency to ~> 2.6

Pro v0.7.1

Changes

  • [Relay] Restore the ability to use Elixir 1.9 by removing use of the is_map_key guard that wasn’t introduced until Elixir 1.10.

  • [SmartEngine] Apply jitter and more selective use of the mutex used for global coordination. The engine also avoids touching or reloading producer records to minimize connection use.

  • [Workflow] Improve deps check performance by using an index powered operator. On instances with millions of jobs this may improve performance ~600x.

Pro v0.7.2

Changes

  • [Lifeline] The plugin will only run along with the SmartEngine. Running with the BasicEngine is flawed and will cause inadvertent orphan rescues.

    You can either switch to the SmartEngine or remove Lifeline from your plugins.

Bug Fixes

  • [SmartEngine] Consider the configured prefix for all ack and update options.

Pro v0.7.3

Changes

  • [SmartEngine] Use database connection safety for all SmartEngine calls.

    Database connections may be unstable for many reasons: pool contention, database overload, crashes. We shouldn’t crash the producer due to a flickering database call.

Bug Fixes

  • [DynamicCron] Explicitly declare cron timestamps as DateTime to compensate for apps that configure Ecto to use NaiveDateTime by default.