My team at birdie recently moved from a home-rolled event-sourcing library to Equinox in one of our services. Equinox describes itself as a set of low-dependency libraries allowing event-sourced processing against stream-based stores. While this description is technically accurate, a cursory look at the codebase reveals that a programming model has formed around the library over time. This article seeks to explore that programming model.
In this article, I will show you the building blocks at play in the equinox model and how they fit together. We'll go through Events, Folds, Decisions, Domain Services, how to test them, and how to wire them up. A base-level knowledge of event-sourcing is assumed.
Terminology
As usual, it is best to get some terminology out of the way before diving in.
- Stream
- A sequence of ordered events, typically for a single process or entity e.g.
User-1
- Category
- A grouping of related streams e.g.
User-1
belongs to theUser
category - Optimistic Concurrency Control
- the ability to request that a write be rejected if a specified precondition no longer holds. In a message stream, this is achieved by checking that the last event considered when deriving the state is still the last event
- DU
- Discriminated Union. A data type that allows you to represent one of a fixed set of choices.
also called a tagged union, variant, choice type, disjoint union, sum type, or coproduct
The Domain
Because this article's purpose is pedagogical, we must choose a simple domain. The goal is to explain the different parts that make up an event-sourced application under Equinox, not to learn the intricacies of a domain. The process we'll be modeling is a simplified invoicing process for which we'll use four events.
InvoiceRaised
- the invoice was raisedInvoiceEmailed
- the invoice was emailed to the payerPaymentReceived
- The payer has posted payment for the invoiceInvoiceFinalized
- The invoice has been finalised
The Events
In an event-sourced system, a Category can be considered analogous to a class. A key insight of Equinox is
that all relevant events for a category can be united as a single Event DU. By convention, we place the Event DU
into an Events
module.
module Events =
type InvoiceRaised =
{ InvoiceNumber: int
Payer: string
Amount: decimal }
type Payment = { Amount: decimal }
type EmailReceipt =
{ IdempotencyKey: string;
Recipient: string;
SentAt: DateTimeOffset }
type Event =
| InvoiceRaised of InvoiceRaised
| InvoiceEmailed of EmailReceipt
| PaymentReceived of Payment
| InvoiceFinalized
Using a DU to represent a given category's set of events is beneficial because the F# compiler can
warn us when we forget to handle an event. F# will warn any time it notices a match expression doesn't
match its inputs totally. While a sensible choice, it is not without drawbacks. With this model, the common practise
of utilising global TypeMap
containing a mapping from an event type to a class is a non-starter. A follow-on
consequence is that we must handle encoding and decoding for each category separately.
Many practitioners choose to hand-roll serialisation and deserialisation functions.
module Serialization =
open System.Text.Json
let serialize x = JsonSerializer.SerializeToUtf8Bytes(x) |> ReadOnlyMemory
let encode event =
match event with
| InvoiceRaised data -> "InvoiceRaised", serialize data
| InvoiceEmailed data -> "InvoiceEmailed", serialize data
| PaymentReceived data -> "PaymentReceived", serialize data
| InvoiceFinalized -> "InvoiceFinalized", serialize null
let decode (eventType, data) =
match eventType with
| "InvoiceRaised" -> InvoiceRaised (JsonSerializer.Deserialize(data.Span))
| "InvoiceEmailed" -> InvoiceEmailed (JsonSerializer.Deserialize(data.Span))
| "PaymentReceived" -> PaymentReceived (JsonSerializer.Deserialize(data.Span))
| "InvoiceFinalized" -> InvoiceFinalized
Hand-rolled functions have the advantage of giving complete control over the serialisation format, which can be necessary in cases where
you're migrating to F#. On the other hand, the typical method in equinox is to use
FsCodec. FsCodec can generate codecs for events as long as you constrain yourself
to use types that work with System.Text.Json
. This codec typically lives in the same module as the events.
module Events =
type Event =
| InvoiceRaised of InvoiceRaised
| InvoiceEmailed of EmailReceipt
| PaymentReceived of Payment
| InvoiceFinalized
// TypeShape's UnionEncoder, which does the codec generation, insists on this marker interface being present
// to remind us that this is a long term storage contract that needs to be versionable
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.SystemTextJson.Codec.Create<Event>()
The Fold
Current state is a left fold of previous events
What exactly does the above quote mean? The concept of folding has different names in different languages.
You may have heard a JavaScripter talk about reduce
and reducers. A C# practitioner might talk about
calling Aggregate
on an IEnumerable
. In F#, we call it fold
.
In Equinox parlance, the Fold is a module responsible for defining the state and how it evolves in response to events.
module Fold =
open Events
type InvoiceState =
{ Amount: decimal
InvoiceNumber: int
Payer: string
EmailedTo: Set<string>
Payments: Set<string>
AmountPaid: decimal }
type State =
| Initial
| Raised of InvoiceState
| Finalized of InvoiceState
let initial = Initial
let evolve state event =
match state with
| Initial ->
match event with
| InvoiceRaised data ->
Raised
{ Amount = data.Amount
InvoiceNumber = data.InvoiceNumber
Payer = data.Payer
EmailedTo = Set.empty
Payments = Set.empty
AmountPaid = 0m }
// We're guaranteed to not have two InvoiceRaised events and that it is the first event in the stream
| e -> failwithf "Unexpected %A" e
| Raised state ->
match event with
| InvoiceRaised _ as e -> failwithf "Unexpected %A" e
| InvoiceEmailed r -> Raised { state with EmailedTo = state.EmailedTo |> Set.add r.Recipient }
| PaymentReceived p ->
Raised
{ state with
AmountPaid = state.AmountPaid + p.Amount
Payments = state.Payments |> Set.add p.PaymentId }
| InvoiceFinalized -> Finalized state
// A Finalized invoice is terminal. No further events should be appended
| Finalized _ -> failwithf "Unexpected %A" event
let fold: State -> Event seq -> State = Seq.fold evolve
The Decisions
The Command pattern can be considered the standard way to implement decisions. Create a Command
DU, and a decide : Command -> State -> Event list
function. This pattern has benefits in that it forces you to enumerate all the different actions you'd like to take on the state in a single place.
In practice, I've found the pattern more of a hindrance than a help and usually end up refactoring it away in favour of a Decisions
module.
module Decisions =
let raiseInvoice data state =
match state with
| Fold.Initial -> [ Events.InvoiceRaised data ]
// This is known as an idempotency check. We could be receiving the same
// command due to a retry, in which case it is not considered a failure
// since the Fold will already be in the state that this command should put it in
| Fold.Raised state when state.Amount = data.Amount && state.Payer = data.Payer -> []
| Fold.Raised _ -> failwith "Invoice is already raised"
| Fold.Finalized _ -> failwith "Invoice is finalized"
let private hasSentEmailToRecipient recipient (state: Fold.InvoiceState) =
state.EmailedTo |> Set.contains recipient
let recordEmailReceipt (data: Events.EmailReceipt) state =
match state with
| Fold.Raised state when not (hasSentEmailToRecipient data.Recipient state) ->
[ Events.InvoiceEmailed data ]
| Fold.Raised _ -> []
| Fold.Initial -> failwith "Invoice not found"
| Fold.Finalized _ -> failwith "Invoice is finalized"
let recordPayment (data: Events.Payment) state =
match state with
| Fold.Raised state when state.Payments |> Set.contains data.PaymentId -> []
| Fold.Raised _ -> [ Events.PaymentReceived data ]
| Fold.Finalized _ -> failwith "Invoice is finalized"
| Fold.Initial -> failwith "Invoice not found"
let finalize state =
match state with
| Fold.Finalized _ -> []
| Fold.Raised _ -> [ Events.InvoiceFinalized ]
| Fold.Initial -> failwith "Invoice not found"
Testing
Everything we've written so far is pure and easily testable. For an exceptional testing
experience, I recommend using FsCheck
, FsCheck.Xunit
, and Unquote
.
An important principle when testing event-sourced applications is to not assert against the state of a decider. Doing so leads to brittle tests and hampers your ability to evolve the state as time goes on. It is generally advisable to write tests in the form of
Given these events have occured
When I interpret this command
Then I expect these new events
I commonly write an operator (=>)
to encapsulate this such that the tests read as
previous events => command = new events
module Tests
open Invoice
open Swensen.Unquote
open FsCheck.Xunit
open FsCodec.Core
[<Property>]
let ``The event codec round-trips cleanly`` event =
let encoded = Events.codec.Encode((), event)
let saved = TimelineEvent.Create(0L, encoded.EventType, encoded.Data)
let decoded = Events.codec.TryDecode(saved)
test <@ ValueSome event = decoded @>
let (=>) events interpret =
Fold.fold Fold.initial events |> interpret
open Events
[<Property>]
let ``Raising an invoice`` data =
test <@ [] => Decisions.raiseInvoice data = [ InvoiceRaised data ] @>
// test the idempotency
test <@ [ InvoiceRaised data ] => Decisions.raiseInvoice data = [] @>
// A finalized invoice will throw
raises <@ [ InvoiceRaised data; InvoiceFinalized ] => Decisions.raiseInvoice data @>
[<Property>]
let ``Recording payments`` raised data =
raises <@ [] => Decisions.recordPayment data @>
test <@ [ InvoiceRaised raised ] => Decisions.recordPayment data = [ PaymentReceived data ] @>
test <@ [ InvoiceRaised raised; PaymentReceived data ] => Decisions.recordPayment data = [] @>
raises <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.recordPayment data @>
[<Property>]
let ``Recording email receipts`` raised data =
raises <@ [] => Decisions.recordEmailReceipt data @>
test <@ [ InvoiceRaised raised ] => Decisions.recordEmailReceipt data = [ InvoiceEmailed data ] @>
test <@ [ InvoiceRaised raised; InvoiceEmailed data ] => Decisions.recordEmailReceipt data = [] @>
raises <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.recordEmailReceipt data @>
[<Property>]
let ``Finalizing`` raised =
raises <@ [] => Decisions.finalize @>
test <@ [ InvoiceRaised raised ] => Decisions.finalize = [ InvoiceFinalized ] @>
test <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.finalize = [] @>
Identity
It is important to note that we have not mentioned identity until this point. There is
no InvoiceId
on any of the events! The reason for this is that the aggregate should not be concerned
with its own identity. Identity in the Equinox programming model should be considered an
infrastructural routing concern. In an event-sourced system, we store the events in streams,
and these streams have names of the format {Category}-{streamId}
.
The identity of the aggregate lives exclusively within the stream name.
Events without identity may sound strange, but it does make sense. A functional programming idiom is to "make illegal states unrepresentable." Imagine for a second a stream with these events:
StreamName: Invoice-1234
1: InvoiceRaised { InvoiceId = 5678 }
The above stream is an absurdity! To make the illegal state unrepresentable, we must apply the DRY principle.
Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.
That single, unambiguous, authoritative representation is the stream name. Treating identity this way has numerous benefits. Imagine if you later wanted to run a multi-tenanted version of your system. You could spin up an event store per tenant or include the tenant's ID in the stream name so they can all share one database. Either way, we would not force the tenant's ID on any event, and the domain logic and schema remain unchanged.
Though identity exists exclusively in the stream name, we would like to avoid passing around unbranded string
s and Guid
s.
To that end, it is common practice to use FSHarp.UMX
to create a type-safe, branded identifier type.
open FSharp.UMX
open System
type InvoiceId = Guid<invoiceId>
and [<Measure>] invoiceId
module InvoiceId =
let inline ofGuid (g: Guid) : InvoiceId = %g
let inline parse (s: string) = Guid.Parse s |> ofGuid
let inline toGuid (id: InvoiceId) : Guid = %id
// We choose the dashless N format to make the distinct parts of the stream's ID
// easier for humans to read
let inline toString (id: InvoiceId) = (toGuid id).ToString("N")
Equinox exposes a helper to create well-formed stream IDs that, when used with branded identifiers, can ensure we don't resolve the wrong stream at runtime.
[<Literal>]
let Category = "Invoice"
let streamId = Equinox.StreamId.gen InvoiceId.toString // InvoiceId -> StreamId
// The example above of adding a tenant to the stream id would look like so:
// let streamId = Equinox.StreamId.gen2 TenantId.toString InvoiceId.toString
The Service
To recap.
- We've created an
Events
module defining the events in play and their storage format. - We've created a
Fold
representing the state and how it evolves in response to events. - We've created a
Decisions
module exposing the actions we can take on a given invoice. - We've written tests that combine all three into a cohesive whole.
- We've created a branded
InvoiceId
type. - We've created a
streamId
helper and defined theCategory
.
One way to expose the behaviour we've programmed to the rest of the application would be
for consumers to wire it up, similar to how we did in the tests. That
would ultimately be a bad idea. Alternatively, we can represent the operations relevant
to this aggregate as a cohesive Domain Service. The convention in equinox is to call this type Service
.
type Service internal (resolve: InvoiceId -> Equinox.Decider<Events.Event, Fold.State>) =
member _.Raise(id, data) =
let decider = resolve id
decider.Transact(Decisions.raiseInvoice data)
member _.RecordEmailReceipt(id, data) =
let decider = resolve id
decider.Transact(Decisions.recordEmailReceipt data)
member _.RecordPayment(id, data) =
let decider = resolve id
decider.Transact(Decisions.recordPayment data)
member _.Finalize(id) =
let decider = resolve id
decider.Transact(Decisions.finalize)
There are a couple of things to call out here.
First, we've made the service's constructor internal
because we'll expose it via a factory later.
Second, this is our first encounter with Equinox's Decider concept. This concept encapsulates the retrieving of events,
their fold, and writing to storage. The main API is Transact
, which performs the actions necessary to get the current
state of the aggregate, calls the supplied function, and appends the resulting events to the underlying storage.
It does this with multiple layers of retries and can, in addition, do caching and snapshotting transparently.
A simplified version of what Transact does would look something like this:
let! version, events = store.ReadStream(streamName)
let newEvents = events |> Seq.choose tryDecode |> fold initial |> decide
do! store.AppendEvents(streamName, newEvents |> Seq.map encode, version)
There's a second API called Query
. This method will load the current state,
call the supplied transformation function and return the result. It is considered a bad idea to expose
anything inside the Fold
module to the outside. Therefore, for querying purposes, we'll add an InvoiceModel
type.
type InvoiceModel =
{ InvoiceNumber: int
Amount: decimal
Payer: string
EmailedTo: string array
Finalized: bool }
module InvoiceModel =
let fromState finalized (state: Fold.InvoiceState) =
{ InvoiceNumber = state.InvoiceNumber
Amount = state.Amount
Payer = state.Payer
EmailedTo = state.EmailedTo |> Set.toArray
Finalized = finalized }
module Queries =
let summary =
function
| Fold.Initial -> None
| Fold.Raised invoice -> Some(InvoiceModel.fromState false invoice)
| Fold.Finalized invoice -> Some(InvoiceModel.fromState true invoice)
With this in place, we can update our service to expose the current state of the aggregate.
type Service internal (resolve: InvoiceId -> Equinox.Decider<Events.Event, Fold.State>) =
member _.Raise(id, data) =
let decider = resolve id
decider.Transact(Decisions.raiseInvoice data)
member _.RecordEmailReceipt(id, data) =
let decider = resolve id
decider.Transact(Decisions.recordEmailReceipt data)
member _.RecordPayment(id, data) =
let decider = resolve id
decider.Transact(Decisions.recordPayment data)
member _.Finalize(id) =
let decider = resolve id
decider.Transact(Decisions.finalize)
member _.ReadInvoice(id) =
let decider = resolve id
decider.Query(Queries.summary)
Because of the aforementioned internal constructor, we'll need to create a factory to expose our service to the outside.
let create resolve = Service(streamId >> resolve Category)
This code looks confusing at first glance, so I think it's worth spelling out what's going on.
- The
create
function receives aresolve: string -> StreamId -> Decider<_, _>
where the first argument is the Category - The Service expects a
resolve
function of typeInvoiceId -> Decider<_, _>
To get from the first to the second type, we must create a third function
let resolveInvoiceDecider resolve invoiceId =
let id = streamId invoiceId
resolve Category id
// this is the same as
let resolveInvoiceDecider resolve invoiceId =
streamId invoiceId |> resolve Category
// which in turn is the same as
let resolveInvoiceDecider resolve =
streamId >> resolve Category
// we could use it as such
let create resolve = Service(resolveInvoiceDecider resolve)
// but the preference is to inline it
let create resolve = Service(streamId >> resolve Category)
The API
To use the Service
, we'll finally need to wire it up to one of Equinox's many available stores.
For this illustration, We'll use Equinox.MessageDb
as the backing store.
module Program
open System
open Equinox.MessageDb
open Serilog
module Environment =
let tryGetEnv = Environment.GetEnvironmentVariable >> Option.ofObj
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let cache = Equinox.Cache("test", sizeMb = 50)
let defaultConnString =
"Host=localhost; Database=message_store; Username=message_store"
let writeUrl =
Environment.tryGetEnv "MESSAGE_DB_URL" |> Option.defaultValue defaultConnString
let readUrl =
Environment.tryGetEnv "MESSAGE_DB_REPLICA_URL" |> Option.defaultValue writeUrl
let connection = MessageDbConnector(writeUrl, readUrl).Establish()
let context = MessageDbContext(connection)
let caching = CachingStrategy.SlidingWindow(cache, TimeSpan.FromMinutes(20))
let service =
MessageDbCategory(context, Invoice.Events.codec, Invoice.Fold.fold, Invoice.Fold.initial, caching)
|> Equinox.Decider.resolve log
|> Invoice.create
With the service in place, we can finally expose it over the wire. To achieve this, we'll use a Minimal API.
open Microsoft.AspNetCore.Builder
open Microsoft.Extensions.Hosting
let builder = WebApplication.CreateBuilder()
let app = builder.Build()
let raiseInvoice body =
task {
let id = Guid.NewGuid() |> Invoice.InvoiceId.ofGuid
do! service.Raise(id, body)
return id
}
app.MapPost("/", Func<_, _>(raiseInvoice)) |> ignore
let finalizeInvoice id =
task {
do! service.Finalize(id)
return "OK"
}
app.MapPost("/{id}/finalize", Func<_, _>(finalizeInvoice)) |> ignore
let recordPayment id payment =
task {
do! service.RecordPayment(id, payment)
return "OK"
}
app.MapPost("/{id}/record-payment", Func<_, _, _>(recordPayment)) |> ignore
let readInvoice id =
task { return! service.ReadInvoice(id) }
app.MapGet("/{id}", Func<_, _>(readInvoice)) |> ignore
app.Run()
You should now be able to run the API
$ curl -XPOST localhost:5244/ -H 'Content-Type: application/json' --data '{"InvoiceNumber": 1, "Payer": "1", "amount": 1230}'
"932c5ca2-3870-468d-85c9-5e5f406d5c7d"⏎
$ curl localhost:5244/932c5ca2-3870-468d-85c9-5e5f406d5c7d
{"invoiceNumber":1,"amount":1230,"payer":"1","emailedTo":[],"finalized":false}
$ curl -XPOST localhost:5244/932c5ca2-3870-468d-85c9-5e5f406d5c7d/finalize
OK
$ curl localhost:5244/f3bec945-2cbd-4a57-84fa-16481d394490
{"amount":1230,"invoiceNumber":1,"emailedTo":[],"amountPaid":0,"finalized":true}
Conclusion
We've reached the end of our journey. The programming model described in this article is one with a heavy emphasis on composition. We compose larger behaviours from smaller ones. I hope that through this post, I've demonstrated its merits.
Part two of this series will focus on Reactions. We'll set up a reactor that, when an invoice is raised, will reserve a unique invoice number for it and another reactor that, when an invoice is raised and numbered, will send it to the payer via email and record the receipt.
If you'd like to see the code in a runnable form, I've pushed it to a GitHub repository.