Oban.Pro.Decorator (Oban Pro v1.5.0-rc.0)
The Decorator
extension converts functions into Oban jobs with a simple annotation.
Decorated functions, such as those in contexts or other non-worker modules, can be executed as fully fledged background jobs with retries, scheduling, and the other guarantees you'd expect from Oban jobs.
Usage
To get started, use the Decorator
module, then annotate functions with the @job
attribute.
Here's a simple example that uses @job true
to decorate a function without any options:
defmodule MyApp.Business do
use Oban.Pro.Decorator
@job true
def weekly_report(tps_id, opts) do
...
end
end
The @job
attribute also accepts all standard Oban.Worker
options, e.g. max_attempts
,
priority
, queue
. This example swaps out @job true
for a list of basic options:
@job [max_ttempts: 3, priority: 1, queue: :reports]
def weekly_report(tps_id, opts) do
...
end
Now you can build and insert a job by calling insert_weekly_report/2
:
{:ok, job} = MyApp.Business.insert_weekly_report(123, pdf: true, rtf: true)
Notice that the second argument is a keyword list of options, which isn't normally allowed in job args because it's not JSON serializable.
Generated Functions
Functions decorated with @job
generate three variants of the original function:
new_*
— builds anOban.Job
changeset ready to be inserted for executioninsert_*
— builds and inserts anOban.Job
usingOban.insert/2
relay_*
— inserts a job, awaits execution, then returns the job's results
See the comp_opts/0
typespec for the subset of job options that are supported at compile
time. Additional options are available at runtime, as described in the Runtime
Options section below.
Using New
The new_
variant will build an Oban.Job
changeset that's ready for insertion, but not
persisted to the database. This is identical to the output from calling Oban.Worker.new/2
on
a standard worker.
changeset = Business.new_weekly_report(123)
The returned changeset is perfect for bulk inserts via Oban.insert_all/1
:
[123, 456, 789]
|> Enum.map(&Business.new_weekly_report/1)
|> Oban.insert_all()
It can also be used to compose batches or workflows:
alias Oban.Pro.Workflow
Workflow.new()
|> Workflow.add(:rep_1, Business.new_weekly_report(1))
|> Workflow.add(:rep_2, Business.new_weekly_report(2))
|> Workflow.add(:rep_3, Business.new_weekly_report(3))
|> Workflow.add(:fin, Business.new_finish_up(), deps: ~w(rep_1 rep_2 rep_3)a)
|> Oban.insert_all()
Using Insert
The insert_
variant builds a changeset and immediately calls Oban.insert/3
to enqueue it.
The result is a success tuple containing the job, or a changeset with errors.
{:ok, job} = Business.insert_weekly_report(123)
By default, jobs are inserted using the standard Oban
instance. For applications that run
multiple Oban instances, or use a non-standard name, you can override the instance with the
:oban
option:
{:ok, job} = Business.insert_weekly_report(123, oban: SomeOban)
Using Relay
The relay_
variant builds a changeset, inserts it, then leverages Oban.Pro.Relay
to await
execution and return a result:
{:ok, result} = Business.relay_weekly_report(123)
The default timeout is a brief 5000ms, which doesn't account for scheduling or queueing time. Provide an alternate timeout to wait longer:
case Business.relay_weekly_report(123, timeout: 30_000) do
{:ok, result} -> IO.inspect(result, label: "RESULT")
{:error, reason} -> IO.inspect(reason, label: "ERROR")
end
Note that the timeout
option only controls how long the local process will block while
awaiting a result. The job will keep executing regardless of the timeout period.
Considerations and Caveats
Decorated functions are a convenient way to run functions in the background, and suitable in many situations. However, there are circumstances where they're unsuitable and you should exercise care:
Advanced worker functionality such as custom backoffs, hooks, structured args, or callbacks requires a dedicated worker module and isn't suitable for decoration.
Args of any type are safely serialized, but dumping large amounts of data may cause performance problems because it must be serialized, stored, and deserialized.
Changing function signatures while jobs are in-flight can cause jobs to fail, just like changing the shape of args passed to a
process/1
callback.
Runtime Options
Each decorated function has an additional generated clause that accepts job options, e.g.
new_weekly_report/1
also has a new_weekly_report/2
variant.
All compile time options can be overridden at runtime. For example, to override the queue
and
max_attempts
:
Business.insert_weekly_report(123, queue: "default", max_attempts: 10)
In addition to compile time options, runtime options accepted by Oban.Job.new/2
(other than
worker
) are also allowed. This makes it possible to schedule decorated jobs:
Business.insert_weekly_report(123, schedule_in: {1, :minute})
See full_opts/0
for the complete typespec of runtime options.
Patterns and Guards
Each generated variant retains pattern matches and guards from the original function. That allows expressive, defensive data validation before a job executes.
For example, use a guard to ensure the id
argument is an integer:
@job true
def notify_user(id) when is_integer(id), do: ...
Or, pattern match on a map with a user_id
key and ensure the id
is an integer:
@job true
def notify_user(%{user_id: id}) when is_integer(id), do: ...
Complex Types
Any Elixir term may be passed as an argument, not just JSON encodable values. That enables passing native data-types such as tuples, keywords, or structs that can't easily be used in regular jobs.
@job true
def add_discount(%User{id: _}, amount: 10_000), do: ...
Avoid Non-Portable Types
Pass non-portable data types such as
pid
,reference
,port
with caution. There's no guarantee that a job will run on the same node and those specific values available. Furthermore, be careful passing anonymous functions because they are closures over the local environment.
Return Values
Decorated jobs respect the same standard return types as Oban.Pro.Worker.process/1
. That
means you can return an {:error, reason}
tuple to signify an error, or {:cancel, reason}
to
quietly cancel a job. However, because decorated functions weren't necessarily designed to be
executed in a job, unexpected returns are considered a success.
While there's no harm in returning nil
, a struct, or some other non-standard value, it's best
to return an explicitly support type such as :ok
or {:ok, value}
.
@job true
def update_account(user_id, params) do
user = Repo.get(User, user_id)
do_update(user, params)
- user
+ {:ok, user}
end
Unique and Replace
Unique and replace options are available for decorated jobs. However, they have purposeful limitations for compatibility with the decorated worker:
unique
— only supportsstates
,period
, andtimestamp
options. Thefields
andkeys
options aren't supported.replace
— expects the newer, per-state syntax, and it doesn't support replacing theworker
orargs
.
Both options can be defined at compilie in the @job
annotation:
@job [unique: [period: :infinity], replace: [scheduled: [:scheduled_at]]]
Or as runtime options:
Business.send_notice(user, schedule_in: 60, replace: [scheduled: [:scheduled_at]])
Limiting Decoration
To avoid generating unnecessary functions you can disable generating new
, insert
, or relay
functions via passing flags to use
:
use Oban.Pro.Decorator, new: false
use Oban.Pro.Decorator, insert: false
use Oban.Pro.Decorator, relay: false
Filtering options can be combined to restrict generation to one variant. For example, to only
generate insert_*
functions:
use Oban.Pro.Decorator, new: false, relay: false
Summary
Types
Options allowed in @job
annotations at compile time.
Options allowed at runtime when calling generated new_
, insert_
, or relay_
functions.
Types
comp_opts()
@type comp_opts() :: [ max_attempts: pos_integer(), oban: GenServer.name(), priority: 0..9, queue: atom() | binary(), replace: [Oban.Job.replace_by_state_option()], tags: Oban.Job.tags(), unique: true | [ period: timeout(), state: Oban.Job.unique_state(), timestamp: :inserted_at | :scheduled_at ] ]
Options allowed in @job
annotations at compile time.
full_opts()
@type full_opts() :: [ max_attempts: pos_integer(), meta: map(), oban: GenServer.name(), priority: 0..9, queue: atom() | binary(), replace: [Oban.Job.replace_by_state_option()], scheduled_at: DateTime.t(), schedule_in: Oban.Job.schedule_in_option(), tags: Oban.Job.tags(), timeout: timeout(), unique: true | [ period: timeout(), state: Oban.Job.unique_state(), timestamp: :inserted_at | :scheduled_at ] ]
Options allowed at runtime when calling generated new_
, insert_
, or relay_
functions.