One of the things I like about CQRS is that many of the infrastructure components become simpler, at least compared to the classical ORM approach. However, some of these components have not seen widespread use in existing enterprise applications and will be new to most people. One such component is the Event Store that is used for persistence of the (transactional) domain in CQRS.

Conceptually, an event store is very simple. For persisting an aggregate, all new state change events need to be stored. When loading an aggregate back into memory, each event is replayed against your domain so that the aggregate is put back into the right state. The original interface of the EventStore reflected this in a very straightforward manner:

public interface EventStore<EventType> {
    void storeEvents(UUID aggregateId, List<EventType> events);
    List<EventType> loadEvents(UUID aggregateId);

The storeEvents method handles both storing the first events for a newly persisted aggregate as well as adding additional events after further transactions. The loadEvents method simply returns all events stored for the aggregate.

This interface works pretty well as a starting point, but is hard to extend to support some features you will likely need in a real system:

  • Polymorphism
  • Snapshots
  • Versioning and conflict resolution

So for the training example the interface was modified to support polymorphism and versioning:

public interface EventStore<EventType> {
    void storeEventSource(EventSource<EventType> source);
    T loadEventSource(Class<T> expectedType, UUID eventSourceId);
    T loadEventSource(Class<T> expectedType, VersionedId eventSourceId);

The EventSource is a new interface that must be implemented by your aggregate root:

public interface EventSource<EventType> {
    VersionedId getVersionedId();
    List<EventType> getUnsavedEvents();
    void loadFromHistory(List<EventType> history);

The getVersionedId and getUnsavedEvents methods are called when persisting an aggregate. When loading the aggregate the event store instantiates your aggregate with its current id and version and invokes loadFromHistory.

Although this change added support for polymorphism and versioning it is not a very clean interface:

  • The aggregate root must implement the EventSource interface, coupling it to the event store
  • The EventSource interface is used both as a source and a sink
  • The event store instantiates your aggregate, passing in the VersionedId. So your aggregate root must also have a one-argument constructor

Finally, the implementation of the event store was not very clean either, mixing JDBC, reflection code, and version management.

So last weekend I decided to start with a clean slate and re-implement the event store. Now one of my favorite ways to design and implement an API is to start with unit tests and a plain Java implementation, without trying to tackle the actual JDBC code at the same time. Maps and Lists are great for storing data while doing exploring the various designs. Furthermore, the tests are completely reusable when it is time to write the actual JDBC implementation.

Here's one of the first test cases:

public void should_create_event_stream_with_initial_version_and_events() {
    FakeEventSource source = new FakeEventSource("type", 0, T1, EVENTS);
    subject.createEventStream(ID_1, source);

    FakeEventSink sink = new FakeEventSink("type", 0, T1, EVENTS);
    subject.loadEventsFromLatestStreamVersion(ID_1, sink);

As you can see from the test case the roles of event source and event sink have been made explicit. Creation of an event stream is now also separate from storing additional events into an existing event stream. Furthermore, the aggregate no longer needs to implement these interfaces. The Repository implementation will need to do the mapping between your domain classes and the event store. Here are the new event store interfaces with various methods added to support loading historical data:

public interface EventSource<EventType> {
    String getType();
    long getVersion();
    long getTimestamp();
    List<EventType> getEvents();

public interface EventSink<EventType> {
    void setType(String type);
    void setVersion(long version);
    void setTimestamp(long timestamp);
    void setEvents(List<EventType> events);

public interface EventStore<EventType> {
    void createEventStream(UUID streamId, EventSource<EventType> source);
    void storeEventsIntoStream(UUID streamId, long expectedVersion, EventSource<EventType> source);

    void loadEventsFromLatestStreamVersion(UUID streamId, EventSink<EventType> sink);
    void loadEventsFromExpectedStreamVersion(UUID streamId, long expectedVersion, EventSink<EventType> sink);

    void loadEventsFromStreamUptoVersion(UUID streamId, long version, EventSink<EventType> sink);
    void loadEventsFromStreamUptoTimestamp(UUID streamId, long timestamp, EventSink<EventType> sink);

As you can see the EventSink and EventSource interfaces are very much opposites and easily extensible with supporting new features like loading from a snapshot or supporting conflict resolution.

Finally, after having a full test suite and an in-memory implementation of the new event store (which took most of the day) adding a new JDBC based implementation was just a couple of hours of work, requiring only minimal changes to the tests and interfaces, mostly related to exception handling. The current JDBC event store implementation is about 250 lines of Java, very modest for a domain persistence mechanism.