Event Driven Architecture in practice:

a case study of one project

Szymon Kulec @Scooletz http://blog.scooletz.com

Tomasz Frydrychewicz @frydrychewicz

Outline

Intro

Project background

  • There was a given framework (hammer), but we didn't want to be another nail.
  • The given idea for integration - shared database.
  • Some well separated contexts/domains visible up front.

Thinking

  • Build a good client using some kind of API (WebAPI probably).
  • Don't use framework, build the client first, decide on the server services later.
  • If no db for integration then what?
  • Microservices a'la CORBA it's not the way you should design systems.
  • Is there a common denominator for the client and the server?

Event Driven Architecture

is an an architectural pattern for building loosely coupled systems and services in a reactive way


and that's the way we've done it.

Client

Make Love Not .Net

A.K.A why did we choose to have just JavaScript

JavaScript

AngularJS

NodeJS

Bower

Gulp

Karma

ASP.NET MVC   :-)

...

Events

Because web is asynchronous

Evolution

From callback-driven asynchrony

To events-driven asynchrony

Flux

The Dispatcher

Black Box

Controller

View

Router

Action

Store

...

Actions Events
Reactions Events
Queries NOT Events

The Code

Actions

angular.module("app.someModule")
       .factory('someModuleActions', someModuleActions);
    
function someModuleActions(bcDispatcher) {
    var service = {};

    service.action1 = action1;
    service.action2 = action2;  

    return service;
    ///////////////

    function action1(a, b) {
        bcDispatcher.dispatch('someModule:action1', {
            param1: a,
            param2: b
        });
    }

    function action2() {
        bcDispatcher.dispatch('someModule:action2');
    }        
}
					    
Store

angular.module("app.someModule")
       .factory("$$someModuleService", $$someModuleService)
       .factory("someModuleStore", someModuleStore)
       .run(registersomeModuleStore);
                    
function someModuleStore($$someModuleService) {
    var service = {
        dispatcherToken: null
    };
    
    service.getSomeData = $$someModuleService.getSomeData;
    
    return service;				                      					        					        
}              	
                    
function $$someModuleService(bcDispatcher) {
    var service = {
        someData: []
    };              				        					        				        					        

    service.getSomeData = getSomeData;
    service.setSomeData = setSomeData;

    return service;
    ////////////////

    function getSomeData() {
        return service.someData;     
    }

    function setSomeData(a, b) { 
        someData.push(a + b);
        bcDispatcher.dispatch('someModule:someDataSet', { value: a + b });
    }	             
} 

function registersomeModuleStore(
   bcDispatcher, 
   someModuleStore, 
   $$someModuleService) {
   
    someModuleStore.dispatcherToken = 
        bcDispatcher.register(function(action, attrs) {
            switch(action) {
                case "someModule:action1":
                    $$someModuleService.setSomeData(attrs.a, attrs.b);
                    break;
            }
        });
}             				        					        				        					        
					    
Controller

angular.module("app.someModule")
       .factory("someController", SomeController);

function SomeController(someModuleActions, someModuleStore, bcDispatcher, $scope) {
    var vm = this;

    vm.action1 = someModuleActions.action1;
    vm.getSomeData = someModuleStore.getSomeData;

    var dispatchToken = bcDispatcher.register(function (action, attrs) {
        switch (action) {
            case "someModule:someDataSet":
                alert("New value: " + attrs.value);
                break;
        }
    });

    $scope.$on('$destroy', function () {
        bcDispatcher.unregister(dispatchToken);
    });
}
					    

CQRS

Server

DDD recap - Aggregate

  • Aggregate is a transaction boundary.
  • Big enough to cover transactions.
  • Small enough to keep contention low.
  • All parts of the aggregate (entities) are accessed by the aggregate root.

Event Sourcing recap

  • Don't store an aggregate's state.
  • Store the business events containing all the needed values.
  • Apply the events on initial state to get the current.

Event Sourcing example

Version Event The state
1RegistrationCompleted{"CandidateName":"Tomasz", "OtherImportantValue":"Value"}
2Paid{"Paid":"true", CandidateName":"Tomasz", "OtherImportantValue":"Value"}

Event Sourcing log

What if we add a sequence to all the events in the system... we could use it as a append only log.

Sequence Aggregate id Version Event The state
1A11RegCompleted{"CandidateName":"Tomasz", ...}
2A12Paid{"Paid":"true", CandidateName":"Tomasz", ...}
3A21RegCompleted{"CandidateName":"Szymon", ...}

Event Sourcing advantages

  • Append only model, data immutable.
  • The state for a given can be cached forever, it doesn't change.
  • Audit out of the box.
  • One can easily read the log and publish it to some ESB or other fancy enterprise solution.

Implementation details

  • SQL server
    • Log, Events, EventsMetadata
    • Using GUIDs v1 (clustered index performance!)
  • protobuf-net
  • custom library based somehow on Lokad CQRS
    • batching SqlCommands (300 lines of ADO. Less than number of your EF attributes:P)
    • reusing memory chunks for serialization (no MemoryStream, no allocs)
    • state cache (off heap)

Implementation details

  • WebAPI
    • ~/api/events/describe
    • ~/api/events/describe/E38784FA-96E5-4392-AC73-76DA532E63D4 - proto
    • ~/api/events/... - for querying the log
  • BDD tests for aggregates
    
    Given
    	-- no events--
    When
    	On ( Register {
    			"Address": "http://test.yourwork.com/",
    			"Name": "test" } )
    Then
    	ModuleRegistered {
    			"Address": "test.yourwork.com",
    			"Name": "test" }
    

I want to react!

  • Events are useless if none can react to them.
  • Process managers to the rescue
  • Small TPL-based runner provided.
  • Can consume event from other services.

public class MarkAsPaid : 
	IProcessEvent<PaymentGateway.PaymentReceived>
{
    public void On(PaymentReceived e, DispatchEnvelope env)
    {
    	// get registration aggregate
    	// registration.MarkAsPaid();
    }
}

I want to query!

  • Sometimes you need the view.
  • The very same PMs are used for building views with EF.
  • For simple, small features sometimes it's a bit redundant.

I want to integrate!

A.K.A. Partying partying yeah!

  • Description provided by proto schemas of all events.
  • Mono.Cecil to retrieve info from the compiled code.
  • Simple WebAPI and polling. It works for now!
  • Catalogue - The Chosen One. (Yes, we're aware of DISCO).

Cherry

SignalR

Flux + SignalR

The Hub

namespace MyNamespace.Hubs
{
    using Microsoft.AspNet.SignalR;

    public class SomeHub : Hub
    {
        public void Login(string userName)
        {
            Groups.Add(Context.ConnectionId, userName);            
        }

        public void SendTo(string userName, string @event, object attr)
        {
            Clients.Group(userName).Raise(@event, attr);            
        }

        public void SendToAll(string @event, object attr)
        {
            Clients.All.Raise(@event, attr);
        }
    }
};					        
					    
SignalR Router Process Manager

namespace MyNamespace.SomeModule
{
    using EventSourcing;

    using Events = MyNamespaced.SomeModule.Events;

    public class SignalREventsRouter : SignalREventsRouterBase,
        IWantEventsWithMetadata,
        IProcessEvent<Events.SomethingHappened>,
        IProcessEvent<Events.SomethingElseHappened>,       
    {
        public SignalREventsRouter(ISignalRMessenger messenger)
            : base(messenger)
        {
            Messenger.UseHubFrom(SystemName.B2B);
        }

        public void On(Events.SomethingHappened e, DispatchEnvelope env)
        {
            Messenger.SendToAll(
                "someModule:somethingHappened", 
                new { env.LogInfo.AggregateId });
        }

        public void On(Events.SomethingElseHappened e, DispatchEnvelope env)
        {
            Messenger.Send(
                env.GetMetadata(this).Audit.UserName, 
                "someModule:somethingElseHappened",
                new { env.LogInfo.AggregateId });
        }       
    }
}					    
					    
Flux.SignalR

angular.module("app.core")
       .config(configure);

function configure(fluxSignalRRegistryProvider) {

    // SignalR Hubs configuration
    fluxSignalRRegistryProvider.register('someHub');

}					    
					     

Summary

Questions?

Szymon Kulec @Scooletz http://blog.scooletz.com

Tomasz Frydrychewicz @frydrychewicz