The Equinox Programming Model

15 minute read Published: 2022-12-05

My team at birdie recently moved from a home-rolled event-sourcing library to Equinox in one of our services. Equinox describes itself as a set of low-dependency libraries allowing event-sourced processing against stream-based stores. While this description is technically accurate, a cursory look at the codebase reveals that a programming model has formed around the library over time. This article seeks to explore that programming model.

In this article, I will show you the building blocks at play in the equinox model and how they fit together. We'll go through Events, Folds, Decisions, Domain Services, how to test them, and how to wire them up. A base-level knowledge of event-sourcing is assumed.

Terminology

As usual, it is best to get some terminology out of the way before diving in.

Stream
A sequence of ordered events, typically for a single process or entity e.g. User-1
Category
A grouping of related streams e.g. User-1 belongs to the User category
Optimistic Concurrency Control
the ability to request that a write be rejected if a specified precondition no longer holds. In a message stream, this is achieved by checking that the last event considered when deriving the state is still the last event
DU
Discriminated Union. A data type that allows you to represent one of a fixed set of choices.
also called a tagged union, variant, choice type, disjoint union, sum type, or coproduct

The Domain

Because this article's purpose is pedagogical, we must choose a simple domain. The goal is to explain the different parts that make up an event-sourced application under Equinox, not to learn the intricacies of a domain. The process we'll be modeling is a simplified invoicing process for which we'll use four events.

  1. InvoiceRaised - the invoice was raised
  2. InvoiceEmailed - the invoice was emailed to the payer
  3. PaymentReceived - The payer has posted payment for the invoice
  4. InvoiceFinalized - The invoice has been finalised

The Events

In an event-sourced system, a Category can be considered analogous to a class. A key insight of Equinox is that all relevant events for a category can be united as a single Event DU. By convention, we place the Event DU into an Events module.

module Events =
  type InvoiceRaised =
    { InvoiceNumber: int
      Payer: string
      Amount: decimal }
  type Payment = { Amount: decimal }
  type EmailReceipt =
    { IdempotencyKey: string;
      Recipient: string;
      SentAt: DateTimeOffset }

  type Event =
    | InvoiceRaised of InvoiceRaised
    | InvoiceEmailed of EmailReceipt
    | PaymentReceived of Payment
    | InvoiceFinalized

Using a DU to represent a given category's set of events is beneficial because the F# compiler can warn us when we forget to handle an event. F# will warn any time it notices a match expression doesn't match its inputs totally. While a sensible choice, it is not without drawbacks. With this model, the common practise of utilising global TypeMap containing a mapping from an event type to a class is a non-starter. A follow-on consequence is that we must handle encoding and decoding for each category separately.

Many practitioners choose to hand-roll serialisation and deserialisation functions.

module Serialization =
  open System.Text.Json
  let serialize x = JsonSerializer.SerializeToUtf8Bytes(x) |> ReadOnlyMemory
  let encode event =
    match event with
    | InvoiceRaised   data -> "InvoiceRaised",    serialize data
    | InvoiceEmailed  data -> "InvoiceEmailed",   serialize data
    | PaymentReceived data -> "PaymentReceived",  serialize data
    | InvoiceFinalized     -> "InvoiceFinalized", serialize null

  let decode (eventType, data) =
    match eventType with
    | "InvoiceRaised"    -> InvoiceRaised    (JsonSerializer.Deserialize(data.Span))
    | "InvoiceEmailed"   -> InvoiceEmailed   (JsonSerializer.Deserialize(data.Span))
    | "PaymentReceived"  -> PaymentReceived  (JsonSerializer.Deserialize(data.Span))
    | "InvoiceFinalized" -> InvoiceFinalized

Hand-rolled functions have the advantage of giving complete control over the serialisation format, which can be necessary in cases where you're migrating to F#. On the other hand, the typical method in equinox is to use FsCodec. FsCodec can generate codecs for events as long as you constrain yourself to use types that work with System.Text.Json. This codec typically lives in the same module as the events.

module Events =
  type Event =
    | InvoiceRaised of InvoiceRaised
    | InvoiceEmailed of EmailReceipt
    | PaymentReceived of Payment
    | InvoiceFinalized
    // TypeShape's UnionEncoder, which does the codec generation, insists on this marker interface being present
    // to remind us that this is a long term storage contract that needs to be versionable
    interface TypeShape.UnionContract.IUnionContract

  let codec = FsCodec.SystemTextJson.Codec.Create<Event>()

The Fold

Current state is a left fold of previous events

What exactly does the above quote mean? The concept of folding has different names in different languages. You may have heard a JavaScripter talk about reduce and reducers. A C# practitioner might talk about calling Aggregate on an IEnumerable. In F#, we call it fold.

In Equinox parlance, the Fold is a module responsible for defining the state and how it evolves in response to events.

module Fold =
  open Events

  type InvoiceState =
    { Amount: decimal
      InvoiceNumber: int
      Payer: string
      EmailedTo: Set<string>
      Payments: Set<string>
      AmountPaid: decimal }

  type State =
    | Initial
    | Raised of InvoiceState
    | Finalized of InvoiceState

  let initial = Initial

  let evolve state event =
    match state with
    | Initial ->
      match event with
      | InvoiceRaised data ->
        Raised
          { Amount = data.Amount
            InvoiceNumber = data.InvoiceNumber
            Payer = data.Payer
            EmailedTo = Set.empty
            Payments = Set.empty
            AmountPaid = 0m }
      // We're guaranteed to not have two InvoiceRaised events and that it is the first event in the stream
      | e -> failwithf "Unexpected %A" e
    | Raised state ->
      match event with
      | InvoiceRaised _ as e -> failwithf "Unexpected %A" e
      | InvoiceEmailed r -> Raised { state with EmailedTo = state.EmailedTo |> Set.add r.Recipient }
      | PaymentReceived p ->
        Raised
          { state with
              AmountPaid = state.AmountPaid + p.Amount
              Payments = state.Payments |> Set.add p.PaymentId }
      | InvoiceFinalized -> Finalized state
    // A Finalized invoice is terminal. No further events should be appended
    | Finalized _ -> failwithf "Unexpected %A" event

  let fold: State -> Event seq -> State = Seq.fold evolve

The Decisions

The Command pattern can be considered the standard way to implement decisions. Create a Command DU, and a decide : Command -> State -> Event list function. This pattern has benefits in that it forces you to enumerate all the different actions you'd like to take on the state in a single place. In practice, I've found the pattern more of a hindrance than a help and usually end up refactoring it away in favour of a Decisions module.

module Decisions =
  let raiseInvoice data state =
    match state with
    | Fold.Initial -> [ Events.InvoiceRaised data ]
    // This is known as an idempotency check. We could be receiving the same
    // command due to a retry, in which case it is not considered a failure
    // since the Fold will already be in the state that this command should put it in
    | Fold.Raised state when state.Amount = data.Amount && state.Payer = data.Payer -> []
    | Fold.Raised _ -> failwith "Invoice is already raised"
    | Fold.Finalized _ -> failwith "Invoice is finalized"

  let private hasSentEmailToRecipient recipient (state: Fold.InvoiceState) =
    state.EmailedTo |> Set.contains recipient

  let recordEmailReceipt (data: Events.EmailReceipt) state =
    match state with
    | Fold.Raised state when not (hasSentEmailToRecipient data.Recipient state) ->
      [ Events.InvoiceEmailed data ]
    | Fold.Raised _ -> []
    | Fold.Initial -> failwith "Invoice not found"
    | Fold.Finalized _ -> failwith "Invoice is finalized"

  let recordPayment (data: Events.Payment) state =
    match state with
    | Fold.Raised state when state.Payments |> Set.contains data.PaymentId -> []
    | Fold.Raised _ -> [ Events.PaymentReceived data ]
    | Fold.Finalized _ -> failwith "Invoice is finalized"
    | Fold.Initial -> failwith "Invoice not found"

  let finalize state =
    match state with
    | Fold.Finalized _ -> []
    | Fold.Raised _ -> [ Events.InvoiceFinalized ]
    | Fold.Initial -> failwith "Invoice not found"

Testing

Everything we've written so far is pure and easily testable. For an exceptional testing experience, I recommend using FsCheck, FsCheck.Xunit, and Unquote.

An important principle when testing event-sourced applications is to not assert against the state of a decider. Doing so leads to brittle tests and hampers your ability to evolve the state as time goes on. It is generally advisable to write tests in the form of

Given these events have occured
When I interpret this command
Then I expect these new events

I commonly write an operator (=>) to encapsulate this such that the tests read as

previous events => command = new events
module Tests

open Invoice
open Swensen.Unquote
open FsCheck.Xunit
open FsCodec.Core

[<Property>]
let ``The event codec round-trips cleanly`` event =
  let encoded = Events.codec.Encode((), event)
  let saved = TimelineEvent.Create(0L, encoded.EventType, encoded.Data)
  let decoded = Events.codec.TryDecode(saved)
  test <@ ValueSome event = decoded @>

let (=>) events interpret =
  Fold.fold Fold.initial events |> interpret

open Events

[<Property>]
let ``Raising an invoice`` data =
  test <@ [] => Decisions.raiseInvoice data = [ InvoiceRaised data ] @>
  // test the idempotency
  test <@ [ InvoiceRaised data ] => Decisions.raiseInvoice data = [] @>
  // A finalized invoice will throw
  raises <@ [ InvoiceRaised data; InvoiceFinalized ] => Decisions.raiseInvoice data @>

[<Property>]
let ``Recording payments`` raised data =
  raises <@ [] => Decisions.recordPayment data @>
  test <@ [ InvoiceRaised raised ] => Decisions.recordPayment data = [ PaymentReceived data ] @>
  test <@ [ InvoiceRaised raised; PaymentReceived data ] => Decisions.recordPayment data = [] @>
  raises <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.recordPayment data @>

[<Property>]
let ``Recording email receipts`` raised data =
  raises <@ [] => Decisions.recordEmailReceipt data @>
  test <@ [ InvoiceRaised raised ] => Decisions.recordEmailReceipt data = [ InvoiceEmailed data ] @>
  test <@ [ InvoiceRaised raised; InvoiceEmailed data ] => Decisions.recordEmailReceipt data = [] @>
  raises <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.recordEmailReceipt data @>

[<Property>]
let ``Finalizing`` raised =
  raises <@ [] => Decisions.finalize @>
  test <@ [ InvoiceRaised raised ] => Decisions.finalize = [ InvoiceFinalized ] @>
  test <@ [ InvoiceRaised raised; InvoiceFinalized ] => Decisions.finalize = [] @>

Identity

It is important to note that we have not mentioned identity until this point. There is no InvoiceId on any of the events! The reason for this is that the aggregate should not be concerned with its own identity. Identity in the Equinox programming model should be considered an infrastructural routing concern. In an event-sourced system, we store the events in streams, and these streams have names of the format {Category}-{streamId}. The identity of the aggregate lives exclusively within the stream name.

Events without identity may sound strange, but it does make sense. A functional programming idiom is to "make illegal states unrepresentable." Imagine for a second a stream with these events:

StreamName: Invoice-1234
1: InvoiceRaised { InvoiceId = 5678 }

The above stream is an absurdity! To make the illegal state unrepresentable, we must apply the DRY principle.

Every piece of knowledge must have a single, unambiguous, authoritative representation within a system.

That single, unambiguous, authoritative representation is the stream name. Treating identity this way has numerous benefits. Imagine if you later wanted to run a multi-tenanted version of your system. You could spin up an event store per tenant or include the tenant's ID in the stream name so they can all share one database. Either way, we would not force the tenant's ID on any event, and the domain logic and schema remain unchanged.

Though identity exists exclusively in the stream name, we would like to avoid passing around unbranded strings and Guids. To that end, it is common practice to use FSHarp.UMX to create a type-safe, branded identifier type.

open FSharp.UMX
open System

type InvoiceId = Guid<invoiceId>
and [<Measure>] invoiceId

module InvoiceId =
  let inline ofGuid (g: Guid) : InvoiceId = %g
  let inline parse (s: string) = Guid.Parse s |> ofGuid
  let inline toGuid (id: InvoiceId) : Guid = %id
  // We choose the dashless N format to make the distinct parts of the stream's ID
  // easier for humans to read
  let inline toString (id: InvoiceId) = (toGuid id).ToString("N")

Equinox exposes a helper to create well-formed stream IDs that, when used with branded identifiers, can ensure we don't resolve the wrong stream at runtime.

[<Literal>]
let Category = "Invoice"
let streamId = Equinox.StreamId.gen InvoiceId.toString // InvoiceId -> StreamId
// The example above of adding a tenant to the stream id would look like so:
// let streamId = Equinox.StreamId.gen2 TenantId.toString InvoiceId.toString

The Service

To recap.

One way to expose the behaviour we've programmed to the rest of the application would be for consumers to wire it up, similar to how we did in the tests. That would ultimately be a bad idea. Alternatively, we can represent the operations relevant to this aggregate as a cohesive Domain Service. The convention in equinox is to call this type Service.

type Service internal (resolve: InvoiceId -> Equinox.Decider<Events.Event, Fold.State>) =
  member _.Raise(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.raiseInvoice data)

  member _.RecordEmailReceipt(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.recordEmailReceipt data)

  member _.RecordPayment(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.recordPayment data)

  member _.Finalize(id) =
    let decider = resolve id
    decider.Transact(Decisions.finalize)

There are a couple of things to call out here.

First, we've made the service's constructor internal because we'll expose it via a factory later. Second, this is our first encounter with Equinox's Decider concept. This concept encapsulates the retrieving of events, their fold, and writing to storage. The main API is Transact, which performs the actions necessary to get the current state of the aggregate, calls the supplied function, and appends the resulting events to the underlying storage. It does this with multiple layers of retries and can, in addition, do caching and snapshotting transparently.

A simplified version of what Transact does would look something like this:

let! version, events = store.ReadStream(streamName)
let newEvents = events |> Seq.choose tryDecode |> fold initial |> decide
do! store.AppendEvents(streamName, newEvents |> Seq.map encode, version)

There's a second API called Query. This method will load the current state, call the supplied transformation function and return the result. It is considered a bad idea to expose anything inside the Fold module to the outside. Therefore, for querying purposes, we'll add an InvoiceModel type.

type InvoiceModel =
  { InvoiceNumber: int
    Amount: decimal
    Payer: string
    EmailedTo: string array
    Finalized: bool }

module InvoiceModel =
  let fromState finalized (state: Fold.InvoiceState) =
    { InvoiceNumber = state.InvoiceNumber
      Amount = state.Amount
      Payer = state.Payer
      EmailedTo = state.EmailedTo |> Set.toArray
      Finalized = finalized }

module Queries =
  let summary =
    function
    | Fold.Initial -> None
    | Fold.Raised invoice -> Some(InvoiceModel.fromState false invoice)
    | Fold.Finalized invoice -> Some(InvoiceModel.fromState true invoice)

With this in place, we can update our service to expose the current state of the aggregate.

type Service internal (resolve: InvoiceId -> Equinox.Decider<Events.Event, Fold.State>) =
  member _.Raise(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.raiseInvoice data)

  member _.RecordEmailReceipt(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.recordEmailReceipt data)

  member _.RecordPayment(id, data) =
    let decider = resolve id
    decider.Transact(Decisions.recordPayment data)

  member _.Finalize(id) =
    let decider = resolve id
    decider.Transact(Decisions.finalize)

  member _.ReadInvoice(id) =
    let decider = resolve id

    decider.Query(Queries.summary)

Because of the aforementioned internal constructor, we'll need to create a factory to expose our service to the outside.

let create resolve = Service(streamId >> resolve Category)

This code looks confusing at first glance, so I think it's worth spelling out what's going on.

  1. The create function receives a resolve: string -> StreamId -> Decider<_, _> where the first argument is the Category
  2. The Service expects a resolve function of type InvoiceId -> Decider<_, _>

To get from the first to the second type, we must create a third function

let resolveInvoiceDecider resolve invoiceId =
  let id = streamId invoiceId
  resolve Category id

// this is the same as

let resolveInvoiceDecider resolve invoiceId =
  streamId invoiceId |> resolve Category

// which in turn is the same as

let resolveInvoiceDecider resolve =
  streamId >> resolve Category

// we could use it as such
let create resolve = Service(resolveInvoiceDecider resolve)

// but the preference is to inline it
let create resolve = Service(streamId >> resolve Category)

The API

To use the Service, we'll finally need to wire it up to one of Equinox's many available stores. For this illustration, We'll use Equinox.MessageDb as the backing store.

module Program

open System
open Equinox.MessageDb
open Serilog

module Environment =
  let tryGetEnv = Environment.GetEnvironmentVariable >> Option.ofObj

let log = LoggerConfiguration().WriteTo.Console().CreateLogger()

let cache = Equinox.Cache("test", sizeMb = 50)

let defaultConnString =
  "Host=localhost; Database=message_store; Username=message_store"

let writeUrl =
  Environment.tryGetEnv "MESSAGE_DB_URL" |> Option.defaultValue defaultConnString

let readUrl =
  Environment.tryGetEnv "MESSAGE_DB_REPLICA_URL" |> Option.defaultValue writeUrl

let connection = MessageDbConnector(writeUrl, readUrl).Establish()
let context = MessageDbContext(connection)
let caching = CachingStrategy.SlidingWindow(cache, TimeSpan.FromMinutes(20))

let service =
  MessageDbCategory(context, Invoice.Events.codec, Invoice.Fold.fold, Invoice.Fold.initial, caching)
  |> Equinox.Decider.resolve log
  |> Invoice.create

With the service in place, we can finally expose it over the wire. To achieve this, we'll use a Minimal API.

open Microsoft.AspNetCore.Builder
open Microsoft.Extensions.Hosting

let builder = WebApplication.CreateBuilder()
let app = builder.Build()

let raiseInvoice body =
  task {
    let id = Guid.NewGuid() |> Invoice.InvoiceId.ofGuid
    do! service.Raise(id, body)
    return id
  }

app.MapPost("/", Func<_, _>(raiseInvoice)) |> ignore

let finalizeInvoice id =
  task {
    do! service.Finalize(id)
    return "OK"
  }

app.MapPost("/{id}/finalize", Func<_, _>(finalizeInvoice)) |> ignore

let recordPayment id payment =
  task {
    do! service.RecordPayment(id, payment)
    return "OK"
  }

app.MapPost("/{id}/record-payment", Func<_, _, _>(recordPayment)) |> ignore

let readInvoice id =
  task { return! service.ReadInvoice(id) }

app.MapGet("/{id}", Func<_, _>(readInvoice)) |> ignore

app.Run()

You should now be able to run the API

$ curl -XPOST localhost:5244/ -H 'Content-Type: application/json' --data '{"InvoiceNumber": 1, "Payer": "1", "amount": 1230}'
"932c5ca2-3870-468d-85c9-5e5f406d5c7d"
$ curl localhost:5244/932c5ca2-3870-468d-85c9-5e5f406d5c7d
{"invoiceNumber":1,"amount":1230,"payer":"1","emailedTo":[],"finalized":false}

$ curl -XPOST localhost:5244/932c5ca2-3870-468d-85c9-5e5f406d5c7d/finalize
OK

$ curl localhost:5244/f3bec945-2cbd-4a57-84fa-16481d394490
{"amount":1230,"invoiceNumber":1,"emailedTo":[],"amountPaid":0,"finalized":true}

Conclusion

We've reached the end of our journey. The programming model described in this article is one with a heavy emphasis on composition. We compose larger behaviours from smaller ones. I hope that through this post, I've demonstrated its merits.

Part two of this series will focus on Reactions. We'll set up a reactor that, when an invoice is raised, will reserve a unique invoice number for it and another reactor that, when an invoice is raised and numbered, will send it to the payer via email and record the receipt.

If you'd like to see the code in a runnable form, I've pushed it to a GitHub repository.