SQL Event Store. Maybe Not.

8 minute read Published: 2021-03-21

Scenario: You just convinced management that event-sourcing is the way to go. One thing they were adamant about was that you should not use a purpose built event-store.

There's broad knowledge of PostgreSQL in the organzation that we would be fools not to leverage!

Let's run through this hypothetical yet typical scenario together.

The first step is usually adding an "events" table to your schema.

CREATE TABLE events(
    id uuid PRIMARY KEY NOT NULL,
    data jsonb NOT NULL
);

That should do the trick! You've got a primary key for that sweet, sweet indexing for when you just need to grab that single message, and since Postgres has jsonb, you can query your events six ways to Sunday.

With this infrastructure in place you decide to dabble with a single entity at first, so why not the users?

To make matters even simpler you decide to do away with eventual consistency by writing your events and projections in a single transaction.

Eventual Consistency: The notion that if left alone to do its job for once, your system will become consistent... eventually

// we assume the client we're handed has started a transaction already
async function confirmUserEmail(command, client) {
  // write an event
  await client.query(`INSERT INTO events(id, data) VALUES ($1, $2)`, [
    uuid.v4(),
    {
      type: 'UserConfirmedEmail',
      email: command.email,
      userId: command.userId,
    },
  ])

  // grab all events relating to the user
  const userEvents = client.query(
    `SELECT * FROM events WHERE data ->> 'userId' = $1`,
    [command.userId],
  )

  // current state is a left fold of previous events
  const userState = userEvents.reduce(userReducer, {})

  // update the projection
  await client.query(
    `INSERT INTO users(id, email, email_confirmed)
     VALUES ($1, $2, $3)
     ON CONFLICT (id) DO UPDATE
     SET email = $2, email_confirmed = $3;`,
    [userState.id, userState.email, userState.emailConfirmed],
  )
}

This will work fine for a long time, but it turns out that it's really important that when you read events, you always get them in the same order.  Always!.

Your table however doesn't guarantee this. It's time for an upgrade!

ALTER TABLE events DROP CONSTRAINT events_pkey;
ALTER TABLE events ADD COLUMN position bigserial NOT NULL;
ALTER TABLE events ADD PRIMARY KEY (position);
CREATE UNIQUE INDEX events_id ON events (id);
async function confirmUserEmail(command, client) {
  await client.query(`INSERT INTO events(id, data) VALUES ($1, $2)`, [
    uuid.v4(),
    {
      type: 'UserConfirmedEmail',
      email: command.email,
      userId: command.userId,
    },
  ])
  const userEvents = client.query(
    `SELECT * FROM events WHERE data ->> 'userId' = $1 ORDER BY position ASC`,
    [command.userId],
  )
  // current state is a left fold of previous events
  const userState = userEvents.reduce(userReducer, {})
  await client.query(
    `INSERT INTO users(id, email, email_confirmed)
     VALUES ($1, $2, $3)
     ON CONFLICT (id) DO UPDATE
     SET email = $2, email_confirmed = $3;`,
    [userState.id, userState.email, userState.emailConfirmed],
  )
}

Great! You now have a repeatable order. A few (tens of) thousand events later your projections are slowing down. You leverage your broad knowledge of relational databases and add an index!

Index All The Things

CREATE INDEX idx_events_user_id ON events ((data ->> 'userId'));

Your database suddenly reduces CPU and DISK READs by orders of magnitude. This is going great! When the next projection slows down you add another index to compensate.

At some point you'll run into two users trying to update the same resource without knowing about each other. A common way to maintain invariants is partitioning your events by something and using optimistic concurrency checks on that something.

Let's call that something a "stream".

ALTER TABLE events ADD COLUMN stream_name text NOT NULL;
ALTER TABLE events ADD COLUMN version bigint NOT NULL;
CREATE UNIQUE INDEX events_stream ON events (stream_name, version);

In order to implement the optimistic concurrency control you'll supply an "expected version" to your function that writes events.

It's a good idea to push the actual checking down to the database, so you implement a function that all writes should go through.

-- simplified version of message-db's write_message function
CREATE FUNCTION write_event(
  id varchar,
  stream_name varchar,
  data jsonb,
  expected_version bigint DEFAULT NULL
)
RETURNS bigint
AS $$
DECLARE
  _event_id uuid;
  _stream_version bigint;
  _next_version bigint;
BEGIN
  -- Lock the stream
  PERFORM pg_advisory_xact_lock(hashtextextended(write_event.stream_name, 0));

  _stream_version := (SELECT MAX(version) FROM events WHERE events.stream_name = write_event.stream_name);

  IF _stream_version IS NULL THEN
    _stream_version := -1;
  END IF;

  IF write_event.expected_version IS NOT NULL THEN
    IF write_event.expected_version != _stream_version THEN
      RAISE EXCEPTION 'Wrong expected version';
    END IF;
  END IF;

  _next_version:= _stream_version + 1;

  _event_id = uuid(write_event.id);

  INSERT INTO events
    (
      id,
      stream_name,
      version,
      data
    )
  VALUES
    (
      _event_id,
      write_event.stream_name,
      _next_version,
      write_event.data
    )
  ;

  RETURN _next_version;
END;
$$ LANGUAGE plpgsql
VOLATILE;

You've now partitioned your events into streams and can prevent concurrent updates to the same stream! Good on you!

Your system keeps growing, and you've been adding new projections as it does. You've probably got 20-40 at this point.

Eventually you'll release a buggy projection. The decision to avoid eventual consistency now means no events can be written because a single projection has an error.

You frantically tell your customer:

Actually no! You didn't confirm your email because there was an error in our document upload system.

Please do not turn of the light switch, It also operates the elevator

The more projections you have the more likely it is you'll have a bug in one of them.

In order to isolate your system from single component failures, you decide to embrace eventual consistency.

You already have that global position on the table, so it should be easy to just poll the events table for changes. You probably also want to store a checkpoint somewhere, so you don't process the same event multiple times.

ALTER TABLE users ADD COLUMN checkpoint bigint NOT NULL DEFAULT -1;
async function appendEvent(client, streamId, event, expectedVersion = null) {
  await client.query(`SELECT write_event($1, $2, $3, $4, $5)`, [
    event.id,
    streamId,
    event.type,
    event,
    expectedVersion,
  ])
}
async function getEvents(client, streamId) {
  return await client.query(
    `SELECT * FROM events WHERE stream_name = $1 ORDER BY position`,
    [streamId],
  )
}
async function confirmUserEmail(command, client) {
  const userEvents = await getEvents(client, `User-${command.id}`)
  await appendEvent(
    client,
    `User-${command.userId}`,
    {
      id: uuid.v4(),
      type: 'UserConfirmedEmail',
      email: command.email,
      userId: command.userId,
    },
    userEvents.length,
  )
}

async function userProjector(client) {
  let { current_checkpoint } = await client.query(
    `SELECT max(checkpoint) as current_checkpoint FROM users;`,
  )
  if (current_checkpoint == null) current_checkpoint = -1n
  while (true) {
    const eventBatch = await client.query(
      `SELECT id, data, position FROM events
       WHERE position > $1
       ORDER BY position
       LIMIT 100;`,
      [current_checkpoint],
    )
    // No events. We're caught up. Let's wait a bit and continue
    if (!eventBatch.length) {
      await new Promise(res => setTimeout(res, 1000))
      continue
    }
    // start a transaction and handle the events
    try {
      await client.query('BEGIN')
      await userProjection(eventBatch, client)
      await client.query('COMMIT')
      current_checkpoint = eventBatch[eventBatch.length - 1].position
    } catch (err) {
      await client.query('ROLLBACK')
    }
  }
}

With this change you should be able to drop every index that's indexing inside the jsonb column

DROP INDEX idx_events_user_id;

With those gone your write performance should go up considerably, and you'll devise clever ways to hide the eventual consistency of this approach.

You're pretty happy for a while.

With more demand on the system, some hard to debug issues crop up. It looks like your projections are skipping some events.

    BEGIN TRANSACTION
    
write_event() -- position 1



COMMIT
    

BEGIN TRANSACTION
write_event() -- position 2
COMMIT

In the above case for a tiny amount of time, the global position will have a gap in the sequence. When your projector happens to poll during that tiny interval, it will miss an event. This is fine until your application's user base grows, and the likelihood of skipped events increases. This is because sequences aren't transactional. There are plenty of strategies to solve this:

  1. Use a separate singleton process to enumerate the event position
  2. Perform gap detection and compensation
  3. Write 1 event at a time (with the scale you're at, this isn't really an option)

There are more options, and they all make sense in different contexts.

I think now is a good time to stop this hypothetical and ask

Why didn't we just use a purpose built event store from the start?

Using a purpose built event store prevents these issues from the start, and that saves us from all of these headaches, extra work, and issues.

Addendum

Further "fun" issues

Auto vacuum

PostgreSQL anti-wraparound autovacuum runs once the oldest unfrozen table row is more than 200 million transactions old. For an append only table (like our events table) this is normally the first time it's vacuumed at all.

If the anti-wraparound auto-vacuum doesn't finish in time, then 1 million transactions before it would suffer data corruption postgres will stop accepting any new transactions.