Data Science with Python - Configuration of Development Environment

The best method of learning is just learning by doing. I decided that I will document my journey with Data Science and Machine Learning with Python.

Installation of Anaconda

To download Anaconda we are going to Anaconda website. For the purpose of this tutorial, we are going to download Anaconda for Python 3.7. There are available GUI installers or compressed packages for Windows, Linux, and MacOSX. If you decided on GUI installer for MacOSX, after installation you will need to modify your PATH environment variable which contains a colon-delimited list of system directories, which are used for searching executables.

In my case I added such line to my ~/.zshrc file (because I’m using Zsh, most likely you will use Bash shell then you can add this line in ~/.bashrc):

export PATH="$HOME/anaconda3/bin:$PATH"

To verify that path was set correctly, please execute such command in newly opened terminal (to be sure that PATH changes were picked up)

$ conda --version
conda 4.5.7

Updating of conda CLI

The version may differ from a version I had, to ensure that you have most recent one you can trigger an update of conda cli:

$ conda update conda
conda update conda
Solving environment: done

## Package Plan ##

  environment location: /Users/andrzejsliwa/anaconda3

  added / updated specs:
    - conda


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    conda-4.5.11               |           py36_0         1.0 MB

The following packages will be UPDATED:

    conda: 4.5.4-py36_0 --> 4.5.11-py36_0

Proceed ([y]/n)? y


Downloading and Extracting Packages
conda-4.5.11         |  1.0 MB | ########## | 100%
Preparing transaction: done
Verifying transaction: done
Executing transaction: done

we can verify that conda cli was updated:

$ conda --version
conda 4.5.11

Ok, we just verified that we have installed conda properly and we have most recent version.

Creating and Managing multiple environments

When you are working with multiple projects, very often you would like to be able to run different Python or libraries versions. Fortunately conda have integrated virtualenv, which lets us keep python environments separately for each project.

To create our first environment lets execute such command

$ conda create --name example_env numpy
Solving environment: done

## Package Plan ##

  environment location: /Users/andrzejsliwa/anaconda3/envs/example_env

  added / updated specs:
    - numpy


The following packages will be downloaded:

    package                    |            build
    ---------------------------|-----------------
    python-3.7.0               |       hc167b69_0        16.3 MB
    mkl_fft-1.0.6              |   py37hb8a8100_0         137 KB
    sqlite-3.25.2              |       ha441bb4_0         2.3 MB
    numpy-1.15.2               |   py37h6a91979_1          48 KB
    intel-openmp-2019.0        |              118         1.0 MB
    mkl_random-1.0.1           |   py37h5d10147_1         335 KB
    setuptools-40.4.3          |           py37_0         555 KB
    openssl-1.0.2p             |       h1de35cc_0         3.4 MB
    readline-7.0               |       h1de35cc_5         393 KB
    tk-8.6.8                   |       ha441bb4_0         3.2 MB
    numpy-base-1.15.2          |   py37h8a80b8c_1         4.0 MB
    pip-10.0.1                 |           py37_0         1.7 MB
    certifi-2018.8.24          |           py37_1         139 KB
    mkl-2019.0                 |              118       154.4 MB
    ca-certificates-2018.03.07 |                0         124 KB
    wheel-0.32.1               |           py37_0          35 KB
    ------------------------------------------------------------
                                           Total:       188.1 MB

The following NEW packages will be INSTALLED:

    blas:            1.0-mkl
    ca-certificates: 2018.03.07-0
    certifi:         2018.8.24-py37_1
    intel-openmp:    2019.0-118
    libcxx:          4.0.1-h579ed51_0
    libcxxabi:       4.0.1-hebd6815_0
    libedit:         3.1.20170329-hb402a30_2
    libffi:          3.2.1-h475c297_4
    libgfortran:     3.0.1-h93005f0_2
    mkl:             2019.0-118
    mkl_fft:         1.0.6-py37hb8a8100_0
    mkl_random:      1.0.1-py37h5d10147_1
    ncurses:         6.1-h0a44026_0
    numpy:           1.15.2-py37h6a91979_1
    numpy-base:      1.15.2-py37h8a80b8c_1
    openssl:         1.0.2p-h1de35cc_0
    pip:             10.0.1-py37_0
    python:          3.7.0-hc167b69_0
    readline:        7.0-h1de35cc_5
    setuptools:      40.4.3-py37_0
    sqlite:          3.25.2-ha441bb4_0
    tk:              8.6.8-ha441bb4_0
    wheel:           0.32.1-py37_0
    xz:              5.2.4-h1de35cc_4
    zlib:            1.2.11-hf3cbc9b_2

Proceed ([y]/n)? y


Downloading and Extracting Packages
python-3.7.0         | 16.3 MB   | ########## | 100%
mkl_fft-1.0.6        | 137 KB    | ########## | 100%
sqlite-3.25.2        | 2.3 MB    | ########## | 100%
numpy-1.15.2         | 48 KB     | ########## | 100%
intel-openmp-2019.0  | 1.0 MB    | ########## | 100%
mkl_random-1.0.1     | 335 KB    | ########## | 100%
setuptools-40.4.3    | 555 KB    | ########## | 100%
openssl-1.0.2p       | 3.4 MB    | ########## | 100%
readline-7.0         | 393 KB    | ########## | 100%
tk-8.6.8             | 3.2 MB    | ########## | 100%
numpy-base-1.15.2    | 4.0 MB    | ########## | 100%
pip-10.0.1           | 1.7 MB    | ########## | 100%
certifi-2018.8.24    | 139 KB    | ########## | 100%
mkl-2019.0           | 154.4 MB  | ########## | 100%
ca-certificates-2018 | 124 KB    | ########## | 100%
wheel-0.32.1         | 35 KB     | ########## | 100%
Preparing transaction: done
Verifying transaction: done
Executing transaction: done
#
# To activate this environment, use:
# > source activate example_env
#
# To deactivate an active environment, use:
# > source deactivate
#

example_env is a name of our environment, numpy is a name of a base package used to create an environment By default it will install Python in the version which you downloaded with Anaconda, in my case it is 3.7

If you wish change python version when you are creating a new environment you can make it by providing python parameter:

$ conda create --name example_env python=3.5 numpy

As we see in a result of execution now, we are able to activate and decativate our newly created environment

Let’s check it, by activating and verifying that we have properly installed NumPy:

$ source activate example_env
$ which python
/Users/andrzejsliwa/anaconda3/envs/example_env/bin/python

Like you see after activation, we are using python directly from our environment. Just for explanation, activation of the environment also changing searching PATH

Lets check if NumPy is installed and available

$ python
Python 3.7.0 (default, Jun 28 2018, 07:39:16)
[Clang 4.0.1 (tags/RELEASE_401/final)] :: Anaconda, Inc. on darwin
Type "help", "copyright", "credits" or "license" for more information.
>>> import numpy as np
>>>

There was no error. We are on our way.

Interactive Environment - Jupyter

The next step will be installing amazing interactive environment: Jupiter

$ conda install -c anaconda jupyter
Solving environment: done
...

Let’s add another package to our activate environment (make sure that you activated it before)

$ conda install matplotlib
Solving environment: done
...

We will use matplotlib in jupyter to show you benefits of the interactive environment.

Installation and configuration of Jupyter plugin on Visual Studio Code editor

As next step I strongly recommend the installation of Visual Studio Code and dedicated plugin for Jupyter:

After reloading editor, you will be able to run jupyter directly from Visual Studio Code: install jupyter plugin on vs code

We need to setup an editor environment: install jupyter plugin on vs code

Running jupyter in Visual Studio Code

Select our environment (same which we activate before) install jupyter plugin on vs code

You can paste this example to your python file:

#%%
print('hello')

#%%
import matplotlib.pyplot as plt
import matplotlib as mpl
import numpy as np

x = np.linspace(0, 20, 100)
plt.plot(x, np.sin(x))
plt.show()

We are ready to start a new notebook from vscode install jupyter plugin on vs code

Now we can run cells directly from python code. install jupyter plugin on vs code

Like you see, on right side we have properly running Jupyter notebook. You can notice that #%% is a place holder for runnable cells.

That’s it today, I hope all steps were well explained, in case of any issues, feel free to contact me over twitter.

Pairing Challenges

Pair programming is a popular technique in which two software engineers work together at the same workstation, by sitting in front of the same computer or by screen/keyboard/codebase sharing in real time. I’m practicing Pairing already for a few years with excellent outcomes but also with some tradeoffs. Today I would like to focus on tradeoffs. Recently I had feedback talks with my team members about tradeoffs related to pairing. In discussions, they provided lots of insights from last few weeks of pairing together. Based on these discussions and my experience I formed a few open questions:

  • how to maximize pairing benefits and avoid issues like being too exhausted, brain melted, etc.?
  • how to let 3rd person jump in (in same feature team) and understand technical issues of the project?

Trade-offs

Based on this questions we can separate a few tradeoffs related to constant pairing:

  • super exhausting
  • not flexible (working hours, daily rhythm breaking, blocking)
  • knowledge transfer only between both pairing mates

Ideas

In discussions with my team members I collected a set of ideas which address some of the tradeoffs listed above:

Pairing only on kickoff - of structure/architecture then splitting

Cons:

  • none

Pros:

  • this will let decide together about the architecture and structure of the project, about serious blockers come from development
  • then split on particular tickets based on agreed structure & contracts
  • this will reduce stress level and exhaustion
  • this will let us work in a more flexible way while staying in sync
  • this will provide better performance and velocity

Live reviews - without specific time constraints or rules, an extended version of GitHub PR reviews

Cons:

  • will require more time
  • will need the context switch

Pros:

  • will give a chance to discuss the technical decision with full understanding of decision context
  • will provide better quality reviews (than just code style, used tips&tricks, etc.)

Small posts - where you can show cons/pros and share your experiences and lesson learned

Cons:

  • will require some time to prepare it

Pros:

  • let us present our point of view and lesson learned without running long, live and unstructured debates
  • will structure out thoughts
  • will be a perfect base for retrospecting our self

Wrap up meetings - workshops/presentations/live coding for all technical teams, once per 1 or 2 weeks, to share architecture and technical shifts and lesson learned

Cons:

  • will require some time to prepare for it
  • will need some time to participate
  • will be difficult to avoid unstructured debates or discussions

Pros:

  • will make technical knowledge transfer between tech teams
  • we will have a chance for discussion

Once per month Technical All Hands - for the whole business department/company to present our work and share outcomes with non-technical team members, with a focus on business value)

Cons:

  • will require some time (we already doing it)
  • I’m still not sure how to structure discussions or open questions with the large audience

Pros:

  • it’s increasing transparency
  • giving more context
  • let us focus more on business value than the technical aspects
  • allow us to collect feedback

More Async way - Async Remote

Cons (and Pros)

  • will require more communication on dedicated slack channels
  • will need more structured discussion focused on a well-defined outcome (when will be difficult to achieve the outcome we can write a post about the subject)
  • will persist one-to-one discussion outcomes to rest of the team (as summary similar to “Meeting minutes”)
  • will increase the number of channels, but it will be more structured on oriented on the subject (vs. discussion on Engineering channel)

Conclusion

Pairing is an individual experience, limited by personal preferences, working style, and personality. Founding good pairing mate isn’t easy and requires building the trustful and respectful relation, after years of working together some mates can almost read your mind. Like every technique have own trade-offs. Provided ideas are not ultimate answers, they are instead set of ideas about how to make pairing a more enjoyable experience and more beneficial for all of us.

Event Sourced Aggregates and Error/Exception flows

Recently in my team, we had a discussion about different ways of handling negative flows in aggregate roots and handling it by presentation or error handling logic. To present my point of view I wrote this post which is comparing different approaches:

Aggregate which is ignoring negative flows

class Order
  include AggregateRoot

  def initialize
    self.state = :new
    # any other code here
  end

  def submit
    return if state == :submitted
    return if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_accessor :state

  def apply_order_submitted(event)
    self.state = :submitted
  end

  def apply_order_expired(event)
    self.state = :expired
  end
end

class OrderController < ApplicationController
  def create
    dispatch_command SubmitOrder.new
    render :edit
  end
end

Cons:

  • we are losing information about when somebody already submitted same order
  • we were losing information about when somebody tried to submit an order which is already expired
  • we are not able to handle any such exceptions on UI; we are ignoring it, the user is more confused

Pros:

  • none

Aggregate which is using exceptions to handle negative flows

class Order
  include AggregateRoot
  HasBeenAlreadySubmitted = Class.new(StandardError)
  HasExpired              = Class.new(StandardError)

  def initialize
    self.state = :new
    # any other code here
  end

  def submit
    raise HasBeenAlreadySubmitted if state == :submitted
    raise HasExpired if state == :expired
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_accessor :state

  def apply_order_submitted(event)
    self.state = :submitted
  end

  def apply_order_expired(event)
    self.state = :expired
  end
end
class OrderController < ApplicationController
  def create
    dispatch_command SubmitOrder.new
  rescue Order::HasBeenAlreadySubmitted
    render :edit, notice: 'Order already submitted!'
  rescue Order::HasExpired
    render :edit, notice: 'Order has expired!'
  end
end

Cons:

  • we are losing information about when somebody already submitted same order
  • we were losing information about when somebody tried to submit an order which is already expired

Pros:

  • we can handle errors/negative flows/exceptions on UI
  • in case of exception which is not handled in the controller, we will get nice Rollbar error

Aggregate which is using Events to handle negative flows

class HasBeenAlreadySubmitted < Event
class HasExpired < Event

class Order
  include AggregateRoot

  def initialize
    self.state = :new
    # any other code here
  end

  def submit
    if state == :submitted
      apply HasBeenAlreadySubmitted.new
      return
    end
    if state == :expired
      apply HasExpired.new
      return
    end
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_accessor :state

  def apply_order_submitted(event)
    self.state = :submitted
  end

  def apply_order_expired(event)
    self.state = :expired
  end
end
class OrderController < ApplicationController
  def create
    event_store.within do
      dispatch_command SubmitOrder.new
    end.subscribe(to: [Order::OrderSubmited]) do
      redirect_to success_path
    end.subscribe(to: [Order::HasBeenAlreadySubmitted]) do
      render :edit, notice: 'Order already submitted!'
    end.subscribe(to: [Order::HasExpired]) do
      render :edit, notice: 'Order has expired!'
    end.call
  end
end

Cons:

  • none (having more business specific events is a pro)

Pros:

  • we know that specific user (meta) already submitted the same order, we can analyze this data, or troubleshot for customer service reason
  • we know that specific user (meta) tried to submit an order which is already expired, we can analyze this data, or troubleshot for customer service reason
  • we can handle errors on UI
  • we can react (in event handlers) for specific events like HasExpired, by releasing booked tickets/products back to pool/products warehouse
  • we have more business specific events related to workflow (then exceptions are really exceptional - like network errors).
  • additionally, we can apply multiple events in the same action, when an exception can be raised only once

Aggregate which is using idea of Infra::IdempotentEventApplied event to handle negative flows

class Order
  include AggregateRoot

  def initialize
    self.state = :new
    # any other code here
  end

  def submit
    if state == :submitted
      apply Infra::IdempotentEventApplied.new(
        event_name: "HasBeenAlreadySubmitted",
        payload:    event.payload
      )
      return
    end
    if state == :expired
      apply Infra::IdempotentEventApplied.new(
        event_name: "HasExpired",
        payload:    event.payload
      )
      return
    end
    apply OrderSubmitted.new(data: {delivery_date: Time.now + 24.hours})
  end

  def expire
    apply OrderExpired.new
  end

  private
  attr_accessor :state

  def apply_order_submitted(event)
    self.state = :submitted
  end

  def apply_order_expired(event)
    self.state = :expired
  end
end
class OrderController < ApplicationController
  def create
    event_store.within do
      dispatch_command SubmitOrder.new
    end.subscribe(to: [Order::OrderSubmited]) do
      redirect_to success_path
    end.subscribe(to: [Infra::IdempotentEventApplied]) do |event|
      case event.event_name
      when 'HasExpired'
        render :edit, notice: 'Order has expired!'
      when 'HasBeenAlreadySubmitted'
        render :edit, notice: 'Order already submitted!'
      end
    end.call
  end
end

Cons:

  • we are introducing artificial event which not belongs to the domain (it will not be in events directory)
  • we are not using types which are the best way to describe our business process (negative flows are still business flows)
  • we can’t find an easy way such events for specific use case
  • we are forced to dispatch based on strings
  • we have fewer business events defined (so some important part of the business process is ignored)

Pros:

  • we know that specific user (meta) already submitted the same order, we can analyze this data, or troubleshot for customer service reason
  • we know that specific user (meta) tried to submit an order which is already expired, we can analyze this data, or troubleshot for customer service reason
  • we can handle errors on UI
  • we can react (in event handlers) for specific events like HasExpired, by releasing booked tickets/products back to pool/products warehouse

Summary

  • Exceptions which are not system based (network error) are the best candidates for events
  • An exception like PaymentGatewayFailed can be converted to a valuable event which can be handled by our infrastructure (sending emails/slacks to ops team, an operation can be scheduled for retry etc…)
  • Negative flows are still business use cases
  • Ignoring negative flows is damaging user flow, messing with his expectations, leading to bad design (maybe your Aggregates are too much CRUD’y, but event then in CRUD you would like to react for active record validations and display them to the user), limiting us with ways of handling them (event handlers, sagas)
  • If we would like to be data-driven there is no better way than tracking events in events oriented architecture
  • How having more events (and having more insights about user and system behaviors) can be a bad thing? Such events are not useless lines of code, they are adding more meaning to the domain.

Frameworks, strong conventions and fixed structures are hurting your business

For every young engineer frameworks and strong conventions are the source of peace, a “safe harbour”, an ultimate answer he or she desires to know. Giving a stable environment, building blocks where one matches another, where each piece has its own place where it fits best.

Frameworks and conventions promise to bring order to the chaotic world surrounding us. Well defined structures, everywhere.

If you leave well defined frames, borders, chains of conventions and structures behind you, then you’ll see that the only problem you should focus on is the domain problem you’re working on. Your domain problem is the only important problem to solve, if you’re neglecting frameworks and conventions for a second.

If you’re able to step away from trying to force your system design into REST conventions or directory structures required by your framework, then there’s a chance to come back to the essence of the problem you’re trying to solve.

The key points are here:

  • How does the business process look like, exactly?
  • How is the business working?
  • What are the workflows between participants?
  • Which events happen when and where?
  • What is the source of these events?

What are we doing instead? We are trying to fit every problem in to the shape of our frames. We’re doing this often very strongly, using our muscle memory. We form code like Play-doh® in our hands, so we do with business requirements trying to fit them into the shape of our frames required by conventions and frameworks.

Somebody wrote: “Framework is code, where if somebody removes all business domain specifics, you’d remain with all the assumptions”.

By using strong conventions or well defined shapes, you’re damaging your perception of the problem or the business domain. If something doesn’t fit, you’re applying “force” to make it fit. This way some pieces fall apart, some pieces are lost. These pieces are important. These pieces will make a difference.

Over years business people and product owners were demotivated trying to explain their domain problems to engineers. They tried and failed, because engineers only saw technical challenges, parts, CRUDs, RESTful operations and all of these building blocks provided by the framework of their choice.

Today, business people and product owners are trying to use the same technical jargon to be able to communicate with engineers. But the real context, the real meaning is lost in the translation.

In many software communities I see a positive development. Small and well defined libraries or packages are preferred as building blocks over frameworks with strong conventions. These libraries or packages don’t provide opinionated DSLs. They provide just a simple API giving you full freedom about their usage.

These kind of building blocks remind me of LEGO®. Sure you can build one model with every set (or even two, following the provided instructions). These building blocks have a simple universal shape, which can be combined to form more complex shapes. They are small, playing a very small role. You can use them to build anything you can imagine. There’s not a single place where they fit, they fit everywhere you need them.

Somebody will tell me: “You know but thanks to this standardisation, formalisation done by frameworks, we can deliver faster and more often, we are more productive”.

This reminds me one tweet:

Maybe, just maybe you should slow down. Stop this racing machine and think, if you really understand what you’ve been ask to build. Whether your perception of the requirements matches those of your product owner.

After all, it’s not only a matter of performance or speed. You can own a $ 2M race car, you can drive it at 420 kmph, but what if I’ll tell you: it’s in the wrong direction? We’re not doing software development for the art or for the number of tickets delivered weekly. We’re doing it to solve real problems, to deliver business value.

I’m pretty sure if you would choose 5 software developers randomly and give them same ticket, they will build 5 totally different implementations. Because it’s matter of perception.

What would happen when they would choose 5 different frameworks? You can guess!

But why does this happen? Probably all of them know the same rules, paradigms, coding standards, methodologies, architectures, design patterns.

We’re all human. And like in the case of frameworks and conventions, we like it when decisions are made for us. Because that’s exactly what happens when you’re using them. Very often you don’t even know those people and their values.

If this sounds familiar to you, please stop for a moment. Take some time to reflect. Take a look on the different options you have. Maybe take a closer look on the approach Domain Driven Design takes. Just read or watch more about Event Storming process.

Go back to the roots, where the real value is deep understanding of the problem domain. For a nice beginning stop using technical jargon at work which makes your vocabulary much poorer. Focus on the words used by the business, try to understand their meaning. You have the full right not to understand it in the first place. You’re responsible to ask for an explanation as many times as it’s needed until you and your domain experts agree and all of you talk about the same thing. Keep these words, their meaning and use them in your software. Unambiguous Language is your treasure.

Thoughts after Rails + Domain Driven Design workshop

Today I come back from Rails + Domain Driven Design workshop organised and made by Arkency. To be honest I wasn’t the best target audience for such workshop. The main reason for it is that I can’t consider myself as a beginner in the topics of Domain Driven Design, CQRS or Event Sourcing. Even then I took part of this workshop to validate some of my experiences and knowledge about mentioned concepts.

For the price of the workshop I got 3 strong practitioners (Andrzej Krzywda, Paweł Pacana, Robert Pankowiecki) in the same room. We had discussed numbers of different topics. Some of them were related to RailsEventStore ecosystem. We also talked about directions and roadmap of changes which are coming to it. Some discussions were more related to conceptual scooping and storming. The outcome was some ideas about the scope of workshop and about potential next steps in development of RailsEventStore. In particular I proposed a few potential improvements such as:

The New Hope (or Empire Strike Back;)

The most promising discussion was about idea of how to remove conditional logic from aggregates which slowly become similar to state machines. I think Andrzej Krzywda proposed the concept of instead of keeping just one class of aggregate you can model aggregates as multiple classes equivalent to the specific state of the aggregate. This means that on each apply of the event to the aggregate we are promoting / upgrading aggregate by replacing one instance of class with new instance of class which represent next state. This will let us keep only methods possible to call on specific state (and class) of aggregate. Promoting of Aggregate classes can look like that:

Order -> OrderPlaced -> OrderPaid -> OrderCanceled

Internal state should then be passed to the new instance once it is promoted. The concept is really interesting. I have few concerns about it:

  • How to implement / model promotion process with keeping in mind that way of testing should stay simple?
  • How to handle dependency injection in promotion process?
  • How to pass internal state from one instance to the next one?
  • How handle use case when method will not be available? (because of promoting). I see so far two solutions for it: check if instance respond to method or let it fail. In such case monitoring will be important

The idea definitely deserves investing some time on it, I would like to verify the thesis: The code will be more clean and more explicit than maintaining internal state and using aggregate which act as state machine (with raising / throwing custom exceptions on wrong states of aggregate)

I would like to verify this thesis in the near future by prototyping such solution.

Thanks

Going back to the workshop I would like to thank Andrzej Krzywda, Paweł Pacana, Robert Pankowiecki for being patient and for giving their time.

Retrospection

From my observations I saw many people in the workshop who weren’t afraid to ask any questions and used this time to gain knowledge and also the people who didn’t ask and didnt always come back with clear vision and understanding of presented concepts. My advice for such people is clear: use better time offered by the experts. You paid for it. I don’t know better way of learning than asking questions. Reading books will not answer questions in case of doubt ;)

Call For Action

Dear readers please let me know what you think about idea of promoting aggregates proposed by Andrzej Krzywda. Maybe you already tried such approach? Maybe you have other ideas to share? Please share your opinion in the comments. If you are more interested about Event Sourcing and RailsEventStore ecosystem please visit documentation and github. Feel free to drop the idea or pull request there.

More generic supervised GenEvent in Elixir

Some Elixir developers wonder how to supervise the GenEvent properly. Some of them waiting for incoming GenBroker  ;). Important fact about GenEvent implementation is that handlers are not separate processes, which leads to problem with supervision. Following the elixir documentation you can find really useful function (available only in Elixir as extension of standard OTP behaviour):

add_mon_handler(manager, handler, term)

Based on documentation you will find that its adds a monitored event handler to the event manager. In case of failure event handler will be deleted and event manager will send a message to the calling process:

{:gen_event_EXIT, handler, reason} 

Documentation describe also important fact that mentioned message is not guaranteed to be delivered in case the manager crashes. So If you want to guarantee the message is delivered, you have two options:

  • monitor the event manager
  • link to the event manager and then set Process.flag(:trap_exit, true) in your handler callback

Lets take a look closer on first approach. But before we will do so I would like to focus on defining two simple (and identical for purpose of example) events handlers:

    defmodule Cqrs.CommandHandler do
      use GenEvent

      def handle_event(:ok, state) do
        {:ok, state}
      end
    end

and

    defmodule Cqrs.EventHandler do
      use GenEvent

      def handle_event(:ok, state) do 
        {:ok, state}
      end
    end

Lets assume that both handlers based on his names will have different purposes, but both will subscribe/listen for events from same event manager. I’m skipping implementation of those handlers for simplicity reason. For purpose of this example lets also introduce simple (and almost empty) Event Manager:

    defmodule Cqrs.Bus do
      def start_link() do
        GenEvent.start_link(name: __MODULE__)
      end
    end

Like you see Event Manager is just a named process.

The first step in case of monitored Event Manager is implementing simple Watcher process. But wait, lets go little bit further, and try to implement it in generic way.

I strongly following TDD approach so lets write first very simple test case for it:

    defmodule Cqrs.HandlerWatcherTest do
      use ExUnit.Case

      alias Cqrs.HandlerWatcher

      defmodule ExampleHandler do
        use GenEvent

        def init(%{callback_pid: pid} = state) do
          send pid, :init_called
          {:ok, state}
        end
     
        def handle_event({:expected}, %{callback_pid: pid} = state) do
          send pid, :handle_event_called
          {:ok, state}
        end
      end

      setup do
        name = __MODULE__
        {:ok, _pid} = GenEvent.start_link(name: name)
        {:ok, name: name}
      end
    end

Like you see above, I defined simple ExampleHandler for testing purpose which I will use to verify implementation.

Here you will find basic test cases for my HandlerWatcher:

     test Forwards event to named process and handle it,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

     @tag capture_log: true
     test handler was re-added automatically,
       %{name: name} do
       assert {:ok, _pid} =
         HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:wrong})
       assert_receive :init_called
       assert :ok = GenEvent.notify(name, {:expected})
       assert_receive :handle_event_called
     end

    test handler was removed and watcher process stoped,
      %{name: name} do
      Process.flag(:trap_exit, true)
      assert {:ok, pid} =
        HandlerWatcher.start_link(name, ExampleHandler, %{callback_pid: self})

      assert_receive :init_called
      assert :ok = 
        HandlerWatcher.remove_handler(ExampleHandler, %{callback_pid: self})
      assert_receive {:EXIT, ^pid, :normal}
    end 

Like you see in those examples I’m using message passing to verify my implementation. Worth of mentioning here is also using capture_log which lets you hide expected errors output of specific test function. In case of problem with understanding how tests are working I refering you to ExUnit documentation.

Ok, lets now take a look on our implementation of HandlerWatcher:

    defmodule Cqrs.HandlerWatcher do
      use GenServer

      defmodule State do
        defstruct handler: nil, args: [], event_manager: nil, monitor_ref: nil
      end

      def start_link(event_manager, handler, args \\ []) do
        GenServer.start_link(__MODULE__,
          [event_manager, handler, args], name: handler)
      end

      def remove_handler(handler, args) do
        GenServer.cast(handler, {:remove_handler, handler, args})
      end

      def init([event_manager, handler, args]) do
        monitor_ref = Process.monitor(event_manager)
        state = %State{event_manager: event_manager,
                       handler: handler,
                       args: args}
        {:ok, ^event_manager} =
          start_handler(state)
        {:ok, %State{state|monitor_ref: monitor_ref}}
      end

      def handle_cast({:remove_handler, handler, args},
        %State{event_manager: event_manager,
               monitor_ref: monitor_ref,
               handler: handler,
               args: args} = state) do
        :ok = GenEvent.remove_handler(event_manager, handler, args)
        Process.demonitor(monitor_ref)
        {:stop, :normal, state}
      end

      def handle_info({:DOWN, _ref, :process, {_event_manager, _node}, _reason}, state) do
        {:stop, :event_manager_down, state}
      end

      def handle_info({:gen_event_EXIT, handler, _reason},
        %State{event_manager: event_manager, handler: state_handler} =   state) do
        ^state_handler = handler
        {:ok, ^event_manager} = start_handler(state)
        {:noreply, state}
      end

      defp start_handler(
        %State{event_manager: event_manager,
               handler: handler,
               args: args}) do
        case GenEvent.add_mon_handler(event_manager, handler, args) do
          :ok -> {:ok, event_manager}
          {:error, reason} -> {:stop, reason}
        end
      end
    end

Purpose of this process is:

  • monitoring event manager
  • adding monitor handler in new process
  • reacting for {:gen_event_EXIT, handler, _reason} with “restarting” handler
  • reacting for {:DOWN, _ref, :process, {_event_manager, _node}, _reason} with stoping process.

In this case I’m using defstruct for keeping server state. In erlang I used always records for such purpose. Problem with structs is that I can’t reuse pattern matching of payload and particular fields of defstruct. For all people interested in I recomend to take a look on __struct__ field and Kernel.struct/2 function

So far, so simple. Now it’s time for supervisor:

    defmodule Cqrs.HandlerWatcher.Supervisor do
      use Supervisor

      defmodule Spec do
        import Supervisor.Spec, warn: false

        def gen_event_supervisor(name, event_handlers \\ []) do
          supervisor(Cqrs.HandlerWatcher.Supervisor, [name, event_handlers])
        end

        def event_handler(name, args \\ []) do
          {name, args}
        end
      end

      def start_link(event_manager, handlers) do
        Supervisor.start_link(__MODULE__, [event_manager, handlers], name: __MODULE__)
      end

      def init([event_manager, handlers]) do
        handlers = for {handler, args} <- handlers do
                     worker(Cqrs.HandlerWatcher, [event_manager, handler, []], id: handler, restart: :transient)
                   end

        children = [worker(event_manager, [])|handlers]
        supervise(children, [strategy: :one_for_one])
      end
    end

The purpose of this supervisor is simple, supervise event manager and all handlers. I made here also few steps forwards to make it more generic. Take a look on simple helpers of Spec module. I skipped all type definitions to not overload examples and to show my intent.

To see benefit of this approach lets me show you root application supervisor:

    defmodule Cqrs do
      use Application

      def start(_type, _args) do
        import Supervisor.Spec, warn: false
        import Cqrs.HandlerWatcher.Supervisor.Spec

        children = [
          gen_event_supervisor(Cqrs.Bus,
            [event_handler(Cqrs.EventHandler, []),
             event_handler(Cqrs.CommandHandler, [])]),

          # ...
          supervisor(Cqrs.Repo, [])
        ]

        opts = [strategy: :one_for_one, name: Cqrs.Supervisor]
        Supervisor.start_link(children, opts)
      end

      # ...
    end 

Running this supervisor should result with such supervisor tree:

Main point here is reusing HandlerWatcher implementation for different handlers in order to avoid code duplication. Some Erlang/Elixir developers just re-implementing from scratch same functionality of watcher for each handler. I strongly believe that current OTP is not closed subset of patterns, there is a lots of space to extend it. (please take a look on erlangpatterns.com introduced by Garret Smith)

I encourage you to search common patterns in your code, but on refactor step for sure. First implementation should be simple and readable, which lets you see those repetitive patterns.

I’m leaving to you evaluation of this solution. Especially by killing/crashing Cqrs.Bus event manager, GenEvent handlers watchers or by sending events with unhandled payload.