Oban.Pro.Plugins.DynamicCron (Oban Pro v1.4.2)
Enhanced cron scheduling that's configurable at runtime across your entire cluster.
DynamicCron
can configure cron scheduling before boot or during runtime, globally, with
scheduling guarantees and per-entry timezone overrides. It's an ideal solution for applications
that can't miss a cron job or must dynamically start and manage scheduled jobs at runtime.
Installation
Before running the DynamicCron
plugin you must run a migration to add the oban_crons
table
to your database.
mix ecto.gen.migration add_oban_crons
Open the generated migration in your editor and add a call to the migration's change/0
function:
defmodule MyApp.Repo.Migrations.AddObanCron do
use Ecto.Migration
defdelegate change, to: Oban.Pro.Migrations.DynamicCron
end
As with the base Oban tables, you can optionally provide a prefix
to "namespace" the table
within your database. Here we specify a "private"
prefix:
defmodule MyApp.Repo.Migrations.AddObanCron do
use Ecto.Migration
def change, do: Oban.Pro.Migrations.DynamicCron.change(prefix: "private")
end
Run the migration to create the table:
mix ecto.migrate
Now you can use the DynamicCron
plugin and start scheduling periodic jobs!
Using and Configuring
To begin using DynamicCron
, add the module to your list of Oban plugins in config.exs
:
config :my_app, Oban,
plugins: [Oban.Pro.Plugins.DynamicCron]
...
By itself, without providing a crontab or dynamically inserting cron entries, the plugin doesn't
have anything to schedule. To get scheduling started, provide a list of {cron, worker}
or
{cron, worker, options}
tuples to the plugin. The syntax is identical to Oban's built in
:crontab
option, which means you can copy an existing standard :crontab
list into the
plugin's :crontab
.
plugins: [{
Oban.Pro.Plugins.DynamicCron,
crontab: [
{"* * * * *", MyApp.MinuteJob},
{"0 * * * *", MyApp.HourlyJob, queue: :scheduled},
{"0 0 * * *", MyApp.DailyJob, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, tags: ["scheduled"]},
{"@daily", MyApp.AnotherDailyWorker}
]
}]
Now, when dynamic cron initializes, it will persist those cron entries to the database and start
scheduling them according to their CRON expression. The plugin's crontab
format is nearly
identical to Oban's standard crontab, with a few important enhancements we'll look at soon.
Each of the crontab entries are persisted to the database and referenced globally, by all the
other connected Oban instances. That allows us to insert, update, or delete cron entries at any
time. In fact, changing the schedule or options of an entry in the crontab provided to the
plugin will automatically update the persisted entry. To demonstrate, let's modify the
MinuteJob
we specified so that it runs every other minute in the :scheduled
queue:
crontab: [
{"*/2 * * * *", MyApp.MinuteJob, queue: :scheduled},
...
]
Now it isn't really a "minute job" any more, and the name is no longer suitable. However, we
didn't provide a name for the entry and it's using the module name instead. To provide more
flexibility we can add a :name
overrride, then we can update the worker's name as well:
crontab: [
{"*/2 * * * *", MyApp.FrequentJob, name: "frequent", queue: :scheduled},
...
]
All entries are referenced by name, which defaults to the worker's name and must be unique. You may define the same worker multiple times as long as you provide a name override:
crontab: [
{"*/3 * * * *", MyApp.BasicJob, name: "client-1", args: %{client_id: 1}},
{"*/3 * * * *", MyApp.BasicJob, name: "client-2", args: %{client_id: 2}},
...
]
To temporarily disable scheduling jobs you can set the paused
flag:
crontab: [
{"* * * * *", MyApp.BasicJob, paused: true},
...
]
To resume the job you must supply paused: false
(or use update/2
to resume it manually),
simply removing the paused
option will have no effect.
crontab: [
{"* * * * *", MyApp.BasicJob, paused: false},
...
]
It is also possible to delete a persisted entry during initialization by passing the :delete
option:
crontab: [
{"* * * * *", MyApp.MinuteJob, delete: true},
...
]
One or more entries can be deleted this way. Deleting entries is idempotent, nothing will happen if no matching entry can be found.
Automatic Synchronization
Synchronizing persisted entries manually requires two deploys: one to flag it with deleted: true
and another to clean up the entry entirely. That extra step isn't ideal for applications
that don't insert or delete jobs at runtime.
To delete entries that are no longer listed in the crontab automatically set the sync_mode
option to :automatic
:
[
sync_mode: :automatic,
crontab: [
{"0 * * * *", MyApp.BasicJob},
{"0 0 * * *", MyApp.OtherJob}
]
]
To remove unwanted entries, simply delete them from the crontab:
crontab: [
{"0 * * * *", MyApp.BasicJob},
- {"0 0 * * *", MyApp.OtherJob}
]
With :automatic
sync, the entry for MyApp.OtherJob
will be deleted on the next deployment.
Scheduling Guarantees
Depending on an application's restart timing or as the result of unexpected downtime, a job's
scheduled insert period may be missed. To compensate, enable guaranteed
mode for the entire
crontab or on an individual bases.
Here's an example of enabling guaranteed insertion for the entire crontab:
[
guaranteed: true,
crontab: [
{"0 * * * *", MyApp.HourlyJob},
{"0 0 * * *", MyApp.DailyJob},
{"0 0 1 * *", MyApp.MonthlyJob},
{"0 0 1 */2 *", MyApp.BiMonthlyJob}
]
]
Guaranteed mode may be enabled or disabled for individual jobs instead if it's a poor fit for the entire crontab:
[
crontab: [
{"@daily", MyApp.DailyJob, guaranteed: true}
{"@monthly", MyApp.MonthlyJob, guaranteed: false}
]
]
Guaranteed insertion is enforced by cron name, so it's durable across schedule changes.
Overriding the Timezone
Without any configuration the default timezone is Etc/UTC
. You can override that for all cron
entries by passing a timezone
option to the plugin:
plugins: [{
Oban.Pro.Plugins.DynamicCron,
timezone: "America/Chicago",
# ...
You can also override the timezone for individual entries by passing it as an option to the
crontab
list or to DynamicCron.insert/1
:
DynamicCron.insert([
{"0 0 * * *", MyApp.Pinger, name: "oslo", timezone: "Europe/Oslo"},
{"0 0 * * *", MyApp.Pinger, name: "chicago", timezone: "America/Chicago"},
{"0 0 * * *", MyApp.Pinger, name: "zagreb", timezone: "Europe/Zagreb"}
])
Runtime Updates
Dynamic cron entries are persisted to the database, making it easy to manipulate them through
typical CRUD operations. The DynamicCron
plugin provides convenience functions to simplify
working those operations.
The insert/1
function takes a list of one or more tuples with the same {expression, worker}
or {expression, worker, options}
format as the plugin's crontab
option:
DynamicCron.insert([
{"0 0 * * *", MyApp.GenericWorker},
{"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
{"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
{"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])
Be aware that insert/1
acts like an "upsert", making it possible to modify existing entries if
the worker or name matches. Still, it is better to use update/2
to make targeted updates.
Isolation and Namespacing
All DynamicCron
functions have an alternate clause that accepts an Oban instance name as the
first argument. This is in line with base Oban
functions such as Oban.insert/2
, which allow
you to seamlessly work with multiple Oban instances and across multiple database prefixes. For
example, you can use all/1
to list all cron entries for the instance named ObanPrivate
:
entries = DynamicCron.all(ObanPrivate)
Likewise, to insert a new entry using the configuration associated with the ObanPrivate
instance:
{:ok, _} = DynamicCron.insert(ObanPrivate, [{"* * * * *", PrivateWorker}])
Instrumenting with Telemetry
The DynamicCron
plugin adds the following metadata to the [:oban, :plugin, :stop]
event:
:jobs
- a list of jobs that were inserted into the database
Summary
Functions
Used to retrieve all persisted cron entries.
Delete individual entries, by worker or name.
Insert cron entries into the database to start scheduling new jobs.
Update a single cron entry, as identified by worker or name.
Types
cron_expr()
@type cron_expr() :: String.t()
cron_input()
cron_name()
cron_opt()
@type cron_opt() :: {:args, Oban.Job.args()} | {:expression, cron_expr()} | {:guaranteed, boolean()} | {:max_attempts, pos_integer()} | {:paused, boolean()} | {:priority, 0..9} | {:meta, map()} | {:name, cron_name()} | {:queue, atom() | String.t()} | {:tags, Oban.Job.tags()} | {:timezone, String.t()}
option()
@type option() :: {:conf, Oban.Config.t()} | {:guaranteed, boolean()} | {:crontab, [cron_input()]} | {:name, Oban.name()} | {:sync_mode, sync_mode()} | {:timezone, Calendar.time_zone()} | {:timeout, timeout()}
sync_mode()
@type sync_mode() :: :manual | :automatic
Functions
all(oban_name \\ Oban)
@spec all(term()) :: [Ecto.Schema.t()]
Used to retrieve all persisted cron entries.
The all/0
function is provided as a convenience to inspect persisted entries.
Examples
Return a list of cron schemas with raw attributes:
entries = DynamicCron.all()
delete(oban_name \\ Oban, name)
@spec delete(term(), cron_name()) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t() | String.t()}
Delete individual entries, by worker or name.
Use delete/1
to remove entries at runtime, rather than hard-coding the :delete
flag into the
crontab
list at compile time.
Examples
With the worker as the entry name:
{:ok, _} = DynamicCron.delete(Worker)
With a custom name:
{:ok, _} = DynamicCron.delete("cron-1")
insert(oban_name \\ Oban, crontab)
@spec insert(term(), [cron_input()]) :: {:ok, [Ecto.Schema.t()]} | {:error, Ecto.Changeset.t()}
Insert cron entries into the database to start scheduling new jobs.
Be aware that insert/1
acts like an "upsert", making it possible to modify existing entries if
the worker or name matches. Still, it is better to use update/2
to make targeted updates.
Examples
Insert a list of tuples with the same {expression, worker}
or {expression, worker, options}
format as the plugin's crontab
option.
DynamicCron.insert([
{"0 0 * * *", MyApp.GenericWorker},
{"* * * * *", MyApp.ClientWorker, name: "client-1", args: %{client_id: 1}},
{"* * * * *", MyApp.ClientWorker, name: "client-2", args: %{client_id: 2}},
{"* * * * *", MyApp.ClientWorker, name: "client-3", args: %{client_id: 3}}
])
update(oban_name \\ Oban, name, opts)
@spec update(term(), cron_name(), [cron_opt()]) :: {:ok, Ecto.Schema.t()} | {:error, Ecto.Changeset.t() | String.t()}
Update a single cron entry, as identified by worker or name.
Any option available when specifying an entry in the crontab
list or when calling insert/2
can be updated—that includes the cron expression
and the worker
.
Examples
The following call demonstrates updating every possible option:
{:ok, _} =
DynamicCron.update(
"cron-1",
expression: "1 * * * *",
max_attempts: 10,
name: "special-cron",
paused: false,
priority: 0,
queue: "dedicated",
tags: ["client", "scheduled"],
timezone: "Europe/Amsterdam",
worker: Other.Worker,
)
Naturally, individual options may be updated instead. For example, set paused: true
to pause
an entry:
{:ok, _} = DynamicCron.update(MyApp.ClientWorker, paused: true)
Since update/2
operates on a single entry at a time, it is possible to rename an entry without
doing a delete
/insert
dance:
{:ok, _} = DynamicCron.update(MyApp.ClientWorker, name: "client-worker")
Or, update an entry with a custom entry name already set:
{:ok, _} = DynamicCron.update("cron-1", name: "special-cron")