Changelog for Oban Pro v0.12
worker-hooks
Worker Hooks
Worker hooks are gauranteed to trigger after a job finishes executing. They're defined as callback functions on the worker, or in separate modules for reuse across workers.
Think of hooks as a convenient alternative to globally attached telemetry handlers, but with clearer scoping and much more safety. They are purely for side-effects such as cleanup, logging, broadcasting notifications, updating other records, error notifications, etc.
There are three mechanisms for defining and attaching hooks:
- Implicitly—hooks are defined directly on the worker and they only run for that worker
- Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
- Globally—hooks are executed for all Pro workers (only Pro workers)
It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.
Here's a simple example of defining an inline hook on a worker to let us know when a job is discarded:
defmodule MyApp.HookWorker do
use Oban.Pro.Worker
@impl Oban.Pro.Worker
def process(_job) do
# ...
end
@impl Oban.Pro.Worker
def after_process(:discard, %Job{} = job) do
IO.puts "Job processed with this job will never complete: #{job.id}"
end
def after_process(_state, _job), do: :ok
end
Look at the Worker Hooks section for a complete usage guide and examples. Note, as part of an ongoing documentation enhancement we've replaced the old guide with proper moduledocs.
global-partitioning
Global Partitioning
Rate limits aren't the only SmartEngine-powered limiters that can be partitioned! Now, with partitioned global limiting, you can enforce concurrency limits by the worker, args, and sub-keys within a queue, across all nodes (without clustering).
Here's an example of partitioning an exporters
queue by the worker to minimize
resource contention:
queues: [
exporters: [
local_limit: 6,
global_limit: [allowed: 3, partition: [fields: [:worker]]]
]
]
In this example where we're partitioning a scrapers
queue by the service_id
value so only one service will run across all nodes:
queues: [
scrapers: [
local_limit: 10,
global_limit: [allowed: 1, partition: [fields: [:args], keys: [:service_id]]]
]
]
To achieve this style of global limiting before, you'd need one queue for every partition, which is challenging to manage and puts additional strain on the database.
Learn more in the revamped Partitioning section of the SmartEngine guide.
unique-jobs-and-batching-for-insert-all
Unique Jobs and Batching for Insert All
A long-standing "gotcha" of bulk inserting jobs with insert_all
was the lack
of unique
support. To apply uniqueness, you had to insert jobs individually
within a transaction (which is as slow as it sounds and led to frequent
timeouts). Now, unique support is baked into the SmartEngine's insert_all
implementation and works without any configuration:
- Repo.transaction(fn ->
- Enum.each(lots_of_unique_jobs, &Oban.insert/1)
- end)
+ Oban.insert_all(lots_of_unique_jobs)
There's also automatic batching for insert_all
. No more manually chunking
massive job inserts to avoid PostgreSQL's parameter limits, the engine handles
it all for you and enforces uniqueness the whole time:
- Repo.transaction(fn ->
- lots_of_jobs
- |> Enum.chunk_every(500)
- |> Enum.each(&Oban.insert_all/1)
- end)
+ Oban.insert_all(lots_of_jobs, batch_size: 500)
Automatic batching, combined with insert placeholders, optimized locks, and
streamlined replace
, means bulk inserts with the SmartEngine is faster and
more efficient.
Explore a little more (and we do mean little, there isn't much more to it) in the SmartEngine's Unique Bulk Insert section.
v0-12-4-2022-08-18
v0.12.4 — 2022-08-18
[SmartEngine] Prevent corruption of global and rate limit counters.
Some combinations of signal commands, acking, and fetching could cause the internal counts used for global and rate limit commands to become corrupted, which eventually prevented continued job fetching.
bug-fixes
Bug Fixes
v0-12-3-2022-08-09
v0.12.3 — 2022-08-09
bug-fixes-1
Bug Fixes
[SmartEngine] Restore
global
functionality with Postgres v10Prior to PG 11 there wasn't support for jsonb to integers. Oban officially supports back to PG 10, and we can work around the limitation by casting to
text
first.[SmartEngine] Apply uniqueness within batches of
insert_all_jobs
Previously, uniqueness was only applied between batches. That meant uniquness wasn't enforced when inserting duplicates within a small-ish (<500) list.
[SmartEngine] Switch rate window time to full unix timestamp
Some queues don't have new jobs for days. When that happens a the current time may be later in the day than the previous time, which breaks rate limit estimation. This switches to full unix timestamps for rate limit windows to prevent this issue and retain window accuracy across days.
[Chunk] Support manually cancelling jobs
Chunk has always supported discarding some or all jobs from a chunk. Now that discarding is deprecated, chunk also supports cancelling as an alternative.
[DynamicQueues] Accept global partition options
[Testing] Remove use of
tap/2
to restore Elixir v1.11 compatibility[Producer] Explicitly set timestamp fields for all schemas
Some applications override Ecto's default timestamp field, which breaks all
DynamicCron
andDynamicQueues
queries and triggers a crash loop.
v0-12-2-2022-07-26
v0.12.2 — 2022-07-26
bug-fixes-2
Bug Fixes
[SmartEngine] Fix stalling in queues using global limits
Tracking global counts was subject to a race condition between an in-memory copy of the producer and async job acking. That led to inconsistent counts that would stop the queue from processing jobs correctly.
[SmartEngine] Remove use of
Map.reject
for pre Elixir 1.13 compatibility[SmartEngine] Ensure timestamps are set without database interraction
[Testing] Deduplicate jobs for
run_workflow
when givenwith_summary: false
Workflow jobs don't run in a predictable order due to dependencies. That means the same job may execute two or more times, but even still, it should only be returned from
run_jobs
once.[Testing] Include cancelled state in
drain_jobs
summary now that it's possible to have inline cancelled jobs.[Worker] Fix the
attach_hook
spec to account for{:error, reason}
return[DynamicPruner] Forward log and prefix to
delete_all
call in MultiBatch operations in a Multi don't have shared options applied automatically.
v0-12-1-2022-07-22
v0.12.1 — 2022-07-22
bug-fixes-3
Bug Fixes
[SmartEngine] Safely cancel jobs that haven't ever ran.
When a job was cancelled without ever having ran, it would cause an ack'ing error within the engine.
[SmartEngine] Record reason from
{:cancel, reason}
in the job's errors.The
cancel_job
callback signature remained the same, so this slipped through the cracks when the functionalty was added to the BasicEngine.
v0-12-0-2022-07-21
v0.12.0 — 2022-07-21
enhancements
Enhancements
[DynamicPruner] Add a
before_delete
option to operate on jobs before they're deleted, e.g. for logging, reporting, cold-storage, etc.[Batch] Accept
batch_callback_queue
option to override where callback jobs are enqueued.
bug-fixes-4
Bug Fixes
[Workflow] Safely handle deleted upstream dependencies. Workflow jobs with a missing dependency no longer crash, and their status can be controlled with the new
ignore_deleted
workflow option.[Workflow] Verify that jobs implement the workflow behaviour as they are added to a workflow.
[DynamicQueues] Default the
local_limit
toglobal_limit
to match the behaviour of standard queues.[SmartEngine] Restore accepting
refresh_interval
to control down-time record refreshing. As a virtual field, it wasn't included in the list of allowed keys.[SmartEngine] Use the new
shutdown
callback for safer pausing during queue shutdown. During tests, or other situations where a database connection was no longer available, pausing could raise an unexpected error.[SmartEngine] Prevent running jobs that exceed rate limit when there are large gaps between insertion/execution.
For changes prior to v0.12 see the v0.11 docs.