Compare commits

...

1 Commits

Author SHA1 Message Date
Guido D'Orsi
83ca084d32 feat: add a websocket peer that handles reconnections internally 2025-05-13 22:30:44 +02:00
9 changed files with 292 additions and 310 deletions

View File

@@ -6,9 +6,10 @@ import {
cojsonInternals,
logger,
} from "cojson";
import { BatchedOutgoingMessages } from "./BatchedOutgoingMessages.js";
import { deserializeMessages } from "./serialization.js";
import { createPingTimeoutListener } from "./tests/utils.js";
import type { AnyWebSocket } from "./types.js";
import { createOutgoingMessagesManager } from "./utils.js";
export const BUFFER_LIMIT = 100_000;
export const BUFFER_LIMIT_POLLING_INTERVAL = 10;
@@ -25,97 +26,6 @@ export type CreateWebSocketPeerOpts = {
onSuccess?: () => void;
};
function createPingTimeoutListener(
enabled: boolean,
timeout: number,
callback: () => void,
) {
if (!enabled) {
return {
reset() {},
clear() {},
};
}
let pingTimeout: ReturnType<typeof setTimeout> | null = null;
return {
reset() {
pingTimeout && clearTimeout(pingTimeout);
pingTimeout = setTimeout(() => {
callback();
}, timeout);
},
clear() {
pingTimeout && clearTimeout(pingTimeout);
},
};
}
function waitForWebSocketOpen(websocket: AnyWebSocket) {
return new Promise<void>((resolve) => {
if (websocket.readyState === 1) {
resolve();
} else {
websocket.addEventListener("open", () => resolve(), { once: true });
}
});
}
function createOutgoingMessagesManager(
websocket: AnyWebSocket,
batchingByDefault: boolean,
) {
let closed = false;
const outgoingMessages = new BatchedOutgoingMessages((messages) => {
if (websocket.readyState === 1) {
websocket.send(messages);
}
});
let batchingEnabled = batchingByDefault;
async function sendMessage(msg: SyncMessage) {
if (closed) {
return Promise.reject(new Error("WebSocket closed"));
}
if (websocket.readyState !== 1) {
await waitForWebSocketOpen(websocket);
}
while (
websocket.bufferedAmount > BUFFER_LIMIT &&
websocket.readyState === 1
) {
await new Promise<void>((resolve) =>
setTimeout(resolve, BUFFER_LIMIT_POLLING_INTERVAL),
);
}
if (websocket.readyState !== 1) {
return;
}
if (!batchingEnabled) {
websocket.send(JSON.stringify(msg));
} else {
outgoingMessages.push(msg);
}
}
return {
sendMessage,
setBatchingEnabled(enabled: boolean) {
batchingEnabled = enabled;
},
close() {
closed = true;
outgoingMessages.close();
},
};
}
function createClosedEventEmitter(callback = () => {}) {
let disconnected = false;
@@ -136,7 +46,7 @@ export function createWebSocketPeer({
pingTimeout = 10_000,
onSuccess,
onClose,
}: CreateWebSocketPeerOpts): Peer {
}: CreateWebSocketPeerOpts) {
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
@@ -251,5 +161,5 @@ export function createWebSocketPeer({
role,
crashOnClose: false,
deletePeerStateOnClose,
};
} satisfies Peer;
}

View File

@@ -0,0 +1,162 @@
import {
type DisconnectedError,
type Peer,
type PingTimeoutError,
type SyncMessage,
cojsonInternals,
logger,
} from "cojson";
import { deserializeMessages } from "./serialization.js";
import { createPingTimeoutListener } from "./tests/utils.js";
import type { AnyWebSocketConstructor } from "./types.js";
import { createOutgoingMessagesManager } from "./utils.js";
export type CreateWebSocketPeerWithReconnectionOpts = {
id: string;
WebSocketConstructor?: AnyWebSocketConstructor;
role: Peer["role"];
expectPings?: boolean;
batchingByDefault?: boolean;
deletePeerStateOnClose?: boolean;
pingTimeout?: number;
url: string;
reconnectionTimeout?: number | undefined;
};
export function createWebSocketPeerWithReconnection({
id,
WebSocketConstructor = WebSocket,
role,
expectPings = true,
batchingByDefault = true,
deletePeerStateOnClose = false,
pingTimeout = 5_000,
url,
reconnectionTimeout = 500,
}: CreateWebSocketPeerWithReconnectionOpts) {
let isClosed = false;
let reconnecting = false;
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
function handleClose() {
if (isClosed) return;
if (reconnecting) return;
websocket.removeEventListener("message", handleIncomingMsg);
websocket.removeEventListener("close", handleClose);
if (websocket.readyState === 1) {
websocket.close();
} else if (websocket.readyState === 0) {
const thisWebSocket = websocket;
websocket.addEventListener(
"open",
() => {
websocket.close();
},
{ once: true },
);
}
reconnecting = true;
setTimeout(() => {
if (isClosed) return;
reconnecting = false;
websocket = createWebSocket();
}, reconnectionTimeout);
}
function createWebSocket() {
const websocket = new WebSocketConstructor(url);
websocket.addEventListener("close", handleClose);
// biome-ignore lint/suspicious/noExplicitAny: WebSocket error event type is not standardized
websocket.addEventListener("error" as any, (err) => {
if (err.message) {
logger.warn(`WebSocket error: ${err.message}`, { err });
}
handleClose();
});
websocket.addEventListener("message", handleIncomingMsg);
return websocket;
}
const pingTimeoutListener = createPingTimeoutListener(
expectPings,
pingTimeout,
() => {
handleClose();
},
);
let websocket = createWebSocket();
const outgoingMessages = createOutgoingMessagesManager(
websocket,
batchingByDefault,
);
function handleIncomingMsg(event: { data: unknown }) {
pingTimeoutListener.reset();
if (event.data === "") {
return;
}
const result = deserializeMessages(event.data);
if (!result.ok) {
logger.warn("Error while deserializing messages", { err: result.error });
return;
}
const { messages } = result;
if (messages.length > 1) {
// If more than one message is received, the other peer supports batching
outgoingMessages.setBatchingEnabled(true);
}
for (const msg of messages) {
if (msg && "action" in msg) {
incoming
.push(msg)
.catch((e) =>
logger.error("Error while pushing incoming msg", { err: e }),
);
}
}
}
return {
id,
incoming,
outgoing: {
push: outgoingMessages.sendMessage,
close() {
isClosed = true;
outgoingMessages.close();
websocket.close();
pingTimeoutListener.clear();
incoming
.push("Disconnected")
.catch((e) =>
logger.error("Error while pushing disconnect msg", { err: e }),
);
},
},
role,
crashOnClose: false,
deletePeerStateOnClose,
} satisfies Peer;
}

View File

@@ -1,3 +1,4 @@
export * from "./createWebSocketPeer.js";
export * from "./WebSocketPeerWithReconnection.js";
export { AnyWebSocketConstructor } from "./types.js";
export * from "./createWebSocketPeerWithReconnection.js";

View File

@@ -26,3 +26,29 @@ export function waitFor(callback: () => boolean | void) {
}, 100);
});
}
export function createPingTimeoutListener(
enabled: boolean,
timeout: number,
callback: () => void,
) {
if (!enabled) {
return {
reset() {},
clear() {},
};
}
let pingTimeout: ReturnType<typeof setTimeout> | null = null;
return {
reset() {
pingTimeout && clearTimeout(pingTimeout);
pingTimeout = setTimeout(() => {
callback();
}, timeout);
},
clear() {
pingTimeout && clearTimeout(pingTimeout);
},
};
}

View File

@@ -0,0 +1,70 @@
import type { SyncMessage } from "cojson";
import { BatchedOutgoingMessages } from "./BatchedOutgoingMessages.js";
import {
BUFFER_LIMIT,
BUFFER_LIMIT_POLLING_INTERVAL,
} from "./createWebSocketPeer.js";
import type { AnyWebSocket } from "./types.js";
export function waitForWebSocketOpen(websocket: AnyWebSocket) {
return new Promise<void>((resolve) => {
if (websocket.readyState === 1) {
resolve();
} else {
websocket.addEventListener("open", () => resolve(), { once: true });
}
});
}
export function createOutgoingMessagesManager(
websocket: AnyWebSocket,
batchingByDefault: boolean,
) {
let closed = false;
const outgoingMessages = new BatchedOutgoingMessages((messages) => {
if (websocket.readyState === 1) {
websocket.send(messages);
}
});
let batchingEnabled = batchingByDefault;
async function sendMessage(msg: SyncMessage) {
if (closed) {
return Promise.reject(new Error("WebSocket closed"));
}
if (websocket.readyState !== 1) {
await waitForWebSocketOpen(websocket);
}
while (
websocket.bufferedAmount > BUFFER_LIMIT &&
websocket.readyState === 1
) {
await new Promise<void>((resolve) =>
setTimeout(resolve, BUFFER_LIMIT_POLLING_INTERVAL),
);
}
if (websocket.readyState !== 1) {
return;
}
if (!batchingEnabled) {
websocket.send(JSON.stringify(msg));
} else {
outgoingMessages.push(msg);
}
}
return {
sendMessage,
setBatchingEnabled(enabled: boolean) {
batchingEnabled = enabled;
},
close() {
closed = true;
outgoingMessages.close();
},
};
}

View File

@@ -1,6 +1,6 @@
import { LocalNode, Peer, RawAccountID } from "cojson";
import { IDBStorage } from "cojson-storage-indexeddb";
import { WebSocketPeerWithReconnection } from "cojson-transport-ws";
import { createWebSocketPeerWithReconnection } from "cojson-transport-ws";
import { WasmCrypto } from "cojson/crypto/WasmCrypto";
import {
Account,
@@ -32,19 +32,6 @@ export type BaseBrowserContextOptions = {
authSecretStorage: AuthSecretStorage;
};
class BrowserWebSocketPeerWithReconnection extends WebSocketPeerWithReconnection {
onNetworkChange(callback: (connected: boolean) => void): () => void {
const handler = () => callback(navigator.onLine);
window.addEventListener("online", handler);
window.addEventListener("offline", handler);
return () => {
window.removeEventListener("online", handler);
window.removeEventListener("offline", handler);
};
}
}
async function setupPeers(options: BaseBrowserContextOptions) {
const crypto = options.crypto || (await WasmCrypto.create());
let node: LocalNode | undefined = undefined;
@@ -66,26 +53,25 @@ async function setupPeers(options: BaseBrowserContextOptions) {
};
}
const wsPeer = new BrowserWebSocketPeerWithReconnection({
peer: options.sync.peer,
reconnectionTimeout: options.reconnectionTimeout,
addPeer: (peer) => {
if (node) {
node.syncManager.addPeer(peer);
} else {
peersToLoadFrom.push(peer);
}
},
removePeer: (peer) => {
peersToLoadFrom.splice(peersToLoadFrom.indexOf(peer), 1);
},
const url = options.sync.peer;
let wsPeer: Peer | undefined = createWebSocketPeerWithReconnection({
url,
role: "server",
id: "upstream",
});
function toggleNetwork(enabled: boolean) {
if (enabled) {
wsPeer.enable();
} else {
wsPeer.disable();
if (enabled && !wsPeer) {
wsPeer = createWebSocketPeerWithReconnection({
url,
role: "server",
id: "upstream",
});
node?.syncManager.addPeer(wsPeer);
} else if (!enabled && wsPeer) {
wsPeer.outgoing.close();
wsPeer = undefined;
}
}

View File

@@ -1,5 +1,8 @@
import { AgentSecret, CryptoProvider, LocalNode } from "cojson";
import { type AnyWebSocketConstructor } from "cojson-transport-ws";
import {
type AnyWebSocketConstructor,
createWebSocketPeerWithReconnection,
} from "cojson-transport-ws";
import { WasmCrypto } from "cojson/crypto/WasmCrypto";
import {
Account,
@@ -9,7 +12,6 @@ import {
createJazzContextFromExistingCredentials,
randomSessionProvider,
} from "jazz-tools";
import { webSocketWithReconnection } from "./webSocketWithReconnection.js";
type WorkerOptions<Acc extends Account> = {
accountID?: string;
@@ -32,13 +34,13 @@ export async function startWorker<Acc extends Account>(
} = options;
let node: LocalNode | undefined = undefined;
const wsPeer = webSocketWithReconnection(
syncServer,
(peer) => {
node?.syncManager.addPeer(peer);
},
options.WebSocket,
);
const wsPeer = createWebSocketPeerWithReconnection({
url: syncServer,
role: "server",
id: "upstream",
WebSocketConstructor: options.WebSocket,
});
if (!accountID) {
throw new Error("No accountID provided");
@@ -61,7 +63,7 @@ export async function startWorker<Acc extends Account>(
AccountSchema,
// TODO: locked sessions similar to browser
sessionProvider: randomSessionProvider,
peersToLoadFrom: [wsPeer.peer],
peersToLoadFrom: [wsPeer],
crypto: options.crypto ?? (await WasmCrypto.create()),
});
@@ -77,7 +79,6 @@ export async function startWorker<Acc extends Account>(
async function done() {
await context.account.waitForAllCoValuesSync();
wsPeer.done();
context.done();
}

View File

@@ -1,127 +0,0 @@
import { createWebSocketPeer } from "cojson-transport-ws";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import { webSocketWithReconnection } from "../webSocketWithReconnection.js";
// Mock dependencies
vi.mock("cojson-transport-ws", () => ({
createWebSocketPeer: vi.fn().mockImplementation(({ onClose }) => ({
id: "upstream",
incoming: { push: vi.fn() },
outgoing: { push: vi.fn(), close: vi.fn() },
onClose,
})),
}));
const WebSocketMock = vi.fn().mockImplementation(() => ({
addEventListener: vi.fn(),
removeEventListener: vi.fn(),
close: vi.fn(),
readyState: 1,
})) as unknown as typeof WebSocket;
describe("webSocketWithReconnection", () => {
beforeEach(() => {
vi.clearAllMocks();
vi.useFakeTimers();
});
afterEach(() => {
vi.useRealTimers();
});
test("should create initial websocket connection", () => {
const addPeerMock = vi.fn();
const { peer } = webSocketWithReconnection(
"ws://localhost:8080",
addPeerMock,
WebSocketMock,
);
expect(WebSocketMock).toHaveBeenCalledWith("ws://localhost:8080");
expect(createWebSocketPeer).toHaveBeenCalledWith(
expect.objectContaining({
id: "upstream",
role: "server",
}),
);
expect(peer).toBeDefined();
});
test("should attempt reconnection when websocket closes", async () => {
const addPeerMock = vi.fn();
webSocketWithReconnection(
"ws://localhost:8080",
addPeerMock,
WebSocketMock,
);
// Get the onClose handler from the first createWebSocketPeer call
const initialPeer = vi.mocked(createWebSocketPeer).mock.results[0]!.value;
// Simulate websocket close
initialPeer.onClose();
// Fast-forward timer to trigger reconnection
await vi.advanceTimersByTimeAsync(1000);
expect(WebSocketMock).toHaveBeenCalledTimes(2);
expect(createWebSocketPeer).toHaveBeenCalledTimes(2);
expect(addPeerMock).toHaveBeenCalledWith(
expect.objectContaining({
id: "upstream",
}),
);
});
test("should clean up when done is called", () => {
const addPeerMock = vi.fn();
const { done } = webSocketWithReconnection(
"ws://localhost:8080",
addPeerMock,
WebSocketMock,
);
// Get the onClose handler
const initialPeer = vi.mocked(createWebSocketPeer).mock.results[0]!.value;
done();
// Simulate websocket close
initialPeer.onClose();
// Fast-forward timer
vi.advanceTimersByTime(1000);
// Should not attempt reconnection
expect(WebSocketMock).toHaveBeenCalledTimes(1);
expect(createWebSocketPeer).toHaveBeenCalledTimes(1);
});
test("should not attempt reconnection after done is called", async () => {
const addPeerMock = vi.fn();
const { done } = webSocketWithReconnection(
"ws://localhost:8080",
addPeerMock,
WebSocketMock,
);
// Get the onClose handler
const initialPeer = vi.mocked(createWebSocketPeer).mock.results[0]!.value;
// Simulate first close and reconnection
initialPeer.onClose();
await vi.advanceTimersByTimeAsync(1000);
expect(WebSocketMock).toHaveBeenCalledTimes(2);
// Call done
done();
// Simulate another close
vi.mocked(createWebSocketPeer).mock.results[1]!.value.onClose();
await vi.advanceTimersByTimeAsync(1000);
// Should not create another connection
expect(WebSocketMock).toHaveBeenCalledTimes(2);
});
});

View File

@@ -1,47 +0,0 @@
import { Peer } from "cojson";
import {
AnyWebSocketConstructor,
createWebSocketPeer,
} from "cojson-transport-ws";
export function webSocketWithReconnection(
peer: string,
addPeer: (peer: Peer) => void,
ws?: AnyWebSocketConstructor,
) {
let done = false;
const WebSocketConstructor = ws ?? WebSocket;
const wsPeer = createWebSocketPeer({
websocket: new WebSocketConstructor(peer),
id: "upstream",
role: "server",
onClose: handleClose,
});
let timer: ReturnType<typeof setTimeout>;
function handleClose() {
if (done) return;
clearTimeout(timer);
timer = setTimeout(() => {
const wsPeer: Peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocketConstructor(peer),
role: "server",
onClose: handleClose,
});
addPeer(wsPeer);
}, 1000);
}
return {
peer: wsPeer,
done: () => {
done = true;
clearTimeout(timer);
},
};
}