A while ago I wrote a blog post about using EventStore with NestJS. I've regretted that post for a while because the pattern I advocated for was overly complex and introduced abstractions for no good reason. I ended up de-listing that post but I'm sure you'll be able to find it if you look. This is meant as an updated version.
Event sourcing
Event sourcing is a very simple notion:
Every change to application state is an event. Instead of storing current state, state is stored a sequence of events
As software developers, we've been trained to think in terms of current state since the 1960's. It makes sense because memory was very expensive back then. Adding a megabyte of storage was a $1,000 decision, so we got good at optimizing storage. Storage simply isn't a major issue anymore and keeping a ledger of events represents a more natural way to think about systems.
NestJS is a backend javascript framework. It builds on a solid foundation of patterns that enable you to write well factored code that's easy to refactor. It provides modules for working with common libraries like TypeORM, Sequelize, Passport, and more. In this post, we're going to look at the command side of the CQRS equation using EventStore to persist our events.
Getting Started
Let's start by creating a new Nest project with some wiring
$ npx @nestjs/cli new cqrs-app
$ cd cqrs-app
$ pnpm add @eventstore/db-client @nestjs/cli bcrypt
$ pnpm exec nest g module user
$ pnpm exec nest g module eventstore
Now that we have the essential boilerplate up, let's start by wiring up the event store
// eventstore/eventstore.module.ts
import { EventStoreDBClient } from '@eventstore/db-client'
import { Module, Global } from '@nestjs/common'
const EventStore = {
provide: EventStoreDBClient,
useFactory: () =>
EventStoreDBClient.connectionString(
process.env.ESDB_CONN_STRING ||
'esdb://admin:changeit@localhost:2113?tls=false',
),
}
@Global()
@Module({ providers: [EventStore], exports: [EventStore] })
export class EventStoreModule {}
We use a factory to instantiate our eventstore client when necessary and expose it in a Global module to ensure there's only 1 instance and that it is accessible from everywhere. Then we wire it up to the controller
// app.module.ts
@Module({
imports: [UserModule, EventStoreModule],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}
The User Module
Next up we can start writing our user module. We begin by creating a decider. You can read more about the decider pattern here
// /user/user.decider.ts
type DomainEvent<Type extends string, Data extends Record<string, any>> = {
type: Type
data: Data
}
export type UserRegistered = { email: string; passwordHash: string }
export type RegisterUser = { email: string; passwordHash: string }
export type Event = DomainEvent<'UserRegistered', UserRegistered>
export type Command = DomainEvent<'RegisterUser', RegisterUser>
export type State =
| { type: 'initial' }
| { type: 'registered'; email: string; passwordHash: string }
export const initialState = { type: 'initial' }
export const evolve = (state: State, event: Event): State => {
switch (event.type) {
case 'UserRegistered':
return {
type: 'registered',
email: event.data.email,
passwordHash: event.data.passwordHash,
}
}
}
export const decide = (state: State, command: Command) => {
switch (command.type) {
case 'RegisterUser':
if (state.type === 'initial')
return [{ type: 'UserRegistered', data: command.data }]
// the practise of using nestjs exceptions inside the decider
// can be debated. It's here for brevity. You may want to consider
// mapping domain exceptions to HTTP status codes instead
throw new ConflictException('User already registered')
}
return []
}
Command Handler
The decider is and should be framework agnostic, it's where your business logic lives after all. There's actually multiple ways we can wire this decider up, the following is a rudimentary event sourced command handler based on the decider. Just keep in mind that you can wire it up to store state instead of events if you so choose.
// /utils/command-handler.ts
type Decider<S, E, C> = {
initialState: S
evolve: (state: S, event: E) => S
decide: (state: S, command: C) => E[]
}
async function* handleEmpty(eventStream: AsyncIterable<ResolvedEvent>) {
try {
for await (const resolved of eventStream) {
if (!resolved.event) continue
yield resolved.event
}
} catch (err) {
if (err.type === ErrorType.STREAM_NOT_FOUND) return
throw err
}
}
type Ctx = { $correlationId?: string; $causationId?: string; userId?: string }
type Event = { type: string; data: any }
export const createCommandHandler = <S, E extends Event, C>(
client: EventStoreDBClient,
getStreamName: (command: C) => string,
decider: Decider<S, E, C>,
) => async (command: C, context?: Ctx) => {
const streamName = getStreamName(command)
let state = decider.initialState
let revision: ExpectedRevision = NO_STREAM
for await (const event of handleEmpty(client.readStream(streamName))) {
state = decider.evolve(state, (event as any) as E)
revision = event.revision
}
const newEvents = decider.decide(state, command).map(event =>
jsonEvent({
type: event.type,
data: event.data,
metadata: context,
}),
)
await client.appendToStream(streamName, newEvents, {
expectedRevision: revision,
})
return { success: true }
}
To finish this up we will create a NestJS controller to invoke the command handler.
$ pnpm add class-validator class-transformer
$ pnpm exec nest g controller user
// user/user.controller.ts
import { Body, Controller, Post } from '@nestjs/common'
import { IsEmail, IsString, Matches, MinLength } from 'class-validator'
import { hash, genSalt } from 'bcrypt'
import * as Decider from './user.decider'
import { AppendResult, EventStoreDBClient } from '@eventstore/db-client'
import { createCommandHandler } from '../eventstore/command-handler'
class RegisterUserDto {
@IsEmail()
email: string
@IsString()
@MinLength(8)
@Matches(/((?=.*\d)|(?=.*\W+))(?![.\n])(?=.*[A-Z])(?=.*[a-z]).*$/, {
message: 'password too weak',
})
password: string
}
@Controller('user')
export class UserController {
private readonly handle: (command: Decider.Command) => Promise<AppendResult>
constructor(private readonly client: EventStoreDBClient) {
this.handle = createCommandHandler(
client,
// email is the primary ID of a user in our system
cmd => `User-${cmd.data.email}`,
Decider,
)
}
@Post('register')
async register(@Body() { email, password }: RegisterUserDto) {
const passwordHash = await hash(password, await genSalt())
return this.handle({ type: 'RegisterUser', data: { email, passwordHash } })
}
}
Finally let's wire up the ValidationPipe
in our main file
// main.ts
import { NestFactory } from '@nestjs/core'
import { AppModule } from './app.module'
import { ValidationPipe } from '@nestjs/common'
async function bootstrap() {
const app = await NestFactory.create(AppModule)
app.useGlobalPipes(new ValidationPipe())
await app.listen(8000)
}
bootstrap()
This should be all that is required to have a working system. Let's fire it up
$ pnpm run start:dev
In another terminal let's use curl to try it out
curl -X POST \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
--data '{"email": "test@example.com", "password": "deADBEef12"}' \
http://localhost:3000/user/register
We should see a
{ "success": true }
If we then open our browser at http://localhost:2113/web/index.html#/streams/User-test@example.com, we should see the event having been persisted to EventStoreDB.
If we try issuing the same request again though, we should see
{ "statusCode": 409, "message": "User already registered", "error": "Conflict" }
Conclusion
The previous iteration of this post used many concepts:
- AggregateRoot
- AggregateRepository
- EventBus
- CommandBus
- CommandHandler
I hope that this updated version shows that you might not need all of that abstraction.
You can view the full example on GitHub.
Addendum
Because deciders tend to not have any dependencies and function much like a state machine they are a great candidate for a language like ReScript. The user decider for example could be written as:
type user_registered = {email: string, passwordHash: string}
type event = UserRegistered(user_registered)
type command = RegisterUser(user_registered)
type state = Initial | Registered(user_registered)
let initialState = Initial
let evolve = (_state, event) =>
switch event {
| UserRegistered(data) => Registered(data)
}
exception User_already_registered
let decide = (state, command) =>
switch (state, command) {
| (Initial, RegisterUser(data)) => [UserRegistered(data)]
| (Registered(_), RegisterUser(_)) => raise(User_already_registered)
}
Which I've found easier to read and write than the typescript versions. ReScript compiles to very readable javascript
and if you're comfortable making some assumptions you can very easily plug this version into the createCommandHandler
function. A full example with the necessary wiring exists on a branch on GitHub