Oban.Pro.Decorator (Oban Pro v1.5.0-rc.0)

The Decorator extension converts functions into Oban jobs with a simple annotation.

Decorated functions, such as those in contexts or other non-worker modules, can be executed as fully fledged background jobs with retries, scheduling, and the other guarantees you'd expect from Oban jobs.

Usage

To get started, use the Decorator module, then annotate functions with the @job attribute. Here's a simple example that uses @job true to decorate a function without any options:

defmodule MyApp.Business do
  use Oban.Pro.Decorator

  @job true
  def weekly_report(tps_id, opts) do
    ...
  end
end

The @job attribute also accepts all standard Oban.Worker options, e.g. max_attempts, priority, queue. This example swaps out @job true for a list of basic options:

@job [max_ttempts: 3, priority: 1, queue: :reports]
def weekly_report(tps_id, opts) do
  ...
end

Now you can build and insert a job by calling insert_weekly_report/2:

{:ok, job} = MyApp.Business.insert_weekly_report(123, pdf: true, rtf: true)

Notice that the second argument is a keyword list of options, which isn't normally allowed in job args because it's not JSON serializable.

Generated Functions

Functions decorated with @job generate three variants of the original function:

  • new_* — builds an Oban.Job changeset ready to be inserted for execution
  • insert_* — builds and inserts an Oban.Job using Oban.insert/2
  • relay_* — inserts a job, awaits execution, then returns the job's results

See the comp_opts/0 typespec for the subset of job options that are supported at compile time. Additional options are available at runtime, as described in the Runtime Options section below.

Using New

The new_ variant will build an Oban.Job changeset that's ready for insertion, but not persisted to the database. This is identical to the output from calling Oban.Worker.new/2 on a standard worker.

changeset = Business.new_weekly_report(123)

The returned changeset is perfect for bulk inserts via Oban.insert_all/1:

[123, 456, 789]
|> Enum.map(&Business.new_weekly_report/1)
|> Oban.insert_all()

It can also be used to compose batches or workflows:

alias Oban.Pro.Workflow

Workflow.new()
|> Workflow.add(:rep_1, Business.new_weekly_report(1))
|> Workflow.add(:rep_2, Business.new_weekly_report(2))
|> Workflow.add(:rep_3, Business.new_weekly_report(3))
|> Workflow.add(:fin, Business.new_finish_up(), deps: ~w(rep_1 rep_2 rep_3)a)
|> Oban.insert_all()

Using Insert

The insert_ variant builds a changeset and immediately calls Oban.insert/3 to enqueue it. The result is a success tuple containing the job, or a changeset with errors.

{:ok, job} = Business.insert_weekly_report(123)

By default, jobs are inserted using the standard Oban instance. For applications that run multiple Oban instances, or use a non-standard name, you can override the instance with the :oban option:

{:ok, job} = Business.insert_weekly_report(123, oban: SomeOban)

Using Relay

The relay_ variant builds a changeset, inserts it, then leverages Oban.Pro.Relay to await execution and return a result:

{:ok, result} = Business.relay_weekly_report(123)

The default timeout is a brief 5000ms, which doesn't account for scheduling or queueing time. Provide an alternate timeout to wait longer:

case Business.relay_weekly_report(123, timeout: 30_000) do
  {:ok, result} -> IO.inspect(result, label: "RESULT")
  {:error, reason} -> IO.inspect(reason, label: "ERROR")
end

Note that the timeout option only controls how long the local process will block while awaiting a result. The job will keep executing regardless of the timeout period.

Considerations and Caveats

Decorated functions are a convenient way to run functions in the background, and suitable in many situations. However, there are circumstances where they're unsuitable and you should exercise care:

  • Advanced worker functionality such as custom backoffs, hooks, structured args, or callbacks requires a dedicated worker module and isn't suitable for decoration.

  • Args of any type are safely serialized, but dumping large amounts of data may cause performance problems because it must be serialized, stored, and deserialized.

  • Changing function signatures while jobs are in-flight can cause jobs to fail, just like changing the shape of args passed to a process/1 callback.

Runtime Options

Each decorated function has an additional generated clause that accepts job options, e.g. new_weekly_report/1 also has a new_weekly_report/2 variant.

All compile time options can be overridden at runtime. For example, to override the queue and max_attempts:

Business.insert_weekly_report(123, queue: "default", max_attempts: 10)

In addition to compile time options, runtime options accepted by Oban.Job.new/2 (other than worker) are also allowed. This makes it possible to schedule decorated jobs:

Business.insert_weekly_report(123, schedule_in: {1, :minute})

See full_opts/0 for the complete typespec of runtime options.

Patterns and Guards

Each generated variant retains pattern matches and guards from the original function. That allows expressive, defensive data validation before a job executes.

For example, use a guard to ensure the id argument is an integer:

@job true
def notify_user(id) when is_integer(id), do: ...

Or, pattern match on a map with a user_id key and ensure the id is an integer:

@job true
def notify_user(%{user_id: id}) when is_integer(id), do: ...

Complex Types

Any Elixir term may be passed as an argument, not just JSON encodable values. That enables passing native data-types such as tuples, keywords, or structs that can't easily be used in regular jobs.

@job true
def add_discount(%User{id: _}, amount: 10_000), do: ...

Avoid Non-Portable Types

Pass non-portable data types such as pid, reference, port with caution. There's no guarantee that a job will run on the same node and those specific values available. Furthermore, be careful passing anonymous functions because they are closures over the local environment.

Return Values

Decorated jobs respect the same standard return types as Oban.Pro.Worker.process/1. That means you can return an {:error, reason} tuple to signify an error, or {:cancel, reason} to quietly cancel a job. However, because decorated functions weren't necessarily designed to be executed in a job, unexpected returns are considered a success.

While there's no harm in returning nil, a struct, or some other non-standard value, it's best to return an explicitly support type such as :ok or {:ok, value}.

 @job true
 def update_account(user_id, params) do
   user = Repo.get(User, user_id)

   do_update(user, params)

-  user
+  {:ok, user}
 end

Unique and Replace

Unique and replace options are available for decorated jobs. However, they have purposeful limitations for compatibility with the decorated worker:

  • unique — only supports states, period, and timestamp options. The fields and keys options aren't supported.
  • replace — expects the newer, per-state syntax, and it doesn't support replacing the worker or args.

Both options can be defined at compilie in the @job annotation:

@job [unique: [period: :infinity], replace: [scheduled: [:scheduled_at]]]

Or as runtime options:

Business.send_notice(user, schedule_in: 60, replace: [scheduled: [:scheduled_at]])

Limiting Decoration

To avoid generating unnecessary functions you can disable generating new, insert, or relay functions via passing flags to use:

use Oban.Pro.Decorator, new: false
use Oban.Pro.Decorator, insert: false
use Oban.Pro.Decorator, relay: false

Filtering options can be combined to restrict generation to one variant. For example, to only generate insert_* functions:

use Oban.Pro.Decorator, new: false, relay: false

Summary

Types

Options allowed in @job annotations at compile time.

Options allowed at runtime when calling generated new_, insert_, or relay_ functions.

Types

@type comp_opts() :: [
  max_attempts: pos_integer(),
  oban: GenServer.name(),
  priority: 0..9,
  queue: atom() | binary(),
  replace: [Oban.Job.replace_by_state_option()],
  tags: Oban.Job.tags(),
  unique:
    true
    | [
        period: timeout(),
        state: Oban.Job.unique_state(),
        timestamp: :inserted_at | :scheduled_at
      ]
]

Options allowed in @job annotations at compile time.

@type full_opts() :: [
  max_attempts: pos_integer(),
  meta: map(),
  oban: GenServer.name(),
  priority: 0..9,
  queue: atom() | binary(),
  replace: [Oban.Job.replace_by_state_option()],
  scheduled_at: DateTime.t(),
  schedule_in: Oban.Job.schedule_in_option(),
  tags: Oban.Job.tags(),
  timeout: timeout(),
  unique:
    true
    | [
        period: timeout(),
        state: Oban.Job.unique_state(),
        timestamp: :inserted_at | :scheduled_at
      ]
]

Options allowed at runtime when calling generated new_, insert_, or relay_ functions.