Since writing this post my views have changed a lot, I describe my current way of using EventStore with NestJS in a v2 of this post here
Event sourcing is a pattern that gained traction recently. It can be boiled down to:
Every change to application state is an event. Instead of storing current state, state is stored a sequence of events
As software developers, we've been trained to think in terms of current state since the 1960's. It makes sense because memory was very expensive back then. Adding a megabyte of storage was a $1,000 decision, so we got good at optimizing storage. Storage simply isn't a major issue anymore and keeping a ledger of events represents a more natural way to think about systems.
NestJS is a relatively new backend javascript framework.
It builds on a solid foundation of patterns that enable you to write well factored code that's easy to refactor.
It provides modules for working with common libraries like TypeORM, Sequelize, Passport, and more. On top of that
there's a hidden gem of a package called @nestjs/cqrs
that provides a lightweight CQRS framework.
In this post, we're going to look at the command side of the CQRS equation using EventStore to persist
our events.
The high level conceptual overview of what we want to achieve is this:
The NestJS CQRS module has utilities to help us with Commands
, CommandHandlers
, and the domain concept of AggregateRoots
. We'll be using my pacakge @nordfjord/nestjs-cqrs-es
to provide the Repository layer.
Getting Started
Let's start by creating a new Nest project with some wiring
$ npx @nestjs/cli new cqrs-app
$ cd cqrs-app
$ yarn add @nestjs/cqrs @nordfjord/nestjs-cqrs-es @nestjs/cli bcrypt
$ yarn nest g module user
Now that we have the essential boilerplate up, let's start by wiring up the event store
// app.module.ts
@Module({
imports: [
CqrsModule,
EventStoreModule.forRoot({
connection: {
defaultUserCredentials: { username: 'admin', password: 'changeit' },
},
tcpEndpoint: 'tcp://127.0.0.1:1113',
}),
UserModule,
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule implements OnModuleInit {
constructor(
private readonly eventStore: EventStore,
private readonly eventBus: EventBus<Event>,
) {}
async onModuleInit() {
this.eventBus.publisher = this.eventStore
}
}
Here we're adding a tcp connection to eventstore on localhost. If you don't have eventstore already running on your machine, here's a docker-compose file you can use.
The User Module
Next up we can start writing our user module. We begin by creating a command. The command should create a user, with an email and a password.
// user/dto/register-user.dto.ts
export class RegisterUserDto {
email: string
password: string
}
// user/command/impl/register-user.command.ts
export class RegisterUser {
constructor(public readonly data: RegisterUserDto) {}
}
Now that we have our command defined we need a way to handle it.
// user/command/handler/register-user.handler.ts
@CommandHandler(RegisterUser)
export class RegisterUserHandler implements ICommandHandler<RegisterUser> {
constructor() {}
async execute(command: RegisterUser) {}
}
The job of a command handler according to our diagram above is to pass data on to the domain layer. Since we don't have a domain layer yet, we should pause development of the handler and create the User AggregateRoot first.
User Aggregate Root
Let's start by defining what on earth an Aggregate Root is.
- Aggregate
- Constituting or amounting to a whole; total.
- Root
- Something that is an origin or source
An aggregate root guarantees the consistency of changes being made within the aggregate by forbidding external objects from holding references to its members.
For example when driving a car, you are just driving a car. You're not moving the wheels in circular motion to move forward. You're just driving a car!
Our User aggregate root has the job of enforcing our domain logic. For instance, a user cannot register twice.
// user/model/user.ts
export class User extends AggregateRoot<Event> {
private isRegistered = false
private email: string
private passwordHash: string
constructor(private id: string) {
super()
}
async register(email: string, password: string) {
if (this.isRegistered) throw new ConflictException()
this.apply(
new UserRegistered({
email,
passwordHash: await hash(password, await genSalt()),
}),
)
}
onUserRegistered(event: UserRegistered) {
this.isRegistered = true
this.email = event.data.email
this.passwordHash = event.data.passwordHash
}
}
NestJS aggregate roots will automatically call on${EventType}
when we apply events. This is useful when hydrating our aggregates.
With this AggregateRoot in place it's clear that the CommandHandler
will instantiate the User AggregateRoot, and call the register
method.
The register
method will then apply a UserRegistered
event or throw an exception.
Let's create the UserRegistered
event.
// user/dto/user-registered.dto.ts
export class UserRegisteredDto {
email: string
passwordHash: string
}
// user/events/user-registered.event.ts
export class UserRegistered extends Event<UserRegisteredDto> {
constructor(data: UserRegisteredDto) {
super(`User-${data.email}`, data, ExpectedVersion.noStream)
}
}
The Event
constructor takes a few arguments.
Argument | Type | Description |
---|---|---|
eventStreamId | string | The stream id the event belongs to |
data | any | The payload of the event |
expectedVersion | number | For optimistic concurrency checks. Here we're using noStream as we don't expect the stream to exist when the user registers. |
eventId | UUID | Id of the event, auto generated uuid v4 by default |
correlationId | UUID | CorrelationId of the event |
causationId | UUID | CausationId of the event |
Now that our event is defined, we wire up our User Module.
// user/user.module.ts
@Module({
imports: [
CqrsModule,
EventStoreModule.forFeature([User], {
UserRegistered: event => new UserRegistered(event.data),
}),
],
providers: [RegisterUserHandler],
})
export class UserModule {}
The EventStoreModule.forFeature
takes 2 arguments.
Argument | Type | Description |
---|---|---|
aggregateRoots | AggregateRootConstructor[] | A list of aggregate roots that will be made available as repositories |
eventTransformers | Record<string, (event: EventStoreEvent)=> Event> | A map of transfomers to map event store events to NestJS events |
Now when reading events from the event store they'll be transformed into classes that NestJS can deal with.
Before returning to the command handler, this is a summary of what we've done.
- Commands are executed by Command Handlers
- Command Handlers instantiate AggregateRoots
- AggregateRoots apply Events
Command Handler
@CommandHandler(RegisterUser)
export class RegisterUserHandler implements ICommandHandler<RegisterUser> {
constructor(
@InjectAggregateRepository(User)
private readonly userRepository: AggregateRepository<User>,
) {}
async execute(command: RegisterUser) {
const user = await this.userRepository.findOne(command.data.email)
await user.register(command.data.email, command.data.password)
await this.userRepository.save(user)
return { success: true }
}
}
@InjectAggregateRepository(User)
tells nest to inject the provider for the User AggregateRepository.
The execute method calls userRepository.findOne(id)
. Behind the scenes findOne
fetches all
events belonging to the user with that id and applies them to a newly instantiated User AggregateRoot.
then calling user.register
applies a UserRegistered event or throws.
If the register
call didn't throw userRepository.save(user)
will get all the uncommited events from the aggregate and persist them to the event store.
To finish this up we will create a NestJS controller to invoke the command handler.
$ yarn nest g controller user
// user/user.controller.ts
@Controller('user')
export class UserController {
constructor(private readonly commandBus: CommandBus) {}
@Post('register')
async register(@Body() data: RegisterUserDto) {
return await this.commandBus.execute(new RegisterUser(data))
}
}
Now we can run our code and test it out.
$ yarn start:dev
In another terminal let's use curl to try it out
curl -X POST \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
--data '{"email": "test@example.com", "password": "12345"}' \
http://localhost:3000/user/register
We should see a
{ "success": true }
If we then open our browser at http://localhost:2113/web/index.html#/streams/User-test@example.com, we should see the event having been persisted to EventStoreDB.
If we try issuing the same request again though, we should see
{ "statusCode": 409, "message": "Conflict" }
Conclusion
In this post we've created a Command Side module in NestJS that can register users. You can view the full example on GitHub.
The Query side is left as an excercise to the reader.