Projecting with Blobs
- not only aggregates
- views combining data
- processing & reacting to events
Projecting with Blobs (2)
Aggregate all payments by customer
public class PaymentsByCustomer :
ProjectionBase<PaymentsByCustomer.State>
{
public class State
{
public decimal TotalPayments;
}
protected override void Map(IEventMapping<State> m)
{
m.For((PaymentIssued e) => e.Event.UserId,
(e, s) => s.TotalPayments += e.Event.Amount);
}
}
Projecting with Blobs (3)
Top paying customer per day
public class TopCustomerPerDay :
ProjectionBase<TopCustomerPerDay.State>
{
public class State
{
public decimal MaximumAmountSoFar;
public UserId UserId;
}
protected override void Map(IEventMapping<State> m)
{
m.For(e => e.Metadata.DateTime.Date, Apply);
}
void Apply (PaymentIssued e, State s)
{
if (e.Event.Amount > s.MaximumAmountSoFar)
{
s.MaximumAmoundSoFar = e.Event.Amount;
s.UserId = e.Event.UserId;
}
}
}
Projecting with Blobs (4)
- making this projections/processes easy requires a log
- the log contains ordered pointers to all events in an account
- an index is required
- if we had an index entry that fits in 512 bytes (page)...
- we could also use smaller (64 bytes) and store multiple per page
Projecting with Blobs (5)
0-63 |
64-127 |
128-191 |
192-255 |
256-319 |
320-383 |
384-447 |
448-511 |
entry0 |
entry1 |
entry2 |
entry3 |
entry4 |
entry5 |
entry6 |
entry7 |
Projecting with Blobs (6)
[StructLayout(LayoutKind.Explicit, Size = 64)]
struct IndexEntry
{
public const int EntriesPerPage = 8
[FieldOffset(0) ] public Sha1 Stream
[FieldOffset(20)] public int Version
[FieldOffset(24)] public Guid EventTypeId
[FieldOffset(40)] public Guid MetadataId
}
Projecting with Blobs (7)
- a single writer per account, scanning changes (*advanced)
- using a lease to lock
- index file = page blob
- scale up - multiple blobs with ranges: 1-10000, 10001- 20001
- readers Get Blob content