More generic supervised GenEvent in Elixir

Some Elixir developers wonder how to supervise the GenEvent properly. Some of them waiting for incoming GenBroker  ;). Important fact about GenEvent implementation is that handlers are not separate processes, which leads to problem with supervision. Following the elixir documentation you can find really useful function (available only in Elixir as extension of standard OTP behaviour):

add_mon_handler(manager, handler, term)

Based on documentation you will find that its adds a monitored event handler to the event manager. In case of failure event handler will be deleted and event manager will send a message to the calling process:

{:gen_event_EXIT, handler, reason} 

Documentation describe also important fact that mentioned message is not guaranteed to be delivered in case the manager crashes. So If you want to guarantee the message is delivered, you have two options:

  • monitor the event manager
  • link to the event manager and then set Process.flag(:trap_exit, true) in your handler callback

Lets take a look closer on first approach. But before we will do so I would like to focus on defining two simple (and identical for purpose of example) events handlers:

    defmodule Cqrs.CommandHandler do
      use GenEvent

      def handle_event(:ok, state) do
        {:ok, state}
      end
    end

and

    defmodule Cqrs.EventHandler do
      use GenEvent

      def handle_event(:ok, state) do 
        {:ok, state}
      end
    end

Lets assume that both handlers based on his names will have different purposes, but both will subscribe/listen for events from same event manager. I’m skipping implementation of those handlers for simplicity reason. For purpose of this example lets also introduce simple (and almost empty) Event Manager:

    defmodule Cqrs.Bus do
      def start_link() do
        GenEvent.start_link(name: __MODULE__)
      end
    end

Like you see Event Manager is just a named process.

The first step in case of monitored Event Manager is implementing simple Watcher process. But wait, lets go little bit further, and try to implement it in generic way.

I strongly following TDD approach so lets write first very simple test case for it:

    defmodule Cqrs.HandlerWatcherTest do
      use ExUnit.Case

      alias Cqrs.HandlerWatcher

      defmodule ExampleHandler do
        use GenEvent

        def init(%{callback_pid: pid} = state) do
          send pid, :init_called
          {:ok, state}
        end
     
        def handle_event({:expected}, %{callback_pid: pid} = state) do
          send pid, :handle_event_called
          {:ok, state}
        end
      end

      setup do
        name = __MODULE__
        {:ok, _pid} = GenEvent.start_link(name: name)
        {:ok, name: name}
      end
    end

Like you see above, I defined simple ExampleHandler for testing purpose which I will use to verify implementation.

Here you will find basic test cases for my HandlerWatcher:

     test Forwards event to named process and handle it,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

     @tag capture_log: true
     test handler was re-added automatically,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:wrong})
       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

    test handler was removed and watcher process stoped,
      %{name: name} do
      Process.flag(:trap_exit, true)
      assert {:ok, pid} =
        HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

      assert_receive :init_called
      assert :ok = 
        HandlerWatcher.remove_handler(ExampleHandler, %{callback_pid: self})
      assert_receive {:EXIT, ^pid, :normal}
    end 

Like you see in those examples I’m using message passing to verify my implementation. Worth of mentioning here is also using capture_log which lets you hide expected errors output of specific test function. In case of problem with understanding how tests are working I refering you to ExUnit documentation.

Ok, lets now take a look on our implementation of HandlerWatcher:

    defmodule Cqrs.HandlerWatcher do
      use GenServer

      defmodule State do
        defstruct handler: nil, args: [], event_manager: nil, monitor_ref: nil
      end

      def start_link(event_manager, handler, args \\ []) do
        GenServer.start_link(__MODULE__,
          [event_manager, handler, args], name: handler)
      end

      def remove_handler(handler, args) do
        GenServer.cast(handler, {:remove_handler, handler, args})
      end

      def init([event_manager, handler, args]) do
        monitor_ref = Process.monitor(event_manager)
        state = %State{event_manager: event_manager,
                       handler: handler,
                       args: args}
        {:ok, ^event_manager} =
          start_handler(state)
        {:ok, %State{state|monitor_ref: monitor_ref}}
      end

      def handle_cast({:remove_handler, handler, args},
        %State{event_manager: event_manager,
               monitor_ref: monitor_ref,
               handler: handler,
               args: args} = state) do
        :ok = GenEvent.remove_handler(event_manager, handler, args)
        Process.demonitor(monitor_ref)
        {:stop, :normal, state}
      end

      def handle_info({:DOWN, _ref, :process, {_event_manager, _node}, _reason}, state) do
        {:stop, :event_manager_down, state}
      end

      def handle_info({:gen_event_EXIT, handler, _reason},
        %State{event_manager: event_manager, handler: state_handler} =   state) do
        ^state_handler = handler
        {:ok, ^event_manager} = start_handler(state)
        {:noreply, state}
      end

      defp start_handler(
        %State{event_manager: event_manager,
               handler: handler,
               args: args}) do
        case GenEvent.add_mon_handler(event_manager, handler, args) do
          :ok -> {:ok, event_manager}
          {:error, reason} -> {:stop, reason}
        end
      end
    end

Purpose of this process is:

  • monitoring event manager
  • adding monitor handler in new process
  • reacting for {:gen_event_EXIT, handler, _reason} with “restarting” handler
  • reacting for {:DOWN, _ref, :process, {_event_manager, _node}, _reason} with stoping process.

In this case I’m using defstruct for keeping server state. In erlang I used always records for such purpose. Problem with structs is that I can’t reuse pattern matching of payload and particular fields of defstruct. For all people interested in I recomend to take a look on __struct__ field and Kernel.struct/2 function

So far, so simple. Now it’s time for supervisor:

    defmodule Cqrs.HandlerWatcher.Supervisor do
      use Supervisor

      defmodule Spec do
        import Supervisor.Spec, warn: false

        def gen_event_supervisor(name, event_handlers \\ []) do
          supervisor(Cqrs.HandlerWatcher.Supervisor, [name, event_handlers])
        end

        def event_handler(name, args \\ []) do
          {name, args}
        end
      end

      def start_link(event_manager, handlers) do
        Supervisor.start_link(__MODULE__, [event_manager, handlers], name: __MODULE__)
      end

      def init([event_manager, handlers]) do
        handlers = for {handler, args} <- handlers do
                     worker(Cqrs.HandlerWatcher, [event_manager, handler, []], id: handler, restart: :transient)
                   end

        children = [worker(event_manager, [])|handlers]
        supervise(children, [strategy: :one_for_one])
      end
    end

The purpose of this supervisor is simple, supervise event manager and all handlers. I made here also few steps forwards to make it more generic. Take a look on simple helpers of Spec module. I skipped all type definitions to not overload examples and to show my intent.

To see benefit of this approach lets me show you root application supervisor:

    defmodule Cqrs do
      use Application

      def start(_type, _args) do
        import Supervisor.Spec, warn: false
        import Cqrs.HandlerWatcher.Supervisor.Spec

        children = [
          gen_event_supervisor(Cqrs.Bus,
            [event_handler(Cqrs.EventHandler, []),
             event_handler(Cqrs.CommandHandler, [])]),

          # ...
          supervisor(Cqrs.Repo, [])
        ]

        opts = [strategy: :one_for_one, name: Cqrs.Supervisor]
        Supervisor.start_link(children, opts)
      end

      # ...
    end 

Running this supervisor should result with such supervisor tree:

Main point here is reusing HandlerWatcher implementation for different handlers in order to avoid code duplication. Some Erlang/Elixir developers just re-implementing from scratch same functionality of watcher for each handler. I strongly believe that current OTP is not closed subset of patterns, there is a lots of space to extend it. (please take a look on erlangpatterns.com introduced by Garret Smith)

I encourage you to search common patterns in your code, but on refactor step for sure. First implementation should be simple and readable, which lets you see those repetitive patterns.

I’m leaving to you evaluation of this solution. Especially by killing/crashing Cqrs.Bus event manager, GenEvent handlers watchers or by sending events with unhandled payload.