🔗 Chain Worker
Chain workers link jobs together to ensure they run in a strict, sequential, FIFO order. Downstream
jobs automatically suspend execution until the upstream job is completed
. Jobs in a chain only
run after the previous job completes successfully, regardless of snoozing or retries.
Through a declarative syntax, chains can partition by worker
, args
, or meta
, and each
partition runs sequentially while the queue runs with any level of concurrency:
defmodule MyApp.WebhookWorker do
use Oban.Pro.Workers.Chain,
by: [:worker, args: :account_id],
on_cancelled: :halt
...
Previously, you could approximate chain behaviour with a global limit of 1, but it lacked
guarantees around retries, cancellation, or even scheduled jobs.
See the Chain docs for details and examples of how to handle
customize error handling.
📐 Structured Additions
Structured jobs, as declared with args_schema
, gain a few new capabilities in this release.
-
:default
— Any field can have a default value, calculated at compilation time and applied at
runtime.
-
{:array, :enum}
— Now it’s possible to cast and validate the values of a list with an array of
enums, e.g. {:array, :enum}, values: ~w(foo bar baz)a
-
:term
— Safely encodes any Elixir term as a string for storage, then decodes it back to the
original term on load. This is similar to :any
, but works with terms like tuples or pids that
can’t usually be serialized. For safety, terms are encoded with the :safe
option to prevent
decoding data that may be used to attack the runtime.
Here’s a toy example that demonstrates all three additions:
args_schema do
field :pid, :term, required: true
field :flags, {:array, :enum}, values: ~(fast heavy lazy)a
field :debug, :boolean, default: false
end
def process(job) do
send(job.args.pid, :pids_work!)
end
🧰 New assert_enqueue/2 and refute_enqueue/2 helpers
Assert or refute that jobs were enqueued during a function call. These new test helpers receive a
function and detect only jobs that were (or weren’t) enqueued while running the function.
More specifically, the helpers:
-
Ignore any jobs enqueued before the function ran
-
Return whatever the function returned when the assertion passes
-
Respect the same filtering syntax as other assertion helpers
For example, this combines assert_enqueue/2
with perform_job/2
to assert that a job is
enqueued by a worker’s perform
:
assert_enqueue [worker: MyApp.OtherWorker, queue: :business], fn ->
perform_job(MyApp.SomeWorker, %{id: account.id})
end
Enhancements
-
[Smart] Record snoozes without incrementing attempts
Rather than increasing attempts and max_attempts on snooze, we now roll back the attempt count
and increment a snoozed
counter in meta
. With this new strategy it’s simple to differentiate
between “real” attempts and snoozes, and backoff calculation remains accurate.
-
[Smart] Cleaner partition syntax to match by
format for Chain and Chunk workers.
The newer syntax is more expressive with less verbose syntax. The new syntax is translated into
the legacy format, which is also accepted for seamless backward compatibility.
Here’s an example of using the newer format:
- partition: [fields: [:worker, :args], keys: [:account_id]]
+ partition: [:worker, args: :account_id]
-
[Smart] Include producer uuid
in check_meta
output for more detailed tracking of producer
gossip events.
-
[Worker] Allow fetch_recorded/1
from unrecorded workers
The previous implementation required that the calling worker was recorded, rather than the
called worker. That forced marking extraneous workers as recorded to access
fetch_recorded/1
.
-
[Worker] Include caught stacktrace in logging from unhandled hook errors
-
[Relay] Use a dedicated channel for relay messages
Due to historic restrictions, relay was built to subscribe and publish messages on the gossip
channel. That channel was also used for true node “gossip” about activity, and it could generate
a lot of messages. Now relay messages use a dedicated channel to minimize accumulating
notifications in long running processes.
Bug Fixes
-
[Batch] Bump batch hash key by 1 to maintain atomics range
Atomics have an inclusive range starting at 1, while phash
has an exclusive range of range - 1
. That combination would cause batch debouncing to fail when the hashed batch id was 0.
-
[Batch] Trigger handle_cancelled
callback after cancel_*
function calls.
Cancelling jobs that weren’t executing, i.e. scheduled
, failed to trigger the
handled_cancelled
callback job.
-
[Smart] Reverse xact to uniq check for larger transactions.
Within insert_all_jobs
the xact_limit
logic was reversed and unique inserts over the default
xact_limit
were inserted in a single transaction.
-
[Testing] Set engine to Smart
in perform_job/3
function calls to ensure the recorded
feature, or anything else that relies on the Smart
engine, works.
-
[Testing] Preprocess jobs before process in perform_chunk/3
The perform_chunk/3
helper didn’t process jobs prior to processing the way running a real
chunk worker did. That prevented applying Pro.Worker features like structured args or inline
hooks from functioning.
-
[Testing] Handle all supported Oban.Config
opts in testing functions