bling.github.io

This blog has relocated to bling.github.io.

Saturday, December 4, 2010

CQRS: Building a “Transactional” Event Store with MongoDB

As you all already know if you’re familiar with MongoDB, is that it does not support transactions.  The closest thing we have is atomic modifications of a single document.

The Event Store in a CQRS architecture has the important responsibility of detecting concurrency violations, where two different sources try to update the same version of the aggregate.  The one that gets it late should be denied changes into the store with an exception thrown.  This ensures the integrity of the data.

Here is a very simple typical implementation of appending events into the event store:

public void Append(Guid id, long expectedVersion, IEnumerable<IEvent> events)
{
  try
  {
    _events.Insert(events.Select(x => ...)); // convert to storage type
  }
  catch (...)
  {
    if (E11000 duplicate key)
      throw new ConcurrencyException(...);
  }
}

Syntax is a mix of C#/pseudo code, but the basic concepts are the same.  This assumes that you’ve set up an multi-index on the collection between the ID and the version.  Thus, when you insert something that already has a matching ID/version, Mongo will tell you of a duplicate key violation, and all is good.

But wait!  Operations are atomic per document!  So what happens if you append 100 events, and it fails on the 43rd one?  Events 1 through 42 will continue to exist in the data store, which is bad news.

Obviously, this solution is not going to work.  The next step was to do something like this:

catch (...)
{
  if (E11000 duplicate keys)
  {
    foreach (var e in events)
      _events.Delete(new { _id = e._id });
 
    throw new ConcurrencyException(...);
  }
}

So, before inserting into the collection, each events gets a generated ObjectID, so that if it fails, the catch exception can simply tell the data store to delete everything.

At first glance this seems to fix everything, except for one glaring problem.  What happens if you lose connection to the database before, or midway sending the deletes?  Now you have a problem of ensuring that those deletes are guaranteed, and so then the question that arises from that is where would you store it?  A local file?  Another database?  The problem is, at that moment, if another process in the system queries all events for the same aggregate it will return invalid data.

So, we’re back to square one.  We need to simulate a transaction through a single insert.

The secret is in the schema design.  Initially, we started out with a straight forward row-per-event schema.  But since we’re operating with documents, we can model it as a batch of events.

Thus, instead of versioning every event individually, we version a batch of events.  For example, originally we would insert 3 events, and the data saved would look like this:

{ _id = 1, aggregate_id = 1, version = 1, event = { … } }
{ _id = 2, aggregate_id = 1, version = 2, event = { … } }
{ _id = 3, aggregate_id = 1, version = 3, event = { … } }

In the new schema, it would look like this:

{ _id = 1, aggregate_id = 1, version = 1, events = [ { … }, { … }, { … }, { … } ] }

Now, a downside to this approach is you lose a bit of granularity of stored events, since you are grouping multiple events under a single version.  However, I don’t see this as a huge loss since the main reason you want to use event sourcing in the first place is to be able to restore an aggregate to any state in its history, and we still retain that functionality.

In our case, this is working very well for us.  When a command gets handled, it generates a bunch of events that get applied and then saved to MongoDB.  I can’t think of any scenario where it’d want to replay to the middle of a half-processed command (but of course it’s possible anyways, just reply half of a batch of events).  But that’s just asking for trouble.  It’s most likely easier to just the re-process the command.

Now, you may be asking why go through the trouble of batching events when you can just store one document per aggregate, and then put all events in one document?  Yes, that would solve the problem very effectively…until you hit the 4MB per document limit ;-)

6 comments:

Jonathan Oliver said...

Awesome post.

When I started writing version 2.0 of my EventStore library, I wanted to ensure that NoSQL databases such as MongoDB could be handled. Pushing everything up as a single batch is critical.

Here is the implementation for Mongo:
https://github.com/joliver/EventStore/tree/master/src/proj/EventStore.Persistence.MongoPersistence

Here is the design guide:
http://jonathan-oliver.blogspot.com/2010/12/cqrs-eventstore-v2-architectural.html

There are three quick things I wanted to mention regarding your implementation. First, because you're now pushing things up as a batch, you can no longer use the event version as an optimistic control technique. Instead, you'll want to number each batch that you push using a sequential, incrementing value.

Lastly, if you only push the event information, you may lose some context because there is oftentimes metadata associated with all of the events that you'll want to store.

You'll also want to consider what happens when a message is processed more than once which causes a batch to be written. NoSQL doesn't provide any guarantees related to de-duplication so you'll need to handle that in your application code/event store code.

Unknown said...

Thanks for the comments!

You are absolutely right in all of your points.

Actually, each individual event in my system is meaningless unless it is part of a batch. The batch is the only thing that is versioned, and the batch version is required to save or get events from the store.

As for duplicate handling, since I wrote the post I implemented _id as a complex object like this:

{ "_id": { "aggregate": 1234, "version": 65 } }

_id is always indexed, and unique. If version 66 already exists, the 2nd command handler will simply fail, and the event store will continue to have good, consistent data.

bodrin said...

Hi, I want to ask how do you dispatch the events. I guess the flow is somthing like this:

1. receive a command
2. load the related AR and process the command which produces some events - a batch
3. store the event batch in MongoDB, e.g. at
{ "_id": { "aggregate": 1234, "version": 65 } }
4. dispatch the events to some messaging system
5. mark the { "_id": { "aggregate": 1234, "version": 65 } } as dispatched

So if this is similar to what you are doing I'm wondering what if the system goes down just after step 3. ?
The events are not dispatched, but how do you find that after restart? Is there an efficient way with MongoDb and/or with other documen / KV stores?

Unknown said...

Well, the easiest way is to cheat and use Mongo as your messaging bus ;-) There's a bunch of examples on the web. Basically, you create a direct connection against the database and read the oplog. This is the mechanism that Mongo uses to do replication, so you get near real-time performance. Since it's only one system to manage, rather than two in concert, it's a bit easier to maintain. You'll get the same guarantees as any other write to a Mongo node.

If you must go to another messaging system it'll depend on the guarantees of that implementation. It's not the end of the world to tell the user to "try again later", assuming that's acceptable for the 0.01% of the time.

Also, if the messaging system is down, you can still read from the event store to get the latest information (and is required for stale nodes joining the system).

bodrin said...

Nice, thanks for answering :)
I should definitely take a closer look at the MongoDB! Sounds very interesting :)

On the other hand I'm still wondering if there is a general approach for this situation - you have done a state transition into the KV/doc DB and you have some events there. Then your system goes down just before it publishes these events on to the wire (some external messaging system). Then when you start again is there an efficient way/query to be done against the KV/doc DB so that you get all the events that haven't been dispatched yet?

Unknown said...

It wouldn't be any different from processing a new command. You will need to load all snapshots/events to get the aggregate root to the latest version. Events should typically be "fire and forget". If you require them to be guaranteed delivery it's better to restructure them as commands.