Thoughts after Rails + Domain Driven Design workshop

Today I come back from Rails + Domain Driven Design workshop organised and made by Arkency. To be honest I wasn’t the best target audience for such workshop. The main reason for it is that I can’t consider myself as a beginner in the topics of Domain Driven Design, CQRS or Event Sourcing. Even then I took part of this workshop to validate some of my experiences and knowledge about mentioned concepts.

For the price of the workshop I got 3 strong practitioners (Andrzej Krzywda, Paweł Pacana, Robert Pankowiecki) in the same room. We had discussed numbers of different topics. Some of them were related to RailsEventStore ecosystem. We also talked about directions and roadmap of changes which are coming to it. Some discussions were more related to conceptual scooping and storming. The outcome was some ideas about the scope of workshop and about potential next steps in development of RailsEventStore. In particular I proposed a few potential improvements such as:

The New Hope (or Empire Strike Back;)

The most promising discussion was about idea of how to remove conditional logic from aggregates which slowly become similar to state machines. I think Andrzej Krzywda proposed the concept of instead of keeping just one class of aggregate you can model aggregates as multiple classes equivalent to the specific state of the aggregate. This means that on each apply of the event to the aggregate we are promoting / upgrading aggregate by replacing one instance of class with new instance of class which represent next state. This will let us keep only methods possible to call on specific state (and class) of aggregate. Promoting of Aggregate classes can look like that:

Order -> OrderPlaced -> OrderPaid -> OrderCanceled

Internal state should then be passed to the new instance once it is promoted. The concept is really interesting. I have few concerns about it:

  • How to implement / model promotion process with keeping in mind that way of testing should stay simple?
  • How to handle dependency injection in promotion process?
  • How to pass internal state from one instance to the next one?
  • How handle use case when method will not be available? (because of promoting). I see so far two solutions for it: check if instance respond to method or let it fail. In such case monitoring will be important

The idea definitely deserves investing some time on it, I would like to verify the thesis: The code will be more clean and more explicit than maintaining internal state and using aggregate which act as state machine (with raising / throwing custom exceptions on wrong states of aggregate)

I would like to verify this thesis in the near future by prototyping such solution.

Thanks

Going back to the workshop I would like to thank Andrzej Krzywda, Paweł Pacana, Robert Pankowiecki for being patient and for giving their time.

Retrospection

From my observations I saw many people in the workshop who weren’t afraid to ask any questions and used this time to gain knowledge and also the people who didn’t ask and didnt always come back with clear vision and understanding of presented concepts. My advice for such people is clear: use better time offered by the experts. You paid for it. I don’t know better way of learning than asking questions. Reading books will not answer questions in case of doubt ;)

Call For Action

Dear readers please let me know what you think about idea of promoting aggregates proposed by Andrzej Krzywda. Maybe you already tried such approach? Maybe you have other ideas to share? Please share your opinion in the comments. If you are more interested about Event Sourcing and RailsEventStore ecosystem please visit documentation and github. Feel free to drop the idea or pull request there.

More generic supervised GenEvent in Elixir

Some Elixir developers wonder how to supervise the GenEvent properly. Some of them waiting for incoming GenBroker  ;). Important fact about GenEvent implementation is that handlers are not separate processes, which leads to problem with supervision. Following the elixir documentation you can find really useful function (available only in Elixir as extension of standard OTP behaviour):

add_mon_handler(manager, handler, term)

Based on documentation you will find that its adds a monitored event handler to the event manager. In case of failure event handler will be deleted and event manager will send a message to the calling process:

{:gen_event_EXIT, handler, reason} 

Documentation describe also important fact that mentioned message is not guaranteed to be delivered in case the manager crashes. So If you want to guarantee the message is delivered, you have two options:

  • monitor the event manager
  • link to the event manager and then set Process.flag(:trap_exit, true) in your handler callback

Lets take a look closer on first approach. But before we will do so I would like to focus on defining two simple (and identical for purpose of example) events handlers:

    defmodule Cqrs.CommandHandler do
      use GenEvent

      def handle_event(:ok, state) do
        {:ok, state}
      end
    end

and

    defmodule Cqrs.EventHandler do
      use GenEvent

      def handle_event(:ok, state) do 
        {:ok, state}
      end
    end

Lets assume that both handlers based on his names will have different purposes, but both will subscribe/listen for events from same event manager. I’m skipping implementation of those handlers for simplicity reason. For purpose of this example lets also introduce simple (and almost empty) Event Manager:

    defmodule Cqrs.Bus do
      def start_link() do
        GenEvent.start_link(name: __MODULE__)
      end
    end

Like you see Event Manager is just a named process.

The first step in case of monitored Event Manager is implementing simple Watcher process. But wait, lets go little bit further, and try to implement it in generic way.

I strongly following TDD approach so lets write first very simple test case for it:

    defmodule Cqrs.HandlerWatcherTest do
      use ExUnit.Case

      alias Cqrs.HandlerWatcher

      defmodule ExampleHandler do
        use GenEvent

        def init(%{callback_pid: pid} = state) do
          send pid, :init_called
          {:ok, state}
        end
     
        def handle_event({:expected}, %{callback_pid: pid} = state) do
          send pid, :handle_event_called
          {:ok, state}
        end
      end

      setup do
        name = __MODULE__
        {:ok, _pid} = GenEvent.start_link(name: name)
        {:ok, name: name}
      end
    end

Like you see above, I defined simple ExampleHandler for testing purpose which I will use to verify implementation.

Here you will find basic test cases for my HandlerWatcher:

     test Forwards event to named process and handle it,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

     @tag capture_log: true
     test handler was re-added automatically,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:wrong})
       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

    test handler was removed and watcher process stoped,
      %{name: name} do
      Process.flag(:trap_exit, true)
      assert {:ok, pid} =
        HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

      assert_receive :init_called
      assert :ok = 
        HandlerWatcher.remove_handler(ExampleHandler, %{callback_pid: self})
      assert_receive {:EXIT, ^pid, :normal}
    end 

Like you see in those examples I’m using message passing to verify my implementation. Worth of mentioning here is also using capture_log which lets you hide expected errors output of specific test function. In case of problem with understanding how tests are working I refering you to ExUnit documentation.

Ok, lets now take a look on our implementation of HandlerWatcher:

    defmodule Cqrs.HandlerWatcher do
      use GenServer

      defmodule State do
        defstruct handler: nil, args: [], event_manager: nil, monitor_ref: nil
      end

      def start_link(event_manager, handler, args \\ []) do
        GenServer.start_link(__MODULE__,
          [event_manager, handler, args], name: handler)
      end

      def remove_handler(handler, args) do
        GenServer.cast(handler, {:remove_handler, handler, args})
      end

      def init([event_manager, handler, args]) do
        monitor_ref = Process.monitor(event_manager)
        state = %State{event_manager: event_manager,
                       handler: handler,
                       args: args}
        {:ok, ^event_manager} =
          start_handler(state)
        {:ok, %State{state|monitor_ref: monitor_ref}}
      end

      def handle_cast({:remove_handler, handler, args},
        %State{event_manager: event_manager,
               monitor_ref: monitor_ref,
               handler: handler,
               args: args} = state) do
        :ok = GenEvent.remove_handler(event_manager, handler, args)
        Process.demonitor(monitor_ref)
        {:stop, :normal, state}
      end

      def handle_info({:DOWN, _ref, :process, {_event_manager, _node}, _reason}, state) do
        {:stop, :event_manager_down, state}
      end

      def handle_info({:gen_event_EXIT, handler, _reason},
        %State{event_manager: event_manager, handler: state_handler} =   state) do
        ^state_handler = handler
        {:ok, ^event_manager} = start_handler(state)
        {:noreply, state}
      end

      defp start_handler(
        %State{event_manager: event_manager,
               handler: handler,
               args: args}) do
        case GenEvent.add_mon_handler(event_manager, handler, args) do
          :ok -> {:ok, event_manager}
          {:error, reason} -> {:stop, reason}
        end
      end
    end

Purpose of this process is:

  • monitoring event manager
  • adding monitor handler in new process
  • reacting for {:gen_event_EXIT, handler, _reason} with “restarting” handler
  • reacting for {:DOWN, _ref, :process, {_event_manager, _node}, _reason} with stoping process.

In this case I’m using defstruct for keeping server state. In erlang I used always records for such purpose. Problem with structs is that I can’t reuse pattern matching of payload and particular fields of defstruct. For all people interested in I recomend to take a look on __struct__ field and Kernel.struct/2 function

So far, so simple. Now it’s time for supervisor:

    defmodule Cqrs.HandlerWatcher.Supervisor do
      use Supervisor

      defmodule Spec do
        import Supervisor.Spec, warn: false

        def gen_event_supervisor(name, event_handlers \\ []) do
          supervisor(Cqrs.HandlerWatcher.Supervisor, [name, event_handlers])
        end

        def event_handler(name, args \\ []) do
          {name, args}
        end
      end

      def start_link(event_manager, handlers) do
        Supervisor.start_link(__MODULE__, [event_manager, handlers], name: __MODULE__)
      end

      def init([event_manager, handlers]) do
        handlers = for {handler, args} <- handlers do
                     worker(Cqrs.HandlerWatcher, [event_manager, handler, []], id: handler, restart: :transient)
                   end

        children = [worker(event_manager, [])|handlers]
        supervise(children, [strategy: :one_for_one])
      end
    end

The purpose of this supervisor is simple, supervise event manager and all handlers. I made here also few steps forwards to make it more generic. Take a look on simple helpers of Spec module. I skipped all type definitions to not overload examples and to show my intent.

To see benefit of this approach lets me show you root application supervisor:

    defmodule Cqrs do
      use Application

      def start(_type, _args) do
        import Supervisor.Spec, warn: false
        import Cqrs.HandlerWatcher.Supervisor.Spec

        children = [
          gen_event_supervisor(Cqrs.Bus,
            [event_handler(Cqrs.EventHandler, []),
             event_handler(Cqrs.CommandHandler, [])]),

          # ...
          supervisor(Cqrs.Repo, [])
        ]

        opts = [strategy: :one_for_one, name: Cqrs.Supervisor]
        Supervisor.start_link(children, opts)
      end

      # ...
    end 

Running this supervisor should result with such supervisor tree:

Main point here is reusing HandlerWatcher implementation for different handlers in order to avoid code duplication. Some Erlang/Elixir developers just re-implementing from scratch same functionality of watcher for each handler. I strongly believe that current OTP is not closed subset of patterns, there is a lots of space to extend it. (please take a look on erlangpatterns.com introduced by Garret Smith)

I encourage you to search common patterns in your code, but on refactor step for sure. First implementation should be simple and readable, which lets you see those repetitive patterns.

I’m leaving to you evaluation of this solution. Especially by killing/crashing Cqrs.Bus event manager, GenEvent handlers watchers or by sending events with unhandled payload.