Relay

The Relay extension lets you insert and await the results of jobs locally or remotely, across any number of nodes, so you can seamlessly distribute jobs and await the results synchronously. Think of Relay as persistent distributed tasks.

usage

Usage

typespecs

Typespecs

📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.

@type t :: %{job: Job.t(), pid: pid(), ref: UUID.t()}

@type await_result ::
        :ok
        | {:ok, term()}
        | {:error, :discarded}
        | {:error, :result_too_large}
        | {:error, :snoozed}
        | {:error, :timeout}
        | {:error, Exception.t()}

async

Async

@spec async(Job.changeset()) :: t() | {:error, Job.changeset()}
@spec async(Oban.name(), Job.changeset()) :: t() | {:error, Job.changeset()}

Use Oban.Pro.Relay.async/1,2 to insert a job for asynchronous execution. The single arity version takes a job changeset and inserts it:

relay =
  %{id: 123}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()

When the Oban instance has a custom name, or an app has multiple Oban instances, you can use the two arity version to select an instance:

changeset = MyApp.Worker.new(%{id: 123})
relay = Oban.Pro.Relay.async(MyOban, changeset)

The returned map contains the caller's pid and a unique ref that is used to await the results.

await

Await

@spec await(relay :: t()) :: await_result()
@spec await(relay :: t(), timeout()) :: await_result()

After inserting a job and constructing a relay, use await/1,2 to await the job's execution and return the result:

{:ok, result} =
  %{:ok, 4}
  |> MyApp.Worker.new()
  |> Oban.Pro.Relay.async()
  |> Oban.Pro.Relay.await()

By default, await will timeout after 5 seconds and return an {:error, :timeout} tuple. The job itself may continue to run, only the local process stops waiting on it. The second argument to await/2 is a timeout value:

Oban.Pro.Relay.await(relay, :timer.seconds(30))

When the executed job fails, crashes, snoozes or is discarded the result comes back as an error tuple with :discarded, :snoozed, or the exception. Note that results are encoded using term_to_binary/2 with compression and then decoded locally using binary_to_term/2 with safety enabled to prevent leaking atoms.

Any value smaller than 8kb when compressed is can be returned. Values larger than 8kb will return {:error, :result_too_large} tuple.

await-many

Await Many

@spec await_many(relays :: [t()]) :: [await_result() | nil]
@spec await_many(relays :: [t()], timeout()) :: [await_result() | nil]

Use await_many/1,2 to await replies from multiple relays and return the results. It returns a list of the results in the same order as the relays supplied as the first argument.

relayed =
  1..3
  |> Enum.map(&DoubleWorker.new(%{int: &1}))
  |> Enum.map(&Relay.async(&1))
  |> Oban.Pro.Relay.await_many()

# [{:ok, 2}, {:ok, 4}, {:ok, 6}]

Unlike Task.await_many or Task.yield_many, Relay.await_many may return partial results when the timeout is reached. When a job hasn't finished executing the value will be a timeout error tuple.

relayed =
  [1, 2, 300_000_000]
  |> Enum.map(&SlowWorker.new(%{int: &1}))
  |> Enum.map(&Oban.Pro.Relay.async(&1))
  |> Oban.Pro.Relay.await_many(100)

# [{:ok, 2}, {:ok, 4}, {:error, :timeout}]

implementation-notes

Implementation Notes

Some additional notes and requirements to be aware of:

  • Relay uses PubSub for to transmit results. That means it will work without Erlang distribution or clustering, but it does require functional PostgreSQL based PubSub.

  • Results are encoded using :erlang.term_to_binary/2 and decoded using :erlang.binary_to_term/2 using the :safe option to prevent the creation of new atoms or function references. If you're returning results with atoms you must be sure that those atoms are defined locally, where the await/2 or await_many/2 function is called.

  • It doesn't matter which node executes a job, the result will still be broadcast back.