Oban.Pro.Workflow behaviour (Oban Pro v1.6.0-rc.1)

Workflows compose jobs together with arbitrary dependencies, allowing sequential, fan-out, and fan-in execution workflows. Workflows are fault tolerant and scale horizontally across all available nodes.

Usage

Workflows link jobs together into a DAG (directed acyclic graph). Dependency resolution guarantees that jobs execute in the prescribed order regardless of scheduling or retries. Each workflow job will wait for all upstream dependencies to complete before it is made available to run.

As a trivial example, let's define an EchoWorker that only inspects the args, and then use it in a workflow to show how jobs execute in order. First, here's the worker:

defmodule MyApp.EchoWorker do
  use Oban.Pro.Worker, queue: :default

  @impl true
  def process(%{args: args}) do
    IO.inspect(args)

    :ok
  end
end

Now use new/1 to initialize a workflow, and add/4 to add named jobs with dependencies to the workflow:

alias MyApp.EchoWorker
alias Oban.Pro.Workflow

Workflow.new()
|> Workflow.add(:a, EchoWorker.new(%{id: 1}))
|> Workflow.add(:b, EchoWorker.new(%{id: 2}), deps: [:a])
|> Workflow.add(:c, EchoWorker.new(%{id: 3}), deps: [:b])
|> Workflow.add(:d, EchoWorker.new(%{id: 4}), deps: [:b])
|> Workflow.add(:e, EchoWorker.new(%{id: 5}), deps: [:c, :d])
|> Oban.insert_all()

When the workflow executes, it will print out each job's args in the prescribed order. However, because steps c and d each depend on b, they may execute in parallel.

Visually, the workflow jobs composes like this:

A
B
C
D
E

Dynamic Workflows

Many workflows aren't static—the number of jobs and their interdependencies aren't known beforehand.

The following worker accepts a count and generates a workflow that fans-out and back in twice, using a variable number of dependencies. The key is using Enum.reduce to accumulate a workflow with interpolated names, i.e. "a_0", "a_1", etc.

defmodule MyApp.Dynamic do
  use Oban.Pro.Worker

  alias Oban.Pro.Workflow

  @impl true
  def process(%{meta: %{"name" => name}}) do
    IO.puts(name)
  end

  def insert_workflow(count) when is_integer(count) do
    range = Range.new(0, count)
    a_deps = Enum.map(range, &"a_#{&1}")
    b_deps = Enum.map(range, &"b_#{&1}")

    Workflow.new()
    |> Workflow.add(:a, new(%{}), [])
    |> fan_out(:a, range)
    |> Workflow.add(:b, new(%{}), deps: a_deps)
    |> fan_out(:b, range)
    |> Workflow.add(:c, new(%{}), deps: b_deps)
    |> Oban.insert_all()
  end

  defp fan_out(workflow, base, range) do
    Enum.reduce(range, workflow, fn key, acc ->
      Workflow.add(acc, "#{base}_#{key}", new(%{}), deps: [base])
    end)
  end
end

Calling MyApp.Dynamic.insert_workflow(3) generates a workflow that fans out to 3 a and 3 b jobs:

A
A1
A2
A3
B
B1
B2
B3
C

Using Upstream Results

Directed dependencies between jobs, paired with the recorded option, allow a workflow's downstream jobs to fetch the output of upstream jobs.

To demonstrate, let's make a workflow that leverages get_recorded/3 to simulate a multi-step API interaction.

The first worker simulates fetching an authentication token using an api_key:

defmodule MyApp.WorkerA do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: %{"api_key" => api_key}}) do
    token =
      api_key
      |> String.graphemes()
      |> Enum.shuffle()
      |> to_string()

    {:ok, token}
  end
end

The second worker fetches the token from the first job by calling get_job/2 with the name :a, which we'll set while building the workflow later.

defmodule MyApp.WorkerB do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: %{"url" => url}} = job) do
    token = Oban.Pro.Workflow.get_recorded(job, :a)

    {:ok, {token, url}}
  end
end

Then the final worker uses all_recorded/3 with the only_deps option to fetch the results from all upstream jobs, then it prints out everything that was fetched.

defmodule MyApp.WorkerC do
  use Oban.Pro.Worker

  @impl true
  def process(job) do
    job
    |> Oban.Pro.Workflow.all_recorded(only_deps: true)
    |> IO.inspect()

    :ok
  end
end

The final step is to build a workflow that composes all of the jobs together with names, args, and deps:

alias MyApp.{WorkerA, WorkerB, WorkerC}

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{api_key: "23kl239bjljlk309af"}))
|> Workflow.add(:b, WorkerB.new(%{url: "elixir-lang.org"}), deps: [:a])
|> Workflow.add(:c, WorkerB.new(%{url: "www.erlang.org"}), deps: [:a])
|> Workflow.add(:d, WorkerB.new(%{url: "oban.pro"}), deps: [:a])
|> Workflow.add(:e, WorkerC.new(%{}), deps: [:b, :c, :d])
|> Oban.insert_all()
A
B
C
D
E

When the workflow runs the final step, e, prints out something like the following:

%{
  "a" => {"93l2jlj3kl90baf2k3", "elixir-lang.org"},
  "b" => {"93l2jlj3kl90baf2k3", "www.erlang.org"},
  "c" => {"93l2jlj3kl90baf2k3", "oban.pro"}
}

Customizing Workflows

Workflow ID

The default workflow_id is a time-ordered, random UUIDv7. This is more than sufficient to ensure that workflows are unique for any period of time. However, if you require more control you can pass a value directly to new/1.

Workflow.new(workflow_id: "custom-but-still-unique-id")

Workflow Name

Workflows accept an optional name to describe the purpose of the workflow, beyond the individual jobs in it. While the workflow_id must be unique, the workflow_name doesn't, so it can be used as a general purpose label.

Workflow.new(workflow_name: "nightly-etl")

Dependency Handling

Workflows use conservative defaults for dependency handling. You can customize the safety checks by providing a few top-level options:

  • ignore_cancelled — regard cancelled dependencies as completed rather than cancelling remaining jobs in the workflow. Defaults to false.

  • ignore_discarded — regard discarded dependencies as completed rather than cancelling remaining jobs in the workflow. Defaults to false.

  • ignore_deleted — regard deleted (typically pruned) dependencies as completed rather cancelling remaining jobs in workflow. Defaults to false.

The following example creates a workflow with all of the available options:

Workflow.new(ignore_cancelled: true, ignore_deleted: true, ignore_discarded: true)

Options may also be applied to individual workflow jobs For example, configure a single job to ignore cancelled dependencies, another to ignore discarded, and another to ignore deleted:

Workflow.new()
|> Workflow.add(:a, MyWorkflow.new(%{}))
|> Workflow.add(:b, MyWorkflow.new(%{}, deps: [:a], ignore_cancelled: true))
|> Workflow.add(:c, MyWorkflow.new(%{}, deps: [:b], ignore_discarded: true))
|> Workflow.add(:d, MyWorkflow.new(%{}, deps: [:c], ignore_deleted: true))

Stuck Workflows

Be sure that you're running the DynamicLifeline to rescue stuck workflows when upstream dependencies are deleted unexpectedly.

 config :my_app, Oban,
   plugins: [Oban.Pro.Plugins.DynamicLifeline],
   ...

Fetching Workflow Jobs

Workflow jobs are tied together through meta attributes. The get_job/3, all_jobs/3, and stream_jobs/3 functions use those attributes to load other jobs in a workflow. This is particularly useful from a worker's process/1 function. For example, to fetch all of the jobs in a workflow:

defmodule MyApp.Workflow do
  use Oban.Pro.Worker

  @impl true
  def process(%Job{} = job) do
    job
    |> Oban.Pro.Workflow.all_jobs()
    |> do_things_with_jobs()

    :ok
  end
end

It's also possible to scope fetching to only dependencies of the current job with only_deps:

deps = Workflow.all_jobs(job, only_deps: true)

Or, only fetch a single explicit dependency by name get_job/3:

dep_job = Workflow.get_job(job, :a)

For large workflows it may be inefficient to load all jobs in memory at once. In that case, you can use stream_jobs/3 to fetch jobs lazily. For example, to stream all of the completed jobs in a workflow:

defmodule MyApp.Workflow do
  use Oban.Pro.Worker

  @impl true
  def process(%Job{} = job) do
    {:ok, workflow_jobs} =
      MyApp.Repo.transaction(fn ->
        job
        |> Oban.Pro.Workflow.stream_jobs()
        |> Stream.filter(& &1.state == "completed")
        |> Enum.to_list()
      end)

    do_things_with_jobs(workflow_jobs)

    :ok
  end
end

Streaming is provided by Ecto's Repo.stream, and it must take place within a transaction. Using a stream lets you control the number of jobs loaded from the database, minimizing memory usage for large workflows.

Appending Workflow Jobs

Sometimes all jobs aren't known when the workflow is created. In that case, you can add more jobs with optional dependency checking using append/2. An appended workflow starts with one or more jobs, which reuses the original workflow_id, and optionally builds a set of dependencies to check against.

In this example we disable deps checking with check_deps: false:

defmodule MyApp.WorkflowWorker do
  use Oban.Pro.Worker

  alias Oban.Pro.Workflow

  @impl true
  def process(%Job{} = job) do
    jobs =
      job
      |> Workflow.append(check_deps: false)
      |> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
      |> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
      |> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
      |> Oban.insert_all()

    {:ok, jobs}
  end
end

The new jobs specify deps on preexisting jobs named :a, :b, and :c, but there isn't any guarantee those jobs actually exist. That could lead to an incomplete workflow where the new jobs may never complete.

To be safe and check jobs while appending we'll fetch all of the previous jobs with all_jobs/3 and feed them in:

defmodule MyApp.WorkflowWorker do
  use Oban.Pro.Worker

  alias Oban.Pro.Workflow

  @impl true
  def process(%Job{} = job) do
    {:ok, jobs} = Workflow.all_jobs(job)

    jobs
    |> Workflow.append()
    |> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
    |> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
    |> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
    |> Oban.insert_all()

    :ok
  end
end

Now there isn't any risk of an incomplete workflow from missing dependencies, at the expense of loading some extraneous jobs.

Adding Sub Workflows

Workflows can be nested hierarchically using sub workflows. This allows you to compose complex workflows from simpler ones, making it easier to organize and reuse workflow patterns.

The add_workflow/4 function lets you add an entire workflow as a dependency of another workflow. Like add/4, it accepts a name and optional dependencies, but instead of a job changeset, it takes another workflow.

Here's an example of creating a primary workflow and adding sub workflows with dependencies:

alias MyApp.{WorkerA, WorkerB, WorkerC}
alias Oban.Pro.Workflow

extr_flow =
  Workflow.new(workflow_name: "extract")
  |> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
  |> Workflow.add(:transform, WorkerB.new(%{type: "normalize"}), deps: :extract)

note_flow =
  Workflow.new(workflow_name: "notify")
  |> Workflow.add(:prepare, WorkerA.new(%{template: "report"}))
  |> Workflow.add(:send, WorkerB.new(%{method: "email"}), deps: :prepare)

# Create the main workflow and add sub workflows to it
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add_workflow(:notify, note_flow, deps: :extract)
|> Workflow.add(:finalize, WorkerC.new(%{mode: "cleanup"}), deps: :notify)
|> Oban.insert_all()

In this example, the main workflow has a single job named :setup, followed by two sub workflows, and ends with a :finalize job. The dependencies ensure proper execution order: setup -> extraction -> notification -> finalize.

Sub workflows are powerful for organizing large job dependencies into logical units, enabling reuse across applications, and simplifying the maintenance of complex job graphs.

Handling Cancellations

Workflow jobs are automatically cancelled when their upstream dependencies are cancelled, discarded, or deleted (unless specifically overridden using the ignore_* options as described earlier). Those workflow jobs are cancelled before they're executing, which means standard Oban.Pro.Worker.after_process/3 hooks won't be called. Instead, there's an optional after_cancelled/2 callback specifically for workflows.

Here's a trivial after_cancelled hook that logs a warning when a workflow job is cancelled:

defmodule MyApp.Workflow do
  use Oban.Pro.Worker

  @behaviour Oban.Pro.Workflow

  require Logger

  @impl Oban.Pro.Workflow
  def after_cancelled(reason, job) do
    Logger.warn("Workflow job #{job.id} cancelled because a dependency was #{reason}")
  end

Visualizing Workflows

Workflows are a type of a Directed Acyclic Graph, which means we can describe a workflow as a graph of jobs and dependencies, where execution flows between jobs. By converting the workflow into a standard graph description language such as mermaid or graphviz, we can visualize workflows.

Along with the general purpose to_graph/1 that outputs a digraph, there are a few built-in visualization options: to_dot/1 for graphviz digraphs and to_mermaid/1 for mermaid flowcharts.

The following example generates a mermaid flowchart from the account workflow from above:

Workflow.to_mermaid(archive_account_workflow(123))

Generates the following mermaid output, where each vertex is the job's name and the label is the worker:

flowchart TD
    backup_1[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    backup_2[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    backup_3[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    receipt[MyApp.FinalReceipt] --> backup_1[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> backup_2[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> backup_3[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> email_1[MyApp.EmailSubscriber]
    receipt[MyApp.FinalReceipt] --> email_2[MyApp.EmailSubscriber]
    email_1[MyApp.EmailSubscriber] --> delete[MyApp.DeleteAccount]
    email_2[MyApp.EmailSubscriber] --> delete[MyApp.DeleteAccount]

Now we can take that output and render it using the command line with a mermaid binary, or directly in LiveBook using Kino. The following example pipes a workflow through to_mermaid/1 and then into Kino.Mermaid:

workflow
|> Workflow.to_mermaid()
|> Kino.Mermaid.new()

That will generate the following SVG of the workflow:

MyApp.BackupPost
MyApp.DeleteAccount
MyApp.BackupPost
MyApp.BackupPost
MyApp.FinalReceipt
MyApp.EmailSubscriber
MyApp.EmailSubscriber

Looking at the flowchart we can clearly see how the workflow starts with a single render job, fans-out to multiple email and backup jobs, and finally fans-in to the delete job—exactly as we planned!

Summary

Callbacks

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

Functions

Add a named job to the workflow along with optional dependencies.

Add multiple named jobs as a sub workflow along with optional dependencies.

Add a sub workflow with a name and optional dependencies to another workflow.

Get all jobs for a workflow, optionally filtered by upstream deps.

Get all recordings for workflow jobs, optionally filtered by name or relationship.

Instantiate a new workflow from one or more existing workflow jobs.

Cancel one or more workflow jobs.

Gets a workflow's context value.

Get a single workflow job by name.

Fetch the recorded output from a workflow job.

Instantiate a new workflow struct with a unique workflow id.

Adds a context value to the workflow that can be accessed by all jobs.

Get detailed status information about a workflow.

Stream all jobs for a workflow.

Converts the given workflow to a graphviz dot digraph.

Converts the given workflow to a :digraph.

Generate a Mermaid flowchart in top-down orientation from a workflow.

Types

add_opts()

@type add_opts() :: [
  deps: name() | [name()],
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean()
]

add_workflow_opts()

@type add_workflow_opts() :: [{:deps, name() | [name()]}]

append_opts()

@type append_opts() :: [
  check_deps: boolean(),
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean(),
  workflow_id: String.t(),
  workflow_name: String.t()
]

cancel_opts()

@type cancel_opts() :: [{:names, [name()]}]

cancel_reason()

@type cancel_reason() :: :deleted | :discarded | :cancelled

fetch_opts()

@type fetch_opts() :: [
  log: Logger.level(),
  names: [name()],
  only_deps: boolean(),
  timeout: timeout(),
  with_subs: boolean()
]

job_or_wid()

@type job_or_wid() :: Oban.Job.t() | workflow_id()

name()

@type name() :: atom() | String.t()

new_opts()

@type new_opts() :: [
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean(),
  workflow_id: String.t(),
  workflow_name: String.t()
]

t()

@type t() :: %Oban.Pro.Workflow{
  changesets: [Oban.Job.changeset()],
  check_deps: boolean(),
  id: workflow_id(),
  names: MapSet.t(),
  opts: map(),
  subs: term()
}

workflow_id()

@type workflow_id() :: String.t()

Callbacks

after_cancelled(cancel_reason, job)

(optional)
@callback after_cancelled(cancel_reason(), job :: Oban.Job.t()) :: :ok

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

This callback is only called when a job is cancelled because of an upstream dependency. It is never called after normal job execution. For that, use Oban.Pro.Worker.after_process/3.

Functions

add(workflow, name, changeset, opts \\ [])

@spec add(t(), name(), Oban.Job.changeset(), add_opts()) :: t()

Add a named job to the workflow along with optional dependencies.

Examples

Add jobs to a workflow with dependencies:

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{id: id}))
|> Workflow.add(:b, WorkerB.new(%{id: id}), deps: [:a])
|> Workflow.add(:c, WorkerC.new(%{id: id}), deps: [:a])
|> Workflow.add(:d, WorkerC.new(%{id: id}), deps: [:b, :c])

add_many(workflow, name, changesets, opts \\ [])

@spec add_many(t(), name(), Enum.t(), new_opts() | add_opts()) :: t()

Add multiple named jobs as a sub workflow along with optional dependencies.

Examples

Add multiple jobs as a sub workflow with a list, where the keys will be the job's position, starting at 0:

Workflow.add_many(workflow, :sub_1, [WorkerA.new(%{}), WorkerB.new(%{})])

Add multiple jobs as a sub workflow with a map, where the keys are the job names:

Workflow.add_many(workflow, :sub_1, %{a: WorkerA.new(%{}), b: WorkerB.new(%{})})

Add multiple jobs as sub workflows and add a dependency on them:

Workflow.new()
|> Workflow.add_many(:a, changesets_a)
|> Workflow.add_many(:b, changesets_b)
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:a, :b])

Workflow options pay be provided to name or otherwise customize the sub-workflow:

Workflow.add_many(workflow, :sub_1, changesets, ignore_cancelled: true)

add_workflow(workflow, name, sub_workflow, opts \\ [])

@spec add_workflow(t(), name(), t(), add_workflow_opts()) :: t()

Add a sub workflow with a name and optional dependencies to another workflow.

This function lets you compose workflows by adding one workflow as a dependency within another workflow. Workflows may depend on jobs or other workflows, and jobs may depend on entire workflows. If a job depends on a workflow, it won't execute until every job in the workflow is in a final state (completed, cancelled, or discarded).

Examples

Add a sub workflow to a workflow with dependencies:

sub_1 =
  Workflow.new()
  |> Workflow.add(:a, WorkerB.new(%{id: id}))
  |> Workflow.add(:b, WorkerC.new(%{id: id}))

sub_2 =
  Workflow.new()
  |> Workflow.add(:a, WorkerB.new(%{id: id}))
  |> Workflow.add(:b, WorkerC.new(%{id: id}))

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{id: id}))
|> Workflow.add_workflow(:b, sub_1, deps: :a)
|> Workflow.add_workflow(:c, sub_2, deps: :b)
|> Workflow.add(:d, WorkerD.new(%{id: id}), deps: [:b, :c])

all_jobs(oban_name, job_or_wid, opts)

@spec all_jobs(Oban.name(), job_or_wid(), fetch_opts()) :: [Oban.Job.t()]

Get all jobs for a workflow, optionally filtered by upstream deps.

Examples

Retrieve all workflow jobs within a process/1 function:

def process(%Job{} = job) do
  job
  |> Workflow.all_jobs()
  |> do_things_with_jobs()

  :ok
end

Retrieve all of the current job's deps:

jobs = Workflow.all_jobs(job, only_deps: true)

Retrieve an explicit list of dependencies:

[job_a, job_b] = Workflow.all_jobs(job, names: [:a, :b])

Include sub workflow jobs:

[sub_a, sub_b] = Workflow.all_jobs(job, names: [:sub], with_subs: true)

Retrieve all jobs using a workflow_id:

jobs = Workflow.all_jobs("some-uuid-1234-5678")

Retrieve only some named jobs using a workflow_id:

[job_a, job_b] = Workflow.all_jobs("some-uuid-1234-5678", deps: [:a, :b])

Retrieve all jobs using a workflow_id and a custom Oban instance name:

jobs = Workflow.all_jobs(MyApp.Oban, "some-uuid-1234-5678")

all_recorded(oban_name, job_or_wid, opts)

@spec all_recorded(Oban.name(), job_or_wid(), fetch_opts()) :: map()

Get all recordings for workflow jobs, optionally filtered by name or relationship.

The result is a map of the job's name and recorded output. Unrecorded jobs, or those without a recording will be present in the map with a a nil value.

Examples

Retrieve results for all workflow jobs within a process/1 function:

def process(%Job{} = job) do
  %{"a" => job_a_return, "b" => job_b_return} = Workflow.all_recorded(job)

  :ok
end

Retrieve recordings for all of the current job's deps:

recorded = Workflow.all_recorded(job, only_deps: true)

Retrieve recordings for a list of dependencies:

%{"a" => _, "b" => _} = Workflow.all_recorded(job, names: [:a, :b])

Retrieve recordings for some named jobs using the workflow_id:

%{"a" => _, "b" => _} = Workflow.all_recorded("some-uuid-1234-5678", names: [:a, :b])

append(jobs, opts \\ [])

@spec append(Oban.Job.t() | [Oban.Job.t()], append_opts()) :: t()

Instantiate a new workflow from one or more existing workflow jobs.

Examples

Append to a workflow seeded with all other jobs in the workflow:

jobs
|> Workflow.append()
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()

Append to a workflow from a single job and bypass checking deps:

job
|> Workflow.append(check_deps: false)
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()

cancel_jobs(oban_name, job_or_wid, opts)

@spec cancel_jobs(Oban.name(), job_or_wid(), cancel_opts()) :: {:ok, integer()}

Cancel one or more workflow jobs.

This uses Oban.cancel_all_jobs/2 internally, and adheres to the same cancellation rules. Namely, it won't touch jobs in a completed, cancelled, or discarded state.

Examples

Cancel jobs with a workflow job from a process/1 function:

def process(job) do
  if should_stop_processing?(job.args) do
    Workflow.cancel_jobs(job)
  else
    ...
  end
end

Cancel specific workflow jobs:

Workflow.cancel_jobs("some-uuid-1234-5678", names: [:a, :b])

Cancel all jobs in a workflow by workflow_id:

Workflow.cancel_jobs("some-uuid-1234-5678")

Cancel jobs from anywhere with a custom Oban instance name:

Workflow.cancel_jobs(MyApp.Oban, "some-uuid-1234-5678")

get_context(oban_name \\ Oban, job_or_wid)

@spec get_context(Oban.name(), job_or_wid()) :: nil | map()

Gets a workflow's context value.

This is a convenience function that internally calls get_recorded/2 with the :context name.

Examples

Retrieve the context within a worker:

def process(job) do
  context = Workflow.get_context(job)
end

Retrieve the context with a workflow id:

Workflow.get_context("some-uuid-1234-5678")

Retrieve the context using a custom Oban instance name:

Workflow.get_context(MyApp.Oban, "some-uuid-1234-5678")

get_job(oban_name \\ Oban, job_or_wid, deps_name)

@spec get_job(Oban.name(), job_or_wid(), name()) :: nil | Oban.Job.t()

Get a single workflow job by name.

Examples

Get the workflow job named :step_1 from within process/:

def process(job) do
  case Workflow.get_job(job, :step_1) do
    nil -> ...
    dep_job -> use_the_job(dep_job)
  end
end

Get a named workflow job with the workflow_id:

Workflow.get_job("some-uuid-1234-5678", :step_2)

Get a named workflow job with the workflow_id and a custom Oban instance name:

Workflow.get_job(MyApp.Oban, "some-uuid-1234-5678", :step_2)

get_recorded(oban_name \\ Oban, job_or_wid, deps_name)

@spec get_recorded(Oban.name(), job_or_wid(), name()) :: any()

Fetch the recorded output from a workflow job.

This function provides an optimized way to get recorded output with a single query.

A nil value is returned when a dependency is incomplete, missing, unrecorded, or lacks recorded output.

Examples

Get the recorded output from :step_1 within process/:

def process(job) do
  case Workflow.get_recorded(job, :step_1) do
    nil -> ...
    val -> use_recorded_output(val)
  end
end

Get recorded output using the workflow id:

Workflow.get_recorded("some-uuid-1234-5678", :step_2)

Get recorded output using the workflow_id and a custom Oban instance name:

Workflow.get_recorded(MyApp.Oban, "some-uuid-1234-5678", :step_2)

new(opts \\ [])

@spec new(opts :: new_opts()) :: t()

Instantiate a new workflow struct with a unique workflow id.

Examples

Create a standard workflow without any options:

Workflow.new()

Create a workflow with a custom name:

Workflow.new(workflow_name: "logistics")

Create a workflow with a static id and some options:

Workflow.new(workflow_id: "workflow-id", ignore_cancelled: true, ignore_discarded: true)

put_context(workflow, value)

@spec put_context(t(), map()) :: t()

Adds a context value to the workflow that can be accessed by all jobs.

This is a convenience function that creates a recorded job named :context containing any value that needs to be shared across multiple workflow jobs.

Examples

Add a context value to a workflow:

Workflow.new()
|> Workflow.put_context(%{user_id: 123, action: "process"})
|> Workflow.add(:job_a, WorkerA.new(%{}))
|> Workflow.add(:job_b, WorkerB.new(%{}))
|> Oban.insert_all()

Later, retrieve the context in a worker:

def process(job) do
  context = Workflow.get_context(job)
  # Use context map...
end

retry_jobs(oban_name \\ Oban, job_or_wid, opts \\ [])

@spec retry_jobs(Oban.name(), job_or_wid(), keyword()) :: {:ok, integer()}

Retry all jobs in a workflow.

By default, this function will retry all jobs that aren't executing or available. Jobs with dependencies will be placed on hold to run again once their dependencies are complete.

Examples

Retry only failed jobs in a workflow from within a process function:

def process(job) do
  Workflow.retry_jobs(job)
end

Retry a workflow by ID:

Workflow.retry_jobs("some-uuid-1234-5678")

Retry a workflow with a custom Oban instance:

Workflow.retry_jobs(MyApp.Oban, "some-uuid-1234-5678")

status(oban_name \\ Oban, job_or_wid)

@spec status(Oban.name(), job_or_wid()) :: map()

Get detailed status information about a workflow.

Returns a map with the following fields:

  • :name - The name of the workflow
  • :total - Total number of jobs in the workflow
  • :state - Overall status of the workflow (e.g. executing, completed, cancelled, discarded)
  • :counts - Map of job state counts (e.g. %{completed: 10, available: 2})
  • :duration - Elapsed time of the workflow in milliseconds (nil if not yet ran)
  • :started_at - When the first job in the workflow was inserted
  • :stopped_at - When the last job in the workflow finished

Examples

Get a workflow's status using a job:

status = Workflow.status(job)

Get a workflow's status using the workflow_id:

status = Workflow.status("some-uuid-1234-5678")

Get a workflow's status using a custom Oban instance name:

status = Workflow.status(MyApp.Oban, "some-uuid-1234-5678")

stream_jobs(oban_name, job_or_wid, opts)

@spec stream_jobs(Oban.name(), Oban.Job.t() | String.t(), fetch_opts()) :: Enum.t()

Stream all jobs for a workflow.

This function behaves identically to all_jobs/3, except it streams jobs lazily from within a Repo transaction.

Examples

Stream with filtering to only preserve completed jobs:

def process(%Job{} = job) do
  {:ok, workflow_jobs} =
    MyApp.Repo.transaction(fn ->
      job
      |> Workflow.stream_jobs()
      |> Stream.filter(& &1.state == "completed")
      |> Enum.to_list()
    end)

  do_things_with_jobs(workflow_jobs)

  :ok
end

Stream workflow jobs from anywhere using the workflow_id:

MyApp.Repo.transaction(fn ->
  "some-uuid-1234-5678"
  |> Workflow.stream_jobs()
  |> Enum.map(& &1.args["account_id"])
end)

Stream workflow jobs using a custom Oban instance name:

MyApp.Repo.transaction(fn ->
  MyApp.Oban
  |> Workflow.stream_jobs("some-uuid-1234-5678")
  |> Enum.map(& &1.args["account_id"])
end)

to_dot(workflow)

@spec to_dot(flow :: t()) :: String.t()

Converts the given workflow to a graphviz dot digraph.

Examples

Generate a dot digraph:

Workflow.to_dot(workflow)

to_graph(workflow)

@spec to_graph(flow :: t()) :: :digraph.graph()

Converts the given workflow to a :digraph.

The resulting digraph can be explored, evaluated, and then be converted to a number of other formats.

Examples

Generate a digraph from a workflow:

graph = Workflow.to_graph(workflow)

Check the path between jobs in a workflow:

workflow
|> Workflow.to_graph()
|> :digraph.get_path("step_1", "step_5")

to_mermaid(workflow)

@spec to_mermaid(flow :: t()) :: String.t()

Generate a Mermaid flowchart in top-down orientation from a workflow.

Examples

Generate a flowchart:

Workflow.to_mermaid(workflow)