Changelog for Oban Pro v0.12

worker-hooks

Worker Hooks

Worker hooks are gauranteed to trigger after a job finishes executing. They're defined as callback functions on the worker, or in separate modules for reuse across workers.

Think of hooks as a convenient alternative to globally attached telemetry handlers, but with clearer scoping and much more safety. They are purely for side-effects such as cleanup, logging, broadcasting notifications, updating other records, error notifications, etc.

There are three mechanisms for defining and attaching hooks:

  1. Implicitly—hooks are defined directly on the worker and they only run for that worker
  2. Explicitly—hooks are listed when defining a worker and they run anywhere they are listed
  3. Globally—hooks are executed for all Pro workers (only Pro workers)

It's possible to combine each type of hook on a single worker. When multiple hooks are stacked they're executed in the order: implicit, explicit, and then global.

Here's a simple example of defining an inline hook on a worker to let us know when a job is discarded:

defmodule MyApp.HookWorker do
  use Oban.Pro.Worker

  @impl Oban.Pro.Worker
  def process(_job) do
    # ...
  end

  @impl Oban.Pro.Worker
  def after_process(:discard, %Job{} = job) do
    IO.puts "Job processed with this job will never complete: #{job.id}"
  end

  def after_process(_state, _job), do: :ok
end

Look at the Worker Hooks section for a complete usage guide and examples. Note, as part of an ongoing documentation enhancement we've replaced the old guide with proper moduledocs.

global-partitioning

Global Partitioning

Rate limits aren't the only SmartEngine-powered limiters that can be partitioned! Now, with partitioned global limiting, you can enforce concurrency limits by the worker, args, and sub-keys within a queue, across all nodes (without clustering).

Here's an example of partitioning an exporters queue by the worker to minimize resource contention:

queues: [
  exporters: [
    local_limit: 6,
    global_limit: [allowed: 3, partition: [fields: [:worker]]]
  ]
]

In this example where we're partitioning a scrapers queue by the service_id value so only one service will run across all nodes:

queues: [
  scrapers: [
    local_limit: 10,
    global_limit: [allowed: 1, partition: [fields: [:args], keys: [:service_id]]]
  ]
]

To achieve this style of global limiting before, you'd need one queue for every partition, which is challenging to manage and puts additional strain on the database.

Learn more in the revamped Partitioning section of the SmartEngine guide.

unique-jobs-and-batching-for-insert-all

Unique Jobs and Batching for Insert All

A long-standing "gotcha" of bulk inserting jobs with insert_all was the lack of unique support. To apply uniqueness, you had to insert jobs individually within a transaction (which is as slow as it sounds and led to frequent timeouts). Now, unique support is baked into the SmartEngine's insert_all implementation and works without any configuration:

- Repo.transaction(fn ->
-  Enum.each(lots_of_unique_jobs, &Oban.insert/1)
- end)

+ Oban.insert_all(lots_of_unique_jobs)

There's also automatic batching for insert_all. No more manually chunking massive job inserts to avoid PostgreSQL's parameter limits, the engine handles it all for you and enforces uniqueness the whole time:

- Repo.transaction(fn ->
-   lots_of_jobs
-   |> Enum.chunk_every(500)
-   |> Enum.each(&Oban.insert_all/1)
- end)

+ Oban.insert_all(lots_of_jobs, batch_size: 500)

Automatic batching, combined with insert placeholders, optimized locks, and streamlined replace, means bulk inserts with the SmartEngine is faster and more efficient.

Explore a little more (and we do mean little, there isn't much more to it) in the SmartEngine's Unique Bulk Insert section.

v0-12-7-2022-11-06

v0.12.7 — 2022-11-06

enhancements

Enhancements

  • [Batch] Add distinct handle_cancelled/1 callback to complement handle_discarded/1, as they're not interchangeable.

bug-fixes

Bug Fixes

  • [SmartEngine] Delete unused producers from stopped dynamic queues.

    Producers from dynamic instances or dynamic queues can accumulate and cause performance issues. Now those producers are cleaned up automatically after approximately 5 minutes of inactivity.

  • [SmartEngine] Delete tracked global partitions when they're no longer tracking anything.

    By deleting tracked partitions we'll no longer infinitly accumulate tracked partitions and prevent query errors.

  • [SmartEngine] Disable prepared statements for partitioned fetch queries.

    Partitioned queries for a large number of tenants (many hundred) generates a tremendous amount of different queries. In some situations, those queries could consume enough memory to cause OOM errors in some situations.

  • [SmartEngine] Prevent transaction errors for calls to insert_all with large batches of unique jobs.

    Only ~12k xact locks are allowed in a single transaction. Due to the way insert_all auto-batches within a single transaction, the entire transaction may fail from an out-of-memory error. To prevent out of memory errors we automatically switch to separate transactions for each batch.

    Batch sizes can be controlled with a new xact_limit option for rare cases where Postgres is tuned to allow more xact locks.

  • [SmartEngine] Flip containment check for global or rate partitions without a particular field.

    For workers with empty args, or partitions with missing keys, the tracked args is an empty map. The standard JSONB containment check (@>) considers an empty map as matching any other map, which subtly broke limit tracking.

  • [Batch] Consider cancelled state when evaluating the handle_exhausted/1 callback.

v0-12-6-2022-10-04

v0.12.6 — 2022-10-04

enhancements-1

Enhancements

  • [DynamicPruner] Configure prune timing using a cron schedule.

    Pruning with a high volume of jobs could cause excessive vacuuming which could impact query times. This replaces the old :interval option with a cron schedule that defaults to the same 60s interval, but can be configured to run on a predictable schedule.

    For example, you can prune once an hour instead:

    {Oban.Pro.Plugins.DynamicPruner, schedule: "0 * * * *"}

    Or, to prune once a day at midnight in your local timezone:

    {Oban.Pro.Plugins.DynamicPruner, schedule: "0 0 * * *", timezone: "America/Chicago"}

bug-fixes-1

Bug Fixes

  • [Chunk] Break lengthy chunk timeouts into smaller periods for evaluation.

    A new sleep option prevents waiting the full timeout while jobs accumulate in the background. For example, if a job had a timeout of 10s it would wait the full 10s before executing a chunk even if more jobs accumulated while it waited. Now that same job would process after the sleep period, which defaults to 1s.

  • [Worker] Always trigger implicit local execution hooks.

    Previously, without any explicit hooks via the hooks: [...] option local hooks (those defined within the worker itself) never fired.

  • [SmartEngine] Optimize partitioned limit fetching queries.

    To avoid unbound, unoptimizable queries, we now sort and limit the jobs into a smaller subset and then applying the window to the subset alone. Another minor optimization added here is to bypass partitioned queries when all partitions have a demand greater than or equal to the current limit.

  • [Relay] Lift 8kb payload size restriction for awaited job messages when using the PG notifier. Now you'll only encounter {:error, :result_too_large} with the Postgres notifier.

  • [Relay] Handle :cancel, :discard, and :snooze returns from relay await.

    Previously, any non :ok return value was treated as an error with a generic response. That made it impossible to extract meaningful errors or to identify jobs that exhausted all attempts. It is now possible to receive the following async returns:

    # A cancelled job
    {:cancel, reason} = Relay.await(relay)
    
    # A discarded job with a reason
    {:discard, reason} = Relay.await(relay)
    
    # A snoozed job with the snooze period
    {:snooze, snooze} = Relay.await(relay)
  • [Relay] Return results from concurrent unique jobs

    Prevent returning {:error, :timeout} for conflicting unique jobs when the concurrent job is still executing. Now multiple process can await the same job concurrently.

v0-12-5-2022-09-07

v0.12.5 — 2022-09-07

bug-fixes-2

Bug Fixes

  • [SmartEngine] Treat transactionally locked unique jobs as dupes

    Without this change, valid changesets for unique jobs looked like they couldn't be inserted. This is an odd circumstance that only occured during concurrent, single insert attempts when no duplicates were found.

  • [SmartEngine] Forward shared opts to all job insert operations

    Option overrides like timeout and log weren't respected by insert_job or insert_all_jobs operations.

  • [SmartEngine] Wrap insert_all batches in a single transaction

    Rather than wrapping each batch in a separate transaction all batches are wrapped in a top level transaction, which guards against partial inserts.

  • [DynamicQueues] Ignore unknown signal notifications

    Unexpected signal notifications weren't handled and would crash the plugin.

  • [Worker] Expose structured errors in the changeset rather than causing a pattern match error.

  • [Batch] Log when unaable to use structured batch callback

    Batch callbacks don't work for structured workers when there is a mismatch between required keys and the callback args. This prevents raising errors when that happens and instead logs an informative message about how to remedy the situation.

v0-12-4-2022-08-18

v0.12.4 — 2022-08-18

  • [SmartEngine] Prevent corruption of global and rate limit counters.

    Some combinations of signal commands, acking, and fetching could cause the internal counts used for global and rate limit commands to become corrupted, which eventually prevented continued job fetching.

bug-fixes-3

Bug Fixes

v0-12-3-2022-08-09

v0.12.3 — 2022-08-09

bug-fixes-4

Bug Fixes

  • [SmartEngine] Restore global functionality with Postgres v10

    Prior to PG 11 there wasn't support for jsonb to integers. Oban officially supports back to PG 10, and we can work around the limitation by casting to text first.

  • [SmartEngine] Apply uniqueness within batches of insert_all_jobs

    Previously, uniqueness was only applied between batches. That meant uniquness wasn't enforced when inserting duplicates within a small-ish (<500) list.

  • [SmartEngine] Switch rate window time to full unix timestamp

    Some queues don't have new jobs for days. When that happens a the current time may be later in the day than the previous time, which breaks rate limit estimation. This switches to full unix timestamps for rate limit windows to prevent this issue and retain window accuracy across days.

  • [Chunk] Support manually cancelling jobs

    Chunk has always supported discarding some or all jobs from a chunk. Now that discarding is deprecated, chunk also supports cancelling as an alternative.

  • [DynamicQueues] Accept global partition options

  • [Testing] Remove use of tap/2 to restore Elixir v1.11 compatibility

  • [Producer] Explicitly set timestamp fields for all schemas

    Some applications override Ecto's default timestamp field, which breaks all DynamicCron and DynamicQueues queries and triggers a crash loop.

v0-12-2-2022-07-26

v0.12.2 — 2022-07-26

bug-fixes-5

Bug Fixes

  • [SmartEngine] Fix stalling in queues using global limits

    Tracking global counts was subject to a race condition between an in-memory copy of the producer and async job acking. That led to inconsistent counts that would stop the queue from processing jobs correctly.

  • [SmartEngine] Remove use of Map.reject for pre Elixir 1.13 compatibility

  • [SmartEngine] Ensure timestamps are set without database interraction

  • [Testing] Deduplicate jobs for run_workflow when given with_summary: false

    Workflow jobs don't run in a predictable order due to dependencies. That means the same job may execute two or more times, but even still, it should only be returned from run_jobs once.

  • [Testing] Include cancelled state in drain_jobs summary now that it's possible to have inline cancelled jobs.

  • [Worker] Fix the attach_hook spec to account for {:error, reason} return

  • [DynamicPruner] Forward log and prefix to delete_all call in Multi

    Batch operations in a Multi don't have shared options applied automatically.

v0-12-1-2022-07-22

v0.12.1 — 2022-07-22

bug-fixes-6

Bug Fixes

  • [SmartEngine] Safely cancel jobs that haven't ever ran.

    When a job was cancelled without ever having ran, it would cause an ack'ing error within the engine.

  • [SmartEngine] Record reason from {:cancel, reason} in the job's errors.

    The cancel_job callback signature remained the same, so this slipped through the cracks when the functionalty was added to the BasicEngine.

v0-12-0-2022-07-21

v0.12.0 — 2022-07-21

enhancements-2

Enhancements

  • [DynamicPruner] Add a before_delete option to operate on jobs before they're deleted, e.g. for logging, reporting, cold-storage, etc.

  • [Batch] Accept batch_callback_queue option to override where callback jobs are enqueued.

bug-fixes-7

Bug Fixes

  • [Workflow] Safely handle deleted upstream dependencies. Workflow jobs with a missing dependency no longer crash, and their status can be controlled with the new ignore_deleted workflow option.

  • [Workflow] Verify that jobs implement the workflow behaviour as they are added to a workflow.

  • [DynamicQueues] Default the local_limit to global_limit to match the behaviour of standard queues.

  • [SmartEngine] Restore accepting refresh_interval to control down-time record refreshing. As a virtual field, it wasn't included in the list of allowed keys.

  • [SmartEngine] Use the new shutdown callback for safer pausing during queue shutdown. During tests, or other situations where a database connection was no longer available, pausing could raise an unexpected error.

  • [SmartEngine] Prevent running jobs that exceed rate limit when there are large gaps between insertion/execution.

For changes prior to v0.12 see the v0.11 docs.