Relay
The Relay
extension lets you insert and await the results of jobs locally or
remotely, across any number of nodes, so you can seamlessly distribute jobs and
await the results synchronously. Think of Relay
as persistent distributed tasks.
usage
Usage
typespecs
Typespecs
📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.
@type t :: %{job: Job.t(), pid: pid(), ref: UUID.t()}
@type await_result ::
:ok
| {:ok, term()}
| {:error, :discarded}
| {:error, :result_too_large}
| {:error, :snoozed}
| {:error, :timeout}
| {:error, Exception.t()}
async
Async
@spec async(Job.changeset()) :: t() | {:error, Job.changeset()}
@spec async(Oban.name(), Job.changeset()) :: t() | {:error, Job.changeset()}
Use Oban.Pro.Relay.async/1,2
to insert a job for asynchronous execution. The
single arity version takes a job changeset and inserts it:
relay =
%{id: 123}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:
changeset = MyApp.Worker.new(%{id: 123})
relay = Oban.Pro.Relay.async(MyOban, changeset)
The returned map contains the caller's pid and a unique ref that is used to await the results.
await
Await
@spec await(relay :: t()) :: await_result()
@spec await(relay :: t(), timeout()) :: await_result()
After inserting a job and constructing a relay, use await/1,2
to await the
job's execution and return the result:
{:ok, result} =
%{:ok, 4}
|> MyApp.Worker.new()
|> Oban.Pro.Relay.async()
|> Oban.Pro.Relay.await()
By default, await will timeout after 5 seconds and return an {:error, :timeout}
tuple. The job itself may continue to run, only the local process
stops waiting on it. The second argument to await/2
is a timeout value:
Oban.Pro.Relay.await(relay, :timer.seconds(30))
When the executed job fails, crashes, snoozes or is discarded the result comes
back as an error tuple with :discarded
, :snoozed
, or the exception. Note
that results are encoded using term_to_binary/2
with compression and then
decoded locally using binary_to_term/2
with safety enabled to prevent leaking
atoms.
Any value smaller than 8kb when compressed is can be returned. Values larger
than 8kb will return {:error, :result_too_large}
tuple.
await-many
Await Many
@spec await_many(relays :: [t()]) :: [await_result() | nil]
@spec await_many(relays :: [t()], timeout()) :: [await_result() | nil]
Use await_many/1,2
to await replies from multiple relays and return the
results. It returns a list of the results in the same order as the relays
supplied as the first argument.
relayed =
1..3
|> Enum.map(&DoubleWorker.new(%{int: &1}))
|> Enum.map(&Relay.async(&1))
|> Oban.Pro.Relay.await_many()
# [{:ok, 2}, {:ok, 4}, {:ok, 6}]
Unlike Task.await_many
or Task.yield_many
, Relay.await_many
may return
partial results when the timeout is reached. When a job hasn't finished
executing the value will be a timeout error tuple.
relayed =
[1, 2, 300_000_000]
|> Enum.map(&SlowWorker.new(%{int: &1}))
|> Enum.map(&Oban.Pro.Relay.async(&1))
|> Oban.Pro.Relay.await_many(100)
# [{:ok, 2}, {:ok, 4}, {:error, :timeout}]
implementation-notes
Implementation Notes
Some additional notes and requirements to be aware of:
Relay
uses PubSub for to transmit results. That means it will work without Erlang distribution or clustering, but it does require functional PostgreSQL based PubSub.Results are encoded using
:erlang.term_to_binary/2
and decoded using:erlang.binary_to_term/2
using the:safe
option to prevent the creation of new atoms or function references. If you're returning results with atoms you must be sure that those atoms are defined locally, where theawait/2
orawait_many/2
function is called.It doesn't matter which node executes a job, the result will still be broadcast back.