From 2d0b8399f8ff5629cf50fe521fe95d1b4e46bf7e Mon Sep 17 00:00:00 2001 From: Joey Yakimowich-Payne Date: Thu, 16 Apr 2026 17:33:48 -0600 Subject: [PATCH] feat(server): add reconnection with 60s grace + snapshot resume (P4.7) On disconnect, a player's slot is held for 60s via ReconnectManager. During the grace window, game.delta frames destined for the absent slot are buffered in order. If the client reconnects with its original token (envelope-level), the grace timer is cancelled and the server replays a fresh game.state snapshot plus every buffered delta. Timer expiry triggers the original "player_left" game.end cleanup that previously ran immediately on disconnect. - new packages/server/src/reconnect.ts: ReconnectManager (no WS refs, no registry coupling, unref'd timers so tests don't block exit) - broadcast.ts: unregisterConnection starts grace; handleRoomJoin routes reconnect path via envelope token + isPending check; handleGameMove buffers deltas for disconnected opponents - reconnect.test.ts: 9 unit cases (grace/cancel/buffer/expire/reset) - broadcast.test.ts: end-to-end reconnect scenario + negative case --- packages/server/src/broadcast.test.ts | 158 +++++++++++++++++- packages/server/src/broadcast.ts | 227 +++++++++++++++++++++++--- packages/server/src/index.ts | 7 + packages/server/src/logging.test.ts | 75 +++++++++ packages/server/src/logging.ts | 88 ++++++++++ packages/server/src/reconnect.test.ts | 128 +++++++++++++++ packages/server/src/reconnect.ts | 141 ++++++++++++++++ packages/server/src/rooms.ts | 4 + 8 files changed, 807 insertions(+), 21 deletions(-) create mode 100644 packages/server/src/logging.test.ts create mode 100644 packages/server/src/logging.ts create mode 100644 packages/server/src/reconnect.test.ts create mode 100644 packages/server/src/reconnect.ts diff --git a/packages/server/src/broadcast.test.ts b/packages/server/src/broadcast.test.ts index 4177003..02de24e 100644 --- a/packages/server/src/broadcast.test.ts +++ b/packages/server/src/broadcast.test.ts @@ -8,7 +8,7 @@ // to-sender) while keeping the tests runtime-agnostic. import type { ServerWebSocket } from "bun"; -import { beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { handleMessage, @@ -19,6 +19,7 @@ import { type ClientData, } from "./broadcast.js"; import { PROTOCOL_VERSION, type ClientMessage } from "./protocol.js"; +import { reconnectManager } from "./reconnect.js"; // --------------------------------------------------------------------------- // Mock ServerWebSocket @@ -99,6 +100,30 @@ function sendClient( ); } +/** + * Like `sendClient` but attaches a `token` to the envelope — the + * reconnect flow's trigger. Reused by the P4.7 tests. + */ +function sendClientWithToken( + ws: MockWs, + type: ClientMessage["type"], + payload: unknown, + token: string, + seq = 1, +): void { + handleMessage( + ws, + JSON.stringify({ + v: PROTOCOL_VERSION, + seq, + ts: Date.now(), + token, + type, + payload, + }), + ); +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -285,3 +310,134 @@ describe("broadcast — room.create / room.join / game.move", () => { unregisterConnection(a); }); }); + +// --------------------------------------------------------------------------- +// P4.7: reconnect flow +// --------------------------------------------------------------------------- + +describe("broadcast — reconnect grace + snapshot resume (P4.7)", () => { + afterEach(() => { + // Reconnect state is process-global; wipe any timers a test may have + // left running so they don't leak across cases. + reconnectManager.clearAll(); + }); + + it("disconnect → opponent moves → reconnect receives game.state + buffered delta", () => { + const a = makeMockWs("A"); + const b = makeMockWs("B"); + registerConnection(a); + registerConnection(b); + + // Room setup + one white move so state is non-trivial. + sendClient(a, "room.create", { rulesetIds: [] }); + const created = nextMsgOfType(a, "room.created"); + const code = created.payload["code"] as string; + const tokenA = created.payload["token"] as string; + + sendClient(b, "room.join", { code }); + nextMsgOfType(b, "room.joined"); + nextMsgOfType(a, "game.state"); + nextMsgOfType(b, "game.state"); + + sendClient(a, "game.move", { from: "e2", to: "e4" }, 2); + nextMsgOfType(a, "game.delta"); + nextMsgOfType(b, "game.delta"); + + // --- A drops -------------------------------------------------------- + unregisterConnection(a); + // The slot must NOT be torn down yet — grace window is open. + expect(reconnectManager.isPending(tokenA)).toBe(true); + expect(roomRegistry.getRoom(code)).toBeDefined(); + // B must NOT have received game.end yet (it might later if A fails + // to come back, but the grace window suppresses it for now). + expect( + b.sent.find( + (m) => (m as { type?: string }).type === "game.end", + ), + ).toBeUndefined(); + + // --- B keeps playing during A's absence ----------------------------- + sendClient(b, "game.move", { from: "e7", to: "e5" }, 2); + // B sees its own delta immediately. + const bDelta = nextMsgOfType(b, "game.delta"); + expect(bDelta.payload["moveNotation"]).toBe("e7e5"); + // A's socket is the old (closed) mock — nothing should have been + // written to it because it's been unregistered. We only assert that + // the delta is buffered in the reconnect manager. The buffer is + // internal, but we'll observe it indirectly on reconnect below. + + // --- A reconnects via a BRAND-NEW socket carrying the old token ---- + const aNew = makeMockWs("A2"); + registerConnection(aNew); + sendClientWithToken(aNew, "room.join", { code }, tokenA); + + // Assertions on the reconnect inbox, in order: + // 1. room.joined echoes the color/token of the resumed slot. + const joined = nextMsgOfType(aNew, "room.joined"); + expect(joined.payload["token"]).toBe(tokenA); + expect(joined.payload["color"]).toBe("white"); + + // 2. game.state carries the authoritative fact set reflecting + // BOTH moves (e4 and e5). Easiest way to verify: a pawn must + // live on e4 (sq 28) and on e5 (sq 36), and NOT on e2 / e7. + const state = nextMsgOfType(aNew, "game.state"); + const facts = state.payload["facts"] as Array<{ + attr: string; + value: unknown; + }>; + const positions = facts + .filter((f) => f.attr === "Position") + .map((f) => f.value); + expect(positions).toContain(28); // e4 + expect(positions).toContain(36); // e5 + expect(positions).not.toContain(12); // e2 vacated + expect(positions).not.toContain(52); // e7 vacated + // It is white's turn again (black just moved). + expect(state.payload["turn"]).toBe("white"); + + // 3. The buffered delta for e7→e5 replayed verbatim. + const replayed = nextMsgOfType(aNew, "game.delta"); + expect(replayed.payload["moveNotation"]).toBe("e7e5"); + expect(replayed.payload["turn"]).toBe("white"); + + // --- Post-conditions ----------------------------------------------- + // The manager should have cleared the pending entry. + expect(reconnectManager.isPending(tokenA)).toBe(false); + // And the slot is marked connected again — otherwise subsequent + // moves wouldn't re-buffer on a future disconnect. + const player = roomRegistry.getPlayerByToken(code, tokenA); + expect(player?.connected).toBe(true); + + // Cleanup — leave voluntarily so no grace timers linger. + sendClientWithToken( + aNew, + "room.leave", + {}, + tokenA, + 3, + ); + unregisterConnection(aNew); + unregisterConnection(b); + }); + + it("reconnect without an active grace window is rejected (falls through to normal join)", () => { + // A never disconnects. If A tries to "reconnect" with its own token + // into its own room, the handler should NOT short-circuit into the + // reconnect path (no grace is pending), it should try to re-join + // and be rejected because it's already bound to a room. + const a = makeMockWs("A"); + registerConnection(a); + sendClient(a, "room.create", { rulesetIds: [] }); + const created = nextMsgOfType(a, "room.created"); + const code = created.payload["code"] as string; + const tokenA = created.payload["token"] as string; + + sendClientWithToken(a, "room.join", { code }, tokenA); + // The socket is already bound; we get INVALID_MESSAGE, not + // room.joined. + const err = nextMsgOfType(a, "error"); + expect(err.payload["code"]).toBe("INVALID_MESSAGE"); + + unregisterConnection(a); + }); +}); diff --git a/packages/server/src/broadcast.ts b/packages/server/src/broadcast.ts index 94e1cfa..1f926ec 100644 --- a/packages/server/src/broadcast.ts +++ b/packages/server/src/broadcast.ts @@ -12,6 +12,13 @@ import type { ServerWebSocket } from "bun"; import { GameSessionRegistry } from "./game-session.js"; import { logger } from "./logger.js"; +import { + incMessages, + incMovesFail, + incMovesOk, + recordTickDuration, + setActiveRooms, +} from "./logging.js"; import { RateLimiter } from "./middleware.js"; import { PROTOCOL_VERSION, @@ -23,6 +30,7 @@ import { type RoomJoinPayload, type ServerMessage, } from "./protocol.js"; +import { DEFAULT_GRACE_MS, reconnectManager } from "./reconnect.js"; import { RoomRegistry } from "./rooms.js"; // --------------------------------------------------------------------------- @@ -71,17 +79,36 @@ export function registerConnection(ws: ServerWebSocket): void { export function unregisterConnection(ws: ServerWebSocket): void { connections.delete(ws.data.clientId); - // If the socket was bound to a room, mark the slot disconnected so the - // reconnect grace window (handled elsewhere) can run. We intentionally - // do NOT leaveRoom here — disconnect ≠ leave per PROTOCOL.md. + // If the socket was bound to a room, mark the slot disconnected and + // start the 60-second reconnect grace window. We intentionally do NOT + // leaveRoom here — disconnect ≠ leave per PROTOCOL.md. The slot stays + // alive until the timer fires or the client returns with its token. const { roomCode, token } = ws.data; - if (roomCode !== undefined && token !== undefined) { - roomRegistry.markDisconnected(roomCode, token); - // Broadcast game.end to the remaining player so they know their - // opponent dropped. v1 treats disconnect as immediate game end; the - // 60-second reconnect grace is a follow-up (see PROTOCOL.md). + if (roomCode === undefined || token === undefined) return; + + // Short-circuit: if the room or slot has already been torn down (e.g. + // opponent left first), there's nothing to preserve. + const player = roomRegistry.getPlayerByToken(roomCode, token); + if (!player) return; + + roomRegistry.markDisconnected(roomCode, token); + + // Capture roomCode + token into the closure so onExpire doesn't need + // any ambient `this`. The closure runs up to DEFAULT_GRACE_MS later + // on the event loop — by then ws.data may already be GC'd. + reconnectManager.startGrace(token, DEFAULT_GRACE_MS, () => { + // Grace expired without a reconnect — treat as a real departure. + // Broadcast game.end to the remaining player, drop the session, + // and remove the slot from the room. Defensive: the room / session + // may have been torn down by the opponent leaving during the + // window; every lookup below tolerates missing state. broadcastGameEnd(roomCode, token, "player_left"); - } + roomRegistry.leaveRoom(roomCode, token); + if (roomRegistry.getRoom(roomCode) === undefined) { + sessionRegistry.delete(roomCode); + } + setActiveRooms(roomRegistry.getRoomCount()); + }); } // --------------------------------------------------------------------------- @@ -163,6 +190,7 @@ export function handleMessage( ws: ServerWebSocket, raw: string | Buffer, ): void { + incMessages(); const str = typeof raw === "string" ? raw : raw.toString("utf8"); const result = validateMessageString(str); @@ -188,7 +216,11 @@ export function handleMessage( handleRoomCreate(ws, msg.payload); break; case "room.join": - handleRoomJoin(ws, msg.payload); + // `token` is an OPTIONAL envelope-level field (see protocol.ts + // `envelopeShape`). Reconnecting clients replay their original + // token here so handleRoomJoin can distinguish reconnect from + // a fresh join. Fresh joins simply omit it. + handleRoomJoin(ws, msg.payload, msg.token); break; case "room.leave": handleRoomLeave(ws); @@ -238,6 +270,7 @@ function handleRoomCreate( sessionRegistry.create(code, rulesetIds); ws.data.roomCode = code; ws.data.token = token; + setActiveRooms(roomRegistry.getRoomCount()); logger .child({ clientId: ws.data.clientId, roomCode: code }) .info("room.create"); @@ -247,6 +280,7 @@ function handleRoomCreate( function handleRoomJoin( ws: ServerWebSocket, payload: RoomJoinPayload, + envelopeToken: string | undefined, ): void { if (ws.data.roomCode !== undefined) { sendTo( @@ -259,6 +293,28 @@ function handleRoomJoin( ); return; } + + // ---- Reconnect path -------------------------------------------------- + // Three conditions must ALL hold for this to be a reconnect: + // 1. Envelope carries a token (the client claims it's returning). + // 2. That token belongs to a RoomPlayer in `code` (token is valid). + // 3. A grace window is currently pending for that token (the slot + // was recently disconnected and hasn't expired yet). + // Any failure falls through to the normal join path — a bogus token + // should not reveal whether a matching slot exists, so a mismatched + // reconnect attempt behaves as a fresh second-player join attempt, + // which will yield ROOM_FULL if the room is already occupied. + if (envelopeToken !== undefined) { + const existing = roomRegistry.getPlayerByToken( + payload.code, + envelopeToken, + ); + if (existing && reconnectManager.isPending(envelopeToken)) { + handleReconnect(ws, payload.code, existing.token, existing.color); + return; + } + } + const result = roomRegistry.joinRoom(payload.code); if ("error" in result) { sendTo( @@ -309,6 +365,99 @@ function handleRoomJoin( ); } +/** + * Resume a disconnected slot. Cancels the grace timer, rebinds the + * socket to the room, sends a `room.joined` acknowledgement, a fresh + * `game.state` snapshot, then replays every `game.delta` that was + * buffered while the client was away — in the original seq order so + * the client's fact store ends up in the same state as the opponent's. + * + * The replayed deltas are re-enveloped with a NEW server seq (the + * original seq is preserved inside the payload via game.state.lastSeq + * semantics on the client). We don't replay the historical seq verbatim + * because doing so would require a second, separate counter space and + * the protocol doesn't need it: the client treats deltas as authoritative + * regardless of their envelope seq. + */ +function handleReconnect( + ws: ServerWebSocket, + code: string, + token: string, + color: "white" | "black", +): void { + // cancelGrace MUST return deltas (isPending was true above), but we + // defend against a race where the timer fires between the isPending + // check and here. If the window expired we fall back to treating this + // as a failed reconnect — the caller already emitted game.end. + const missed = reconnectManager.cancelGrace(token); + if (missed === undefined) { + sendTo( + ws, + errorMessage( + "ROOM_NOT_FOUND", + `reconnect grace expired for room ${code}`, + false, + ), + ); + return; + } + const session = sessionRegistry.get(code); + const room = roomRegistry.getRoom(code); + if (!session || !room) { + sendTo( + ws, + errorMessage( + "ROOM_NOT_FOUND", + `reconnect: room ${code} no longer exists`, + false, + ), + ); + return; + } + + roomRegistry.markConnected(code, token); + ws.data.roomCode = code; + ws.data.token = token; + logger + .child({ clientId: ws.data.clientId, roomCode: code }) + .info("room.join (reconnect)"); + + sendTo( + ws, + envelope("room.joined", { + code, + token, + color, + activeRules: [...room.rulesetIds], + }), + ); + + // Snapshot: authoritative state for the returning client. The client + // discards its local fact store and rebuilds from this frame. + sendTo( + ws, + envelope("game.state", { + facts: session.getAllFacts(), + turn: session.getTurn(), + // lastSeq tells the client the highest delta seq it can expect + // to have already processed. For a reconnect we set it to the + // most recent buffered seq (or 0 if nothing was missed) so the + // client can detect any subsequent gaps. + lastSeq: missed.length > 0 ? (missed[missed.length - 1]?.seq ?? 0) : 0, + moveHistory: [], + activeRules: [...room.rulesetIds], + fen: "", + }), + ); + + // Replay buffered deltas IN ORDER. We re-envelope each one so it + // carries a fresh, monotonically increasing seq relative to other + // server traffic on this socket. + for (const delta of missed) { + sendTo(ws, envelope("game.delta", delta.payload)); + } +} + function handleRoomLeave(ws: ServerWebSocket): void { const { roomCode, token } = ws.data; if (roomCode === undefined || token === undefined) { @@ -329,6 +478,7 @@ function handleRoomLeave(ws: ServerWebSocket): void { if (roomRegistry.getRoom(roomCode) === undefined) { sessionRegistry.delete(roomCode); } + setActiveRooms(roomRegistry.getRoomCount()); delete ws.data.roomCode; delete ws.data.token; } @@ -371,6 +521,7 @@ function handleGameMove( // so clients can distinguish "you're premoving" from "that move is // illegal for the side to move". if (player.color !== session.getTurn()) { + incMovesFail(); sendTo( ws, errorMessage("NOT_YOUR_TURN", "it is not your turn", false), @@ -378,12 +529,14 @@ function handleGameMove( return; } + const tickStart = performance.now(); const moveResult = session.applyMove( payload.from, payload.to, payload.promoteTo, ); if (!moveResult.ok) { + incMovesFail(); const code: ErrorCode = moveResult.error; sendTo( ws, @@ -397,6 +550,8 @@ function handleGameMove( ); return; } + incMovesOk(); + recordTickDuration(performance.now() - tickStart); // v1 move notation is compact `fromto[promotion]`. PROTOCOL.md example // uses "e2e4"; promotion is appended as the first letter lower-case @@ -406,16 +561,19 @@ function handleGameMove( payload.to + (payload.promoteTo ? payload.promoteTo[0] ?? "" : ""); - broadcastToRoom( - roomCode, - envelope("game.delta", { - inserted: moveResult.inserted satisfies WireFact[], - retracted: moveResult.retracted satisfies WireFact[], - moveNotation, - turn: moveResult.turn, - gameOver: moveResult.gameOver, - }), - ); + const deltaPayload = { + inserted: moveResult.inserted satisfies WireFact[], + retracted: moveResult.retracted satisfies WireFact[], + moveNotation, + turn: moveResult.turn, + gameOver: moveResult.gameOver, + }; + const deltaMsg = envelope("game.delta", deltaPayload); + // Broadcast to currently-connected sockets in the room, AND buffer a + // copy for every slot that is mid-grace-window so a reconnecting + // client can replay the delta on return. + broadcastToRoom(roomCode, deltaMsg); + bufferDeltaForDisconnected(roomCode, token, deltaMsg.seq, deltaPayload); // Terminal positions also get an explicit game.end for clarity per // PROTOCOL.md §game.end. `finalFen` is empty for v1 (see note above). @@ -431,6 +589,35 @@ function handleGameMove( } } +/** + * For every OTHER player in `code` whose slot is mid-grace-window + * (disconnected), buffer a copy of a just-broadcast delta so the + * reconnect path can replay it. `moverToken` is the sender — we never + * buffer back to the player who made the move. + * + * The buffer is keyed on the opponent's token. Delegates the + * pending-check to ReconnectManager so we don't double-buffer for a + * slot that was already torn down. + */ +function bufferDeltaForDisconnected( + code: string, + moverToken: string, + seq: number, + payload: object, +): void { + const room = roomRegistry.getRoom(code); + if (!room) return; + for (const player of room.players.values()) { + if (player.token === moverToken) continue; + // bufferDelta itself no-ops when no window is pending, but the + // explicit check documents intent and avoids building unnecessary + // closures on the hot path. + if (reconnectManager.isPending(player.token)) { + reconnectManager.bufferDelta(player.token, seq, payload); + } + } +} + /** * Broadcast a game.end to everyone in `code` EXCEPT the player whose * token is `leaverToken`. Used when a player disconnects or leaves diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 835d9a4..1def980 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -8,6 +8,7 @@ import { type ClientData, } from "./broadcast.js"; import { logger } from "./logger.js"; +import { renderMetrics } from "./logging.js"; import { checkMessageSize, checkOrigin, @@ -46,6 +47,12 @@ export const server = Bun.serve({ return Response.json({ ok: true, version: "1.0.0" }); } + if (url.pathname === "/metrics") { + return new Response(renderMetrics(), { + headers: { "Content-Type": "text/plain; version=0.0.4; charset=utf-8" }, + }); + } + return new Response("Not Found", { status: 404 }); }, websocket: { diff --git a/packages/server/src/logging.test.ts b/packages/server/src/logging.test.ts new file mode 100644 index 0000000..d980de7 --- /dev/null +++ b/packages/server/src/logging.test.ts @@ -0,0 +1,75 @@ +import { beforeEach, describe, expect, test } from "vitest"; + +import { + __resetForTests, + incMessages, + incMovesFail, + incMovesOk, + recordTickDuration, + renderMetrics, + setActiveRooms, +} from "./logging.js"; + +beforeEach(() => { + __resetForTests(); +}); + +describe("renderMetrics — Prometheus text format", () => { + test("initial state produces zero-valued metrics", () => { + const out = renderMetrics(); + expect(out).toContain("rooms_active 0"); + expect(out).toContain("messages_received_total 0"); + expect(out).toContain('moves_validated_total{result="ok"} 0'); + expect(out).toContain('moves_validated_total{result="fail"} 0'); + expect(out).toContain("tick_duration_milliseconds_sum 0"); + expect(out).toContain("tick_duration_milliseconds_count 0"); + }); + + test("incMessages increments messages_received_total", () => { + incMessages(); + incMessages(); + expect(renderMetrics()).toContain("messages_received_total 2"); + }); + + test("incMovesOk / incMovesFail are independent counters", () => { + incMovesOk(); + incMovesOk(); + incMovesOk(); + incMovesFail(); + const out = renderMetrics(); + expect(out).toContain('moves_validated_total{result="ok"} 3'); + expect(out).toContain('moves_validated_total{result="fail"} 1'); + }); + + test("setActiveRooms updates the gauge", () => { + setActiveRooms(4); + expect(renderMetrics()).toContain("rooms_active 4"); + setActiveRooms(1); + expect(renderMetrics()).toContain("rooms_active 1"); + }); + + test("recordTickDuration accumulates sum and count", () => { + recordTickDuration(10); + recordTickDuration(20); + recordTickDuration(30); + const out = renderMetrics(); + expect(out).toContain("tick_duration_milliseconds_sum 60"); + expect(out).toContain("tick_duration_milliseconds_count 3"); + }); + + test("output includes required HELP and TYPE lines", () => { + const out = renderMetrics(); + expect(out).toContain("# HELP rooms_active"); + expect(out).toContain("# TYPE rooms_active gauge"); + expect(out).toContain("# HELP messages_received_total"); + expect(out).toContain("# TYPE messages_received_total counter"); + expect(out).toContain("# HELP moves_validated_total"); + expect(out).toContain("# TYPE moves_validated_total counter"); + expect(out).toContain("# HELP tick_duration_milliseconds"); + expect(out).toContain("# TYPE tick_duration_milliseconds summary"); + }); + + test("output ends with a trailing newline (Prometheus spec)", () => { + expect(renderMetrics().endsWith("\n")).toBe(true); + }); +}); diff --git a/packages/server/src/logging.ts b/packages/server/src/logging.ts new file mode 100644 index 0000000..08f492b --- /dev/null +++ b/packages/server/src/logging.ts @@ -0,0 +1,88 @@ +// Prometheus-format metrics for the chess server (P4.8). +// +// Intentionally hand-rolled — no prom-client dependency. All state is +// module-level and resets on process restart, which is intentional for v1. + +// --------------------------------------------------------------------------- +// Counters & gauges +// --------------------------------------------------------------------------- + +const counters = { + messages_received_total: 0, + moves_validated_total_ok: 0, + moves_validated_total_fail: 0, +}; + +let activeRooms = 0; + +// --------------------------------------------------------------------------- +// Summary: tick (move-processing) duration in milliseconds +// --------------------------------------------------------------------------- + +const tickDurations: number[] = []; + +// --------------------------------------------------------------------------- +// Mutators — called from broadcast.ts +// --------------------------------------------------------------------------- + +export function incMessages(): void { + counters.messages_received_total++; +} + +export function incMovesOk(): void { + counters.moves_validated_total_ok++; +} + +export function incMovesFail(): void { + counters.moves_validated_total_fail++; +} + +export function setActiveRooms(n: number): void { + activeRooms = n; +} + +export function recordTickDuration(ms: number): void { + tickDurations.push(ms); +} + +// --------------------------------------------------------------------------- +// Renderer — Prometheus text exposition format 0.0.4 +// --------------------------------------------------------------------------- + +export function renderMetrics(): string { + const sum = tickDurations.reduce((acc, v) => acc + v, 0); + const count = tickDurations.length; + + return [ + "# HELP rooms_active Number of active game rooms", + "# TYPE rooms_active gauge", + `rooms_active ${activeRooms}`, + "", + "# HELP messages_received_total Total messages received", + "# TYPE messages_received_total counter", + `messages_received_total ${counters.messages_received_total}`, + "", + "# HELP moves_validated_total Total moves validated", + "# TYPE moves_validated_total counter", + `moves_validated_total{result="ok"} ${counters.moves_validated_total_ok}`, + `moves_validated_total{result="fail"} ${counters.moves_validated_total_fail}`, + "", + "# HELP tick_duration_milliseconds Tick (move-processing) duration in milliseconds", + "# TYPE tick_duration_milliseconds summary", + `tick_duration_milliseconds_sum ${sum}`, + `tick_duration_milliseconds_count ${count}`, + "", + ].join("\n"); +} + +// --------------------------------------------------------------------------- +// Test-only reset — never call from production code +// --------------------------------------------------------------------------- + +export const __resetForTests = (): void => { + counters.messages_received_total = 0; + counters.moves_validated_total_ok = 0; + counters.moves_validated_total_fail = 0; + activeRooms = 0; + tickDurations.length = 0; +}; diff --git a/packages/server/src/reconnect.test.ts b/packages/server/src/reconnect.test.ts new file mode 100644 index 0000000..e3bd137 --- /dev/null +++ b/packages/server/src/reconnect.test.ts @@ -0,0 +1,128 @@ +// Unit tests for ReconnectManager (P4.7). +// +// These tests exercise the manager in isolation — no WebSockets, no +// RoomRegistry. Real wall-clock timing would make the "grace expires" +// case either slow or flaky, so we use vitest's fake-timer facility. +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { ReconnectManager } from "./reconnect.js"; + +describe("ReconnectManager", () => { + let mgr: ReconnectManager; + + beforeEach(() => { + mgr = new ReconnectManager(); + // Fake timers so `setTimeout` never really fires — we advance the + // clock explicitly via vi.advanceTimersByTime. + vi.useFakeTimers(); + }); + + afterEach(() => { + mgr.clearAll(); + vi.useRealTimers(); + }); + + it("startGrace followed by isPending returns true", () => { + mgr.startGrace("tok-a", 1_000, () => {}); + expect(mgr.isPending("tok-a")).toBe(true); + // A different token is not pending — scoping is strict. + expect(mgr.isPending("tok-b")).toBe(false); + }); + + it("cancelGrace within the window returns buffered deltas and clears isPending", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 1_000, onExpire); + mgr.bufferDelta("tok-a", 1, { move: "e2e4" }); + mgr.bufferDelta("tok-a", 2, { move: "d2d4" }); + + const deltas = mgr.cancelGrace("tok-a"); + expect(deltas).toEqual([ + { seq: 1, payload: { move: "e2e4" } }, + { seq: 2, payload: { move: "d2d4" } }, + ]); + expect(mgr.isPending("tok-a")).toBe(false); + // Crucially, cancelling must NOT trigger the expiry callback. + // Advance well past the original grace period to prove the + // timer was cleared. + vi.advanceTimersByTime(10_000); + expect(onExpire).not.toHaveBeenCalled(); + }); + + it("cancelGrace on an unknown token returns undefined", () => { + expect(mgr.cancelGrace("never-seen")).toBeUndefined(); + }); + + it("timer fires onExpire and removes the pending entry", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 50, onExpire); + expect(mgr.isPending("tok-a")).toBe(true); + + // Advance past the grace period — the scheduled callback must fire + // and the manager must forget the entry. + vi.advanceTimersByTime(60); + expect(onExpire).toHaveBeenCalledTimes(1); + expect(mgr.isPending("tok-a")).toBe(false); + }); + + it("bufferDelta on a non-pending token is a silent no-op", () => { + // No grace started — buffering should not throw and should not + // create an implicit pending entry. + mgr.bufferDelta("tok-a", 1, { anything: true }); + expect(mgr.isPending("tok-a")).toBe(false); + // Now start a grace and confirm nothing was surreptitiously + // retained from the earlier call. + mgr.startGrace("tok-a", 1_000, () => {}); + expect(mgr.cancelGrace("tok-a")).toEqual([]); + }); + + it("bufferDelta accumulates in insertion order", () => { + mgr.startGrace("tok-a", 1_000, () => {}); + for (let i = 1; i <= 5; i++) { + mgr.bufferDelta("tok-a", i, { step: i }); + } + const deltas = mgr.cancelGrace("tok-a"); + expect(deltas?.map((d) => d.seq)).toEqual([1, 2, 3, 4, 5]); + }); + + it("starting a second grace for the same token replaces the first timer", () => { + const firstExpire = vi.fn(); + const secondExpire = vi.fn(); + mgr.startGrace("tok-a", 100, firstExpire); + mgr.bufferDelta("tok-a", 1, { a: 1 }); + // Restart: previous buffered deltas are dropped (the client is + // effectively starting a brand-new grace window). + mgr.startGrace("tok-a", 100, secondExpire); + vi.advanceTimersByTime(150); + expect(firstExpire).not.toHaveBeenCalled(); + expect(secondExpire).toHaveBeenCalledTimes(1); + }); + + it("multiple tokens have independent lifecycles", () => { + const aExpire = vi.fn(); + const bExpire = vi.fn(); + mgr.startGrace("tok-a", 100, aExpire); + mgr.startGrace("tok-b", 200, bExpire); + mgr.bufferDelta("tok-a", 1, { for: "a" }); + mgr.bufferDelta("tok-b", 1, { for: "b" }); + + // Cancel A before it expires; B should be untouched. + const aDeltas = mgr.cancelGrace("tok-a"); + expect(aDeltas).toEqual([{ seq: 1, payload: { for: "a" } }]); + expect(mgr.isPending("tok-b")).toBe(true); + + vi.advanceTimersByTime(250); + expect(aExpire).not.toHaveBeenCalled(); + expect(bExpire).toHaveBeenCalledTimes(1); + }); + + it("clearAll stops all pending timers without firing callbacks", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 100, onExpire); + mgr.startGrace("tok-b", 100, onExpire); + mgr.clearAll(); + vi.advanceTimersByTime(1_000); + expect(onExpire).not.toHaveBeenCalled(); + expect(mgr.isPending("tok-a")).toBe(false); + expect(mgr.isPending("tok-b")).toBe(false); + }); +}); diff --git a/packages/server/src/reconnect.ts b/packages/server/src/reconnect.ts new file mode 100644 index 0000000..accd4cc --- /dev/null +++ b/packages/server/src/reconnect.ts @@ -0,0 +1,141 @@ +// Reconnection grace-window manager (P4.7). +// +// When a player's socket drops, their room slot is held for a bounded +// grace period (default 60s per PROTOCOL.md). During that window we: +// 1. Keep the RoomPlayer entry alive (marked disconnected via +// RoomRegistry.markDisconnected — done by the caller). +// 2. Buffer every `game.delta` destined for that slot, keyed by the +// slot's authentication token, so the client can replay them on +// reconnect in exact server order. +// 3. If the timer fires without a reconnect, invoke the caller's +// onExpire callback so broadcast.ts can tear the room down (send +// game.end to the opponent, drop the session, etc.). +// +// The manager is deliberately storage-only: it holds no references to +// WebSockets, RoomRegistry, or GameSessionRegistry. All room-level +// bookkeeping (broadcasting game.end on expiry, purging the session) +// lives in broadcast.ts via the onExpire closure. That keeps this +// module free of server-wide side effects and trivially testable. +// +// v1 scope (per spec): we do NOT implement the "other player continues +// playing against themselves" edge case. We simply buffer whatever +// deltas the game produces while a slot is absent and replay them on +// reconnect. If the opponent plays multiple moves during the grace +// window, the reconnecting client receives them all in order. + +/** A single buffered server frame destined for a disconnected slot. */ +export interface MissedDelta { + /** The `seq` the frame was assigned at broadcast time. */ + seq: number; + /** + * The `game.delta` payload body (inserted/retracted/moveNotation/…). + * Stored as `object` — we don't need to re-parse it and the caller + * already built it from typed data. + */ + payload: object; +} + +interface PendingEntry { + timer: ReturnType; + deltas: MissedDelta[]; + /** + * Captured so an external caller can force expiry (not currently + * used, but keeps the API symmetrical and makes cleanup explicit). + */ + onExpire: () => void; +} + +export class ReconnectManager { + // token → pending entry. A Map keyed on the secret token keeps lookups + // O(1) and avoids a secondary index; tokens are UUIDs so there is no + // collision concern across rooms. + private readonly pending = new Map(); + + /** + * Start a grace window for `token`. If the client doesn't reconnect + * before `gracePeriodMs` elapses, `onExpire` runs and the token is + * forgotten. + * + * Calling `startGrace` twice for the same token is a programming + * error (the previous timer would leak); we clear the existing timer + * defensively so a buggy caller cannot accumulate zombie timeouts. + */ + startGrace( + token: string, + gracePeriodMs: number, + onExpire: () => void, + ): void { + // Defensive: replace any existing entry so we never leak timers. + const existing = this.pending.get(token); + if (existing) { + clearTimeout(existing.timer); + } + const timer = setTimeout(() => { + // Remove before invoking the callback so onExpire observing + // `isPending` sees `false` — reflects the fact that the grace + // window is over from this manager's perspective. + this.pending.delete(token); + onExpire(); + }, gracePeriodMs); + // Node's setTimeout returns a Timeout object with an `unref` method; + // Bun's timer is compatible. We unref so a pending grace window + // never blocks process shutdown in tests that don't explicitly + // cancel. The runtime fallback is a no-op if `unref` is absent. + if (typeof (timer as { unref?: () => void }).unref === "function") { + (timer as { unref: () => void }).unref(); + } + this.pending.set(token, { timer, deltas: [], onExpire }); + } + + /** + * Append a frame to the buffer for `token` iff a grace window is + * active. Silently no-ops when no window is pending — callers can + * call unconditionally without guarding with `isPending`. + */ + bufferDelta(token: string, seq: number, payload: object): void { + const entry = this.pending.get(token); + if (!entry) return; + entry.deltas.push({ seq, payload }); + } + + /** + * Cancel a pending grace window because the client reconnected. + * Returns the accumulated deltas (possibly empty) so the caller can + * replay them. Returns `undefined` if no window was pending — that + * signals "this is a fresh join, not a reconnect" to broadcast.ts. + */ + cancelGrace(token: string): MissedDelta[] | undefined { + const entry = this.pending.get(token); + if (!entry) return undefined; + clearTimeout(entry.timer); + this.pending.delete(token); + return entry.deltas; + } + + /** True iff a grace window is currently active for `token`. */ + isPending(token: string): boolean { + return this.pending.has(token); + } + + /** + * Diagnostics/teardown only. Stops every outstanding timer without + * invoking the associated onExpire callbacks. Tests use this to keep + * the process-global manager clean between runs. + */ + clearAll(): void { + for (const entry of this.pending.values()) { + clearTimeout(entry.timer); + } + this.pending.clear(); + } +} + +/** + * Process-global manager shared by broadcast.ts. A singleton matches + * the rest of the server (roomRegistry, sessionRegistry) and keeps the + * grace state resident across all rooms in a single process. + */ +export const reconnectManager = new ReconnectManager(); + +/** Default grace window length — 60 seconds per PROTOCOL.md §Reconnect. */ +export const DEFAULT_GRACE_MS = 60_000; diff --git a/packages/server/src/rooms.ts b/packages/server/src/rooms.ts index 5e5e9ae..e09e83f 100644 --- a/packages/server/src/rooms.ts +++ b/packages/server/src/rooms.ts @@ -149,6 +149,10 @@ export class RoomRegistry { return this.rooms.get(code); } + getRoomCount(): number { + return this.rooms.size; + } + getPlayerByToken(code: string, token: string): RoomPlayer | undefined { return this.rooms.get(code)?.players.get(token); }