Skip to content
On this page

Custom Aggregations

Once in awhile users are hitting use cases or desired functionality for aggregation projections that just don't fit in well to our SingleStreamProjection<T> or MultiStreamProjection<TDoc, TId> models. Not to worry though, because Marten V5.0 introduces the new CustomAggregation<T> base type that will let you define aggregation projections with explicit user code while still taking advantage of some of the parallelization optimizations that were built for the previous aggregation types running in the async daemon;

Alright, let's jump right into an example. Two of the drivers for this feature were for aggregations to document types that were soft-deleted or aggregations where some events should only apply to the aggregate document if the document already existed. To illustrate this with a contrived example, let's say that we've got these event types:

cs
public class Start{}
public class End{}
public class Restart{}
public class Increment{}

snippet source | anchor

And a simple aggregate document type like this:

cs
public class StartAndStopAggregate : ISoftDeleted
{
    // These are Marten controlled
    public bool Deleted { get; set; }
    public DateTimeOffset? DeletedAt { get; set; }

    public int Count { get; set; }

    public Guid Id { get; set; }

    public void Increment()
    {
        Count++;
    }
}

snippet source | anchor

As you can see, StartAndStopAggregate as a Guid as its identity and is also soft-deleted when stored by Marten by virtue of implementing the ISoftDeleted interface.

With all that being done, here's a sample aggregation that inherits from the Marten Marten.Events.Aggregation.CustomAggregation<TDoc, TId> base class:

cs
public class StartAndStopProjection: CustomProjection<StartAndStopAggregate, Guid>
{
    public StartAndStopProjection()
    {
        // I'm telling Marten that events are assigned to the aggregate
        // document by the stream id
        AggregateByStream();

        // This is an optional, but potentially important optimization
        // for the async daemon so that it sets up an allow list
        // of the event types that will be run through this projection
        IncludeType<Start>();
        IncludeType<End>();
        IncludeType<Restart>();
        IncludeType<Increment>();
    }

    public override ValueTask ApplyChangesAsync(DocumentSessionBase session, EventSlice<StartAndStopAggregate, Guid> slice, CancellationToken cancellation,
        ProjectionLifecycle lifecycle = ProjectionLifecycle.Inline)
    {

        var aggregate = slice.Aggregate;

        foreach (var data in slice.AllData())
        {
            switch (data)
            {
                case Start:
                    aggregate = new StartAndStopAggregate
                    {
                        // Have to assign the identity ourselves
                        Id = slice.Id
                    };
                    break;
                case Increment when aggregate is { Deleted: false }:
                    // Use explicit code to only apply this event
                    // if the aggregate already exists
                    aggregate.Increment();
                    break;
                case End when aggregate is { Deleted: false }:
                    // This will be a "soft delete" because the aggregate type
                    // implements the IDeleted interface
                    session.Delete(aggregate);
                    aggregate.Deleted = true; // Got to help Marten out a little bit here
                    break;
                case Restart when (aggregate == null || aggregate.Deleted):
                    // Got to "undo" the soft delete status
                    session
                        .UndoDeleteWhere<StartAndStopAggregate>(x => x.Id == slice.Id);
                    break;
            }
        }

        // Apply any updates!
        if (aggregate != null)
        {
            session.Store(aggregate);
        }

        // We didn't do anything that required an asynchronous call
        return new ValueTask();
    }
}

snippet source | anchor

Custom Grouping

All aggregations in Marten come in two parts:

  1. Grouping incoming events into "slices" of events that should be applied to an aggregate by id
  2. Applying incoming events from each slice into the identified aggregate

CustomAggregate supports aggregating by the stream identity as shown above. You can also use all the same customizable grouping functionality as the older MultiStreamProjection subclass.

Released under the MIT License.