Relay Plugin

🌟 This plugin is available through Oban.Pro

The Relay plugin lets you insert and await the results of jobs locally or remotely, across any number of nodes, i.e. persistent distributed tasks. Once the plugin is running, you can seamlessly distribute jobs and await the results synchronously.

installation

Installation

The Relay plugin must be started to coordinate events and messages for async/await functions.

Start by adding it to your plugins:

plugins: [Oban.Pro.Plugins.Relay]

There isn't any extra configuration necessary (or possible).

usage

Usage

typespecs

Typespecs

📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.

@type t :: %{job: Job.t(), pid: pid(), ref: UUID.t()}

@type await_result ::
        :ok
        | {:ok, term()}
        | {:error, :discarded}
        | {:error, :result_too_large}
        | {:error, :snoozed}
        | {:error, :timeout}
        | {:error, Exception.t()}

async

Async

@spec async(Job.changeset()) :: t() | {:error, Job.changeset()}
@spec async(Oban.name(), Job.changeset()) :: t() | {:error, Job.changeset()}

Use Relay.async/1,2 to insert a job for asynchronous execution. The single arity version takes a job changeset and inserts it:

relay =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Relay.async()

When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:

changeset = MyApp.Worker.new(%{id: 123})
relay = Relay.async(MyOban, changeset)

The returned map contains the caller's pid and a unique ref that is used to await the results.

await

Await

@spec await(relay :: t()) :: await_result()
@spec await(relay :: t(), timeout()) :: await_result()

After inserting a job and constructing a relay, use Relay.await/1,2 to await the job's execution and return the result:

{:ok, result} =
  %{:ok, 4}
  |> MyApp.Worker.new()
  |> Relay.async()
  |> Relay.await()

By default, await will timeout after 5 seconds and return an {:error, :timeout} tuple. The job itself may continue to run, only the local process stops waiting on it. The second argument to await/2 is a timeout value:

Relay.await(relay, :timer.seconds(30))

When the executed job fails, crashes, snoozes or is discarded the result comes back as an error tuple with :discarded, :snoozed, or the exception. Note that results are encoded using term_to_binary/2 with compression and then decoded locally using binary_to_term/2 with safety enabled to prevent leaking atoms.

Any value smaller than 8kb when compressed is can be returned. Values larger than 8kb will return {:error, :result_too_large} tuple.

await-many

Await Many

@spec await_many(relays :: [t()]) :: [await_result() | nil]
@spec await_many(relays :: [t()], timeout()) :: [await_result() | nil]

Use Relay.await_many/1,2 to await replies from multiple relays and return the results. It returns a list of the results in the same order as the relays supplied as the first argument.

relayed =
  1..3
  |> Enum.map(&DoubleWorker.new(%{int: &1}))
  |> Enum.map(&Relay.async(&1))
  |> Relay.await_many()

# [{:ok, 2}, {:ok, 4}, {:ok, 6}]

Unlike Task.await_many or Task.yield_many, Relay.await_many may return partial results when the timeout is reached. When a job hasn't finished executing the value will be a timeout error tuple.

relayed =
  [1, 2, 300_000_000]
  |> Enum.map(&SlowWorker.new(%{int: &1}))
  |> Enum.map(&Relay.async(&1))
  |> Relay.await_many(100)

# [{:ok, 2}, {:ok, 4}, {:error, :timeout}]

implementation-notes

Implementation Notes

Some additional notes and requirements to be aware of:

  • Relay uses PubSub for to transmit results. That means it will work without Erlang distribution or clustering, but it does require functional PostgreSQL based PubSub.

  • It doesn't matter where a job is executed, the result will still be broadcast back provided that the Oban instance which processed it is running the Relay plugin as well.

  • Results are encoded using :erlang.term_to_binary/2 and decoded using :erlang.binary_to_term/2 using the :safe option to prevent the creation of new atoms or function references. If you're returning results with atoms you must be sure that those atoms are defined locally, where the await/2 or await_many/2 function is called.