Optimize Elixir/Phoenix Service using OTP (Genservers, Supervisors, Registry) Part 2

In our previous post, we identified some of the issues that affect the performance of our Mgate service. These issues include lost requests, bad user experience, and single point of failure amongst others. If you have not read it, it is time to check it before proceeding with this article. This is the link.

In this article, we are going to leverage OTP to make some improvements to our system. We intend to first save the request in the database once it’s validated and thereafter initiate an async operation where each request will have a life of its own until the transaction is complete. The Diagram to illustrate this is as below. Also if you want to jump ahead, the source code is here

Request Life Cycle

Each request will have two stages (we are ignoring the initialization and termination stage). The first stage is the creation where we make an HTTP call to the Mbanking service and update our transaction with a pending status. If for any reason, we encounter an error, we shall retry 5 times before failing the transaction.

If the creation stage is successful, we initiate the polling stage. In this stage, we shall keep making an HTTP call to the Mbanking service until we get a response of either failed or completed status. If we exceed 10 requests, we shall stop the polling mechanism and update our transaction with a stale status. This will allow admins to recheck later manually. The flow diagram below highlights the pictorial view of what I have just described.

Open Telecom Platform (OTP)

To implement the architecture described thus far, we need to explore some of the features of OTP in Elixir. Specifically, we shall need a Registry, which is a key value process storage, a Genserver, which is going to keep track of our requests, and a Supervisor that will help supervise our Genserver. Let’s explore each one of these. 

Genserver – Transfer Process

Each of our requests is going to be encapsulated in a Genserver. The choice is simple, we need to keep track of each request while making asynchronous requests to a third-party service. You can find the full implementation here but we are going to explain some of the important parts. 

initialization
...
 def start_link(%Transfer{} = transfer) do
    GenServer.start_link(__MODULE__, transfer,
      name: {:via, Registry, {Mgate.Gateway.TransferRegistry, transfer.uuid}}
    )
  end
...

The function above defines how to start our Transfer Process. We have named our Process using :via option intentionally so that we can look up the process using the uuid of the transfer. The transfer will be passed to the init/1 function which must be defined for each Genserver. the code is below

  
...
  @impl true
  def init(%Transfer{} = transfer) do
    meta = %{
      max_poll_retries: @max_poll_retries,
      max_create_retries: @max_create_retries,
      create_retries: 0,
      poll_retries: 0,
      create_ref: nil
    }

    {:ok, {transfer, meta}}
  end
...

As you can see, the second option that was passed to GenServer.start_link is what is passed as the parameter to the init/1 function. @max_poll_retries and @max_create_retries are module variables that define the maximum retries for create and polling mechanisms. The init function will return what will be the initial state of our transfer process.

Since Genservers are behavior modules, it defines for us a child_spec/1 function that will be used by the supervisor to start, restart, and terminate our process.

Request Creation Stage

Once the transfer process is up and running, we need to make an HTTP request to the third-party service and possibly retry if it fails. In our case, we don’t want our service to wait for the response from the third party and therefore we shall use Genserver.cast/2 function. the code is as follows.

...
  def initiate_request(pid) do
    GenServer.cast(pid, :initiate_request)
  end

...

The server-side code that will process this request is as follows

...
 @impl true
  def handle_cast(:initiate_request, state) do
    Process.send(self(), :create, [])
    {:noreply, state}
  end
...

A lot is going on in the above code. Ideally, we are sending a message to the current process to initiate the creation stage. This will be processed by the handle_info/2 callback.

...
 @impl true
  def handle_info(:create, {transfer, %{create_retries: @max_create_retries} = meta}) do
    {_transfer, changeset} = update_transfer_state(transfer, %{status: "failed"})
    new_transfer = Repo.update!(changeset)
    {:stop, :normal, {new_transfer, meta}}
  end

  @impl true
  def handle_info(:create, {transfer, meta} = state) do
    with {:ok, res} <- Mbanking.create(transfer),
         {updated_transfer, changeset} <-
           update_transfer_state(transfer, %{
             status: "pending",
             response: %{initial: res},
             response_id: to_string(res["id"])
           }) do
      {:noreply, {updated_transfer, meta}, {:continue, {:save_and_poll, changeset}}}
    else
      _ ->
        retry_create(state)
    end
  end
..

The handle_info/2 callback will try to make the HTTP call to the third-party service and if everything is okay, then we initiate the polling stage by returning a tuple like this {:noreply, {updated_transfer, meta}, {:continue, {:save_and_poll, changeset}}}. The retry logic for making the HTTP request is as follows

 defp retry_create({transfer, meta}) do
    meta = %{meta | create_retries: meta.create_retries + 1}
    timer_ref = schedule_create(transfer, meta)
    meta = %{meta | create_ref: timer_ref}
    {:noreply, {transfer, meta}}
  end

 defp schedule_create(transfer, meta) do
    in_mill = 2_000 * meta.create_retries

    Logger.info(
      "CREATE JOB FOR #{transfer.uuid}. RETRY COUNT: #{meta.create_retries} after #{in_mill} ms "
    )

    Process.send_after(self(), :create, in_mill)
  end

Notice that when we reach the maximum allowed retries, we are returning {:stop, :normal, {new_transfer, meta}}. This will trigger the transfer process to terminate. Notice also that we have implemented our retry mechanism with exponential backoff.

Request Polling Stage

when the handle_info/2 callback returns {:noreply, {updated_transfer, meta}, {:continue, {:save_and_poll, changeset}}}, the handle_continue/2 is triggered, and below is the definition.

  @impl true
  def handle_continue({:save_and_poll, changeset}, {_transfer, meta}) do
    transfer = Repo.update!(changeset)
    new_meta = %{meta | poll_retries: 1}
    schedule_poll(transfer, new_meta)
    {:noreply, {transfer, new_meta}}
  end

This function will save the current changes in the database and schedule polling. let us explore the function that does that.

defp schedule_poll(transfer, meta) do
    in_mill = 2_000 * meta.poll_retries

    Logger.info(
      "POLL JOB FOR #{transfer.uuid}. RETRY COUNT: #{meta.poll_retries} after #{in_mill} ms "
    )

    Process.send_after(self(), :poll, in_mill)
  end

As you can see, we are sending the message to be processed after some time. using Process.send_after/3. This message will be processed by the handle_info/2 callback and here is the code for it.

  @impl true
  def handle_info(:poll, {transfer, %{poll_retries: @max_poll_retries} = meta}) do
    {_transfer, changeset} = update_transfer_state(transfer, %{status: "stale"})
    new_transfer = Repo.update!(changeset)
    {:stop, :normal, {new_transfer, meta}}
  end

  @impl true
  def handle_info(:poll, {transfer, meta} = state) do
    case Mbanking.get_status(transfer) do
      {:ok, res} ->
        case res["status"] do
          "completed" ->
            {_transfer, changeset} =
              update_transfer_state(transfer, %{status: "completed", response: %{final: res}})

            new_transfer = Repo.update!(changeset)
            {:stop, :normal, {new_transfer, meta}}

          "failed" ->
            {_transfer, changeset} =
              update_transfer_state(transfer, %{status: "completed", response: %{final: res}})

            new_transfer = Repo.update!(changeset)
            {:stop, :normal, {new_transfer, meta}}

          _ ->
            retry_poll(state)
        end

      {:error, _res} ->
        retry_poll(state)
    end
  end

When we reach the maximum allowed retries, the handle_info callback will return {:stop, :normal, {new_transfer, meta}} which will instruct the process to terminate. Otherwise, the handle_info will keep retrying with exponential backoff until we get the status of either failed or completed. The retry functionality is as follows.

  defp retry_poll({transfer, meta}) do
    new_meta = %{meta | poll_retries: meta.poll_retries + 1}
    schedule_poll(transfer, new_meta)
    {:noreply, {transfer, new_meta}}
  end
Termination Stage

When any of the callbacks return a tuple like so {:stop, :normal, state} the process terminates and in our case, we are just logging.

  @impl true
  def terminate(:normal, {%Transfer{uuid: ref_id}, _meta}) do
    Logger.debug("TERMINATING WORKER - #{ref_id}")
  end

  @impl true
  def terminate(_reason, {%Transfer{uuid: ref_id}, _meta}) do
    Logger.debug("BAD TERMINATION SIGNAL - WORKER - #{ref_id}")
  end

The terminate/2 callback receives the reason for termination and the current state of the process. we could do other cleanups here but we chose to log the termination message. Now that we are done with the transfer Process, let’s wide up with the Registry and the Supervisor.

Registry – Transfer Registry

In our Transfer Process, we mentioned that we shall start it using the :via option. So the definition for this is as below and you can check the full implementation here

defmodule Mgate.Gateway.TransferRegistry do
  def start_link() do
    Registry.start_link(keys: :unique, name: __MODULE__, partitions: System.schedulers_online())
  end

  def child_spec(_) do
    %{
      id: __MODULE__,
      start: {__MODULE__, :start_link, []},
      restart: :permanent,
      shutdown: 10_000
    }
  end

  def lookup(transfer_id) do
    case Registry.lookup(__MODULE__, transfer_id) do
      [{transfer_pid, _}] -> {:ok, transfer_pid}
      [] -> {:error, :not_found}
    end
  end
end

The start_link/0 function just defines how to start the Registry. Notice that the keys are unique and this is because each transfer will have a unique uuid. We are also partitioning our registry with the number of cores available on your computer.

We also defined the child_spec function which is used by the supervisor to start the registry. we are giving it a shutdown value so it delays and allows the processes it manages to gracefully terminate.

The lookup/1 function is just a convenient function to help us look up the process ID using the transfer uuid. Since the Registry uses ETS tables, this lookup is quite fast. Now let’s explore the Supervisor next.

DynamicSupervisor – Transfer Supervisor

In Elixir, Supervisors are a special type of processes that monitor other processes called child processes. They are at the core of a fault-tolerant system. They encapsulate how an application starts and shuts down. In Elixir, we have a Supervisor Behavior which handles static processes, and DynamicSupervisor behavior which handles processes that are created dynamically. DynamicSupervisor is what we shall need because we are going to use it to start the transfer Process. Here is the full implementation and you can also check it here.

defmodule Mgate.Gateway.TransferSupervisor do
  use DynamicSupervisor
  alias Mgate.Gateway.TransferProcess
  alias Mgate.Transfers.Transfer

  def start_link(options) do
    DynamicSupervisor.start_link(__MODULE__, options, name: __MODULE__)
  end

  @impl true
  def init(_options) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end

  def init_transfer_process(%Transfer{} = transfer) do
    child_spec = %{
      id: TransferProcess,
      start: {TransferProcess, :start_link, [transfer]},
      restart: :transient
    }

    {:ok, pid} = DynamicSupervisor.start_child(__MODULE__, child_spec)
    {:ok, pid}
  end
end

The init/1 callback defines the one_for_one strategy which is the only strategy for dynamic supervisors. The init_transfer_process/1 is a convenient function that will help us start the transfer process. Notice that the restart value is transient and this tells the supervisor to restart the Transfer Process only if it terminates abnormally.

Next, let’s make sure that the Transfer Registry and the Transfer Supervisor are started when the application starts. In the start/2 function of the application file add the following.

def start(_type, _args) do
    children = [
      MgateWeb.Telemetry,
      Mgate.Repo,
      {DNSCluster, query: Application.get_env(:mgate, :dns_cluster_query) || :ignore},
     ..
      Mgate.Gateway.TransferRegistry,
      Mgate.Gateway.TransferSupervisor,
      MgateWeb.Endpoint
    ]

Here, we are making sure that the Registry is started followed by the Transfer Supervisor and then the Endpoint. Now that we are done, start the server and run mix run benchmark/create_api_benchmark.exs. Here are the results.

Conclusion

From the metrics above, the average is around 2ms but this may vary in other environments. Before I conclude, let me put out a disclaimer. If the third-party service has a rate limit, this architecture will probably drop many requests or mark them as failed for no good reason. Additionally, we are re-inventing the wheel here when we could use other battle-tested libraries such as Genstage. Anyhow, we have learned how to use Genserver, Registry, and Supervisor in this article and I welcome comments below.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top