Skip to content

Commit c0e4d39

Browse files
feat(api): real AppendLog implementation in fs + memory drivers
The third driver primitive moves from stub to real impl. Per-project monotonic seq log; FS driver writes to web/projects/<id>/.meta/history.jsonl, memory driver holds an array. Both implementations honor: - append(entries) returns the new lastSeq; seqs are monotonic from 1 even across daemon restarts (FS reloads from disk on first call). - read({ sinceSeq, limit, reverse }) for resumption, replay, paginated reads. - subscribe(undefined, fn) for live-only event flow; subscribe({sinceSeq}, fn) replays past entries then attaches for live, with a buffer that drains on the same fn call so consumers see no gaps and no dupes — what an SSE handler with Last-Event-ID needs verbatim. - truncateBefore(seq) compaction, atomic on FS via tmp + rename. FS impl serializes appends via a per-channel promise chain so concurrent callers can't corrupt the seq counter or interleave lines. Filesystem events for history.jsonl don't fire BlobStore events (the blob watcher filters .meta/) and don't appear in JsonKv.list (which only lists *.json). 5 new test scenarios in driver-swap.test.ts run via describe.each against both drivers (10 new test instances): seq monotonicity, read filters, live subscribe, replay-then-live with sinceSeq, truncateBefore. Total api suite: 35 → 45 tests, all green. No new consumer wired in this commit — interface ready for the Last-Event-ID-resumable run-events SSE (issue #12) when that feature lands. Snapshots stay on JsonKv (keyed by turnId, not sequential, so not an AppendLog workload). Co-Authored-By: Claude Opus 4.7 <[email protected]>
1 parent 4d063f7 commit c0e4d39

4 files changed

Lines changed: 353 additions & 26 deletions

File tree

api/src/storage/driver-swap.test.ts

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,4 +189,92 @@ describe.each(drivers)("StorageDriver:$name", ({ make }) => {
189189
await driver.deleteProject("p_dead");
190190
expect(await driver.listProjectIds()).not.toContain("p_dead");
191191
});
192+
193+
it("AppendLog: append assigns monotonic seqs and read returns them in order", async () => {
194+
const driver = make();
195+
await driver.createProject("p_log");
196+
const log = driver.project("p_log").history;
197+
198+
const r1 = await log.append([{ kind: "tweak", file: "a" }, { kind: "tweak", file: "b" }]);
199+
expect(r1.lastSeq).toBe(2);
200+
const r2 = await log.append([{ kind: "comment-edit", turn: "x" }]);
201+
expect(r2.lastSeq).toBe(3);
202+
203+
const all = await log.read();
204+
expect(all.map((e) => e.seq)).toEqual([1, 2, 3]);
205+
expect(all.map((e) => (e.data as { kind: string }).kind)).toEqual([
206+
"tweak", "tweak", "comment-edit",
207+
]);
208+
});
209+
210+
it("AppendLog: read({ sinceSeq, limit, reverse })", async () => {
211+
const driver = make();
212+
await driver.createProject("p_log");
213+
const log = driver.project("p_log").history;
214+
await log.append([1, 2, 3, 4, 5]);
215+
216+
const since2 = await log.read<number>({ sinceSeq: 2 });
217+
expect(since2.map((e) => e.data)).toEqual([3, 4, 5]);
218+
219+
const limited = await log.read<number>({ limit: 2 });
220+
expect(limited.map((e) => e.data)).toEqual([1, 2]);
221+
222+
const reversed = await log.read<number>({ reverse: true, limit: 2 });
223+
expect(reversed.map((e) => e.data)).toEqual([5, 4]);
224+
});
225+
226+
it("AppendLog: subscribe (live-only) fires on append", async () => {
227+
const driver = make();
228+
await driver.createProject("p_log");
229+
const log = driver.project("p_log").history;
230+
231+
const seen: number[] = [];
232+
const unsub = log.subscribe<number>(undefined, (e) => seen.push(e.data));
233+
await log.append([10, 20, 30]);
234+
// Memory dispatches synchronously inside append; FS goes through
235+
// the EventEmitter immediately too. Both should be visible by now.
236+
await new Promise((r) => setTimeout(r, 5));
237+
unsub();
238+
expect(seen).toEqual([10, 20, 30]);
239+
});
240+
241+
it("AppendLog: subscribe with sinceSeq replays past entries then live-streams new ones", async () => {
242+
const driver = make();
243+
await driver.createProject("p_log");
244+
const log = driver.project("p_log").history;
245+
await log.append(["a", "b", "c"]);
246+
247+
const seen: { seq: number; data: string }[] = [];
248+
const unsub = log.subscribe<string>({ sinceSeq: 1 }, (e) => {
249+
seen.push({ seq: e.seq, data: e.data });
250+
});
251+
252+
// Wait for the replay microtask, then append more.
253+
await new Promise((r) => setTimeout(r, 10));
254+
await log.append(["d"]);
255+
await new Promise((r) => setTimeout(r, 10));
256+
unsub();
257+
258+
// Replay returns seq>1 (b, c), then live append delivers d.
259+
expect(seen.map((s) => s.data)).toEqual(["b", "c", "d"]);
260+
expect(seen.map((s) => s.seq)).toEqual([2, 3, 4]);
261+
});
262+
263+
it("AppendLog: truncateBefore drops old entries and keeps the rest", async () => {
264+
const driver = make();
265+
await driver.createProject("p_log");
266+
const log = driver.project("p_log").history;
267+
await log.append([1, 2, 3, 4, 5]);
268+
269+
if (!log.truncateBefore) {
270+
// Drivers may not implement truncate; both of ours do.
271+
throw new Error("expected truncateBefore to be defined");
272+
}
273+
const r = await log.truncateBefore(3);
274+
expect(r.removed).toBe(3);
275+
276+
const after = await log.read<number>();
277+
expect(after.map((e) => e.seq)).toEqual([4, 5]);
278+
expect(after.map((e) => e.data)).toEqual([4, 5]);
279+
});
192280
});

api/src/storage/driver.ts

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,11 +91,11 @@ export interface BlobStore {
9191

9292
/* ─── AppendLog: append-only event sequence ─────────────────────────
9393
*
94-
* Sketched here so PR 2 (snapshots → AppendLog, fixes the in-memory
95-
* comment-undo bug) doesn't perturb the interface. Not implemented in
96-
* either driver yet; methods throw if called. The shape is what
97-
* `/api/comment-undo` and a future Last-Event-ID-resumable run-events
98-
* SSE will both need.
94+
* Per-project monotonic seq log. FS driver writes one JSON entry per
95+
* line to `<projectDir>/.meta/history.jsonl`; memory driver holds an
96+
* array. Subscribe with `sinceSeq` replays past entries then attaches
97+
* for live ones — the shape a Last-Event-ID-resumable run-events SSE
98+
* (issue #12) will plug into directly.
9999
*/
100100

101101
export type LogEntry<T = unknown> = {
@@ -130,8 +130,9 @@ export type ProjectScope = {
130130
meta: JsonKv;
131131
/** Project source files and uploads. Excludes `.meta/`. */
132132
files: BlobStore;
133-
/** Append-only history (.meta/history.jsonl in PR 2). Defined but
134-
* not implemented in either driver yet — calls throw. */
133+
/** Append-only history. FS impl: `.meta/history.jsonl`. Used by
134+
* the Last-Event-ID-resumable run-events SSE (issue #12) and any
135+
* future replayable edit log. */
135136
history: AppendLog;
136137
};
137138

api/src/storage/fs-driver.ts

Lines changed: 167 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import { EventEmitter } from "node:events";
1515
import { watch } from "node:fs";
1616
import {
17+
appendFile,
1718
mkdir,
1819
readFile,
1920
readdir,
@@ -38,6 +39,7 @@ import type {
3839
KvDeleteResult,
3940
KvGetResult,
4041
KvPutResult,
42+
LogEntry,
4143
ProjectScope,
4244
SharedScope,
4345
StorageDriver,
@@ -346,16 +348,172 @@ function createFsBlobStore(rootDir: string, opts: { reloadDebounceMs: number }):
346348
};
347349
}
348350

349-
/* ─── AppendLog (stub — implemented in PR 2) ─────────────────────── */
351+
/* ─── AppendLog (filesystem) ─────────────────────────────────────── */
350352

351-
function createStubAppendLog(): AppendLog {
352-
function notImpl(): never {
353-
throw new Error("AppendLog: filesystem driver does not implement this primitive yet (lands in PR 2).");
353+
/** Per-project append-only log backed by `<projectDir>/.meta/history.jsonl`.
354+
*
355+
* • One entry per line: `{"seq":N,"ts":<ms>,"data":...}`.
356+
* • Append is serialized via a per-channel promise chain so concurrent
357+
* callers can't corrupt the seq counter.
358+
* • Subscribe with sinceSeq replays disk entries > seq, then attaches
359+
* for live events. Live entries arriving during replay are buffered
360+
* and drained in order — no dupes, no gaps.
361+
* • truncateBefore rewrites the file atomically (tmp + rename). */
362+
function createFsAppendLog(metaDir: string): AppendLog & { __destroy(): void } {
363+
const file = resolvePath(metaDir, "history.jsonl");
364+
const emitter = new EventEmitter();
365+
emitter.setMaxListeners(0);
366+
367+
// Cache of the highest seq written. Loaded from disk on first append/read.
368+
let lastSeq: number | null = null;
369+
// Promise chain to serialize append() calls.
370+
let appendChain: Promise<unknown> = Promise.resolve();
371+
372+
async function loadLastSeq(): Promise<number> {
373+
if (lastSeq !== null) return lastSeq;
374+
try {
375+
const text = await readFile(file, "utf8");
376+
const lines = text.trimEnd().split("\n");
377+
// Walk backward to find a valid JSON line (skip torn-tail edge cases).
378+
for (let i = lines.length - 1; i >= 0; i--) {
379+
const line = lines[i].trim();
380+
if (!line) continue;
381+
try {
382+
const entry = JSON.parse(line) as LogEntry;
383+
if (typeof entry.seq === "number") {
384+
lastSeq = entry.seq;
385+
return lastSeq;
386+
}
387+
} catch { /* malformed; keep looking */ }
388+
}
389+
lastSeq = 0;
390+
return 0;
391+
} catch {
392+
lastSeq = 0;
393+
return 0;
394+
}
354395
}
396+
397+
async function readAll<T>(): Promise<LogEntry<T>[]> {
398+
let text: string;
399+
try { text = await readFile(file, "utf8"); }
400+
catch { return []; }
401+
const out: LogEntry<T>[] = [];
402+
for (const line of text.split("\n")) {
403+
const trimmed = line.trim();
404+
if (!trimmed) continue;
405+
try { out.push(JSON.parse(trimmed) as LogEntry<T>); }
406+
catch { /* skip malformed */ }
407+
}
408+
return out;
409+
}
410+
355411
return {
356-
async append() { notImpl(); },
357-
async read() { notImpl(); },
358-
subscribe() { notImpl(); },
412+
async append<T = unknown>(entries: T[]): Promise<{ lastSeq: number }> {
413+
const turn = appendChain.then(async () => {
414+
let seq = await loadLastSeq();
415+
const ts = Date.now();
416+
const newEntries: LogEntry<T>[] = [];
417+
const lines: string[] = [];
418+
for (const data of entries) {
419+
seq++;
420+
const e: LogEntry<T> = { seq, ts, data };
421+
newEntries.push(e);
422+
lines.push(JSON.stringify(e));
423+
}
424+
await mkdir(dirname(file), { recursive: true });
425+
await appendFile(file, lines.length ? lines.join("\n") + "\n" : "", "utf8");
426+
lastSeq = seq;
427+
for (const e of newEntries) emitter.emit("entry", e);
428+
return { lastSeq: seq };
429+
});
430+
// Never let an append failure poison the chain.
431+
appendChain = turn.then(() => undefined, () => undefined);
432+
return turn;
433+
},
434+
435+
async read<T = unknown>(opts?: { sinceSeq?: number; limit?: number; reverse?: boolean }): Promise<LogEntry<T>[]> {
436+
let entries = await readAll<T>();
437+
if (opts?.sinceSeq != null) {
438+
const since = opts.sinceSeq;
439+
entries = entries.filter((e) => e.seq > since);
440+
}
441+
if (opts?.reverse) entries.reverse();
442+
if (opts?.limit != null) entries = entries.slice(0, opts.limit);
443+
return entries;
444+
},
445+
446+
subscribe<T = unknown>(
447+
opts: { sinceSeq?: number } | undefined,
448+
fn: (entry: LogEntry<T>) => void,
449+
): Unsubscribe {
450+
// Live-only fast path.
451+
if (opts?.sinceSeq == null) {
452+
const direct = (e: LogEntry<T>) => fn(e);
453+
emitter.on("entry", direct);
454+
return () => emitter.off("entry", direct);
455+
}
456+
457+
// Replay-then-live with buffer to avoid gaps or dupes.
458+
const since = opts.sinceSeq;
459+
let stopped = false;
460+
let replaying = true;
461+
const buffered: LogEntry<T>[] = [];
462+
const handler = (e: LogEntry<T>) => {
463+
if (stopped) return;
464+
if (replaying) buffered.push(e);
465+
else fn(e);
466+
};
467+
emitter.on("entry", handler);
468+
469+
void readAll<T>().then((all) => {
470+
if (stopped) return;
471+
const past = all.filter((e) => e.seq > since);
472+
let lastSent = since;
473+
for (const e of past) {
474+
fn(e);
475+
if (e.seq > lastSent) lastSent = e.seq;
476+
}
477+
for (const e of buffered) {
478+
if (e.seq > lastSent) fn(e);
479+
}
480+
buffered.length = 0;
481+
replaying = false;
482+
}).catch(() => {
483+
// If the read fails, fall through to live-only mode.
484+
replaying = false;
485+
buffered.length = 0;
486+
});
487+
488+
return () => {
489+
stopped = true;
490+
emitter.off("entry", handler);
491+
};
492+
},
493+
494+
async truncateBefore(seq: number): Promise<{ removed: number }> {
495+
// Serialize against appends so we don't drop concurrent writes.
496+
const turn = appendChain.then(async () => {
497+
const all = await readAll();
498+
const kept = all.filter((e) => e.seq > seq);
499+
const removed = all.length - kept.length;
500+
if (removed === 0) return { removed: 0 };
501+
await mkdir(dirname(file), { recursive: true });
502+
const tmp = file + ".tmp";
503+
const text = kept.length ? kept.map((e) => JSON.stringify(e)).join("\n") + "\n" : "";
504+
await writeFile(tmp, text, "utf8");
505+
await rename(tmp, file);
506+
// Recompute lastSeq from kept.
507+
lastSeq = kept.length ? kept[kept.length - 1].seq : 0;
508+
return { removed };
509+
});
510+
appendChain = turn.then(() => undefined, () => undefined);
511+
return turn;
512+
},
513+
514+
__destroy() {
515+
emitter.removeAllListeners();
516+
},
359517
};
360518
}
361519

@@ -383,12 +541,12 @@ export function createFsDriver(opts: FsDriverOptions): StorageDriver {
383541
const dir = projectDir(id);
384542
const meta = createFsJsonKv(resolvePath(dir, ".meta"));
385543
const files = createFsBlobStore(dir, { reloadDebounceMs });
386-
const history = createStubAppendLog();
544+
const history = createFsAppendLog(resolvePath(dir, ".meta"));
387545
return {
388546
meta,
389547
files,
390548
history,
391-
__destroy() { files.__destroy(); },
549+
__destroy() { files.__destroy(); history.__destroy(); },
392550
};
393551
}
394552

0 commit comments

Comments
 (0)