awesome-everything EN
↑ Обратно к восхождению

Data engineering

Event sourcing: чтение кода

Суть Чтение реальных сниппетов event sourcing — пересборка projection, загрузка через snapshot и upcaster — предскажи поведение и выбери сеньорский фикс.
Высота — путь к senior
НольJuniorMiddleSenior
Ты на senior-высоте — в орбите
◷ 14 min

Паттерн живёт или умирает в цикле свёртки, обработчике projection, загрузке через snapshot и upcaster. Прочитай каждый сниппет, предскажи, что он делает в реальных условиях, потом выбери фикс, который сеньор делает первым.

Цель

Потренируйся читать четыре куска кода, что есть в любой event-sourced системе: пересборку read-модели через replay, идемпотентность projection, загрузку через snapshot плюс хвост, и upcasting старой формы события без переписывания истории.

Сниппет 1 — пересборка projection

// пересборка "drop and replay" read-модели балансов
async function rebuildBalances(store: EventStore, db: ReadDb) {
  await db.exec("TRUNCATE balances");
  for await (const e of store.readAll({ from: 0 })) {  // все события, по порядку
    if (e.type === "MoneyDeposited") {
      await db.exec(
        "UPDATE balances SET cents = cents + $1 WHERE acct = $2",
        e.data.cents, e.data.acct,
      );
    }
    // обработчик MoneyWithdrawn опущен
  }
}
Викторина

Таблица balances пуста после TRUNCATE. Для счёта, чьё первое событие MoneyDeposited, UPDATE затрагивает ноль строк и баланс не появляется. Каков правильный фикс?

Сниппет 2 — обработчик projection при ретраях

async function onEvent(e: Event, db: ReadDb) {
  // доставка at-least-once: может быть вызвана дважды для одного события
  await db.exec(
    "UPDATE counters SET total = total + 1 WHERE name = $1",
    e.aggregateId,
  );
}
Викторина

Доставка at-least-once, так что обработчик может выполниться дважды для одного события. В чём баг и каков стандартный фикс?

Сниппет 3 — загрузка агрегата через snapshot

async function loadAccount(id: string, store: EventStore): Promise<Account> {
  const snap = await store.latestSnapshot(id);          // может быть null
  let state = snap ? snap.state : Account.empty(id);
  const from = snap ? snap.version : 0;
  for await (const e of store.read(id, { from })) {      // БАГ здесь
    state = apply(state, e);
  }
  return state;
}
Викторина

Snapshot захватил состояние до версии N включительно (snap.version === N). Чтение использует from = N. Что идёт не так?

Сниппет 4 — upcasting эволюционировавшего события

// v1: { type: "OrderPlaced", total: 1299 }            // центы, валюта подразумевается USD
// v2: { type: "OrderPlaced", amount: { cents, currency } }
function upcastOrderPlaced(raw: any): OrderPlacedV2 {
  if (raw.version === 2) return raw;                    // уже текущая
  // raw это v1
  return {
    type: "OrderPlaced",
    version: 2,
    amount: { cents: raw.total, currency: "USD" },
  };
}
Викторина

Какова роль этой функции и какое свойство существенно, чтобы её было безопасно нести бесконечно?

Итог

Любая event-sourced система читается в этих четырёх кусках кода. Пересборка начинается с пустоты и должна реконструировать строки из лога, поэтому обработчики делают UPSERT или чтят события создания, а не предполагают строки. Projection сталкиваются с доставкой at-least-once, поэтому продвигают version-checkpoint на поток в той же транзакции, что и запись, и игнорируют уже виденные версии. Загрузка через snapshot использует исключающую нижнюю границу (версия + 1), чтобы заснапшоченное событие не свернулось повторно. А upcaster это чистые протестированные преобразования, поднимающие старые формы к текущей при чтении, никогда не переписывая сохранённый лог. Читай код, предскажи свёртку, потом чини размещение истины — но не сам лог.

Продолжить восхождение ↑Event sourcing: построй аудируемый ledger
хоткеи развернуть
поиск
K
пред. пьеса
k
след. пьеса
j
тиры
t
это меню
?
sources2
expand
  1. 01
  2. 02

Trademarks belong to their respective owners. Editorial reference only.