Using EventStore with NestJS

7 minute read Published: 2020-10-12

Since writing this post my views have changed a lot, I describe my current way of using EventStore with NestJS in a v2 of this post here

Event sourcing is a pattern that gained traction recently. It can be boiled down to:

Every change to application state is an event. Instead of storing current state, state is stored a sequence of events

As software developers, we've been trained to think in terms of current state since the 1960's. It makes sense because memory was very expensive back then. Adding a megabyte of storage was a $1,000 decision, so we got good at optimizing storage. Storage simply isn't a major issue anymore and keeping a ledger of events represents a more natural way to think about systems.

NestJS is a relatively new backend javascript framework. It builds on a solid foundation of patterns that enable you to write well factored code that's easy to refactor. It provides modules for working with common libraries like TypeORM, Sequelize, Passport, and more. On top of that there's a hidden gem of a package called @nestjs/cqrs that provides a lightweight CQRS framework. In this post, we're going to look at the command side of the CQRS equation using EventStore to persist our events.

The high level conceptual overview of what we want to achieve is this:

Command Architecture Figure 1. Command Architecture

The NestJS CQRS module has utilities to help us with Commands, CommandHandlers, and the domain concept of AggregateRoots. We'll be using my pacakge @nordfjord/nestjs-cqrs-es to provide the Repository layer.

Getting Started

Let's start by creating a new Nest project with some wiring

$ npx @nestjs/cli new cqrs-app
$ cd cqrs-app
$ yarn add @nestjs/cqrs @nordfjord/nestjs-cqrs-es @nestjs/cli bcrypt
$ yarn nest g module user

Now that we have the essential boilerplate up, let's start by wiring up the event store

// app.module.ts
@Module({
  imports: [
    CqrsModule,
    EventStoreModule.forRoot({
      connection: {
        defaultUserCredentials: { username: 'admin', password: 'changeit' },
      },
      tcpEndpoint: 'tcp://127.0.0.1:1113',
    }),
    UserModule,
  ],
  controllers: [AppController],
  providers: [AppService],
})
export class AppModule implements OnModuleInit {
  constructor(
    private readonly eventStore: EventStore,
    private readonly eventBus: EventBus<Event>,
  ) {}

  async onModuleInit() {
    this.eventBus.publisher = this.eventStore
  }
}

Here we're adding a tcp connection to eventstore on localhost. If you don't have eventstore already running on your machine, here's a docker-compose file you can use.

The User Module

Next up we can start writing our user module. We begin by creating a command. The command should create a user, with an email and a password.

// user/dto/register-user.dto.ts
export class RegisterUserDto {
  email: string
  password: string
}
// user/command/impl/register-user.command.ts
export class RegisterUser {
  constructor(public readonly data: RegisterUserDto) {}
}

Now that we have our command defined we need a way to handle it.

// user/command/handler/register-user.handler.ts
@CommandHandler(RegisterUser)
export class RegisterUserHandler implements ICommandHandler<RegisterUser> {
  constructor() {}

  async execute(command: RegisterUser) {}
}

The job of a command handler according to our diagram above is to pass data on to the domain layer. Since we don't have a domain layer yet, we should pause development of the handler and create the User AggregateRoot first.

User Aggregate Root

Let's start by defining what on earth an Aggregate Root is.

Aggregate
Constituting or amounting to a whole; total.
Root
Something that is an origin or source

An aggregate root guarantees the consistency of changes being made within the aggregate by forbidding external objects from holding references to its members.

For example when driving a car, you are just driving a car. You're not moving the wheels in circular motion to move forward. You're just driving a car!

Our User aggregate root has the job of enforcing our domain logic. For instance, a user cannot register twice.

// user/model/user.ts

export class User extends AggregateRoot<Event> {
  private isRegistered = false
  private email: string
  private passwordHash: string
  constructor(private id: string) {
    super()
  }

  async register(email: string, password: string) {
    if (this.isRegistered) throw new ConflictException()

    this.apply(
      new UserRegistered({
        email,
        passwordHash: await hash(password, await genSalt()),
      }),
    )
  }

  onUserRegistered(event: UserRegistered) {
    this.isRegistered = true
    this.email = event.data.email
    this.passwordHash = event.data.passwordHash
  }
}

NestJS aggregate roots will automatically call on${EventType} when we apply events. This is useful when hydrating our aggregates.

With this AggregateRoot in place it's clear that the CommandHandler will instantiate the User AggregateRoot, and call the register method. The register method will then apply a UserRegistered event or throw an exception.

Let's create the UserRegistered event.

// user/dto/user-registered.dto.ts
export class UserRegisteredDto {
  email: string
  passwordHash: string
}
// user/events/user-registered.event.ts
export class UserRegistered extends Event<UserRegisteredDto> {
  constructor(data: UserRegisteredDto) {
    super(`User-${data.email}`, data, ExpectedVersion.noStream)
  }
}

The Event constructor takes a few arguments.

ArgumentTypeDescription
eventStreamIdstringThe stream id the event belongs to
dataanyThe payload of the event
expectedVersionnumberFor optimistic concurrency checks. Here we're using noStream as we don't expect the stream to exist when the user registers.
eventIdUUIDId of the event, auto generated uuid v4 by default
correlationIdUUIDCorrelationId of the event
causationIdUUIDCausationId of the event

Now that our event is defined, we wire up our User Module.

// user/user.module.ts
@Module({
  imports: [
    CqrsModule,
    EventStoreModule.forFeature([User], {
      UserRegistered: event => new UserRegistered(event.data),
    }),
  ],
  providers: [RegisterUserHandler],
})
export class UserModule {}

The EventStoreModule.forFeature takes 2 arguments.

ArgumentTypeDescription
aggregateRootsAggregateRootConstructor[]A list of aggregate roots that will be made available as repositories
eventTransformersRecord<string, (event: EventStoreEvent)=> Event>A map of transfomers to map event store events to NestJS events

Now when reading events from the event store they'll be transformed into classes that NestJS can deal with.

Before returning to the command handler, this is a summary of what we've done.

Command Handler

@CommandHandler(RegisterUser)
export class RegisterUserHandler implements ICommandHandler<RegisterUser> {
  constructor(
    @InjectAggregateRepository(User)
    private readonly userRepository: AggregateRepository<User>,
  ) {}

  async execute(command: RegisterUser) {
    const user = await this.userRepository.findOne(command.data.email)

    await user.register(command.data.email, command.data.password)

    await this.userRepository.save(user)

    return { success: true }
  }
}

@InjectAggregateRepository(User) tells nest to inject the provider for the User AggregateRepository.

The execute method calls userRepository.findOne(id). Behind the scenes findOne fetches all events belonging to the user with that id and applies them to a newly instantiated User AggregateRoot.

then calling user.register applies a UserRegistered event or throws.

If the register call didn't throw userRepository.save(user) will get all the uncommited events from the aggregate and persist them to the event store.

To finish this up we will create a NestJS controller to invoke the command handler.

$ yarn nest g controller user
// user/user.controller.ts
@Controller('user')
export class UserController {
  constructor(private readonly commandBus: CommandBus) {}

  @Post('register')
  async register(@Body() data: RegisterUserDto) {
    return await this.commandBus.execute(new RegisterUser(data))
  }
}

Now we can run our code and test it out.

$ yarn start:dev

In another terminal let's use curl to try it out

curl -X POST \
-H 'Content-Type: application/json' \
-H 'Accept: application/json' \
--data '{"email": "test@example.com", "password": "12345"}' \
http://localhost:3000/user/register

We should see a

{ "success": true }

If we then open our browser at http://localhost:2113/web/index.html#/streams/User-test@example.com, we should see the event having been persisted to EventStoreDB.

If we try issuing the same request again though, we should see

{ "statusCode": 409, "message": "Conflict" }

Conclusion

In this post we've created a Command Side module in NestJS that can register users. You can view the full example on GitHub.

The Query side is left as an excercise to the reader.