As you all already know if you’re familiar with MongoDB, is that it does not support transactions. The closest thing we have is atomic modifications of a single document.
The Event Store in a CQRS architecture has the important responsibility of detecting concurrency violations, where two different sources try to update the same version of the aggregate. The one that gets it late should be denied changes into the store with an exception thrown. This ensures the integrity of the data.
Here is a very simple typical implementation of appending events into the event store:
public void Append(Guid id, long expectedVersion, IEnumerable<IEvent> events){try
{_events.Insert(events.Select(x => ...)); // convert to storage type
}catch (...)
{if (E11000 duplicate key)
throw new ConcurrencyException(...);}}
Syntax is a mix of C#/pseudo code, but the basic concepts are the same. This assumes that you’ve set up an multi-index on the collection between the ID and the version. Thus, when you insert something that already has a matching ID/version, Mongo will tell you of a duplicate key violation, and all is good.
But wait! Operations are atomic per document! So what happens if you append 100 events, and it fails on the 43rd one? Events 1 through 42 will continue to exist in the data store, which is bad news.
Obviously, this solution is not going to work. The next step was to do something like this:
catch (...)
{if (E11000 duplicate keys)
{foreach (var e in events)_events.Delete(new { _id = e._id });
throw new ConcurrencyException(...);}}
So, before inserting into the collection, each events gets a generated ObjectID, so that if it fails, the catch exception can simply tell the data store to delete everything.
At first glance this seems to fix everything, except for one glaring problem. What happens if you lose connection to the database before, or midway sending the deletes? Now you have a problem of ensuring that those deletes are guaranteed, and so then the question that arises from that is where would you store it? A local file? Another database? The problem is, at that moment, if another process in the system queries all events for the same aggregate it will return invalid data.
So, we’re back to square one. We need to simulate a transaction through a single insert.
The secret is in the schema design. Initially, we started out with a straight forward row-per-event schema. But since we’re operating with documents, we can model it as a batch of events.
Thus, instead of versioning every event individually, we version a batch of events. For example, originally we would insert 3 events, and the data saved would look like this:
{ _id = 1, aggregate_id = 1, version = 1, event = { … } }
{ _id = 2, aggregate_id = 1, version = 2, event = { … } }
{ _id = 3, aggregate_id = 1, version = 3, event = { … } }
In the new schema, it would look like this:
{ _id = 1, aggregate_id = 1, version = 1, events = [ { … }, { … }, { … }, { … } ] }
Now, a downside to this approach is you lose a bit of granularity of stored events, since you are grouping multiple events under a single version. However, I don’t see this as a huge loss since the main reason you want to use event sourcing in the first place is to be able to restore an aggregate to any state in its history, and we still retain that functionality.
In our case, this is working very well for us. When a command gets handled, it generates a bunch of events that get applied and then saved to MongoDB. I can’t think of any scenario where it’d want to replay to the middle of a half-processed command (but of course it’s possible anyways, just reply half of a batch of events). But that’s just asking for trouble. It’s most likely easier to just the re-process the command.
Now, you may be asking why go through the trouble of batching events when you can just store one document per aggregate, and then put all events in one document? Yes, that would solve the problem very effectively…until you hit the 4MB per document limit ;-)