awesome-everything RU
↑ Back to the climb

Data Engineering

Event sourcing: code reading

Crux Read real event-sourcing snippets — a projection rebuild, a snapshot-aware load, and an upcaster — predict the behaviour, and pick the senior fix.
Your altitude — climbing toward senior
ZeroJuniorMiddleSenior
You are at senior altitude — in orbit
◷ 14 min

The pattern lives or dies in the fold loop, the projection handler, the snapshot load, and the upcaster. Read each snippet, predict what it does under real conditions, then choose the fix a senior makes first.

Goal

Practise reading the four pieces of code every event-sourced system has: rebuilding a read model by replay, keeping a projection idempotent, loading via a snapshot plus tail, and upcasting an old event shape without rewriting history.

Snippet 1 — rebuilding a projection

// "drop and replay" rebuild of the balances read model
async function rebuildBalances(store: EventStore, db: ReadDb) {
  await db.exec("TRUNCATE balances");
  for await (const e of store.readAll({ from: 0 })) {  // every event, in order
    if (e.type === "MoneyDeposited") {
      await db.exec(
        "UPDATE balances SET cents = cents + $1 WHERE acct = $2",
        e.data.cents, e.data.acct,
      );
    }
    // MoneyWithdrawn handler omitted
  }
}
Quiz

The balances table starts empty after TRUNCATE. For an account whose first event is MoneyDeposited, the UPDATE touches zero rows and the balance never appears. What is the correct fix?

Snippet 2 — the projection handler under retries

async function onEvent(e: Event, db: ReadDb) {
  // at-least-once delivery: this can be called twice for the same event
  await db.exec(
    "UPDATE counters SET total = total + 1 WHERE name = $1",
    e.aggregateId,
  );
}
Quiz

Delivery is at-least-once, so this handler can run twice for the same event. What is the bug and the standard fix?

Snippet 3 — loading an aggregate with a snapshot

async function loadAccount(id: string, store: EventStore): Promise<Account> {
  const snap = await store.latestSnapshot(id);          // may be null
  let state = snap ? snap.state : Account.empty(id);
  const from = snap ? snap.version : 0;
  for await (const e of store.read(id, { from })) {      // BUG is here
    state = apply(state, e);
  }
  return state;
}
Quiz

The snapshot captured state up to and including version N (snap.version === N). The read uses from = N. What goes wrong?

Snippet 4 — upcasting an evolved event

// v1: { type: "OrderPlaced", total: 1299 }            // cents, currency implied USD
// v2: { type: "OrderPlaced", amount: { cents, currency } }
function upcastOrderPlaced(raw: any): OrderPlacedV2 {
  if (raw.version === 2) return raw;                    // already current
  // raw is v1
  return {
    type: "OrderPlaced",
    version: 2,
    amount: { cents: raw.total, currency: "USD" },
  };
}
Quiz

What is the role of this function, and which property is essential for it to be safe to carry indefinitely?

Recap

Every event-sourced system is read in these four pieces of code. A rebuild starts from empty and must reconstruct rows from the log, so handlers UPSERT or honour creation events rather than assuming rows exist. Projections face at-least-once delivery, so they advance a per-stream version checkpoint in the same transaction as the write and ignore already-seen versions. Snapshot loads use an exclusive lower bound (version + 1) so the snapshotted event is not re-folded. And upcasters are pure, tested transformations that lift old shapes to current at read time, never rewriting the persisted log. Read the code, predict the fold, then fix the allocation of truth — never the log itself.

Continue the climb ↑Event sourcing: build an auditable ledger
shortcuts expand
search
K
prev piece
k
next piece
j
cycle tier
t
this menu
?
sources2
expand
  1. 01
  2. 02

Trademarks belong to their respective owners. Editorial reference only.