Oban.Pro.Workflow behaviour (Oban Pro v1.6.0-rc.2)

Workflows compose jobs with flexible dependency relationships, enabling sequential execution, fan-out parallelization, and fan-in convergence patterns. By declaratively defining jobs and their connections, you can build complex processing pipelines that are both fault-tolerant and horizontally scalable across all nodes.

Basic Usage

Workflows are composed of regular jobs or decorated functions linked together through declarative dependencies. A workflow consists of these key elements:

  • Jobs - Individual units of work identified by unique names
  • Dependencies - Relationships specifying execution order

Let's create a workflow using a decorated echo/1 function that demonstrates how jobs execute in order:

defmodule EchoWorkflow do
  use Oban.Pro.Decorator

  alias Oban.Pro.Workflow

  def insert do
    Workflow.new()
    |> Workflow.add(:a, new_echo(1))
    |> Workflow.add(:b, new_echo(2), deps: :a)
    |> Workflow.add(:c, new_echo(3), deps: :b)
    |> Workflow.add(:d, new_echo(4), deps: :b)
    |> Workflow.add(:e, new_echo(5), deps: [:c, :d])
    |> Oban.insert_all()
  end

  @job true
  def echo(value), do: IO.inspect(value)
end

The workflow is initialized with Workflow.new/1, jobs are added with Workflow.add/3, connections between jobs are added with the deps option, and finally the workflow is inserted with Oban.insert_all/1.

When executed, this workflow will print each job's value in the prescribed order. Steps :c and :d both depend on :b, so they may execute in parallel. Visually, the workflow looks like this:

Common Patterns

Workflows support several execution patterns. These patterns form the building blocks of workflow design, allowing you to create both simple linear processes and complex dependency graphs. Each pattern serves a specific purpose in orchestrating job execution:

Example Workers

The following examples all use a generic Worker module to help focus on declaring dependencies. The patterns apply to any type of worker module or decorated function.

Sequential Execution

Jobs run one after another, waiting for scheduled or retryable delays:

Workflow.new()
|> Workflow.add(:first, new_job())
|> Workflow.add(:second, Worker.new(%{}), deps: :first)
|> Workflow.add(:third, Worker.new(%{}), deps: :second)

Fan-Out (1-to-many)

One job triggers multiple parallel jobs:

Workflow.new()
|> Workflow.add(:parent, Worker.new(%{}))
|> Workflow.add(:child_1, Worker.new(%{}), deps: :parent)
|> Workflow.add(:child_2, Worker.new(%{}), deps: :parent)
|> Workflow.add(:child_3, Worker.new(%{}), deps: :parent)

Fan-In (many-to-1)

Multiple parallel jobs converge to a single job:

Workflow.new()
|> Workflow.add(:step_1, Worker.new(%{}))
|> Workflow.add(:step_2, Worker.new(%{}))
|> Workflow.add(:step_3, Worker.new(%{}))
|> Workflow.add(:final, Worker.new(%{}), deps: [:step_1, :step_2, :step_3])

Diamond Pattern

Combines fan-out and fan-in:

Workflow.new()
|> Workflow.add(:start, Worker.new(%{}))
|> Workflow.add(:left, Worker.new(%{}), deps: :start)
|> Workflow.add(:right, Worker.new(%{}), deps: :start)
|> Workflow.add(:end, Worker.new(%{}), deps: [:left, :right])

These patterns can be combined to create complex workflows tailored to your specific business processes.

Advanced Patterns

Once you're comfortable with basic workflows, you can leverage more advanced patterns to handle complex scenarios. These patterns enable shared state, modular composition, simplified dependencies, and dynamic generation of workflows.

Sub-Workflows

Workflows can be nested hierarchically using sub workflows. This allows you to compose complex workflows from simpler ones, making it easier to organize and reuse workflow patterns.

The add_workflow/4 function lets you add an entire workflow as a dependency of another workflow. Like add/4, it accepts a name and optional dependencies, but instead of a job changeset, it takes another workflow.

Here's an example of creating a primary workflow and adding sub workflows with dependencies:

alias MyApp.{WorkerA, WorkerB, WorkerC}
alias Oban.Pro.Workflow

extr_flow =
  Workflow.new(workflow_name: "extract")
  |> Workflow.add(:extract, WorkerA.new(%{source: "database"}))
  |> Workflow.add(:transform, WorkerB.new(%{type: "normalize"}), deps: :extract)

note_flow =
  Workflow.new(workflow_name: "notify")
  |> Workflow.add(:prepare, WorkerA.new(%{template: "report"}))
  |> Workflow.add(:send, WorkerB.new(%{method: "email"}), deps: :prepare)

# Create the main workflow and add sub workflows to it
Workflow.new()
|> Workflow.add(:setup, WorkerC.new(%{mode: "initialize"}))
|> Workflow.add_workflow(:extract, extr_flow, deps: :setup)
|> Workflow.add_workflow(:notify, note_flow, deps: :extract)
|> Workflow.add(:finalize, WorkerC.new(%{mode: "cleanup"}), deps: :notify)
|> Oban.insert_all()

In this example, the main workflow has a single job named :setup, followed by two sub workflows, and ends with a :finalize job. The dependencies ensure proper execution order: setup -> extraction -> notification -> finalize.

Sub-workflows are a powerful pattern for:

  • Organizing large job dependencies into logical units
  • Enabling reuse across applications
  • Simplifying the maintenance of complex job graphs

Sharing Results

Directed dependencies between jobs, paired with the recorded option, allow downstream jobs to fetch the output of upstream jobs. This is particularly useful for multi-step processes where each step builds on previous results.

Consider this workflow that simulates a multi-step API interaction:

defmodule MyApp.WorkerA do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: %{"api_key" => api_key}}) do
    token =
      api_key
      |> String.graphemes()
      |> Enum.shuffle()
      |> to_string()

    {:ok, token}  # This return value will be recorded
  end
end

The second worker fetches the token from the first job by calling get_recorded/2 with the name :a, which we'll set while building the workflow later.

defmodule MyApp.WorkerB do
  use Oban.Pro.Worker, recorded: true

  @impl true
  def process(%Job{args: %{"url" => url}} = job) do
    token = Oban.Pro.Workflow.get_recorded(job, :a)

    {:ok, {token, url}}
  end
end

Then the final worker uses all_recorded/3 with the only_deps option to fetch the results from all upstream jobs, then it prints out everything that was fetched.

defmodule MyApp.WorkerC do
  use Oban.Pro.Worker

  @impl true
  def process(job) do
    job
    |> Oban.Pro.Workflow.all_recorded(only_deps: true)
    |> IO.inspect()

    :ok
  end
end

Now compose the workers together:

alias MyApp.{WorkerA, WorkerB, WorkerC}

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{api_key: "23kl239bjljlk309af"}))
|> Workflow.add(:b, WorkerB.new(%{url: "elixir-lang.org"}), deps: [:a])
|> Workflow.add(:c, WorkerB.new(%{url: "www.erlang.org"}), deps: [:a])
|> Workflow.add(:d, WorkerB.new(%{url: "oban.pro"}), deps: [:a])
|> Workflow.add(:e, WorkerC.new(%{}), deps: [:b, :c, :d])
|> Oban.insert_all()

When the workflow runs, the final step prints something like:

%{
  "b" => {"93l2jlj3kl90baf2k3", "elixir-lang.org"},
  "c" => {"93l2jlj3kl90baf2k3", "www.erlang.org"},
  "d" => {"93l2jlj3kl90baf2k3", "oban.pro"}
}

Sharing results between jobs is a powerful building block for processing pipelines. While the approach above works well, cascading functions provide an even more elegant way to share and transform data between workflow steps.

Cascading Functions

Cascade mode allows you to build workflows using function captures that automatically receive context and previous step results. Each function receives a map containing the workflow context and the results of its dependencies. Results from each step are recorded and made available to subsequent steps.

Here's an ETL (Extract, Transform, Load) pipeline using cascading functions:

defmodule MyApp.ETL do
  def insert(source, date) do
    Workflow.new()
    |> Workflow.put_context(%{source: source, date: date})
    |> Workflow.add_cascade(:extract, &extract/1)
    |> Workflow.add_cascade(:transform, &transform/1, deps: :extract)
    |> Workflow.add_cascade(:load, &load/1, deps: :transform)
    |> Workflow.add_cascade(:notify, &notify/1, deps: [:transform, :load])
    |> Oban.insert_all()
  end

  def extract(%{source: source, date: date}) do
    if source =~ ~r/[a-z0-9]+/ do
      %{records: [1, 2, 3], extracted_at: DateTime.utc_now()}
    else
      {:cancel, "unprocessable source"}
    end
  end

  def transform(%{extract: %{records: records}}) do
    transformed = Enum.map(records, & &1 * 2)
    %{records: transformed, count: length(transformed)}
  end

  def load(%{transform: %{records: records}}) do
    %{loaded: true, count: length(records)}
  end

  def notify(%{transform: transform, load: load}) do
    IO.puts("Processed #{transform.count} records, loaded #{load.count}")
  end
end

When this workflow runs, each function is automatically called with a context map containing the shared context (source and date), and results from dependencies (accessible by their step names).

There's no need to manually fetch recorded results as with standard workflow jobs!

Cascading functions are especially valuable for:

  • Data pipelines where each step needs the output of previous steps
  • Validating aggregate context before proceeding
  • Minimizing worker and function definition boilerplate

Cascading Sub-Workflow

Cascades can be added as sub-workflows that efficiently fan out workloads across multiple items. This works similar to add_many/4, but for cascading functions.

To create a fan-out cascade, provide a tuple containing an enumerable (like a list, map, or range) and a function capture with arity 2. The function will be called once for each item in the enumerable, with the first argument receiving the current item and the second argument receiving the cumulative context.

defmodule MyApp.BatchETL do
  def insert(sources, date) do
    Workflow.new()
    |> Workflow.put_context(%{date: date})
    |> Workflow.add_cascade(:sources, {sources, &extract_source/2})
    |> Workflow.add_cascade(:transform, &transform_all/1, deps: :sources)
    |> Workflow.add_cascade(:load, &load_data/1, deps: :transform)
    |> Workflow.add_cascade(:notify, &send_notification/1, deps: [:transform, :load])
    |> Oban.insert_all()
  end

  # Each `source` from the list is passed as the first argument
  def extract_source(source, context) do
    %{source: source, records: [1, 2, 3], extracted_at: DateTime.utc_now()}
  end

  def transform_all(%{sources: sources}) do
    total_records = Enum.sum_by(sources, fn {_key, result} -> length(result.records) end)

    transformed =
      sources
      |> Enum.flat_map(fn {_key, %{records: records}} -> records end)
      |> Enum.map(&(&1 * 2))

    %{records: transformed, count: total_records}
  end

  def load_data(%{transform: %{records: records}}) do
    %{loaded: true, count: length(records)}
  end

  def send_notification(%{transform: transform, load: load, date: date}) do
    IO.puts("On #{date}, processed #{transform.count} records, loaded #{load.count}")
  end
end

This example processes multiple data sources in parallel, then consolidates the results in subsequent steps. The extract_source/2 function is automatically called for each source in the provided list, with the results collected into a map.

Cascading sub-workflows are useful for:

  • Processing collections of items in parallel with shared context
  • Building data pipelines that operate on multiple sources simultaneously
  • Implementing fan-out/fan-in patterns with minimal boilerplate
  • Maintaining clean separation between item processing and result aggregation

Dynamic Workflows

Many workflows aren't static—the number of jobs and their interdependencies aren't known beforehand. You can generate workflows dynamically based on runtime conditions.

The following worker creates a workflow that fans-out and back in twice, using a variable number of dependencies:

defmodule MyApp.Dynamic do
  use Oban.Pro.Worker

  alias Oban.Pro.Workflow

  @impl true
  def process(%{meta: %{"name" => name}}) do
    IO.puts(name)
  end

  def insert_workflow(count) when is_integer(count) do
    range = Range.new(0, count)
    a_deps = Enum.map(range, &"a_#{&1}")
    b_deps = Enum.map(range, &"b_#{&1}")

    Workflow.new()
    |> Workflow.add(:a, new(%{}), [])
    |> fan_out(:a, range)
    |> Workflow.add(:b, new(%{}), deps: a_deps)
    |> fan_out(:b, range)
    |> Workflow.add(:c, new(%{}), deps: b_deps)
    |> Oban.insert_all()
  end

  defp fan_out(workflow, base, range) do
    Enum.reduce(range, workflow, fn key, acc ->
      Workflow.add(acc, "#{base}_#{key}", new(%{}), deps: [base])
    end)
  end
end

This approach is useful for:

  • Processing variable-sized collections
  • Creating workflows based on database queries
  • Building complex workflows from configuration
  • Implementing multi-tenant workflows with different requirements

Appending Jobs

Sometimes all jobs aren't known when the workflow is created. In that case, you can add more jobs with optional dependency checking using append/2. An appended workflow starts with one or more jobs, which reuses the original workflow_id, and optionally builds a set of dependencies to check against.

In this example we disable deps checking with check_deps: false:

def process(job) do
  jobs =
    job
    |> Workflow.append(check_deps: false)
    |> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
    |> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
    |> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
    |> Oban.insert_all()

  {:ok, jobs}
end

The new jobs specify deps on preexisting jobs named :a, :b, and :c, but there isn't any guarantee those jobs actually exist. That could lead to an incomplete workflow where the new jobs may never complete.

To be safe and check jobs while appending we'll fetch all of the previous jobs with all_jobs/3 and feed them in:

def process(job) do
  jobs = Workflow.all_jobs(job)

  jobs
  |> Workflow.append()
  |> Workflow.add(:d, WorkerD.new(%{}), deps: :a)
  |> Workflow.add(:e, WorkerE.new(%{}), deps: :b)
  |> Workflow.add(:f, WorkerF.new(%{}), deps: :c)
  |> Oban.insert_all()

  :ok
end

Now there isn't any risk of an incomplete workflow from missing dependencies, at the expense of loading some extraneous jobs.

Appending jobs is particularly useful for:

  • Extending workflows based on user interaction or external events
  • Adding additional reporting or cleanup steps after seeing initial results
  • Breaking large workflows into stages that are built incrementally

Customization

Workflows provide several customization options to control their behavior, from identification to error handling. This section covers how to tailor workflows to your specific requirements.

Workflow ID

Every workflow needs a unique identifier. By default, workflow_id is a time-ordered, random UUIDv7, which ensures uniqueness for any period. For more control, you can provide a custom ID:

Workflow.new(workflow_id: "custom-but-still-unique-id")

Workflow Name

Additionally, workflows accept an optional name that describes their purpose:

Workflow.new(workflow_name: "nightly-etl")

While the workflow_id must be unique, the workflow_name doesn't have to be, so it can serve as a general-purpose label for categorizing or grouping workflows.

Dependency Handling

By default, workflows use conservative dependency handling - if an upstream job is cancelled, discarded, or deleted, dependent jobs are automatically cancelled. You can customize this behavior with these options:

  • ignore_cancelled — Treat cancelled dependencies as completed rather than cancelling downstream jobs. Defaults to false.

  • ignore_discarded — Treat discarded dependencies as completed rather than cancelling downstream jobs. Defaults to false.

  • ignore_deleted — Treat deleted (typically pruned) dependencies as completed rather than cancelling downstream jobs. Defaults to false.

Apply these options to the entire workflow:

Workflow.new(ignore_cancelled: true, ignore_deleted: true, ignore_discarded: true)

Or configure individual jobs within a workflow:

Workflow.new()
|> Workflow.add(:a, MyWorkflow.new(%{}))
|> Workflow.add(:b, MyWorkflow.new(%{}), deps: :a, ignore_cancelled: true)
|> Workflow.add(:c, MyWorkflow.new(%{}), deps: :b, ignore_discarded: true)
|> Workflow.add(:d, MyWorkflow.new(%{}), deps: :c, ignore_deleted: true)

Preventing Stuck Workflows

Be sure that you're running the DynamicLifeline to rescue stuck workflows when upstream dependencies are deleted unexpectedly.

 config :my_app, Oban,
   plugins: [Oban.Pro.Plugins.DynamicLifeline],
   ...

Cancellation Callbacks

Workflow jobs are automatically cancelled when their upstream dependencies are cancelled, discarded, or deleted (unless specifically configured with the ignore_* options mentioned above). Since these workflow jobs are cancelled before execution, standard Oban.Pro.Worker.after_process/3 hooks won't be called.

Instead, you can implement the optional after_cancelled/2 callback specifically for workflows:

defmodule MyApp.Workflow do
  use Oban.Pro.Worker

  @behaviour Oban.Pro.Workflow

  require Logger

  @impl Oban.Pro.Workflow
  def after_cancelled(reason, job) do
    Logger.warn("Workflow job #{job.id} cancelled because a dependency was #{reason}")
    
    # Perform any cleanup required
    notify_monitoring_system(job, reason)

    :ok
  end
end

The reason parameter will be one of:

  • :cancelled - A dependency was cancelled
  • :discarded - A dependency was discarded
  • :deleted - A dependency was deleted

Handling Failures with Status Checking

For situations where you can't use the after_cancelled callback, or when you have many different workers in a workflow, you can combine the ignore_cancelled option with status checking in a downstream job.

This pattern allows a final "cleanup" job to examine the state of all upstream jobs and take appropriate action:

Workflow.new(ignore_cancelled: true)
|> Workflow.add(:step_1, StepOne.new(%{}))
|> Workflow.add(:step_2, StepTwo.new(%{}), deps: :step_1)
|> Workflow.add(:step_3, StepThree.new(%{}), deps: :step_2)
|> Workflow.add(:report, Reporter.new(%{}), deps: ~w(step_1 step_2 step_3))

Then, in your final worker, examine the state of the upstream jobs using status/1:

defmodule MyApp.Reporter do
  use Oban.Pro.Worker

  @impl true
  def process(job) do
    status = Workflow.status(job)

    if status.counts.cancelled > 0 do
      # Some results were cancelled
    else
      :ok
    end
  end
end

This approach offers several advantages over hooks:

  • It works with any type of worker, including decorated functions and cascades
  • You can centralize error handling in a single place
  • The workflow continues to the reporting step even if some steps fail

Introspection

Workflow jobs are tied together through meta attributes. You can retrieve these jobs using several functions, each with different optimization characteristics.

Fetching Jobs

There are several functions for retrieving jobs, get_job/2 and all_jobs/3, which are optimized for different use cases:

# Get a single specific job by name
job_a = Workflow.get_job(job, :a)

# Get all jobs in a workflow
all_jobs = Workflow.all_jobs(job)

# Get only the dependencies of the current job
deps = Workflow.all_jobs(job, only_deps: true)

# Get specific named jobs
[job_a, job_b] = Workflow.all_jobs(job, names: [:a, :b])

# Access from outside a worker using workflow_id
jobs = Workflow.all_jobs("some-uuid-1234-5678")

Streaming Jobs

For large workflows where loading all jobs at once would consume too much memory, use stream_jobs/3:

def process(job) do
  {:ok, workflow_jobs} =
    MyApp.Repo.transaction(fn ->
      job
      |> Oban.Pro.Workflow.stream_jobs()
      |> Stream.filter(& &1.state == "completed")
      |> Enum.to_list()
    end)

  process_completed_jobs(workflow_jobs)

  :ok
end

Transaction Required

Streaming is provided by Ecto's Repo.stream, and must take place within a transaction. This approach lets you control the number of jobs loaded from the database, minimizing memory usage for large workflows.

Retrieving Results

To fetch recorded output from workflow jobs, without loading the full job first, use get_recorded/2 and all_recorded/3:

# Get results from a specific job
result_a = Workflow.get_recorded(job, :a)

# Get results from all jobs
results = Workflow.all_recorded(job)

# Get only results from dependencies
dep_results = Workflow.all_recorded(job, only_deps: true)

Checking Status

To get detailed status information about a workflow, including the overall workflow state, job counts, time measurement, and sub-workflow details, use status/1:

status = Workflow.status(job)

This outputs a map with details, somewhat like this:

%{
  id: "some-uuid-1234-5678",
  name: "my-workflow",
  total: 3,
  state: :completed,
  duration: 72,
  counts: %{
    cancelled: 0,
    available: 0,
    completed: 3,
    scheduled: 0,
    discarded: 0,
    executing: 0,
    retryable: 0
  },
  started_at: ~U[2025-04-10 16:34:34],
  stopped_at: ~U[2025-04-10 16:34:34],
  subs: %{}
}

Visualization

Workflows are a type of a Directed Acyclic Graph, which means we can represent them visually as nodes (jobs) connected by edges (dependencies). By converting workflows into standard graph description languages, we can create clear visual representations of job execution patterns.

Converting to Graph Formats

Along with the general purpose to_graph/1 that outputs a digraph, there are several built-in visualization options:

For example, let's generate a mermaid flowchart from our account workflow:

Workflow.to_mermaid(archive_account_workflow(123))

This produces the following mermaid output, where each vertex represents a job's name and the label shows the worker:

flowchart TD
    backup_1[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    backup_2[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    backup_3[MyApp.BackupPost] --> delete[MyApp.DeleteAccount]
    receipt[MyApp.FinalReceipt] --> backup_1[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> backup_2[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> backup_3[MyApp.BackupPost]
    receipt[MyApp.FinalReceipt] --> email_1[MyApp.EmailSubscriber]
    receipt[MyApp.FinalReceipt] --> email_2[MyApp.EmailSubscriber]
    email_1[MyApp.EmailSubscriber] --> delete[MyApp.DeleteAccount]
    email_2[MyApp.EmailSubscriber] --> delete[MyApp.DeleteAccount]

Rendering Visualizations

You can render these visualizations using the command line with a mermaid binary, directly in LiveBook using Kino, or in documentation sites.

For LiveBook, you can pipe a workflow through to_mermaid/1 and then into Kino.Mermaid:

workflow
|> Workflow.to_mermaid()
|> Kino.Mermaid.new()

This generates an SVG diagram like this:

MyApp.BackupPost
MyApp.DeleteAccount
MyApp.BackupPost
MyApp.BackupPost
MyApp.FinalReceipt
MyApp.EmailSubscriber
MyApp.EmailSubscriber

The visualization clearly shows how our workflow starts with a single receipt job, fans-out to multiple email and backup jobs, and finally fans-in to the delete job—making complex dependency relationships immediately apparent.

Summary

Callbacks

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

Functions

Add a named job to the workflow along with optional dependencies.

Add a cascading function to the workflow with optional dependencies.

Add multiple named jobs as a sub workflow along with optional dependencies.

Add a sub workflow with a name and optional dependencies to another workflow.

Get all jobs for a workflow, optionally filtered by upstream deps.

Get all recordings for workflow jobs, optionally filtered by name or relationship.

Instantiate a new workflow from one or more existing workflow jobs.

Cancel one or more workflow jobs.

Gets a workflow's context value.

Get a single workflow job by name.

Fetch the recorded output from a workflow job.

Instantiate a new workflow struct with a unique workflow id.

Adds a context value to the workflow that can be accessed by all jobs.

Get detailed status information about a workflow.

Stream all jobs for a workflow.

Converts the given workflow to a graphviz dot digraph.

Converts the given workflow to a :digraph.

Generate a Mermaid flowchart in top-down orientation from a workflow.

Types

add_cascade_opts()

@type add_cascade_opts() :: [Oban.Job.option() | add_opts()]

add_opts()

@type add_opts() :: [
  deps: name() | [name()],
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean()
]

add_workflow_opts()

@type add_workflow_opts() :: [{:deps, name() | [name()]}]

append_opts()

@type append_opts() :: [
  check_deps: boolean(),
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean(),
  workflow_id: String.t(),
  workflow_name: String.t()
]

cancel_opts()

@type cancel_opts() :: [{:names, [name()]}]

cancel_reason()

@type cancel_reason() :: :deleted | :discarded | :cancelled

cascade_capture()

@type cascade_capture() :: (map() -> any()) | {Enum.t(), (any(), map() -> any())}

fetch_opts()

@type fetch_opts() :: [
  log: Logger.level(),
  names: [name()],
  only_deps: boolean(),
  timeout: timeout(),
  with_subs: boolean()
]

job_or_wid()

@type job_or_wid() :: Oban.Job.t() | workflow_id()

name()

@type name() :: atom() | String.t()

new_opts()

@type new_opts() :: [
  ignore_cancelled: boolean(),
  ignore_deleted: boolean(),
  ignore_discarded: boolean(),
  workflow_id: String.t(),
  workflow_name: String.t()
]

status()

@type status() :: %{
  id: workflow_id(),
  name: String.t() | nil,
  total: non_neg_integer(),
  state: :executing | :completed | :cancelled | :discarded,
  counts: %{required(atom()) => non_neg_integer()},
  duration: non_neg_integer() | nil,
  started_at: DateTime.t(),
  stopped_at: DateTime.t() | nil,
  subs: %{required(String.t()) => map()}
}

t()

@type t() :: %Oban.Pro.Workflow{
  changesets: [Oban.Job.changeset()],
  check_deps: boolean(),
  id: workflow_id(),
  names: MapSet.t(),
  opts: map(),
  subs: term()
}

workflow_id()

@type workflow_id() :: String.t()

Callbacks

after_cancelled(cancel_reason, job)

(optional)
@callback after_cancelled(cancel_reason(), job :: Oban.Job.t()) :: :ok

Called after a workflow job is cancelled due to upstream jobs being cancelled, deleted, or discarded.

This callback is only called when a job is cancelled because of an upstream dependency. It is never called after normal job execution. For that, use Oban.Pro.Worker.after_process/3.

Functions

add(workflow, name, changeset, opts \\ [])

@spec add(t(), name(), Oban.Job.changeset(), add_opts()) :: t()

Add a named job to the workflow along with optional dependencies.

Examples

Add jobs to a workflow with dependencies:

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{id: id}))
|> Workflow.add(:b, WorkerB.new(%{id: id}), deps: :a)
|> Workflow.add(:c, WorkerC.new(%{id: id}), deps: :a)
|> Workflow.add(:d, WorkerC.new(%{id: id}), deps: [:b, :c])

add_cascade(workflow, name, fun, opts \\ [])

@spec add_cascade(t(), name(), cascade_capture(), add_cascade_opts()) :: t()

Add a cascading function to the workflow with optional dependencies.

Cascading functions receive a context map containing the workflow's shared context and any results from their upstream dependencies. Results of the function are automatically recorded, unless they're one of the following standard control tuples:

  • {:cancel, reason}
  • {:error, reason}
  • {:snooze, seconds}

Function Captures Only

Cascading functions must be captures like &MyApp.foo/1 for external functions or &foo/1 for local functions. Anonymous functions won't work. The function always receives a single map parameter containing all context and previous results.

Cascading Sub-Workflow

Cascades can be added as sub-workflows that efficiently fan out workloads across multiple items. This works similar to add_many/4, but for cascading functions.

To create a fan-out cascade, provide a tuple containing an enumerable (like a list, map, or range) and a function capture with arity 2. The function will be called once for each item in the enumerable, with the first argument receiving the current item and the second argument receiving the cumulative context.

user_ids = [101, 202, 303]

Workflow.new()
|> Workflow.put_context(%{operation: "sync"})
|> Workflow.add_cascade(:users, {user_ids, &MyApp.sync_user/2})
|> Workflow.add_cascade(:notify, &MyApp.send_summary/1, deps: :users)

def sync_user(user_id, context) do
  # ...
end

Options

A subset of standard job options can be passed directly to customize the cascade jobs. The available options are:

  • :max_attempts
  • :meta
  • :priority
  • :queue
  • :tags

Pass the additional options alongside deps or other add/4 workflow options.

Examples

Add a cascading function to a workflow:

Workflow.new()
|> Workflow.put_context(%{user_id: 123})
|> Workflow.add_cascade(:activate, &MyApp.activate/1)
|> Workflow.add_cascade(:notify, &MyApp.notify/1, deps: :activate)
|> Oban.insert_all()

Customize the cascade job:

Workflow.new()
|> Workflow.add_cascade(:activate, &MyApp.activate/1, queue: :activation)
|> Workflow.add_cascade(:retrofit, &MyApp.retrofit/1, max_attempts: 3)

Add cascade steps from a range:

Workflow.new()
|> Workflow.put_context(%{user_id: 123})
|> Workflow.add_many_cascades(:activations, 1..5, &MyApp.activate/2)

Add cascade steps from a map, using the map keys as custom names:

accounts = %{admin: 1, user: 2, guest: 3}

Workflow.new()
|> Workflow.put_context(%{action: "sync"})
|> Workflow.add_many_cascades(:accounts, accounts, &MyApp.sync_account/2)

add_many(workflow, name, changesets, opts \\ [])

@spec add_many(t(), name(), Enum.t(), new_opts() | add_opts()) :: t()

Add multiple named jobs as a sub workflow along with optional dependencies.

Examples

Add multiple jobs as a sub workflow with a list, where the keys will be the job's position, starting at 0:

Workflow.add_many(workflow, :sub_1, [WorkerA.new(%{}), WorkerB.new(%{})])

Add multiple jobs as a sub workflow with a map, where the keys are the job names:

Workflow.add_many(workflow, :sub_1, %{a: WorkerA.new(%{}), b: WorkerB.new(%{})})

Add multiple jobs as sub workflows and add a dependency on them:

Workflow.new()
|> Workflow.add_many(:a, changesets_a)
|> Workflow.add_many(:b, changesets_b)
|> Workflow.add(:c, WorkerC.new(%{}), deps: [:a, :b])

Workflow options pay be provided to name or otherwise customize the sub-workflow:

Workflow.add_many(workflow, :sub_1, changesets, ignore_cancelled: true)

add_workflow(workflow, name, sub_workflow, opts \\ [])

@spec add_workflow(t(), name(), t(), add_workflow_opts()) :: t()

Add a sub workflow with a name and optional dependencies to another workflow.

This function lets you compose workflows by adding one workflow as a dependency within another workflow. Workflows may depend on jobs or other workflows, and jobs may depend on entire workflows. If a job depends on a workflow, it won't execute until every job in the workflow is in a final state (completed, cancelled, or discarded).

Examples

Add a sub workflow to a workflow with dependencies:

sub_1 =
  Workflow.new()
  |> Workflow.add(:a, WorkerB.new(%{id: id}))
  |> Workflow.add(:b, WorkerC.new(%{id: id}))

sub_2 =
  Workflow.new()
  |> Workflow.add(:a, WorkerB.new(%{id: id}))
  |> Workflow.add(:b, WorkerC.new(%{id: id}))

Workflow.new()
|> Workflow.add(:a, WorkerA.new(%{id: id}))
|> Workflow.add_workflow(:b, sub_1, deps: :a)
|> Workflow.add_workflow(:c, sub_2, deps: :b)
|> Workflow.add(:d, WorkerD.new(%{id: id}), deps: [:b, :c])

all_jobs(oban_name, job_or_wid, opts)

@spec all_jobs(Oban.name(), job_or_wid(), fetch_opts()) :: [Oban.Job.t()]

Get all jobs for a workflow, optionally filtered by upstream deps.

Examples

Retrieve all workflow jobs within a process/1 function:

def process(job) do
  job
  |> Workflow.all_jobs()
  |> do_things_with_jobs()

  :ok
end

Retrieve all of the current job's deps:

jobs = Workflow.all_jobs(job, only_deps: true)

Retrieve an explicit list of dependencies:

[job_a, job_b] = Workflow.all_jobs(job, names: [:a, :b])

Include sub workflow jobs:

[sub_a, sub_b] = Workflow.all_jobs(job, names: [:sub], with_subs: true)

Retrieve all jobs using a workflow_id:

jobs = Workflow.all_jobs("some-uuid-1234-5678")

Retrieve only some named jobs using a workflow_id:

[job_a, job_b] = Workflow.all_jobs("some-uuid-1234-5678", deps: [:a, :b])

Retrieve all jobs using a workflow_id and a custom Oban instance name:

jobs = Workflow.all_jobs(MyApp.Oban, "some-uuid-1234-5678")

all_recorded(oban_name, job_or_wid, opts)

@spec all_recorded(Oban.name(), job_or_wid(), fetch_opts()) :: map()

Get all recordings for workflow jobs, optionally filtered by name or relationship.

The result is a map of the job's name and recorded output. Unrecorded jobs, or those without a recording will be present in the map with a a nil value.

Examples

Retrieve results for all workflow jobs within a process/1 function:

def process(job) do
  %{"a" => job_a_return, "b" => job_b_return} = Workflow.all_recorded(job)

  :ok
end

Retrieve recordings for all of the current job's deps:

recorded = Workflow.all_recorded(job, only_deps: true)

Retrieve recordings for a list of dependencies:

%{"a" => _, "b" => _} = Workflow.all_recorded(job, names: [:a, :b])

Retrieve recordings for some named jobs using the workflow_id:

%{"a" => _, "b" => _} = Workflow.all_recorded("some-uuid-1234-5678", names: [:a, :b])

append(jobs, opts \\ [])

@spec append(Oban.Job.t() | [Oban.Job.t()], append_opts()) :: t()

Instantiate a new workflow from one or more existing workflow jobs.

Examples

Append to a workflow seeded with all other jobs in the workflow:

jobs
|> Workflow.append()
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()

Append to a workflow from a single job and bypass checking deps:

job
|> Workflow.append(check_deps: false)
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()

cancel_jobs(oban_name, job_or_wid, opts)

@spec cancel_jobs(Oban.name(), job_or_wid(), cancel_opts()) :: {:ok, integer()}

Cancel one or more workflow jobs.

This uses Oban.cancel_all_jobs/2 internally, and adheres to the same cancellation rules. Namely, it won't touch jobs in a completed, cancelled, or discarded state.

Examples

Cancel jobs with a workflow job from a process/1 function:

def process(job) do
  if should_stop_processing?(job.args) do
    Workflow.cancel_jobs(job)
  else
    ...
  end
end

Cancel specific workflow jobs:

Workflow.cancel_jobs("some-uuid-1234-5678", names: [:a, :b])

Cancel all jobs in a workflow by workflow_id:

Workflow.cancel_jobs("some-uuid-1234-5678")

Cancel jobs from anywhere with a custom Oban instance name:

Workflow.cancel_jobs(MyApp.Oban, "some-uuid-1234-5678")

get_context(oban_name \\ Oban, job_or_wid)

@spec get_context(Oban.name(), job_or_wid()) :: nil | map()

Gets a workflow's context value.

This is a convenience function that internally calls get_recorded/2 with the :context name.

Examples

Retrieve the context within a worker:

def process(job) do
  context = Workflow.get_context(job)
end

Retrieve the context with a workflow id:

Workflow.get_context("some-uuid-1234-5678")

Retrieve the context using a custom Oban instance name:

Workflow.get_context(MyApp.Oban, "some-uuid-1234-5678")

get_job(oban_name \\ Oban, job_or_wid, deps_name)

@spec get_job(Oban.name(), job_or_wid(), name()) :: nil | Oban.Job.t()

Get a single workflow job by name.

Examples

Get the workflow job named :step_1 from within process/:

def process(job) do
  case Workflow.get_job(job, :step_1) do
    nil -> ...
    dep_job -> use_the_job(dep_job)
  end
end

Get a named workflow job with the workflow_id:

Workflow.get_job("some-uuid-1234-5678", :step_2)

Get a named workflow job with the workflow_id and a custom Oban instance name:

Workflow.get_job(MyApp.Oban, "some-uuid-1234-5678", :step_2)

get_recorded(oban_name \\ Oban, job_or_wid, deps_name)

@spec get_recorded(Oban.name(), job_or_wid(), name()) :: any()

Fetch the recorded output from a workflow job.

This function provides an optimized way to get recorded output with a single query.

A nil value is returned when a dependency is incomplete, missing, unrecorded, or lacks recorded output.

Examples

Get the recorded output from :step_1 within process/:

def process(job) do
  case Workflow.get_recorded(job, :step_1) do
    nil -> ...
    val -> use_recorded_output(val)
  end
end

Get recorded output using the workflow id:

Workflow.get_recorded("some-uuid-1234-5678", :step_2)

Get recorded output using the workflow_id and a custom Oban instance name:

Workflow.get_recorded(MyApp.Oban, "some-uuid-1234-5678", :step_2)

new(opts \\ [])

@spec new(opts :: new_opts()) :: t()

Instantiate a new workflow struct with a unique workflow id.

Examples

Create a standard workflow without any options:

Workflow.new()

Create a workflow with a custom name:

Workflow.new(workflow_name: "logistics")

Create a workflow with a static id and some options:

Workflow.new(workflow_id: "workflow-id", ignore_cancelled: true, ignore_discarded: true)

put_context(workflow, value)

@spec put_context(t(), map()) :: t()

Adds a context value to the workflow that can be accessed by all jobs.

This is a convenience function that creates a recorded job named :context containing any value that needs to be shared across multiple workflow jobs.

Examples

Add a context value to a workflow:

Workflow.new()
|> Workflow.put_context(%{user_id: 123, action: "process"})
|> Workflow.add(:job_a, WorkerA.new(%{}))
|> Workflow.add(:job_b, WorkerB.new(%{}))
|> Oban.insert_all()

Later, retrieve the context in a worker:

def process(job) do
  context = Workflow.get_context(job)
  # Use context map...
end

retry_jobs(oban_name \\ Oban, job_or_wid, opts \\ [])

@spec retry_jobs(Oban.name(), job_or_wid(), keyword()) :: {:ok, integer()}

Retry all jobs in a workflow.

By default, this function will retry all jobs that aren't executing or available. Jobs with dependencies will be placed on hold to run again once their dependencies are complete.

Examples

Retry only failed jobs in a workflow from within a process function:

def process(job) do
  Workflow.retry_jobs(job)
end

Retry a workflow by ID:

Workflow.retry_jobs("some-uuid-1234-5678")

Retry a workflow with a custom Oban instance:

Workflow.retry_jobs(MyApp.Oban, "some-uuid-1234-5678")

status(oban_name \\ Oban, job_or_wid)

@spec status(Oban.name(), job_or_wid()) :: status()

Get detailed status information about a workflow.

Returns a map with the following fields:

  • :id - The workflow's unique id
  • :name - The name of the workflow, if it has one
  • :total - Total number of jobs in the workflow
  • :state - Overall status of the workflow (e.g. executing, completed, cancelled, discarded)
  • :counts - Map of job state counts (e.g. %{completed: 10, available: 2})
  • :duration - Elapsed time of the workflow in milliseconds (nil if not yet ran)
  • :started_at - When the first job in the workflow was inserted
  • :stopped_at - When the last job in the workflow finished
  • :subs - A map of status output for sub-worfklows where the key is the sub name and the value is a status/1 output

An example of the full output:

%{
  id: "some-uuid-1234-5678",
  name: "my-workflow",
  total: 3,
  state: :completed,
  duration: 72,
  counts: %{
    cancelled: 0,
    available: 0,
    completed: 3,
    scheduled: 0,
    discarded: 0,
    executing: 0,
    retryable: 0
  },
  started_at: ~U[2025-04-10 16:34:34.114730Z],
  stopped_at: ~U[2025-04-10 16:34:34.186935Z],
  subs: %{}
}

Examples

Get a workflow's status using a job:

status = Workflow.status(job)

Get a workflow's status using the workflow_id:

status = Workflow.status("some-uuid-1234-5678")

Get a workflow's status using a custom Oban instance name:

status = Workflow.status(MyApp.Oban, "some-uuid-1234-5678")

stream_jobs(oban_name, job_or_wid, opts)

@spec stream_jobs(Oban.name(), Oban.Job.t() | String.t(), fetch_opts()) :: Enum.t()

Stream all jobs for a workflow.

This function behaves identically to all_jobs/3, except it streams jobs lazily from within a Repo transaction.

Examples

Stream with filtering to only preserve completed jobs:

def process(job) do
  {:ok, workflow_jobs} =
    MyApp.Repo.transaction(fn ->
      job
      |> Workflow.stream_jobs()
      |> Stream.filter(& &1.state == "completed")
      |> Enum.to_list()
    end)

  do_things_with_jobs(workflow_jobs)

  :ok
end

Stream workflow jobs from anywhere using the workflow_id:

MyApp.Repo.transaction(fn ->
  "some-uuid-1234-5678"
  |> Workflow.stream_jobs()
  |> Enum.map(& &1.args["account_id"])
end)

Stream workflow jobs using a custom Oban instance name:

MyApp.Repo.transaction(fn ->
  MyApp.Oban
  |> Workflow.stream_jobs("some-uuid-1234-5678")
  |> Enum.map(& &1.args["account_id"])
end)

to_dot(workflow)

@spec to_dot(flow :: t()) :: String.t()

Converts the given workflow to a graphviz dot digraph.

Examples

Generate a dot digraph:

Workflow.to_dot(workflow)

to_graph(workflow)

@spec to_graph(flow :: t()) :: :digraph.graph()

Converts the given workflow to a :digraph.

The resulting digraph can be explored, evaluated, and then be converted to a number of other formats.

Examples

Generate a digraph from a workflow:

graph = Workflow.to_graph(workflow)

Check the path between jobs in a workflow:

workflow
|> Workflow.to_graph()
|> :digraph.get_path("step_1", "step_5")

to_mermaid(workflow)

@spec to_mermaid(flow :: t()) :: String.t()

Generate a Mermaid flowchart in top-down orientation from a workflow.

Examples

Generate a flowchart:

Workflow.to_mermaid(workflow)