bling.github.io

This blog has relocated to bling.github.io.
Showing posts with label CQRS. Show all posts
Showing posts with label CQRS. Show all posts

Saturday, December 4, 2010

CQRS: Building a “Transactional” Event Store with MongoDB

As you all already know if you’re familiar with MongoDB, is that it does not support transactions.  The closest thing we have is atomic modifications of a single document.

The Event Store in a CQRS architecture has the important responsibility of detecting concurrency violations, where two different sources try to update the same version of the aggregate.  The one that gets it late should be denied changes into the store with an exception thrown.  This ensures the integrity of the data.

Here is a very simple typical implementation of appending events into the event store:

public void Append(Guid id, long expectedVersion, IEnumerable<IEvent> events)
{
  try
  {
    _events.Insert(events.Select(x => ...)); // convert to storage type
  }
  catch (...)
  {
    if (E11000 duplicate key)
      throw new ConcurrencyException(...);
  }
}

Syntax is a mix of C#/pseudo code, but the basic concepts are the same.  This assumes that you’ve set up an multi-index on the collection between the ID and the version.  Thus, when you insert something that already has a matching ID/version, Mongo will tell you of a duplicate key violation, and all is good.

But wait!  Operations are atomic per document!  So what happens if you append 100 events, and it fails on the 43rd one?  Events 1 through 42 will continue to exist in the data store, which is bad news.

Obviously, this solution is not going to work.  The next step was to do something like this:

catch (...)
{
  if (E11000 duplicate keys)
  {
    foreach (var e in events)
      _events.Delete(new { _id = e._id });
 
    throw new ConcurrencyException(...);
  }
}

So, before inserting into the collection, each events gets a generated ObjectID, so that if it fails, the catch exception can simply tell the data store to delete everything.

At first glance this seems to fix everything, except for one glaring problem.  What happens if you lose connection to the database before, or midway sending the deletes?  Now you have a problem of ensuring that those deletes are guaranteed, and so then the question that arises from that is where would you store it?  A local file?  Another database?  The problem is, at that moment, if another process in the system queries all events for the same aggregate it will return invalid data.

So, we’re back to square one.  We need to simulate a transaction through a single insert.

The secret is in the schema design.  Initially, we started out with a straight forward row-per-event schema.  But since we’re operating with documents, we can model it as a batch of events.

Thus, instead of versioning every event individually, we version a batch of events.  For example, originally we would insert 3 events, and the data saved would look like this:

{ _id = 1, aggregate_id = 1, version = 1, event = { … } }
{ _id = 2, aggregate_id = 1, version = 2, event = { … } }
{ _id = 3, aggregate_id = 1, version = 3, event = { … } }

In the new schema, it would look like this:

{ _id = 1, aggregate_id = 1, version = 1, events = [ { … }, { … }, { … }, { … } ] }

Now, a downside to this approach is you lose a bit of granularity of stored events, since you are grouping multiple events under a single version.  However, I don’t see this as a huge loss since the main reason you want to use event sourcing in the first place is to be able to restore an aggregate to any state in its history, and we still retain that functionality.

In our case, this is working very well for us.  When a command gets handled, it generates a bunch of events that get applied and then saved to MongoDB.  I can’t think of any scenario where it’d want to replay to the middle of a half-processed command (but of course it’s possible anyways, just reply half of a batch of events).  But that’s just asking for trouble.  It’s most likely easier to just the re-process the command.

Now, you may be asking why go through the trouble of batching events when you can just store one document per aggregate, and then put all events in one document?  Yes, that would solve the problem very effectively…until you hit the 4MB per document limit ;-)

Tuesday, November 23, 2010

CQRS: Auto register event handlers

I’m not going to go into detail about what the deal is about event handlers in a CQRS architecture, since a quick Google/Bing search will give plenty of very good information.  What this post is about is a solution to the “how do I quickly register something to handle a bunch of events” without copying pasting all over the place. There are other solutions out there, like this one.  Here’s something I came up with (took some concepts from my post on Weak Events).

public class Aggregate
{
  private delegate void OpenEventHandler<in TTarget, in TEvt>(TTarget target, TEvt @event);
  private static readonly IDictionary<Type, OpenEventHandler<Game, IEvent>> _evtHandlers = new Dictionary<Type, OpenEventHandler<Game, IEvent>>();
   
  static Aggregate()
  {
    var methods = from m in typeof(Game).GetMethods(BindingFlags.NonPublic | BindingFlags.Instance)
    let p = m.GetParameters()
            where m.Name == "ApplyEvent" && p.Length == 1 && typeof(IEvent).IsAssignableFrom(p[0].ParameterType)
            select m;
    
    var registerForwarder = typeof(Game).GetMethod("RegisterForwarder", BindingFlags.NonPublic | BindingFlags.Static);
    foreach (var m in methods)
    {
      Type eventType = m.GetParameters()[0].ParameterType;
      var forwarder = registerForwarder.MakeGenericMethod(eventType).Invoke(null, new[] { m });
      _evtHandlers[eventType] = (OpenEventHandler<Game, IEvent>)forwarder;
    }
  }
  
  private static OpenEventHandler<Game, IEvent> RegisterForwarder<TEvt>(MethodInfo method)
  {
    var invoker = typeof(OpenEventHandler<,>).MakeGenericType(typeof(Game), typeof(TEvt));
    var forwarder = (OpenEventHandler<Game, TEvt>)Delegate.CreateDelegate(invoker, null, method);
    return (g, e) => forwarder(g, (TEvt)e);
  }
 
  private void ApplyEvent(EventHappened e)
  {
    _something = e.Something;
  }
 
  public void ApplyChanges(IEnumerable<IEvent> events)
  {
    foreach (var e in events)
    {
      _evtHandlers[e.GetType()](this, e);
    }
  }
}

A couple things:

  • The registration happens in the static constructor.  This is important, because this relatively heavy cost of using reflection only happens once for the aggregate.
  • The filtering of methods is arbitrary.  I chose “ApplyEvent” here as the convention, but of course you can choose whatever you like.
  • ApplyChanges simply invokes the event handlers dictionary directly.  Assuming you’re being a good citizen with the code, accessing _evtHandlers doesn’t need a lock because once created it should never be modified.

So in summary, it finds all methods named ApplyEvent in the current class, and generates an “open delegate” which takes in an extra parameter which is the instance itself.  In this case, the instance is the aggregate, as shown in the ApplyChanges method.

So there you have it!  Excluding the lengthy LINQ query, roughly 10 lines of code to find and register all event handlers in the aggregate.  And if you’re wondering, the performance cost is negligible because there’s no reflection involved in the invocation of the handlers.  Awesome!