diff --git a/packages/server/src/broadcast.test.ts b/packages/server/src/broadcast.test.ts index 4177003..02de24e 100644 --- a/packages/server/src/broadcast.test.ts +++ b/packages/server/src/broadcast.test.ts @@ -8,7 +8,7 @@ // to-sender) while keeping the tests runtime-agnostic. import type { ServerWebSocket } from "bun"; -import { beforeEach, describe, expect, it } from "vitest"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { handleMessage, @@ -19,6 +19,7 @@ import { type ClientData, } from "./broadcast.js"; import { PROTOCOL_VERSION, type ClientMessage } from "./protocol.js"; +import { reconnectManager } from "./reconnect.js"; // --------------------------------------------------------------------------- // Mock ServerWebSocket @@ -99,6 +100,30 @@ function sendClient( ); } +/** + * Like `sendClient` but attaches a `token` to the envelope — the + * reconnect flow's trigger. Reused by the P4.7 tests. + */ +function sendClientWithToken( + ws: MockWs, + type: ClientMessage["type"], + payload: unknown, + token: string, + seq = 1, +): void { + handleMessage( + ws, + JSON.stringify({ + v: PROTOCOL_VERSION, + seq, + ts: Date.now(), + token, + type, + payload, + }), + ); +} + // --------------------------------------------------------------------------- // Tests // --------------------------------------------------------------------------- @@ -285,3 +310,134 @@ describe("broadcast — room.create / room.join / game.move", () => { unregisterConnection(a); }); }); + +// --------------------------------------------------------------------------- +// P4.7: reconnect flow +// --------------------------------------------------------------------------- + +describe("broadcast — reconnect grace + snapshot resume (P4.7)", () => { + afterEach(() => { + // Reconnect state is process-global; wipe any timers a test may have + // left running so they don't leak across cases. + reconnectManager.clearAll(); + }); + + it("disconnect → opponent moves → reconnect receives game.state + buffered delta", () => { + const a = makeMockWs("A"); + const b = makeMockWs("B"); + registerConnection(a); + registerConnection(b); + + // Room setup + one white move so state is non-trivial. + sendClient(a, "room.create", { rulesetIds: [] }); + const created = nextMsgOfType(a, "room.created"); + const code = created.payload["code"] as string; + const tokenA = created.payload["token"] as string; + + sendClient(b, "room.join", { code }); + nextMsgOfType(b, "room.joined"); + nextMsgOfType(a, "game.state"); + nextMsgOfType(b, "game.state"); + + sendClient(a, "game.move", { from: "e2", to: "e4" }, 2); + nextMsgOfType(a, "game.delta"); + nextMsgOfType(b, "game.delta"); + + // --- A drops -------------------------------------------------------- + unregisterConnection(a); + // The slot must NOT be torn down yet — grace window is open. + expect(reconnectManager.isPending(tokenA)).toBe(true); + expect(roomRegistry.getRoom(code)).toBeDefined(); + // B must NOT have received game.end yet (it might later if A fails + // to come back, but the grace window suppresses it for now). + expect( + b.sent.find( + (m) => (m as { type?: string }).type === "game.end", + ), + ).toBeUndefined(); + + // --- B keeps playing during A's absence ----------------------------- + sendClient(b, "game.move", { from: "e7", to: "e5" }, 2); + // B sees its own delta immediately. + const bDelta = nextMsgOfType(b, "game.delta"); + expect(bDelta.payload["moveNotation"]).toBe("e7e5"); + // A's socket is the old (closed) mock — nothing should have been + // written to it because it's been unregistered. We only assert that + // the delta is buffered in the reconnect manager. The buffer is + // internal, but we'll observe it indirectly on reconnect below. + + // --- A reconnects via a BRAND-NEW socket carrying the old token ---- + const aNew = makeMockWs("A2"); + registerConnection(aNew); + sendClientWithToken(aNew, "room.join", { code }, tokenA); + + // Assertions on the reconnect inbox, in order: + // 1. room.joined echoes the color/token of the resumed slot. + const joined = nextMsgOfType(aNew, "room.joined"); + expect(joined.payload["token"]).toBe(tokenA); + expect(joined.payload["color"]).toBe("white"); + + // 2. game.state carries the authoritative fact set reflecting + // BOTH moves (e4 and e5). Easiest way to verify: a pawn must + // live on e4 (sq 28) and on e5 (sq 36), and NOT on e2 / e7. + const state = nextMsgOfType(aNew, "game.state"); + const facts = state.payload["facts"] as Array<{ + attr: string; + value: unknown; + }>; + const positions = facts + .filter((f) => f.attr === "Position") + .map((f) => f.value); + expect(positions).toContain(28); // e4 + expect(positions).toContain(36); // e5 + expect(positions).not.toContain(12); // e2 vacated + expect(positions).not.toContain(52); // e7 vacated + // It is white's turn again (black just moved). + expect(state.payload["turn"]).toBe("white"); + + // 3. The buffered delta for e7→e5 replayed verbatim. + const replayed = nextMsgOfType(aNew, "game.delta"); + expect(replayed.payload["moveNotation"]).toBe("e7e5"); + expect(replayed.payload["turn"]).toBe("white"); + + // --- Post-conditions ----------------------------------------------- + // The manager should have cleared the pending entry. + expect(reconnectManager.isPending(tokenA)).toBe(false); + // And the slot is marked connected again — otherwise subsequent + // moves wouldn't re-buffer on a future disconnect. + const player = roomRegistry.getPlayerByToken(code, tokenA); + expect(player?.connected).toBe(true); + + // Cleanup — leave voluntarily so no grace timers linger. + sendClientWithToken( + aNew, + "room.leave", + {}, + tokenA, + 3, + ); + unregisterConnection(aNew); + unregisterConnection(b); + }); + + it("reconnect without an active grace window is rejected (falls through to normal join)", () => { + // A never disconnects. If A tries to "reconnect" with its own token + // into its own room, the handler should NOT short-circuit into the + // reconnect path (no grace is pending), it should try to re-join + // and be rejected because it's already bound to a room. + const a = makeMockWs("A"); + registerConnection(a); + sendClient(a, "room.create", { rulesetIds: [] }); + const created = nextMsgOfType(a, "room.created"); + const code = created.payload["code"] as string; + const tokenA = created.payload["token"] as string; + + sendClientWithToken(a, "room.join", { code }, tokenA); + // The socket is already bound; we get INVALID_MESSAGE, not + // room.joined. + const err = nextMsgOfType(a, "error"); + expect(err.payload["code"]).toBe("INVALID_MESSAGE"); + + unregisterConnection(a); + }); +}); diff --git a/packages/server/src/broadcast.ts b/packages/server/src/broadcast.ts index 94e1cfa..1f926ec 100644 --- a/packages/server/src/broadcast.ts +++ b/packages/server/src/broadcast.ts @@ -12,6 +12,13 @@ import type { ServerWebSocket } from "bun"; import { GameSessionRegistry } from "./game-session.js"; import { logger } from "./logger.js"; +import { + incMessages, + incMovesFail, + incMovesOk, + recordTickDuration, + setActiveRooms, +} from "./logging.js"; import { RateLimiter } from "./middleware.js"; import { PROTOCOL_VERSION, @@ -23,6 +30,7 @@ import { type RoomJoinPayload, type ServerMessage, } from "./protocol.js"; +import { DEFAULT_GRACE_MS, reconnectManager } from "./reconnect.js"; import { RoomRegistry } from "./rooms.js"; // --------------------------------------------------------------------------- @@ -71,17 +79,36 @@ export function registerConnection(ws: ServerWebSocket): void { export function unregisterConnection(ws: ServerWebSocket): void { connections.delete(ws.data.clientId); - // If the socket was bound to a room, mark the slot disconnected so the - // reconnect grace window (handled elsewhere) can run. We intentionally - // do NOT leaveRoom here — disconnect ≠ leave per PROTOCOL.md. + // If the socket was bound to a room, mark the slot disconnected and + // start the 60-second reconnect grace window. We intentionally do NOT + // leaveRoom here — disconnect ≠ leave per PROTOCOL.md. The slot stays + // alive until the timer fires or the client returns with its token. const { roomCode, token } = ws.data; - if (roomCode !== undefined && token !== undefined) { - roomRegistry.markDisconnected(roomCode, token); - // Broadcast game.end to the remaining player so they know their - // opponent dropped. v1 treats disconnect as immediate game end; the - // 60-second reconnect grace is a follow-up (see PROTOCOL.md). + if (roomCode === undefined || token === undefined) return; + + // Short-circuit: if the room or slot has already been torn down (e.g. + // opponent left first), there's nothing to preserve. + const player = roomRegistry.getPlayerByToken(roomCode, token); + if (!player) return; + + roomRegistry.markDisconnected(roomCode, token); + + // Capture roomCode + token into the closure so onExpire doesn't need + // any ambient `this`. The closure runs up to DEFAULT_GRACE_MS later + // on the event loop — by then ws.data may already be GC'd. + reconnectManager.startGrace(token, DEFAULT_GRACE_MS, () => { + // Grace expired without a reconnect — treat as a real departure. + // Broadcast game.end to the remaining player, drop the session, + // and remove the slot from the room. Defensive: the room / session + // may have been torn down by the opponent leaving during the + // window; every lookup below tolerates missing state. broadcastGameEnd(roomCode, token, "player_left"); - } + roomRegistry.leaveRoom(roomCode, token); + if (roomRegistry.getRoom(roomCode) === undefined) { + sessionRegistry.delete(roomCode); + } + setActiveRooms(roomRegistry.getRoomCount()); + }); } // --------------------------------------------------------------------------- @@ -163,6 +190,7 @@ export function handleMessage( ws: ServerWebSocket, raw: string | Buffer, ): void { + incMessages(); const str = typeof raw === "string" ? raw : raw.toString("utf8"); const result = validateMessageString(str); @@ -188,7 +216,11 @@ export function handleMessage( handleRoomCreate(ws, msg.payload); break; case "room.join": - handleRoomJoin(ws, msg.payload); + // `token` is an OPTIONAL envelope-level field (see protocol.ts + // `envelopeShape`). Reconnecting clients replay their original + // token here so handleRoomJoin can distinguish reconnect from + // a fresh join. Fresh joins simply omit it. + handleRoomJoin(ws, msg.payload, msg.token); break; case "room.leave": handleRoomLeave(ws); @@ -238,6 +270,7 @@ function handleRoomCreate( sessionRegistry.create(code, rulesetIds); ws.data.roomCode = code; ws.data.token = token; + setActiveRooms(roomRegistry.getRoomCount()); logger .child({ clientId: ws.data.clientId, roomCode: code }) .info("room.create"); @@ -247,6 +280,7 @@ function handleRoomCreate( function handleRoomJoin( ws: ServerWebSocket, payload: RoomJoinPayload, + envelopeToken: string | undefined, ): void { if (ws.data.roomCode !== undefined) { sendTo( @@ -259,6 +293,28 @@ function handleRoomJoin( ); return; } + + // ---- Reconnect path -------------------------------------------------- + // Three conditions must ALL hold for this to be a reconnect: + // 1. Envelope carries a token (the client claims it's returning). + // 2. That token belongs to a RoomPlayer in `code` (token is valid). + // 3. A grace window is currently pending for that token (the slot + // was recently disconnected and hasn't expired yet). + // Any failure falls through to the normal join path — a bogus token + // should not reveal whether a matching slot exists, so a mismatched + // reconnect attempt behaves as a fresh second-player join attempt, + // which will yield ROOM_FULL if the room is already occupied. + if (envelopeToken !== undefined) { + const existing = roomRegistry.getPlayerByToken( + payload.code, + envelopeToken, + ); + if (existing && reconnectManager.isPending(envelopeToken)) { + handleReconnect(ws, payload.code, existing.token, existing.color); + return; + } + } + const result = roomRegistry.joinRoom(payload.code); if ("error" in result) { sendTo( @@ -309,6 +365,99 @@ function handleRoomJoin( ); } +/** + * Resume a disconnected slot. Cancels the grace timer, rebinds the + * socket to the room, sends a `room.joined` acknowledgement, a fresh + * `game.state` snapshot, then replays every `game.delta` that was + * buffered while the client was away — in the original seq order so + * the client's fact store ends up in the same state as the opponent's. + * + * The replayed deltas are re-enveloped with a NEW server seq (the + * original seq is preserved inside the payload via game.state.lastSeq + * semantics on the client). We don't replay the historical seq verbatim + * because doing so would require a second, separate counter space and + * the protocol doesn't need it: the client treats deltas as authoritative + * regardless of their envelope seq. + */ +function handleReconnect( + ws: ServerWebSocket, + code: string, + token: string, + color: "white" | "black", +): void { + // cancelGrace MUST return deltas (isPending was true above), but we + // defend against a race where the timer fires between the isPending + // check and here. If the window expired we fall back to treating this + // as a failed reconnect — the caller already emitted game.end. + const missed = reconnectManager.cancelGrace(token); + if (missed === undefined) { + sendTo( + ws, + errorMessage( + "ROOM_NOT_FOUND", + `reconnect grace expired for room ${code}`, + false, + ), + ); + return; + } + const session = sessionRegistry.get(code); + const room = roomRegistry.getRoom(code); + if (!session || !room) { + sendTo( + ws, + errorMessage( + "ROOM_NOT_FOUND", + `reconnect: room ${code} no longer exists`, + false, + ), + ); + return; + } + + roomRegistry.markConnected(code, token); + ws.data.roomCode = code; + ws.data.token = token; + logger + .child({ clientId: ws.data.clientId, roomCode: code }) + .info("room.join (reconnect)"); + + sendTo( + ws, + envelope("room.joined", { + code, + token, + color, + activeRules: [...room.rulesetIds], + }), + ); + + // Snapshot: authoritative state for the returning client. The client + // discards its local fact store and rebuilds from this frame. + sendTo( + ws, + envelope("game.state", { + facts: session.getAllFacts(), + turn: session.getTurn(), + // lastSeq tells the client the highest delta seq it can expect + // to have already processed. For a reconnect we set it to the + // most recent buffered seq (or 0 if nothing was missed) so the + // client can detect any subsequent gaps. + lastSeq: missed.length > 0 ? (missed[missed.length - 1]?.seq ?? 0) : 0, + moveHistory: [], + activeRules: [...room.rulesetIds], + fen: "", + }), + ); + + // Replay buffered deltas IN ORDER. We re-envelope each one so it + // carries a fresh, monotonically increasing seq relative to other + // server traffic on this socket. + for (const delta of missed) { + sendTo(ws, envelope("game.delta", delta.payload)); + } +} + function handleRoomLeave(ws: ServerWebSocket): void { const { roomCode, token } = ws.data; if (roomCode === undefined || token === undefined) { @@ -329,6 +478,7 @@ function handleRoomLeave(ws: ServerWebSocket): void { if (roomRegistry.getRoom(roomCode) === undefined) { sessionRegistry.delete(roomCode); } + setActiveRooms(roomRegistry.getRoomCount()); delete ws.data.roomCode; delete ws.data.token; } @@ -371,6 +521,7 @@ function handleGameMove( // so clients can distinguish "you're premoving" from "that move is // illegal for the side to move". if (player.color !== session.getTurn()) { + incMovesFail(); sendTo( ws, errorMessage("NOT_YOUR_TURN", "it is not your turn", false), @@ -378,12 +529,14 @@ function handleGameMove( return; } + const tickStart = performance.now(); const moveResult = session.applyMove( payload.from, payload.to, payload.promoteTo, ); if (!moveResult.ok) { + incMovesFail(); const code: ErrorCode = moveResult.error; sendTo( ws, @@ -397,6 +550,8 @@ function handleGameMove( ); return; } + incMovesOk(); + recordTickDuration(performance.now() - tickStart); // v1 move notation is compact `fromto[promotion]`. PROTOCOL.md example // uses "e2e4"; promotion is appended as the first letter lower-case @@ -406,16 +561,19 @@ function handleGameMove( payload.to + (payload.promoteTo ? payload.promoteTo[0] ?? "" : ""); - broadcastToRoom( - roomCode, - envelope("game.delta", { - inserted: moveResult.inserted satisfies WireFact[], - retracted: moveResult.retracted satisfies WireFact[], - moveNotation, - turn: moveResult.turn, - gameOver: moveResult.gameOver, - }), - ); + const deltaPayload = { + inserted: moveResult.inserted satisfies WireFact[], + retracted: moveResult.retracted satisfies WireFact[], + moveNotation, + turn: moveResult.turn, + gameOver: moveResult.gameOver, + }; + const deltaMsg = envelope("game.delta", deltaPayload); + // Broadcast to currently-connected sockets in the room, AND buffer a + // copy for every slot that is mid-grace-window so a reconnecting + // client can replay the delta on return. + broadcastToRoom(roomCode, deltaMsg); + bufferDeltaForDisconnected(roomCode, token, deltaMsg.seq, deltaPayload); // Terminal positions also get an explicit game.end for clarity per // PROTOCOL.md §game.end. `finalFen` is empty for v1 (see note above). @@ -431,6 +589,35 @@ function handleGameMove( } } +/** + * For every OTHER player in `code` whose slot is mid-grace-window + * (disconnected), buffer a copy of a just-broadcast delta so the + * reconnect path can replay it. `moverToken` is the sender — we never + * buffer back to the player who made the move. + * + * The buffer is keyed on the opponent's token. Delegates the + * pending-check to ReconnectManager so we don't double-buffer for a + * slot that was already torn down. + */ +function bufferDeltaForDisconnected( + code: string, + moverToken: string, + seq: number, + payload: object, +): void { + const room = roomRegistry.getRoom(code); + if (!room) return; + for (const player of room.players.values()) { + if (player.token === moverToken) continue; + // bufferDelta itself no-ops when no window is pending, but the + // explicit check documents intent and avoids building unnecessary + // closures on the hot path. + if (reconnectManager.isPending(player.token)) { + reconnectManager.bufferDelta(player.token, seq, payload); + } + } +} + /** * Broadcast a game.end to everyone in `code` EXCEPT the player whose * token is `leaverToken`. Used when a player disconnects or leaves diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 835d9a4..1def980 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -8,6 +8,7 @@ import { type ClientData, } from "./broadcast.js"; import { logger } from "./logger.js"; +import { renderMetrics } from "./logging.js"; import { checkMessageSize, checkOrigin, @@ -46,6 +47,12 @@ export const server = Bun.serve({ return Response.json({ ok: true, version: "1.0.0" }); } + if (url.pathname === "/metrics") { + return new Response(renderMetrics(), { + headers: { "Content-Type": "text/plain; version=0.0.4; charset=utf-8" }, + }); + } + return new Response("Not Found", { status: 404 }); }, websocket: { diff --git a/packages/server/src/logging.test.ts b/packages/server/src/logging.test.ts new file mode 100644 index 0000000..d980de7 --- /dev/null +++ b/packages/server/src/logging.test.ts @@ -0,0 +1,75 @@ +import { beforeEach, describe, expect, test } from "vitest"; + +import { + __resetForTests, + incMessages, + incMovesFail, + incMovesOk, + recordTickDuration, + renderMetrics, + setActiveRooms, +} from "./logging.js"; + +beforeEach(() => { + __resetForTests(); +}); + +describe("renderMetrics — Prometheus text format", () => { + test("initial state produces zero-valued metrics", () => { + const out = renderMetrics(); + expect(out).toContain("rooms_active 0"); + expect(out).toContain("messages_received_total 0"); + expect(out).toContain('moves_validated_total{result="ok"} 0'); + expect(out).toContain('moves_validated_total{result="fail"} 0'); + expect(out).toContain("tick_duration_milliseconds_sum 0"); + expect(out).toContain("tick_duration_milliseconds_count 0"); + }); + + test("incMessages increments messages_received_total", () => { + incMessages(); + incMessages(); + expect(renderMetrics()).toContain("messages_received_total 2"); + }); + + test("incMovesOk / incMovesFail are independent counters", () => { + incMovesOk(); + incMovesOk(); + incMovesOk(); + incMovesFail(); + const out = renderMetrics(); + expect(out).toContain('moves_validated_total{result="ok"} 3'); + expect(out).toContain('moves_validated_total{result="fail"} 1'); + }); + + test("setActiveRooms updates the gauge", () => { + setActiveRooms(4); + expect(renderMetrics()).toContain("rooms_active 4"); + setActiveRooms(1); + expect(renderMetrics()).toContain("rooms_active 1"); + }); + + test("recordTickDuration accumulates sum and count", () => { + recordTickDuration(10); + recordTickDuration(20); + recordTickDuration(30); + const out = renderMetrics(); + expect(out).toContain("tick_duration_milliseconds_sum 60"); + expect(out).toContain("tick_duration_milliseconds_count 3"); + }); + + test("output includes required HELP and TYPE lines", () => { + const out = renderMetrics(); + expect(out).toContain("# HELP rooms_active"); + expect(out).toContain("# TYPE rooms_active gauge"); + expect(out).toContain("# HELP messages_received_total"); + expect(out).toContain("# TYPE messages_received_total counter"); + expect(out).toContain("# HELP moves_validated_total"); + expect(out).toContain("# TYPE moves_validated_total counter"); + expect(out).toContain("# HELP tick_duration_milliseconds"); + expect(out).toContain("# TYPE tick_duration_milliseconds summary"); + }); + + test("output ends with a trailing newline (Prometheus spec)", () => { + expect(renderMetrics().endsWith("\n")).toBe(true); + }); +}); diff --git a/packages/server/src/logging.ts b/packages/server/src/logging.ts new file mode 100644 index 0000000..08f492b --- /dev/null +++ b/packages/server/src/logging.ts @@ -0,0 +1,88 @@ +// Prometheus-format metrics for the chess server (P4.8). +// +// Intentionally hand-rolled — no prom-client dependency. All state is +// module-level and resets on process restart, which is intentional for v1. + +// --------------------------------------------------------------------------- +// Counters & gauges +// --------------------------------------------------------------------------- + +const counters = { + messages_received_total: 0, + moves_validated_total_ok: 0, + moves_validated_total_fail: 0, +}; + +let activeRooms = 0; + +// --------------------------------------------------------------------------- +// Summary: tick (move-processing) duration in milliseconds +// --------------------------------------------------------------------------- + +const tickDurations: number[] = []; + +// --------------------------------------------------------------------------- +// Mutators — called from broadcast.ts +// --------------------------------------------------------------------------- + +export function incMessages(): void { + counters.messages_received_total++; +} + +export function incMovesOk(): void { + counters.moves_validated_total_ok++; +} + +export function incMovesFail(): void { + counters.moves_validated_total_fail++; +} + +export function setActiveRooms(n: number): void { + activeRooms = n; +} + +export function recordTickDuration(ms: number): void { + tickDurations.push(ms); +} + +// --------------------------------------------------------------------------- +// Renderer — Prometheus text exposition format 0.0.4 +// --------------------------------------------------------------------------- + +export function renderMetrics(): string { + const sum = tickDurations.reduce((acc, v) => acc + v, 0); + const count = tickDurations.length; + + return [ + "# HELP rooms_active Number of active game rooms", + "# TYPE rooms_active gauge", + `rooms_active ${activeRooms}`, + "", + "# HELP messages_received_total Total messages received", + "# TYPE messages_received_total counter", + `messages_received_total ${counters.messages_received_total}`, + "", + "# HELP moves_validated_total Total moves validated", + "# TYPE moves_validated_total counter", + `moves_validated_total{result="ok"} ${counters.moves_validated_total_ok}`, + `moves_validated_total{result="fail"} ${counters.moves_validated_total_fail}`, + "", + "# HELP tick_duration_milliseconds Tick (move-processing) duration in milliseconds", + "# TYPE tick_duration_milliseconds summary", + `tick_duration_milliseconds_sum ${sum}`, + `tick_duration_milliseconds_count ${count}`, + "", + ].join("\n"); +} + +// --------------------------------------------------------------------------- +// Test-only reset — never call from production code +// --------------------------------------------------------------------------- + +export const __resetForTests = (): void => { + counters.messages_received_total = 0; + counters.moves_validated_total_ok = 0; + counters.moves_validated_total_fail = 0; + activeRooms = 0; + tickDurations.length = 0; +}; diff --git a/packages/server/src/reconnect.test.ts b/packages/server/src/reconnect.test.ts new file mode 100644 index 0000000..e3bd137 --- /dev/null +++ b/packages/server/src/reconnect.test.ts @@ -0,0 +1,128 @@ +// Unit tests for ReconnectManager (P4.7). +// +// These tests exercise the manager in isolation — no WebSockets, no +// RoomRegistry. Real wall-clock timing would make the "grace expires" +// case either slow or flaky, so we use vitest's fake-timer facility. +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; + +import { ReconnectManager } from "./reconnect.js"; + +describe("ReconnectManager", () => { + let mgr: ReconnectManager; + + beforeEach(() => { + mgr = new ReconnectManager(); + // Fake timers so `setTimeout` never really fires — we advance the + // clock explicitly via vi.advanceTimersByTime. + vi.useFakeTimers(); + }); + + afterEach(() => { + mgr.clearAll(); + vi.useRealTimers(); + }); + + it("startGrace followed by isPending returns true", () => { + mgr.startGrace("tok-a", 1_000, () => {}); + expect(mgr.isPending("tok-a")).toBe(true); + // A different token is not pending — scoping is strict. + expect(mgr.isPending("tok-b")).toBe(false); + }); + + it("cancelGrace within the window returns buffered deltas and clears isPending", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 1_000, onExpire); + mgr.bufferDelta("tok-a", 1, { move: "e2e4" }); + mgr.bufferDelta("tok-a", 2, { move: "d2d4" }); + + const deltas = mgr.cancelGrace("tok-a"); + expect(deltas).toEqual([ + { seq: 1, payload: { move: "e2e4" } }, + { seq: 2, payload: { move: "d2d4" } }, + ]); + expect(mgr.isPending("tok-a")).toBe(false); + // Crucially, cancelling must NOT trigger the expiry callback. + // Advance well past the original grace period to prove the + // timer was cleared. + vi.advanceTimersByTime(10_000); + expect(onExpire).not.toHaveBeenCalled(); + }); + + it("cancelGrace on an unknown token returns undefined", () => { + expect(mgr.cancelGrace("never-seen")).toBeUndefined(); + }); + + it("timer fires onExpire and removes the pending entry", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 50, onExpire); + expect(mgr.isPending("tok-a")).toBe(true); + + // Advance past the grace period — the scheduled callback must fire + // and the manager must forget the entry. + vi.advanceTimersByTime(60); + expect(onExpire).toHaveBeenCalledTimes(1); + expect(mgr.isPending("tok-a")).toBe(false); + }); + + it("bufferDelta on a non-pending token is a silent no-op", () => { + // No grace started — buffering should not throw and should not + // create an implicit pending entry. + mgr.bufferDelta("tok-a", 1, { anything: true }); + expect(mgr.isPending("tok-a")).toBe(false); + // Now start a grace and confirm nothing was surreptitiously + // retained from the earlier call. + mgr.startGrace("tok-a", 1_000, () => {}); + expect(mgr.cancelGrace("tok-a")).toEqual([]); + }); + + it("bufferDelta accumulates in insertion order", () => { + mgr.startGrace("tok-a", 1_000, () => {}); + for (let i = 1; i <= 5; i++) { + mgr.bufferDelta("tok-a", i, { step: i }); + } + const deltas = mgr.cancelGrace("tok-a"); + expect(deltas?.map((d) => d.seq)).toEqual([1, 2, 3, 4, 5]); + }); + + it("starting a second grace for the same token replaces the first timer", () => { + const firstExpire = vi.fn(); + const secondExpire = vi.fn(); + mgr.startGrace("tok-a", 100, firstExpire); + mgr.bufferDelta("tok-a", 1, { a: 1 }); + // Restart: previous buffered deltas are dropped (the client is + // effectively starting a brand-new grace window). + mgr.startGrace("tok-a", 100, secondExpire); + vi.advanceTimersByTime(150); + expect(firstExpire).not.toHaveBeenCalled(); + expect(secondExpire).toHaveBeenCalledTimes(1); + }); + + it("multiple tokens have independent lifecycles", () => { + const aExpire = vi.fn(); + const bExpire = vi.fn(); + mgr.startGrace("tok-a", 100, aExpire); + mgr.startGrace("tok-b", 200, bExpire); + mgr.bufferDelta("tok-a", 1, { for: "a" }); + mgr.bufferDelta("tok-b", 1, { for: "b" }); + + // Cancel A before it expires; B should be untouched. + const aDeltas = mgr.cancelGrace("tok-a"); + expect(aDeltas).toEqual([{ seq: 1, payload: { for: "a" } }]); + expect(mgr.isPending("tok-b")).toBe(true); + + vi.advanceTimersByTime(250); + expect(aExpire).not.toHaveBeenCalled(); + expect(bExpire).toHaveBeenCalledTimes(1); + }); + + it("clearAll stops all pending timers without firing callbacks", () => { + const onExpire = vi.fn(); + mgr.startGrace("tok-a", 100, onExpire); + mgr.startGrace("tok-b", 100, onExpire); + mgr.clearAll(); + vi.advanceTimersByTime(1_000); + expect(onExpire).not.toHaveBeenCalled(); + expect(mgr.isPending("tok-a")).toBe(false); + expect(mgr.isPending("tok-b")).toBe(false); + }); +}); diff --git a/packages/server/src/reconnect.ts b/packages/server/src/reconnect.ts new file mode 100644 index 0000000..accd4cc --- /dev/null +++ b/packages/server/src/reconnect.ts @@ -0,0 +1,141 @@ +// Reconnection grace-window manager (P4.7). +// +// When a player's socket drops, their room slot is held for a bounded +// grace period (default 60s per PROTOCOL.md). During that window we: +// 1. Keep the RoomPlayer entry alive (marked disconnected via +// RoomRegistry.markDisconnected — done by the caller). +// 2. Buffer every `game.delta` destined for that slot, keyed by the +// slot's authentication token, so the client can replay them on +// reconnect in exact server order. +// 3. If the timer fires without a reconnect, invoke the caller's +// onExpire callback so broadcast.ts can tear the room down (send +// game.end to the opponent, drop the session, etc.). +// +// The manager is deliberately storage-only: it holds no references to +// WebSockets, RoomRegistry, or GameSessionRegistry. All room-level +// bookkeeping (broadcasting game.end on expiry, purging the session) +// lives in broadcast.ts via the onExpire closure. That keeps this +// module free of server-wide side effects and trivially testable. +// +// v1 scope (per spec): we do NOT implement the "other player continues +// playing against themselves" edge case. We simply buffer whatever +// deltas the game produces while a slot is absent and replay them on +// reconnect. If the opponent plays multiple moves during the grace +// window, the reconnecting client receives them all in order. + +/** A single buffered server frame destined for a disconnected slot. */ +export interface MissedDelta { + /** The `seq` the frame was assigned at broadcast time. */ + seq: number; + /** + * The `game.delta` payload body (inserted/retracted/moveNotation/…). + * Stored as `object` — we don't need to re-parse it and the caller + * already built it from typed data. + */ + payload: object; +} + +interface PendingEntry { + timer: ReturnType; + deltas: MissedDelta[]; + /** + * Captured so an external caller can force expiry (not currently + * used, but keeps the API symmetrical and makes cleanup explicit). + */ + onExpire: () => void; +} + +export class ReconnectManager { + // token → pending entry. A Map keyed on the secret token keeps lookups + // O(1) and avoids a secondary index; tokens are UUIDs so there is no + // collision concern across rooms. + private readonly pending = new Map(); + + /** + * Start a grace window for `token`. If the client doesn't reconnect + * before `gracePeriodMs` elapses, `onExpire` runs and the token is + * forgotten. + * + * Calling `startGrace` twice for the same token is a programming + * error (the previous timer would leak); we clear the existing timer + * defensively so a buggy caller cannot accumulate zombie timeouts. + */ + startGrace( + token: string, + gracePeriodMs: number, + onExpire: () => void, + ): void { + // Defensive: replace any existing entry so we never leak timers. + const existing = this.pending.get(token); + if (existing) { + clearTimeout(existing.timer); + } + const timer = setTimeout(() => { + // Remove before invoking the callback so onExpire observing + // `isPending` sees `false` — reflects the fact that the grace + // window is over from this manager's perspective. + this.pending.delete(token); + onExpire(); + }, gracePeriodMs); + // Node's setTimeout returns a Timeout object with an `unref` method; + // Bun's timer is compatible. We unref so a pending grace window + // never blocks process shutdown in tests that don't explicitly + // cancel. The runtime fallback is a no-op if `unref` is absent. + if (typeof (timer as { unref?: () => void }).unref === "function") { + (timer as { unref: () => void }).unref(); + } + this.pending.set(token, { timer, deltas: [], onExpire }); + } + + /** + * Append a frame to the buffer for `token` iff a grace window is + * active. Silently no-ops when no window is pending — callers can + * call unconditionally without guarding with `isPending`. + */ + bufferDelta(token: string, seq: number, payload: object): void { + const entry = this.pending.get(token); + if (!entry) return; + entry.deltas.push({ seq, payload }); + } + + /** + * Cancel a pending grace window because the client reconnected. + * Returns the accumulated deltas (possibly empty) so the caller can + * replay them. Returns `undefined` if no window was pending — that + * signals "this is a fresh join, not a reconnect" to broadcast.ts. + */ + cancelGrace(token: string): MissedDelta[] | undefined { + const entry = this.pending.get(token); + if (!entry) return undefined; + clearTimeout(entry.timer); + this.pending.delete(token); + return entry.deltas; + } + + /** True iff a grace window is currently active for `token`. */ + isPending(token: string): boolean { + return this.pending.has(token); + } + + /** + * Diagnostics/teardown only. Stops every outstanding timer without + * invoking the associated onExpire callbacks. Tests use this to keep + * the process-global manager clean between runs. + */ + clearAll(): void { + for (const entry of this.pending.values()) { + clearTimeout(entry.timer); + } + this.pending.clear(); + } +} + +/** + * Process-global manager shared by broadcast.ts. A singleton matches + * the rest of the server (roomRegistry, sessionRegistry) and keeps the + * grace state resident across all rooms in a single process. + */ +export const reconnectManager = new ReconnectManager(); + +/** Default grace window length — 60 seconds per PROTOCOL.md §Reconnect. */ +export const DEFAULT_GRACE_MS = 60_000; diff --git a/packages/server/src/rooms.ts b/packages/server/src/rooms.ts index 5e5e9ae..e09e83f 100644 --- a/packages/server/src/rooms.ts +++ b/packages/server/src/rooms.ts @@ -149,6 +149,10 @@ export class RoomRegistry { return this.rooms.get(code); } + getRoomCount(): number { + return this.rooms.size; + } + getPlayerByToken(code: string, token: string): RoomPlayer | undefined { return this.rooms.get(code)?.players.get(token); }