diff --git a/package.json b/package.json index 0eeca5659..65796aee1 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,7 @@ "types": "src/index.ts", "type": "module", "license": "MIT", - "version": "0.0.12", + "version": "0.0.14", "devDependencies": { "@types/jest": "^29.5.3", "@typescript-eslint/eslint-plugin": "^6.2.1", diff --git a/src/account.test.ts b/src/account.test.ts index 12a1acc18..6298e7d27 100644 --- a/src/account.test.ts +++ b/src/account.test.ts @@ -1,6 +1,6 @@ import { newRandomSessionID } from "./coValue.js"; import { LocalNode } from "./node.js"; -import { connectedPeers } from "./testUtils.js"; +import { connectedPeers } from "./streamUtils.js"; test("Can create a node while creating a new account with profile", async () => { const { node, accountID, accountSecret, sessionID } = diff --git a/src/index.ts b/src/index.ts index b8709ed02..d0b552a29 100644 --- a/src/index.ts +++ b/src/index.ts @@ -2,6 +2,7 @@ import { CoValue, newRandomSessionID } from "./coValue.js"; import { LocalNode } from "./node.js"; import { CoMap } from "./contentTypes/coMap.js"; import { agentSecretFromBytes, agentSecretToBytes } from "./crypto.js"; +import { connectedPeers } from "./streamUtils.js"; import type { SessionID } from "./ids.js"; import type { CoID, ContentType } from "./contentType.js"; @@ -15,6 +16,7 @@ const internals = { agentSecretFromBytes, agentSecretToBytes, newRandomSessionID, + connectedPeers }; export { LocalNode, CoValue, CoMap, internals }; diff --git a/src/streamUtils.ts b/src/streamUtils.ts new file mode 100644 index 000000000..c3656d60f --- /dev/null +++ b/src/streamUtils.ts @@ -0,0 +1,130 @@ +import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams"; +import { Peer, PeerID, SyncMessage } from "./sync.js"; + + +export function connectedPeers( + peer1id: PeerID, + peer2id: PeerID, + { + trace = false, peer1role = "peer", peer2role = "peer", + }: { + trace?: boolean; + peer1role?: Peer["role"]; + peer2role?: Peer["role"]; + } = {} +): [Peer, Peer] { + const [inRx1, inTx1] = newStreamPair(); + const [outRx1, outTx1] = newStreamPair(); + + const [inRx2, inTx2] = newStreamPair(); + const [outRx2, outTx2] = newStreamPair(); + + void outRx2 + .pipeThrough( + new TransformStream({ + transform( + chunk: SyncMessage, + controller: { enqueue: (msg: SyncMessage) => void; } + ) { + trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2)); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx1); + + void outRx1 + .pipeThrough( + new TransformStream({ + transform( + chunk: SyncMessage, + controller: { enqueue: (msg: SyncMessage) => void; } + ) { + trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2)); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx2); + + const peer2AsPeer: Peer = { + id: peer2id, + incoming: inRx1, + outgoing: outTx1, + role: peer2role, + }; + + const peer1AsPeer: Peer = { + id: peer1id, + incoming: inRx2, + outgoing: outTx2, + role: peer1role, + }; + + return [peer1AsPeer, peer2AsPeer]; +} + +export function newStreamPair(): [ReadableStream, WritableStream] { + const queue: T[] = []; + let resolveNextItemReady: () => void = () => { }; + let nextItemReady: Promise = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + + let writerClosed = false; + let readerClosed = false; + + const readable = new ReadableStream({ + async pull(controller) { + let retriesLeft = 3; + while (retriesLeft > 0) { + if (writerClosed) { + controller.close(); + return; + } + retriesLeft--; + if (queue.length > 0) { + controller.enqueue(queue.shift()!); + if (queue.length === 0) { + nextItemReady = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + } + return; + } else { + await nextItemReady; + } + } + throw new Error( + "Should only use one retry to get next item in queue." + ); + }, + + cancel(reason) { + console.log("Manually closing reader"); + readerClosed = true; + }, + }); + + const writable = new WritableStream({ + write(chunk, controller) { + if (readerClosed) { + console.log("Reader closed, not writing chunk", chunk); + throw new Error("Reader closed, not writing chunk"); + } + queue.push(chunk); + if (queue.length === 1) { + // make sure that await write resolves before corresponding read + setTimeout(() => resolveNextItemReady()); + } + }, + abort(reason) { + console.log("Manually closing writer"); + writerClosed = true; + resolveNextItemReady(); + return Promise.resolve(); + }, + }); + + return [readable, writable]; +} diff --git a/src/sync.test.ts b/src/sync.test.ts index 880703cd2..543b80734 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -10,11 +10,13 @@ import { TransformStream, } from "isomorphic-streams"; import { - connectedPeers, - newStreamPair, randomAnonymousAccountAndSessionID, shouldNotResolve, } from "./testUtils.js"; +import { + connectedPeers, + newStreamPair +} from "./streamUtils.js"; import { AccountID } from "./account.js"; test("Node replies with initial tx and header to empty subscribe", async () => { diff --git a/src/sync.ts b/src/sync.ts index c99b01bc2..78d0385d6 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -285,7 +285,7 @@ export class SyncManager { trySendToPeer(peer: PeerState, msg: SyncMessage) { return peer.outgoing.write(msg).catch((e) => { - console.error("Error writing to peer, disconnecting", e); + console.error(new Error("Error writing to peer, disconnecting", {cause: e})); delete this.peers[peer.id]; }); } diff --git a/src/testUtils.ts b/src/testUtils.ts index 18c3f5e27..1785bb082 100644 --- a/src/testUtils.ts +++ b/src/testUtils.ts @@ -4,8 +4,6 @@ import { LocalNode } from "./node.js"; import { expectTeamContent } from "./permissions.js"; import { AnonymousControlledAccount } from "./account.js"; import { SessionID } from "./ids.js"; -import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams"; -import { Peer, PeerID, SyncMessage } from "./sync.js"; export function randomAnonymousAccountAndSessionID(): [AnonymousControlledAccount, SessionID] { const agentSecret = newRandomAgentSecret(); @@ -80,71 +78,6 @@ export function teamWithTwoAdminsHighLevel() { return { admin, node, team, otherAdmin }; } -export function newStreamPair(): [ReadableStream, WritableStream] { - const queue: T[] = []; - let resolveNextItemReady: () => void = () => {}; - let nextItemReady: Promise = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - - let writerClosed = false; - let readerClosed = false; - - const readable = new ReadableStream({ - async pull(controller) { - let retriesLeft = 3; - while (retriesLeft > 0) { - if (writerClosed) { - controller.close(); - return; - } - retriesLeft--; - if (queue.length > 0) { - controller.enqueue(queue.shift()!); - if (queue.length === 0) { - nextItemReady = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - } - return; - } else { - await nextItemReady; - } - } - throw new Error( - "Should only use one retry to get next item in queue." - ); - }, - - cancel(reason) { - console.log("Manually closing reader"); - readerClosed = true; - }, - }); - - const writable = new WritableStream({ - write(chunk, controller) { - if (readerClosed) { - console.log("Reader closed, not writing chunk", chunk); - throw new Error("Reader closed, not writing chunk"); - } - queue.push(chunk); - if (queue.length === 1) { - // make sure that await write resolves before corresponding read - process.nextTick(() => resolveNextItemReady()); - } - }, - abort(reason) { - console.log("Manually closing writer"); - writerClosed = true; - resolveNextItemReady(); - return Promise.resolve(); - }, - }); - - return [readable, writable]; -} - export function shouldNotResolve( promise: Promise, ops: { timeout: number } @@ -164,66 +97,3 @@ export function shouldNotResolve( }); } -export function connectedPeers( - peer1id: PeerID, - peer2id: PeerID, - { - trace = false, - peer1role = "peer", - peer2role = "peer", - }: { - trace?: boolean; - peer1role?: Peer["role"]; - peer2role?: Peer["role"]; - } = {} -): [Peer, Peer] { - const [inRx1, inTx1] = newStreamPair(); - const [outRx1, outTx1] = newStreamPair(); - - const [inRx2, inTx2] = newStreamPair(); - const [outRx2, outTx2] = newStreamPair(); - - void outRx2 - .pipeThrough( - new TransformStream({ - transform( - chunk: SyncMessage, - controller: { enqueue: (msg: SyncMessage) => void } - ) { - trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2)); - controller.enqueue(chunk); - }, - }) - ) - .pipeTo(inTx1); - - void outRx1 - .pipeThrough( - new TransformStream({ - transform( - chunk: SyncMessage, - controller: { enqueue: (msg: SyncMessage) => void } - ) { - trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2)); - controller.enqueue(chunk); - }, - }) - ) - .pipeTo(inTx2); - - const peer2AsPeer: Peer = { - id: peer2id, - incoming: inRx1, - outgoing: outTx1, - role: peer2role, - }; - - const peer1AsPeer: Peer = { - id: peer1id, - incoming: inRx2, - outgoing: outTx2, - role: peer1role, - }; - - return [peer1AsPeer, peer2AsPeer]; -} \ No newline at end of file