Testing Pro Workers

This guide builds on Testing Workers, which you should read to acquaint yourself with the basics of unit testing workers. With Oban.Pro.Testing helpers, testing Pro workers is identical to testing basic workers, but with a few powerful additions. To see those additions in action, let's step through how to test each of the Pro specific workers.

Testing Batches

Batches link the execution of multiple jobs and optional callbacks after jobs are processed. To test a batch exhaustively, you can exercise process/1 with Oban.Pro.Testing.perform_job/2,3 and use Oban.Pro.Testing.perform_callback/2,3 for any callback handlers.

To demonstrate, we'll define a basic batch worker with a handler for when the batch completes successfully:

defmodule MyApp.MyBatch do
  use Oban.Worker

  @behaviour Oban.Pro.Batch

  @impl true
  def process(%{args: %{"email" => email}}) do
    if MyApp.valid_email?(email) do
      MyApp.deliver_welcome(email)
    else
      {:error, :invalid_email}
    end
  end

  @impl true
  def batch_completed(%{args: %{"admin_email" => email}, meta: %{"batch_id" => bid}}) do
    MyApp.batch_complete(bid, email)

    :ok
  end
end

Testing the worker's process/1 function is straight forward with perform_job/2:

test "delivering welcome emails to valid addresses" do
  assert :ok = perform_job(MyBatch, %{email: "[email protected]"})
  assert {:error, _} = perform_job(MyBatch, %{email: "fake-email"})
end

Similarly, there is a helper for testing callback functions. The helper produces a batch callback job and verifies that the callback function is exported. Here we are verifying the handle_completed/1 callback:

test "notifying admins that a batch completed" do
  assert :ok = perform_callback(MyBatch, :completed, %{admin_email: "[email protected]"})
end

Integration Testing Batches

Oban inserts callback jobs automatically based on the results of each job in the batch; e.g. if each job is completed then there will be a handle_completed/1 callback job. The Oban.Pro.Testing.run_batch/2 helper handles inserting and executing all jobs in a batch, including any appropriate callbacks.

test "running all jobs in a batch and the callbacks" do
  batch =
    ["[email protected]", "[email protected]", "[email protected]"]
    |> Enum.map(&MyBatch.new(%{email: &1}))
    |> MyBatch.new_batch(batch_callback_args: %{admin_email: "[email protected]"})

  assert %{completed: 4} = run_batch(batch)
end

When you're application code inserts a batch on its own, outside the context of your test, you can't call run_batch/1,2. In that case, you can use Oban.Pro.Testing.drain_jobs/1 instead to execute the jobs and the callback:

test "draining batches inserted by application code" do
  :ok = MyApp.welcome_recent_users()

  assert %{completed: 4} = drain_jobs(queue: :all)
end

Testing Chunks

Chunk workers process jobs in groups based on size or a timeout. They are an outlier amongst other workers because the process/1 callback receives a list of jobs rather than a single job. That difference prevents chunks from working with perform_job/2, and instead you can use the Oban.Pro.Testing.perform_chunk/3 helper.

To demonstrate testing chunks we'll define a worker that checks a batch of password hashes against a pwned database and notifies admins when a significant ratio of hashes are compromised.

defmodule MyApp.MyChunk do
  use Oban.Pro.Workers.Chunk, size: 100, timeout: :timer.seconds(30)

  @impl true
  def process([_ | _] = jobs) do
    pwned_count = Enum.count(jobs, fn %{args: args} -> MyApp.was_pwned?(args["hash"]) end)
    pwned_ratio = pwned_count / length(jobs)

    if pwned_ratio > 0.1, do: MyApp.deliver_pwned_alert()

    {:ok, pwned_count}
  end
end

Now, exercise process/1 with perform_chunk/3:

test "calculating the ratio of pwned password hashes" do
  clear_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:clear)}
  pwned_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:pwned)}

  assert {:ok, 3} = perform_chunk(MyChunk, clear_args ++ pwned_args)
end

Unit testing chunks is convenient for checking edge cases, but it lacks the depth and reality of real chunked execution. For that, we need integration testing.

Integration Testing Chunks

During normal execution, chunk size is limited to the configured size. In our example above, the size is set to 100, which means that a chunk may process up to 100 jobs at once. To verify our chunking we'll insert and execute the chunk jobs with Oban.Pro.Testing.run_chunk/2:

test "running up to 100 jobs at a time" do
  jobs = Enum.map(1..150, fn _ -> MyChunk.new(%{hash: MyApp.gen_hash(:clear)}) end)

  assert %{completed: 2} = run_chunk(jobs)

  # Integration tests are about side effects; assert no alert email was delivered
  refute email_delivered()
end

As with the other run_* functions, if you need to execute jobs that were inserted within application code, use drain_jobs instead:

test "draining chunks inserted by application code" do
  # Prepare the database with less than 100 recent users
  :ok = MyApp.check_pwned_signups()

  assert %{completed: 1} = drain_jobs(queue: :chunked)
end

Testing Chains

Chain workers link jobs together to ensure they run in a strict sequential order. Because they lack callbacks like Batches or an alternative process/ signature like Chunks, individual jobs are testable with the standard perform_job/3 helper.

To demonstrate testing a chain, we'll define an account management worker that receives external balance notifications and synchronizes them locally.

defmodule MyApp.MyChain do
  use Oban.Pro.Worker, chain: [by: [args: :account_id]]

  @impl true
  def process(%Job{args: %{"account_id" => account_id, "balance" => balance}}) do
    account_id
    |> MyApp.Account.find()
    |> MyApp.Account.update_balance(balance)
  end
end

Now, use perform_job/3 to verify that updating works as expected:

test "updating account balances" do
  args = %{account_id: insert(:account).id, balance: 100}

  assert {:ok, %{balance: 100}} = perform_job(MyChain, args)
end

The real power of Chains only shows when multiple jobs run in sequence. For that, we need integration testing.

Integration Testing Chains

Downstream execution halts when a Chain job encounters an error. Subsequent jobs snooze while waiting for the upstream job to complete successfully. We can test this with run_chain/2 and by disabling the with_recursion option to prevent the failing job from retrying until exhaustion.

test "hold balance processing when an error occurs" do
  jobs = [
    MyChain.new(%{account_id: account.id, balance: 0.0}),
    MyChain.new(%{account_id: account.id, balance: 100}),
    MyChain.new(%{account_id: account.id, balance: 150})
  ]

  assert %{retryable: 1, scheduled: 2} = run_chain(changesets, with_recursion: false)
end

Testing Workflows

Workflows jobs compose together with arbitrary dependencies that effect if and when jobs are executed. Individual jobs in a workflow are easily tested with perform_job/3, but the real challenge is testing the interplay between jobs as they execute.

For this demonstration we'll build a workflow that applies various natural language processing to a text submission.

defmodule MyApp.Analysis do
  use Oban.Pro.Worker, recorded: true

  args_schema do
    field :text, :string
    field :mode, :enum, values: ~w(complexity expressiveness sentiment syntax)a
  end

  @impl true
  def process(%{args: %{text: text, mode: mode}}) do
    apply(MyApp, mode, [text])
  end

  def process(job) do
    expressiveness =
      job
      |> Workflow.all_recorded(job)
      |> MyApp.expressiveness()

    {:ok, expressiveness}
  end
end

Our workflow is defined as a single worker with multiple process/1 clauses. Typically, workflows are composed of multiple workers, but there isn't any practical difference.

Each of the clauses can be exercised with perform_job/3:

test "analyzing text sentiment" do
  assert {:ok, :positive} = perform_job(Analysis, %{text: text(), mode: :sentiment})
end

Dependencies between jobs are what defines a workflow, and to test those dependencies we need integration tests.

Integration Testing Workflows

Downstream workflow jobs only run when their upstream dependencies have completed successfully. To verify ordered execution between dependencies we'll insert and execute jobs using Oban.Pro.Testing.run_workflow/2.

test "running through a complete NLP analysis workflow" do
  text = text_sample()

  workflow =
    Workflow.new()
    |> Workflow.add(:com, Analysis.new(%{text: text, mode: :complexity}))
    |> Workflow.add(:sen, Analysis.new(%{text: text, mode: :sentiment}))
    |> Workflow.add(:syn, Analysis.new(%{text: text, mode: :syntax}))
    |> Workflow.add(:exp, Analysis.new(%{}), deps: [:com, :sen, :syn])

  assert [_com, _sen, _syn, exp_job] = run_workflow(workflow, with_summary: false)

  assert {:ok, 0.8} = Analysis.fetch_recorded(exp_job)
end

The test executes all upstream jobs and then uses the results to compute a score in the final downstream job. Because we want to verify the result of the final job, we use with_summary: false to give us the completed jobs rather than a count summary.

Finally, you can use drain_jobs directly when your application code inserts the workflow:

test "draining workflows inserted by application code" do
  :ok = MyApp.analyze_text(text_sample())

  assert %{completed: 4} = drain_jobs(queue: :all)
end