Testing Pro Workers

This guide builds on Testing Workers, which you should read to aquaint yourself with the basics of unit testing workers. With Oban.Pro.Testing helpers, testing Pro workers is identical to testing basic workers, but with a few powerful additions. To see those additions in action, let's step through how to test each of the Pro specific workers.

testing-batches

Testing Batches

Batches link the execution of multiple jobs and optional callbacks after jobs are processed. To test a batch exhaustively, you can exercise process/1 with Oban.Pro.Testing.perform_job/2,3 and use Oban.Pro.Testing.perform_callback/2,3 for any callback handlers.

To demonstrate, we'll define a basic batch worker with a handler for when the batch completes successfully:

defmodule MyApp.MyBatch do
  use Oban.Pro.Workers.Batch

  @impl true
  def process(%{args: %{"email" => email}}) do
    if MyApp.valid_email?(email) do
      MyApp.deliver_welcome(email)
    else
      {:error, :invalid_email}
    end
  end

  @impl true
  def handle_completed(%{args: %{"admin_email" => email}, meta: %{"batch_id" => bid}}) do
    MyApp.batch_complete(bid, email)

    :ok
  end
end

Testing the worker's process/1 function is straight forward with perform_job/2:

test "delivering welcome emails to valid addresses" do
  assert :ok = perform_job(MyBatch, %{email: "[email protected]"})
  assert {:error, _} = perform_job(MyBatch, %{email: "fake-email"})
end

Similarly, there is a helper for testing callback functions. The helper produces a batch callback job and verifies that the callback function is exported. Here we are verifying the handle_completed/1 callback:

test "notifying admins that a batch completed" do
  assert :ok = perform_callback(MyBatch, :completed, %{admin_email: "[email protected]"})
end

integration-testing-batches

Integration Testing Batches

Oban inserts callback jobs automatically based on the results of each job in the batch; e.g. if each job is completed then there will be a handle_completed/1 callback job. The Oban.Pro.Testing.run_batch/2 helper handles inserting and executing all jobs in a batch, including any appropriate callbacks.

test "running all jobs in a batch and the callbacks" do
  batch =
    ["[email protected]", "[email protected]", "[email protected]"]
    |> Enum.map(&MyBatch.new(%{email: &1}))
    |> MyBatch.new_batch(batch_callback_args: %{admin_email: "[email protected]"})

  assert %{completed: 4} = run_batch(batch)
end

When you're application code inserts a batch on its own, outside the context of your test, you can't call run_batch/1,2. In that case, you can use Oban.Pro.Testing.drain_jobs/1 instead to execute the jobs and the callback:

test "draining batches inserted by application code" do
  :ok = MyApp.welcome_recent_users()

  assert %{completed: 4} = drain_jobs(queue: :all)
end

testing-chunks

Testing Chunks

Chunk workers process jobs in groups based on size or a timeout. They are an outlier amongst other workers because the process/1 callback receives a list of jobs rather than a single job. That difference prevents chunks from working with perform_job/2, and instead you can use the Oban.Pro.Testing.perform_chunk/3 helper.

To demonstrate testing chunks we'll define a worker that checks a batch of password hashes against a pwned database and notifies admins when a significant ratio of hashes are compromised.

defmodule MyApp.MyChunk do
  use Oban.Pro.Workers.Chunk, size: 100, timeout: :timer.seconds(30)

  @impl true
  def process([_ | _] = jobs) do
    pwned_count = Enum.count(jobs, fn %{args: args} -> MyApp.was_pwned?(args["hash"]) end)
    pwned_ratio = pwned_count / length(jobs)

    if pwned_ratio > 0.1, do: MyApp.deliver_pwned_alert()

    {:ok, pwned_count}
  end
end

Now, exercise process/1 with perform_chunk/3:

test "calculating the ratio of pwned password hashes" do
  clear_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:clear)}
  pwned_args = for _ <- 1..3, do: %{hash: MyApp.gen_hash(:pwned)}

  assert {:ok, 3} = perform_chunk(MyChunk, clear_args ++ pwned_args)
end

Unit testing chunks is convenient for checking edge cases, but it lacks the depth and reality of real chunked execution. For that, we need to integration testing.

integration-testing-chunks

Integration Testing Chunks

During normal execution chunk size is limited to the configured size. In our example above, the size is set to 100, which means that a chunk may process up to 100 jobs at once. To verify our chunking we'll insert and execute the chunk jobs with Oban.Pro.Testing.run_chunk/2:

test "running up to 100 jobs at a time" do
  jobs = Enum.map(1..150, fn _ -> MyChunk.new(%{hash: MyApp.gen_hash(:clear)}) end)

  assert %{completed: 2} = run_chunk(jobs)

  # Integration tests are about side effects; assert no alert email was delivered
  refute email_delivered()
end

As with the other run_* functions, if you need to execute jobs that were inserted within application code, use drain_jobs instead:

test "draining chunks inserted by application code" do
  # Prepare the database with less than 100 recent users
  :ok = MyApp.check_pwned_signups()

  assert %{completed: 1} = drain_jobs(queue: :chunked)
end

testing-workflows

Testing Workflows

Workflows jobs compose together with arbitrary dependencies that effect if and when jobs are executed. Individual jobs in a workflow are easily tested with perform_job/3, but the real challenge is testing the interplay between jobs as they execute.

For this demonstration we'll build a workflow that applies various natural language processing to a text submission.

defmodule MyApp.MyWorkflow do
  use Oban.Pro.Workers.Workflow, recorded: true

  @impl true
  def process(%{args: %{"text" => text, "mode" => mode}}) do
    analysis_fun =
      case mode do
        "complexity" => :complexity_analysis
        "sentiment" => :sentiment_analysis
        "syntax" => :syntax_analysis
      end

    apply(MyApp, analysis_fun, [text])
  end

  def process(job) do
    expressiveness =
      job
      |> all_workflow_jobs(only_deps: true)
      |> Enum.map(fn job, acc -> {job.args["mode"], fetch_result(job)} end)
      |> MyApp.expressiveness()

    {:ok, expressiveness}
  end
end

Our workflow is defined as a single worker with multiple process/1 clauses. Typically, workflows are composed of multiple workers, but there isn't any practical difference.

Each of the clauses can be exercised with perform_job/3:

test "analyzing text sentiment" do
  assert {:ok, :positive} = perform_job(MyWorkflow, %{text: text(), mode: :sentiment})
end

Dependencies between jobs are what defines a workflow, and to test those dependencies we need integration tests.

integration-testing-workflows

Integration Testing Workflows

Downstream workflow jobs only run when their upstream dependencies have completed successfully. To verify ordered execution between dependencies we'll insert and execute jobs using Oban.Pro.Testing.run_workflow/2.

test "running through a complete NLP analysis workflow" do
  text = text_sample()

  workflow =
    MyWorkflow.new_workflow()
    |> MyWorkflow.add(:com, MyWorkflow.new(%{text: text, mode: :complexity}))
    |> MyWorkflow.add(:sen, MyWorkflow.new(%{text: text, mode: :sentiment}))
    |> MyWorkflow.add(:syn, MyWorkflow.new(%{text: text, mode: :syntax}))
    |> MyWorkflow.add(:exp, MyWorkflow.new(%{}), deps: [:com, :sen, :syn])

  # Using with_summary: false gives us a list of executed jobs
  assert [_com, _sen, _syn, exp_job] = run_workflow(workflow, with_summary: false)

  assert {:ok, 0.8} = MyWorkflow.fetch_result(exp_job)
end

The test executes all upstream jobs and then uses the results to compute a score in the final downstream job. Because we want to verify the result of the final job, we use with_summary: false to give us the completed jobs rather than a count summary.

📓 The with_summary option is available to all run and drain functions.

Finally, you can use drain_jobs directly when your application code inserts the workflow:

test "draining workflows inserted by application code" do
  :ok = MyApp.analyze_text(text_sample())

  assert %{completed: 4} = drain_jobs(queue: :all)
end