Oban.Pro.Workers.Workflow behaviour (Oban Pro v1.4.2)
Workflow workers compose together with arbitrary dependencies between jobs, allowing sequential, fan-out, and fan-in execution workflows. Workflows are fault tolerant, can be homogeneous or heterogeneous, and scale horizontally across all available nodes.
Workflow jobs aren't executed until all upstream dependencies have completed. This includes waiting on retries, scheduled jobs, or snoozing.
Usage
Each worker within a workflow can use Oban.Pro.Workers.Workflow
rather than Oban.Worker
to
automatically inject helper functions like new_workflow/1
and add/4
.
Our example workflow will coordinate archiving an account. To properly archive the account we must perform the following workers:
FinalReceipt
— generate a final receipt and email it to the account ownerEmailSubscriber
— email each subscriber on the accountBackupPost
— up all important posts on the account to external storageDeleteAccount
— delete the account from the database
We need step 1 to run before step 2 or 3, but both steps 2 and 3 can run in parallel. After all
of the jobs in steps 2 and 3 finish we can finally delete the account from the database. For
demonstration purposes we'll define one of the workers with a stubbed-in process/1
function:
defmodule MyApp.FinalReceipt do
use Oban.Pro.Workers.Workflow, queue: :default
@impl true
def process(%Job{} = _job) do
# Generate the final receipt.
:ok
end
end
Note that we define a process/1
callback instead of perform/1
because the perform/1
function coordinates the workflow. The process/1
function receives an Oban.Job
struct, just
like perform/1
and it should return similar values, i.e. :ok
, {:ok, value}
.
Assuming that you've defined the other workers, we can construct a workflow to enforce sequential execution:
alias MyApp.{BackupPost, DeleteAccount, EmailSubscriber, FinalReceipt}
def archive_account(acc_id) do
FinalReceipt.new_workflow()
|> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
|> FinalReceipt.add(:email, EmailSubscriber.new(%{id: acc_id}), deps: [:receipt])
|> FinalReceipt.add(:backup, BackupPost.new(%{id: acc_id}), deps: [:receipt])
|> FinalReceipt.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: [:backup, :receipt])
|> Oban.insert_all()
end
Here we use new_workflow/1
to initialize a new workflow (later on we will look at how to
customize workflow behaviour with various options). Jobs are then added to the workflow with
add/3,4
, where the first argument is an existing workflow, the second argument is unique name,
the third is a job changeset, and the fourth an optional list of dependencies. Finally, the
workflow passes directly to Oban.insert_all/1,2
, which handles inserting each of the jobs into
the database atomically.
All workers that use Workflow
have new_workflow/1
and add/4
functions defined, so it
doesn't matter which module you use to initialize or to add new jobs.
Dependency resolution guarantees that jobs execute in the order receipt -> email -> backup -> delete
, even if one of the jobs fails and needs to retry. Any job that defines a dependency
will wait for each upstream dependency to complete before it starts.
Recall how our original specification stated that there could be multiple email
and backup
jobs? The workflow we've built only handles one email or backup job at a time.
Let's modify the workflow to fan-out to multiple email
and backup
jobs and then fan-in to
the final delete
job:
def archive_account_workflow(acc_id) do
subscribers = subscribers_for_account(acc_id)
documents = documents_for_account(acc_id)
email_deps = Enum.map(subscribers, &"email_#{&1.id}")
backup_deps = Enum.map(documents, &"backup_#{&1.id}")
delete_deps = email_deps ++ backup_deps
FinalReceipt.new_workflow()
|> FinalReceipt.add(:receipt, FinalReceipt.new(%{id: acc_id}))
|> add_email_jobs(subscribers)
|> add_backup_jobs(documents)
|> DeleteAccount.add(:delete, DeleteAccount.new(%{id: acc_id}), deps: delete_deps)
end
defp add_email_jobs(workflow, subscribers) do
Enum.reduce(subscribers, workflow, fn %{id: id, email: email}, acc ->
EmailSubscriber.add(acc, "email_#{id}", EmailSubscriber.new(%{email: email}), deps: [:receipt])
end)
end
defp add_backup_jobs(workflow, subscribers) do
Enum.reduce(subscribers, workflow, fn %{id: id}, acc ->
BackupPost.add(acc, "backup_#{id}", BackupPost.new(%{id: id}), deps: [:receipt])
end)
end
Now the workflow will run all of the email
and backup
jobs we need, before deleting the
account. To confirm the flow without inserting and executing jobs we can visualize it.
Creating Workflows
Workflows use conservative defaults for safe, and relatively quick, dependency resolution. You can customize the safety checks, waiting times and resolution intensity by providing a few top-level options:
ignore_cancelled
— regardcancelled
dependencies as completed rather than halting the workflow. Defaults tofalse
.ignore_discarded
— regarddiscarded
dependencies as completed rather than halting the workflow. Defaults tofalse
.ignore_deleted
— regard deleted (typically pruned) dependencies as completed rather than halting the workflow. Defaults tofalse
.
The following example creates a workflow with all of the available options:
alias Oban.Pro.Workers.Workflow
workflow = Workflow.new(
ignore_cancelled: true,
ignore_deleted: true,
ignore_discarded: true,
)
Internally, the meta
field stores options for each job. That makes it possible to set or
override workflow options per-job. For example, configure a single job to ignore cancelled
dependencies and another to ignore discarded:
MyWorkflow.new_workflow()
|> MyWorkflow.add(:a, MyWorkflow.new(%{}))
|> MyWorkflow.add(:b, MyWorkflow.new(%{}, deps: [:a], ignore_cancelled: true)
|> MyWorkflow.add(:c, MyWorkflow.new(%{}, deps: [:b], ignore_discarded: true)
Dependency resolution relies on jobs lingering in the database after execution. If your system
prunes job dependencies then the workflow may never complete. To override this behaviour, set
ignore_deleted: true
on your workflows.
Generating Workflow IDs
By default workflow_id
is a time-ordered random UUIDv7. This is more than sufficient
to ensure that workflows are unique for any period of time. However, if you require more control
you can override workflow_id
generation at the worker level, or pass a value directly to the
new_workflow/1
function.
To override the workflow_id
for a particular workflow you override the gen_id/0
callback:
defmodule MyApp.Workflow do
use Oban.Pro.Workers.Workflow
# Generate a 24 character long random string instead
@impl true
def gen_id do
24
|> :crypto.strong_rand_bytes()
|> Base.encode64()
end
...
end
The gen_id/0
callback works for random/non-deterministic id generation. If you'd prefer to
use a deterministic id instead you can pass the workflow_id
in as an option to
new_workflow/1
:
MyApp.Workflow.new_workflow(workflow_id: "custom-id")
Using this technique you can verify the workflow_id
in tests or append to the workflow
manually after it was originally created.
Appending to a Workflow
Sometimes all jobs aren't known when the workflow is created. In that case, you can add more
jobs with optional dependency checking using append/2
. An appended workflow starts with one or
more jobs, which reuses the original workflow id, and optionally builds a set of dependencies
for checking.
In this example we disable deps checking with check_deps: false
:
defmodule MyApp.WorkflowWorker do
use Oban.Pro.Workers.Workflow
@impl true
def process(%Job{} = job) do
jobs =
job
|> Workflow.append(check_deps: false)
|> add(:d, WorkerD.new(%{}), deps: [:a])
|> add(:e, WorkerE.new(%{}), deps: [:b])
|> add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
{:ok, jobs}
end
end
The new jobs specify deps on preexisting jobs named :a
, :b
, and :c
, but there isn't any
guarantee those jobs actually exist. That could lead to an incomplete workflow where the new
jobs may never complete.
To be safe and check jobs while appending we'll fetch all of the previous jobs and feed them in:
defmodule MyApp.WorkflowWorker do
use Oban.Pro.Workers.Workflow
@impl true
def process(%Job{} = job) do
{:ok, jobs} =
MyApp.Repo.transaction(fn ->
job
|> Workflow.stream_jobs()
|> Enum.to_list()
end)
jobs
|> Workflow.append()
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Workflow.add(:f, WorkerF.new(%{}), deps: [:c])
|> Oban.insert_all()
:ok
end
end
Now there isn't any risk of an incomplete workflow, at the expense of loading some extraneous jobs.
Fetching Workflow Jobs
The all_jobs/1,2
function simplifies loading all jobs in a workflow from within a worker. For
example, to fetch all of the jobs in a workflow:
defmodule MyApp.Workflow do
use Oban.Pro.Workers.Workflow
@impl Workflow
def process(%Job{} = job) do
job
|> Workflow.all_jobs()
|> do_things_with_jobs()
:ok
end
end
It's also possible to scope fetching to only dependencies of the current job:
deps = Workflow.all_jobs(job, only_deps: true)
Or only a single explicit dependency:
[dep_job] = Workflow.all_jobs(job, names: [:a])
For large workflows it's not efficient to load all jobs in memory at once. In that case, you can
the stream_jobs/1,2
callback instead to fetch jobs lazily. For example, to stream all of the
completed
jobs in a workflow:
defmodule MyApp.Workflow do
use Oban.Pro.Workers.Workflow
@impl Workflow
def process(%Job{} = job) do
{:ok, workflow_jobs} =
MyApp.Repo.transaction(fn ->
job
|> Workflow.stream_jobs()
|> Stream.filter(& &1.state == "completed")
|> Enum.to_list()
end)
do_things_with_jobs(workflow_jobs)
:ok
end
end
Streaming is provided by Ecto's Repo.stream
, and it must take place within a transaction.
Using a stream lets you control the number of jobs loaded from the database, minimizing memory
usage for large workflows.
Visualizing Workflows
Workflows are a type of Directed Acyclic Graph, also known as a DAG. That means we can describe a workflow as a graph of jobs and dependencies, where execution flows between jobs. By converting the workflow into DOT notation, a standard graph description language, we can render visualizations!
Dot generation relies on libgraph, which is an optional dependency. You'll need to specify it as a dependency before generating dot output:
def deps do
[{:libgraph, "~> 0.7"}]
end
Once you've installed libgraph
, we can use to_dot/1
to convert a workflow. As with
new_workflow
and add
, all workflow workers define a to_dot/1
function that takes a
workflow and returns a dot formatted string. For example, calling to_dot/1
with the account
archiving workflow from above:
FinalReceipt.to_dot(archive_account_workflow(123))
Generates the following dot output, where each vertex is a combination of the job's name in the workflow and its worker module:
strict digraph {
"delete (MyApp.DeleteAccount)"
"backup_1 (MyApp.BackupPost)"
"backup_2 (MyApp.BackupPost)"
"backup_3 (MyApp.BackupPost)"
"receipt (MyApp.FinalReceipt)"
"email_1 (MyApp.EmailSubscriber)"
"email_2 (MyApp.EmailSubscriber)"
"backup_1 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"backup_2 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"backup_3 (MyApp.BackupPost)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_1 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_2 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "backup_3 (MyApp.BackupPost)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "email_1 (MyApp.EmailSubscriber)" [weight=1]
"receipt (MyApp.FinalReceipt)" -> "email_2 (MyApp.EmailSubscriber)" [weight=1]
"email_1 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
"email_2 (MyApp.EmailSubscriber)" -> "delete (MyApp.DeleteAccount)" [weight=1]
}
Now we can take that dot output and render it using a tool like graphviz. The following example function accepts a workflow and renders it out as an SVG:
defmodule WorkflowRenderer do
alias Oban.Pro.Workers.Workflow
def render(workflow) do
dot_path = "workflow.dot"
svg_path = "workflow.svg"
File.write!(dot_path, Workflow.to_dot(workflow))
System.cmd("dot", ["-T", "svg", "-o", svg_path, dot_path])
end
end
With graphviz installed, that will generate a SVG of the workflow:
Looking at the visualized graph we can clearly see how the workflow starts with a single
render
job, fans-out to multiple email
and backup
jobs, and finally fans-in to the
delete
job—exactly as we planned!
Summary
Callbacks
Delegates to add/4
.
Delegates to all_jobs/2
.
Delegates to append/2
.
Generate a unique string to identify the workflow.
Instantiate a new workflow struct with a unique workflow id.
Delegates to stream_jobs/2
.
Delegates to to_dot/1
.
Functions
Add a named job to the workflow along with optional dependencies.
Get all jobs for a workflow, optionally filtered by upstream deps.
Instantiate a new workflow from an existing workflow job or jobs.
Generates a UUIDv7 based workflow id.
Instantiate a new workflow struct with a unique workflow id.
Stream all jobs for a workflow.
Converts the given workflow to DOT format, which can then be converted to a number of other
formats via Graphviz, e.g. dot -Tpng out.dot > out.png
.
Types
add_option()
@type add_option() :: {:deps, [name()]}
append_option()
@type append_option() :: new_option() | {:check_deps, boolean()}
chan()
@type chan() :: Oban.Job.changeset()
fetch_opts()
@type fetch_opts() :: {:log, Logger.level()} | {:names, [name()]} | {:only_deps, boolean()} | {:timeout, timeout()}
name()
new_option()
@type new_option() :: {:ignore_cancelled, boolean()} | {:ignore_deleted, boolean()} | {:ignore_discarded, boolean()} | {:workflow_id, String.t()} | {:workflow_debounce, non_neg_integer()}
Callbacks
add(flow, name, changeset, opts)
@callback add(flow :: t(), name :: name(), changeset :: chan(), opts :: [add_option()]) :: t()
Delegates to add/4
.
all_workflow_jobs(t, list)
@callback all_workflow_jobs(Oban.Job.t(), [fetch_opts()]) :: [Oban.Job.t()]
Delegates to all_jobs/2
.
append_workflow(jobs, list)
@callback append_workflow(jobs :: Oban.Job.t() | [Oban.Job.t()], [append_option()]) :: t()
Delegates to append/2
.
gen_id()
@callback gen_id() :: String.t()
Generate a unique string to identify the workflow.
Defaults to a 128bit UUIDv7.
Examples
Generate a workflow id using random bytes instead of a UUID:
@impl Workflow
def gen_id do
24
|> :crypto.strong_rand_bytes()
|> Base.encode64()
end
new_workflow(opts)
@callback new_workflow(opts :: [new_option()]) :: t()
Instantiate a new workflow struct with a unique workflow id.
Delegates to new/1
and uses the module's gen_id/0
to generate the workflow id.
stream_workflow_jobs(t, list)
@callback stream_workflow_jobs(Oban.Job.t(), [fetch_opts()]) :: Enum.t()
Delegates to stream_jobs/2
.
to_dot(flow)
Delegates to to_dot/1
.
Functions
add(workflow, name, changeset, opts \\ [])
@spec add(flow :: t(), name :: name(), changeset :: chan(), opts :: [add_option()]) :: t()
Add a named job to the workflow along with optional dependencies.
Examples
Add jobs to a workflow with dependencies:
Workflow.new()
|> Workflow.add(:a, MyApp.WorkerA.new(%{id: id}))
|> Workflow.add(:b, MyApp.WorkerB.new(%{id: id}), deps: [:a])
|> Workflow.add(:c, MyApp.WorkerC.new(%{id: id}), deps: [:a])
|> Workflow.add(:d, MyApp.WorkerC.new(%{id: id}), deps: [:b, :c])
all_jobs(job, opts)
@spec all_jobs(Oban.Job.t(), [fetch_opts()]) :: [Oban.Job.t()]
Get all jobs for a workflow, optionally filtered by upstream deps.
Examples
Retrieve all workflow jobs:
@impl Workflow
def process(%Job{} = job) do
job
|> Workflow.all_jobs()
|> do_things_with_jobs()
:ok
end
Retrieve only the current job's deps:
workflow_jobs = Workflow.all_jobs(job, only_deps: true)
Retrieve an explicit list of dependencies:
[job_a, job_b] = Workflow.all_jobs(job, names: [:a, :b])
append(jobs, opts)
@spec append(jobs :: Oban.Job.t() | [Oban.Job.t()], [append_option()]) :: t()
Instantiate a new workflow from an existing workflow job or jobs.
Examples
Append to a workflow seeded with all other jobs in the workflow:
jobs
|> Workflow.append()
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()
Append to a workflow from a single job and bypass checking deps:
job
|> Workflow.append(check_deps: false)
|> Workflow.add(:d, WorkerD.new(%{}), deps: [:a])
|> Workflow.add(:e, WorkerE.new(%{}), deps: [:b])
|> Oban.insert_all()
gen_id()
@spec gen_id() :: String.t()
Generates a UUIDv7 based workflow id.
Examples
iex> Workflow.gen_id()
"018e5d3b-1bb6-7f60-9c12-d6ed50cfff59"
new(opts \\ [])
@spec new(opts :: [new_option()]) :: t()
Instantiate a new workflow struct with a unique workflow id.
Examples
Create a standard workflow without any options:
Workflow.new()
Create a workflow with a static id and some options:
Workflow.new(workflow_id: "workflow-id", ignore_cancelled: true, ignore_discarded: true)
stream_jobs(job, opts)
@spec stream_jobs(Oban.Job.t(), [fetch_opts()]) :: Enum.t()
Stream all jobs for a workflow.
Examples
Stream with filtering to only preserve completed
jobs:
@impl true def process(%Job{} = job) do
{:ok, workflow_jobs} =
MyApp.Repo.transaction(fn ->
job
|> Workflow.stream_jobs()
|> Stream.filter(& &1.state == "completed")
|> Enum.to_list()
end)
do_things_with_jobs(workflow_jobs)
:ok
end
to_dot(workflow)
Converts the given workflow to DOT format, which can then be converted to a number of other
formats via Graphviz, e.g. dot -Tpng out.dot > out.png
.
The default implementation relies on libgraph.
Examples
Generate a DOT graph format from a workflow:
Workflow.to_dot(workflow)