Compare commits

...

1 Commits

Author SHA1 Message Date
Guido D'Orsi
3fb90fc162 feat(jazz-run): add a --flaky option to the sync server to work on the network reliability 2024-10-28 22:05:30 +01:00

View File

@@ -1,6 +1,15 @@
/* istanbul ignore file -- @preserve */
import { Command, Options } from "@effect/cli";
import { ControlledAgent, LocalNode, WasmCrypto } from "cojson";
import {
ControlledAgent,
DisconnectedError,
LocalNode,
Peer,
PingTimeoutError,
SyncMessage,
WasmCrypto,
cojsonInternals,
} from "cojson";
import { WebSocketServer } from "ws";
import { createServer } from "http";
@@ -23,6 +32,10 @@ const inMemory = Options.boolean("in-memory").pipe(
Options.withDescription("Use an in-memory storage instead of file-based"),
);
const flaky = Options.boolean("flaky").pipe(
Options.withDescription("Randomly drop messages on the WS connection"),
);
const db = Options.file("db")
.pipe(
Options.withDescription(
@@ -33,8 +46,8 @@ const db = Options.file("db")
export const startSync = Command.make(
"sync",
{ port, inMemory, db },
({ port, inMemory, db }) => {
{ port, inMemory, db, flaky },
({ port, inMemory, db, flaky }) => {
return Effect.gen(function* () {
const crypto = yield* Effect.promise(() => WasmCrypto.create());
@@ -92,16 +105,19 @@ export const startSync = Command.make(
const clientId = clientAddress + "@" + new Date().toISOString();
localNode.syncManager.addPeer(
createWebSocketPeer({
id: clientId,
role: "client",
// eslint-disable-next-line @typescript-eslint/no-explicit-any
websocket: ws as any, // TODO: fix types
expectPings: false,
batchingByDefault: false
}),
);
let peer = createWebSocketPeer({
id: clientId,
role: "client",
websocket: ws,
expectPings: false,
batchingByDefault: false,
});
if (flaky) {
peer = makeTheWsPeerFlaky(peer);
}
localNode.syncManager.addPeer(peer);
ws.on("error", (e) =>
console.error(`Error on connection ${clientId}:`, e),
@@ -123,3 +139,24 @@ export const startSync = Command.make(
});
},
);
function makeTheWsPeerFlaky(peer: Peer) {
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
async function handleIncoming() {
for await (const msg of peer.incoming) {
if (Math.random() > 0.8) {
void incoming.push(msg);
}
}
}
void handleIncoming();
return {
...peer,
incoming,
};
}