How to write a background job in Elixir
The easy, built-in way to run a background job
Because Elixir runs on the Beam, concurrency is one of the strong selling points of the language. A background job is as easy as starting a Task. The Task will run in a separate process and our code can coninue on without worrying about how long the task will take.
defmodule MyApp.Users do
def create_user(params) do
{:ok, user} = MyApp.Repo.insert(User.changeset(params))
# launch a background job to create and send a welcome email
Task.start(fn ->
send_welcome_email(user)
end)
# return the created user
{:ok, user}
end
def send_welcome_email(user) do
# … send welcome email
end
end
While super easy to do, this leaves some questions. What if sending the email fails? Let’s add some resiliency by adding our job to a Task supervisor with restarts if it should fail.
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.JobSupervisor}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule MyApp.Users do
require Logger
def create_user(params) do
{:ok, user} = MyApp.Repo.insert(User.changeset(params))
# launch a supervised background job to create and send a welcome email
# The job will continue to run even if our process completes (i.e. web request ends)
Task.Supervisor.start_child(
MyApp.JobSupervisor,
fn ->
try do
# Log the start of the job
Logger.info("Sending welcome email for #{user.id}")
:ok = send_welcome_email(user)
Logger.info("Sending welcome email for #{user.id} complete")
rescue
error ->
# Log the job failure for later investigation
Logger.error(
"Failed to send welcome email for #{user.id} - #{inspect(error)}, #{inspect(__STACKTRACE__)}"
)
reraise error, __STACKTRACE__
end
end,
# restart: :transient means the job will be restarted if it fails
restart: :transient
)
# return the created user
{:ok, user}
end
def send_welcome_email(user) do
# … send welcome email
end
end
Schedule a task to run from Cron
Because Elixir runs on the Beam which is a virtual machine that manages processes, it’s not like many other languages where you can just launch a script in it’s own process. Even launching a Mix task will launch a copy of the Beam runtime, start your app and it’s dependencies so that’s not the best choice. If you’re using releases, then there is a way to run a function on a running app by calling into the app from the command line.
First, lets’s start with a basic job module. Our Job module exposes an enqueue/0
function to add it to the supervisor which then calls it with the run/0
function.
defmodule MyApp.Job do
def enqueue() do
Task.Supervisor.start_child(MyApp.JobSupervisor, __MODULE__, :run, [])
end
def run() do
IO.puts("Job started")
Process.sleep(5000)
IO.puts("Job complete")
end
end
A release provides a command line entry point into your application. After it has started you can use bin/myapp rpc COMMAND
to run a command on the running system. With this we can launch our job from cron as such:
* * * * * /full/path/to/myapp/bin/myapp rpc 'MyApp.Job.enqueue()'
If you launch your app with bin/myapp start
then you should see “Job started” followed by “Job complete” 5 seconds later, every minute as the cron job runs.
Enterprise level background jobs
While the code above is ver easy to setup it is lacking a lot of key features for a reliable background system.
- Job scheduling
- Periodic jobs (cron schduling)
- Multiple job queues
- Priority control
- Retry backoff strategies
- etc.
Oban is a great library for robust job processing with enterprise grace features (as it’s description describes it).
Oban is built on top of PostgreSQL which is likely already part of your project. It uses PostgreSQL to store jobs, handle multi-node access without erlang clustering, and ensure uniqueness of jobs when you need it.
A job is a module that implements the Oban.Worker
behaviour and must expose a perform/1
function.
We can rewrite the simple job from above as:
defmodule MyApp.Job do
use Oban.Worker
# This is a convenience function so we can still call MyApp.Job.enqueue()
def enqueue(args \\ %{}, opts \\ []) do
Oban.insert(MyApp.Job.new(args, opts))
end
@impl Worker
def perform(%{args: _args} = _job) do
IO.puts("Job started")
Process.sleep(5000)
IO.puts("Job complete")
end
end
Job Scheduling
Oban provides a number of job scheduling options
You can schedule a job to run in the future by adding the option [schedule_in: 5]
(start in 5 seconds time)
You can schdule a job to run at a specific time by adding the option [schedule_at: DateTime.add(DateTime.now!("Etc/UTC"), 1, :hour)]
(start in an hours time)
You can also use the Cron plugin to setup period jobs.
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.Job, args: %{}},
]}
]
Off course, with releases you can still use an RPC call to run jobs via the system crontab.
Job Queuing and priorities
Oban has the ability to run different job queues and to have different priorities within each queue. This gives you a lot of flexibility over when a job might run. You can have long running jobs on one queue, and important jobs running on a different queue. You can then prioritise jobs within each queue up to four levels. By default everything is queued at the highest priority and you can set jobs to run at lower priorities when they are less important.
# Insert the job on the `:long_running` queue with the lowest priority of 3 (0 is highest)
Oban.insert(MyApp.Job.new(%{}, queue: :long_running, priority: 3))
Retrying, back-off strategies, and timeouts
When a job fails you want it to retry but it’s useful to have a backoff strategy that spaces out the retries over longer and longer periods. Oban has this built in with a default linear backoff strategy up to the max_attempts
option (default 20 retries).
This can be overridden using the backoff/1
callback (see more at Customizing Backoff in the docs)
By default jobs are allowed to execute indefinitly but you can set a timeout in a similar way, through the timeout/1
callback.
Here is our code modified with a custom backoff and timeout strategy.
defmodule MyApp.Job do
# Limit the job to only 3 retries
use Oban.Worker, max_attempts: 3
# This is a convenience function so we can still call MyApp.Job.enqueue()
def enqueue(args \\ %{}, opts \\ []) do
Oban.insert(MyApp.Job.new(args, opts))
end
@impl Worker
def perform(%{args: _args} = _job) do
IO.puts("Job started")
Process.sleep(5000)
IO.puts("Job complete")
end
@impl Worker
def backoff(%{attempt: _attempt}) do
# wait a fixed thirty seconds between each attempt
30
end
@impl Worker
def timeout(%{attempt: attempt}) do
# Allow the job 5 additional seconds on each attempt
attempt * 5000
end
end
Logging and notification on failure
At Sitesure we want to ensure that everything runs correctly at all times so this article wouldn’t be complete without a way to report your job execution to a monitor.
Oban uses Telemetry to instrument everything that happens within the system. We can leverage that to report on job success and job failure by writing a handler that attaches to :oban
events.
This uses the simple Req library for calling out to the Sitesure monitor URL.
defmodule MyApp.ObanNotifier do
def handle_event([:oban, :job, :stop], _measure, m(job, state) = _metadata, _) do
notify_url = Map.get(job.meta, "notify_url")
if notify_url do
Req.get!(notify_url, params: %{s: if(state == :success, do: 0, else: 1)})
end
end
def handle_event([:oban, :job, :exception], _measure, m(job, _state) = _metadata, _) do
notify_url = Map.get(job.meta, "notify_url")
if notify_url do
Req.get!(notify_url, params: %{s: -1})
end
end
end
The notifier handler can be attached in your application.ex
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{Task.Supervisor, name: MyApp.JobSupervisor}
]
# attach to the oban job stop and exception events
events = [[:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-notifier", events, &MyApp.ObanNotifier.handle_event/4, [])
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
And finally add the notify_url
to the meta
option of each job you want to monitor.
MyApp.Job.enqueue(%{}, meta: %{notify_url: "https://notify.do/my-token"})