Dynamic Cron Plugin
The DynamicCron
plugin enhances Oban's built in cron scheduler by making it
configurable at runtime, globally, across your entire cluster. DynamicCron
supports adding, updating, deleting, and pausing cron entries at boot time or
runtime. It is an ideal solution for applications that must dynamically start
and manage scheduled tasks at runtime.
installation
Installation
Before running the DynamicCron
plugin you must run a migration to add the
oban_cron
table to your database.
mix ecto.gen.migration add_oban_cron
Open the generated migration in your editor and call the change
function on
Oban.Pro.Migrations.DynamicCron
:
defmodule MyApp.Repo.Migrations.AddObanCron do
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end
As with the base Oban tables you can optionally provide a prefix
to
"namespace" the table within your database. Here we specify a "private"
prefix:
defmodule MyApp.Repo.Migrations.AddObanCron do
use Ecto.Migration
def change, do: Oban.Pro.Migrations.DynamicCron.change("private")
end
Run the migration to create the table:
mix ecto.migrate
Now you can use the DynamicCron
plugin and start scheduling periodic jobs!
using-and-configuring
Using and Configuring
To begin using DynamicCron
, add the module to your list of Oban plugins in
config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicCron]
...
By itself, without providing a crontab or dynamically inserting cron entries,
the plugin doesn't have anything to schedule. To get scheduling started, provide
a list of {cron, worker}
or {cron, worker, options}
tuples to the plugin.
The syntax is identical to Oban's built in :crontab
option, which means you
can copy an existing standard :crontab
list into the plugin's :crontab
.
plugins: [{
Oban.Pro.Plugins.DynamicCron,
timezone: "America/Chicago",
crontab: [
{"* * * * *", MyApp.MinuteJob},
{"0 * * * *", MyApp.HourlyJob, queue: :scheduled},
{"0 0 * * *", MyApp.DailyJob, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, tags: ["scheduled"]},
{"@daily", MyApp.AnotherDailyWorker}
]
}]
For more details about periodic jobs and cron expressions see the documentation on Periodic Jobs.
Now, when dynamic cron initializes, it will persist those cron entries to the
database and start scheduling them according to their CRON expression. The
plugin's crontab
format is nearly identical to Oban's standard crontab, with a
few important enhancements we'll look at soon.
Each of the crontab entries are persisted to the database and referenced
globally, by all the other connected Oban instances. That allows us to insert,
update, or delete cron entries at any time. In fact, changing the schedule or
options of an entry in the crontab provided to the plugin will automatically
update the persisted entry. To demonstrate, let's modify the MinuteJob
we
specified so that it runs every other minute in the :scheduled
queue:
crontab: [
{"*/2 * * * *", MyApp.MinuteJob, queue: :scheduled},
...
]
Now it isn't really a "minute job" any more, and the name is no longer suitable.
However, we didn't provide a name for the entry and it's using the module name
instead. To provide more flexibility we can add a :name
overrride, then we can
update the worker's name as well:
crontab: [
{"*/2 * * * *", MyApp.FrequentJob, name: "frequent", queue: :scheduled},
...
]
All entries are referenced by name, which defaults to the worker's name and must be unique. You may define the same worker multiple times as long as you provide a name override:
crontab: [
{"*/3 * * * *", MyApp.BasicJob, name: "client-1", args: %{client_id: 1}},
{"*/3 * * * *", MyApp.BasicJob, name: "client-2", args: %{client_id: 2}},
...
]
To temporarily disable scheduling jobs you can set the paused
flag:
crontab: [
{"* * * * *", MyApp.BasicJob, paused: true},
...
]
To resume the job you must supply paused: false
(or use update/2
to resume
it manually), simply removing the paused
option will have no effect.
crontab: [
{"* * * * *", MyApp.BasicJob, paused: false},
...
]
It is also possible to delete a persisted entry during initialization by passing
the :delete
option:
crontab: [
{"* * * * *", MyApp.MinuteJob, delete: true},
...
]
One or more entries can be deleted this way. Deleting entries is idempotent, nothing will happen if no matching entry can be found.
In the next section we'll look at how to list, insert, update and delete jobs dynamically at runtime.
runtime-updates
Runtime Updates
Dynamic cron entries are persisted to the database, making it easy to manipulate
them through typical CRUD operations. The DynamicCron
plugin provides
convenience functions to simplify working those operations. In this section
we'll walk through each of the available functions and look at some examples.
typespecs
Typespecs
📚 In order to bridge the gap between module level docs and a guide, each section includes a typespec for the corresponding function. The snippet below defines the types listed in each section.
@type cron_expr :: String.t()
@type cron_name :: String.t() | atom()
@type cron_opt ::
{:args, Oban.Job.args()}
| {:expression, cron_expr()}
| {:max_attempts, pos_integer()}
| {:paused, boolean()}
| {:priority, 0..3}
| {:name, cron_name()}
| {:queue, atom() | String.t()}
| {:tags, Oban.Job.tags()}
| {:timezone, String.t()}
@type cron_input :: {cron_expr(), module()} | {cron_expr(), module(), [cron_opt]}
listing-cron-entries
Listing Cron Entries
@spec all() :: [CronEntry.t()]
Use all/0
to retrieve all persisted cron entries:
entries = DynamicCron.all()
This returns a list of Oban.Pro.Cron
schemas with raw attributes. The all/0
function is provided as a convenience to inspect persisted entries.
As Oban.Pro.Cron
is an Ecto schema you're free to query the table however you
wish using Ecto.Query
. For example, you can list all of the entries with a
name like "client-":
import Ecto.Query, only: [where: 3]
Oban.Pro.Cron
|> where([c], ilike(c.name, "client-%"))
|> MyApp.Repo.all()
You can use functions like update
or update_all
to modify cron jobs in
place, but it is highly recommended that you use update/2
to ensure that
options are set correctly and to prevent breakage.
inserting-cron-entries
Inserting Cron Entries
@spec insert([cron_input()]) :: {:ok, [CronEntry.t()]} | {:error, Ecto.Changeset.t()}
The insert/1
function takes a list of one or more tuples with the same
{expression, worker}
or {expression, worker, options}
format as the plugin's
crontab
option:
DynamicCron.insert([
{"0 0 * * *", MyApp.GenericWorker},
{"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
{"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
{"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])
Be aware that insert/1
acts like an "upsert", making it possible to modify
existing entries if the worker or name matches. Still, it is better to use
update/2
to make targeted updates.
updating-cron-entries
Updating Cron Entries
@spec update(cron_name(), [cron_opt()])) :: {:ok, CronEntry.t()} | {:error, Ecto.Changeset.t()}
The update/2
function updates a single cron entry, as identified by the worker
or name. Any option available when specifying an entry in the crontab
list or
when calling insert/2
can be updated—that includes the cron expression
and
the worker
.
The following call demonstrates updating every possible option:
{:ok, _} =
DynamicCron.update(
"cron-1",
expression: "1 * * * *",
max_attempts: 10,
name: "special-cron",
paused: false,
priority: 0,
queue: "dedicated",
tags: ["client", "scheduled"],
timezone: "Europe/Amsterdam",
worker: Other.Worker,
)
Naturally, individual options may be updated instead. For example, set paused: true
to pause an entry:
{:ok, _} = DynamicCron.update(MyApp.ClientWorker, paused: true)
Since update/2
operates on a single entry at a time, it is possible to rename
an entry without doing a delete
/insert
dance:
# With the worker as the entry name
{:ok, _} = DynamicCron.update(MyApp.ClientWorker, name: "client-worker")
# With a custom entry name already set
{:ok, _} = DynamicCron.update("cron-1", name: "special-cron")
deleting-cron-entries
Deleting Cron Entries
@spec delete(cron_name()) :: {:ok, CronEntry.t()} | {:error, Ecto.Changeset.t()}
The delete/1
function operates on individual entries, by worker or name. You
can use it to delete entries at runtime, rather than hard-coding the :delete
flag into the crontab
list at compile time.
# With the worker as the entry name
{:ok, _} = DynamicCron.delete(Worker)
# With a custom name
{:ok, _} = DynamicCron.delete("cron-1")
overriding-the-timezone
Overriding the Timezone
Without any configuration the default timezone is Etc/UTC
. You can override
that for all cron entries by passing a timezone
option to the plugin:
plugins: [{
Oban.Pro.Plugins.DynamicCron,
timezone: "America/Chicago",
# ...
You can also override the timezone for individual entries by passing it as an
option to the crontab
list or to DynamicCron.insert/1
:
DynamicCron.insert([
{"0 0 * * *", MyApp.Pinger, name: "oslo", timezone: "Europe/Oslo"},
{"0 0 * * *", MyApp.Pinger, name: "chicago", timezone: "America/Chicago"},
{"0 0 * * *", MyApp.Pinger, name: "zagreb", timezone: "Europe/Zagreb"}
])
isolation-and-namespacing
Isolation and Namespacing
All DynamicCron
functions have an alternate clause that accepts an Oban
instance name as the first argument. This is in line with base Oban
functions
such as Oban.insert/2
, which allow you to seamlessly work with multiple
Oban instances and across multiple database prefixes. For example, you can use
all/1
to list all cron entries for the instance named ObanPrivate
:
entries = DynamicCron.all(ObanPrivate)
Likewise, to insert a new entry using the configuration associated with the
ObanPrivate
instance:
{:ok, _} = DynamicCron.insert(ObanPrivate, [{"* * * * *", PrivateWorker}])
instrumenting-with-telemetry
Instrumenting with Telemetry
The DynamicCron
plugin adds the following metadata to the [:oban, :plugin, :stop]
event:
:jobs
- a list of jobs that were inserted into the database
See the docs on Plugin Events for details.