Pluggable Smart Engine
Pro’s SmartEngine
enables truly global concurrency and global rate limiting by
handling demand at the queue level, without any conflict or churn. The engine
uses centralized producer records to coordinate with minimal load on the
database.
config :my_app, Oban,
engine: Oban.Pro.Queue.SmartEngine,
queues: [
alpha: 1,
gamma: [global_limit: 1],
delta: [local_limit: 2, global_limit: 5],
kappa: [local_limit: 5, rate_limit: [allowed: 30, period: {1, :minute}]],
omega: [global_limit: 1, rate_limit: [allowed: 500, period: {1, :hour}]]
],
...
Relay Plugin
The Relay plugin lets you insert and await the results of jobs locally or
remotely, across any number of nodes, i.e. persistent distributed tasks. Once
the plugin is running, you can seamlessly distribute oban jobs and await the
results synchronously:
alias Oban.Pro.Plugins.Relay
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async/1)
|> Relay.await_many(timeout: :timer.seconds(1))
# [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Chunk Worker
Process jobs “broadway style”, in groups based on size or a timeout, but with
the robust error handling semantics of Oban. Chunking operates at the worker
level, allowing many chunks to run in parallel within the same queue.
defmodule MyApp.ChunkWorker do
use Oban.Pro.Workers.Chunk, queue: :alpha, size: 10, timeout: :timer.seconds(5)
@impl Chunk
def process(jobs) do
jobs
|> Enum.map(& &1.args)
|> Business.process_messages()
:ok
end
end
Pro Worker Testing Module
Testing batch, workflows and chunk workers can be tricky due to how they
override the perform/1
function. The new Oban.Pro.Testing
module provides a
process_job/2,3
function that mirrors the functionality of perform_job/2,3
in the base Oban.Testing
module.
With process_job/2,3
you can test pro workers in isolation, without worrying
about whether a manager is running or the state of a workflow. To start using
it, import it into your test case or test file:
import Oban.Pro.Testing
Then test worker logic in isolation:
assert :ok = process_job(MyApp.ProWorker, %{"id" => 1})
Changed