From 94d0012da03c8c11e6cb3f7303ffd292b164621c Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 15 Aug 2023 11:18:53 +0100 Subject: [PATCH 1/2] Basic account and profile creation --- src/account.test.ts | 32 +++++++++++++++ src/account.ts | 73 +++++++++++++++++++++++++--------- src/contentTypes/coMap.ts | 16 +++++++- src/node.ts | 84 +++++++++++++++++++++++++++++++++++---- src/permissions.ts | 16 ++++++-- 5 files changed, 192 insertions(+), 29 deletions(-) create mode 100644 src/account.test.ts diff --git a/src/account.test.ts b/src/account.test.ts new file mode 100644 index 000000000..640aacccd --- /dev/null +++ b/src/account.test.ts @@ -0,0 +1,32 @@ +import { LocalNode } from "./node.js"; + +test("Can create a node while creating a new account with profile", async () => { + const { node, account, accountSecret, sessionID } = + LocalNode.withNewlyCreatedAccount("Hermes Puggington"); + + expect(node).not.toBeNull(); + expect(account).not.toBeNull(); + expect(accountSecret).not.toBeNull(); + expect(sessionID).not.toBeNull(); + + expect((node.expectProfileLoaded(account)).get("name")).toEqual("Hermes Puggington"); + expect((await node.loadProfile(account)).get("name")).toEqual("Hermes Puggington"); +}); + +test("A node with an account can create teams and and objects within them", async () => { + const { node, account } = + LocalNode.withNewlyCreatedAccount("Hermes Puggington"); + + const team = await node.createTeam(); + expect(team).not.toBeNull(); + + let map = team.createMap(); + map = map.edit((edit) => { + edit.set("foo", "bar", "private"); + expect(edit.get("foo")).toEqual("bar"); + }); + + expect(map.get("foo")).toEqual("bar"); + + expect(map.getLastEditor("foo")).toEqual(account); +}); \ No newline at end of file diff --git a/src/account.ts b/src/account.ts index ab8eea08e..9eeb144ae 100644 --- a/src/account.ts +++ b/src/account.ts @@ -1,33 +1,50 @@ -import { CoValueHeader } from './coValue.js'; -import { CoID } from './contentType.js'; -import { AgentSecret, SealerID, SealerSecret, SignerID, SignerSecret, getAgentID, getAgentSealerID, getAgentSealerSecret, getAgentSignerID, getAgentSignerSecret } from './crypto.js'; -import { AgentID } from './ids.js'; -import { CoMap, LocalNode } from './index.js'; -import { Team, TeamContent } from './permissions.js'; +import { CoValueHeader } from "./coValue.js"; +import { CoID } from "./contentType.js"; +import { + AgentSecret, + SealerID, + SealerSecret, + SignerID, + SignerSecret, + getAgentID, + getAgentSealerID, + getAgentSealerSecret, + getAgentSignerID, + getAgentSignerSecret, +} from "./crypto.js"; +import { AgentID } from "./ids.js"; +import { CoMap, LocalNode } from "./index.js"; +import { Team, TeamContent } from "./permissions.js"; -export function accountHeaderForInitialAgentSecret(agentSecret: AgentSecret): CoValueHeader { +export function accountHeaderForInitialAgentSecret( + agentSecret: AgentSecret +): CoValueHeader { const agent = getAgentID(agentSecret); return { type: "comap", - ruleset: {type: "team", initialAdmin: agent}, + ruleset: { type: "team", initialAdmin: agent }, meta: { - type: "account" + type: "account", }, createdAt: null, uniqueness: null, - } + }; } export class Account extends Team { get id(): AccountID { - return this.teamMap.id; + return this.teamMap.id as AccountID; } getCurrentAgentID(): AgentID { - const agents = this.teamMap.keys().filter((k): k is AgentID => k.startsWith("sealer_")); + const agents = this.teamMap + .keys() + .filter((k): k is AgentID => k.startsWith("sealer_")); if (agents.length !== 1) { - throw new Error("Expected exactly one agent in account, got " + agents.length); + throw new Error( + "Expected exactly one agent in account, got " + agents.length + ); } return agents[0]!; @@ -45,10 +62,17 @@ export interface GeneralizedControlledAccount { currentSealerSecret: () => SealerSecret; } -export class ControlledAccount extends Account implements GeneralizedControlledAccount { +export class ControlledAccount + extends Account + implements GeneralizedControlledAccount +{ agentSecret: AgentSecret; - constructor(agentSecret: AgentSecret, teamMap: CoMap, node: LocalNode) { + constructor( + agentSecret: AgentSecret, + teamMap: CoMap, + node: LocalNode + ) { super(teamMap, node); this.agentSecret = agentSecret; @@ -75,7 +99,9 @@ export class ControlledAccount extends Account implements GeneralizedControlledA } } -export class AnonymousControlledAccount implements GeneralizedControlledAccount { +export class AnonymousControlledAccount + implements GeneralizedControlledAccount +{ agentSecret: AgentSecret; constructor(agentSecret: AgentSecret) { @@ -107,9 +133,20 @@ export class AnonymousControlledAccount implements GeneralizedControlledAccount } } -export type AccountMeta = {type: "account"}; -export type AccountID = CoID>; +export type AccountContent = TeamContent & { profile: CoID }; +export type AccountMeta = { type: "account" }; +export type AccountID = CoID>; export type AccountIDOrAgentID = AgentID | AccountID; export type AccountOrAgentID = AgentID | Account; export type AccountOrAgentSecret = AgentSecret | Account; + +export function isAccountID(id: AccountIDOrAgentID): id is AccountID { + return id.startsWith("co_"); +} + +export type ProfileContent = { + name: string; +}; +export type ProfileMeta = { type: "profile" }; +export type Profile = CoMap; diff --git a/src/contentTypes/coMap.ts b/src/contentTypes/coMap.ts index 254397745..8965c5f40 100644 --- a/src/contentTypes/coMap.ts +++ b/src/contentTypes/coMap.ts @@ -1,7 +1,8 @@ import { JsonObject, JsonValue } from '../jsonValue.js'; import { TransactionID } from '../ids.js'; import { CoID } from '../contentType.js'; -import { CoValue } from '../coValue.js'; +import { CoValue, accountOrAgentIDfromSessionID } from '../coValue.js'; +import { AccountID, isAccountID } from '../account.js'; type MapOp = { txID: TransactionID; @@ -106,6 +107,19 @@ export class CoMap< } } + getLastEditor>(key: K): AccountID | undefined { + const tx = this.getLastTxID(key); + if (!tx) { + return undefined; + } + const accountID = accountOrAgentIDfromSessionID(tx.sessionID); + if (isAccountID(accountID)) { + return accountID; + } else { + return undefined; + } + } + getLastTxID>(key: K): TransactionID | undefined { const ops = this.ops[key]; if (!ops) { diff --git a/src/node.ts b/src/node.ts index 69f5d22ac..1778fabbb 100644 --- a/src/node.ts +++ b/src/node.ts @@ -1,4 +1,5 @@ import { + AgentSecret, createdNowUnique, getAgentID, getAgentSealerID, @@ -20,6 +21,11 @@ import { GeneralizedControlledAccount, ControlledAccount, AnonymousControlledAccount, + AccountID, + Profile, + AccountContent, + ProfileContent, + ProfileMeta, } from "./account.js"; import { CoMap } from "./index.js"; @@ -37,6 +43,33 @@ export class LocalNode { this.ownSessionID = ownSessionID; } + static withNewlyCreatedAccount(name: string): { + node: LocalNode; + account: AccountID; + accountSecret: AgentSecret; + sessionID: SessionID; + } { + const throwawayAgent = newRandomAgentSecret(); + const setupNode = new LocalNode( + new AnonymousControlledAccount(throwawayAgent), + newRandomSessionID(getAgentID(throwawayAgent)) + ); + + const account = setupNode.createAccount(name); + + const nodeWithAccount = account.node.testWithDifferentAccount( + account, + newRandomSessionID(account.id) + ); + + return { + node: nodeWithAccount, + account: account.id, + accountSecret: account.agentSecret, + sessionID: nodeWithAccount.ownSessionID, + }; + } + createCoValue(header: CoValueHeader): CoValue { const coValue = new CoValue(header, this); this.coValues[coValue.id] = { state: "loaded", coValue: coValue }; @@ -65,6 +98,16 @@ export class LocalNode { return (await this.loadCoValue(id)).getCurrentContent() as T; } + async loadProfile(id: AccountID): Promise { + const account = await this.load>(id); + const profileID = account.get("profile"); + + if (!profileID) { + throw new Error(`Account ${id} has no profile`); + } + return (await this.loadCoValue(profileID)).getCurrentContent() as Profile; + } + expectCoValueLoaded(id: RawCoID, expectation?: string): CoValue { const entry = this.coValues[id]; if (!entry) { @@ -82,7 +125,20 @@ export class LocalNode { return entry.coValue; } - createAccount(_publicNickname: string): ControlledAccount { + expectProfileLoaded(id: AccountID, expectation?: string): Profile { + const account = this.expectCoValueLoaded(id, expectation); + const profileID = expectTeamContent(account.getCurrentContent()).get("profile"); + if (!profileID) { + throw new Error( + `${ + expectation ? expectation + ": " : "" + }Account ${id} has no profile` + ); + } + return this.expectCoValueLoaded(profileID, expectation).getCurrentContent() as Profile; + } + + createAccount(name: string): ControlledAccount { const agentSecret = newRandomAgentSecret(); const account = this.createCoValue( @@ -92,7 +148,9 @@ export class LocalNode { newRandomSessionID(getAgentID(agentSecret)) ); - expectTeamContent(account.getCurrentContent()).edit((editable) => { + const accountAsTeam = new Team(expectTeamContent(account.getCurrentContent()), account.node); + + accountAsTeam.teamMap.edit((editable) => { editable.set(getAgentID(agentSecret), "admin", "trusting"); const readKey = newRandomKeySecret(); @@ -111,14 +169,26 @@ export class LocalNode { "trusting" ); - editable.set('readKey', readKey.id, "trusting"); + editable.set("readKey", readKey.id, "trusting"); }); - return new ControlledAccount( + const controlledAccount = new ControlledAccount( agentSecret, - account.getCurrentContent() as CoMap, - this + account.getCurrentContent() as CoMap, + account.node ); + + const profile = accountAsTeam.createMap({ type: "profile" }); + + profile.edit((editable) => { + editable.set("name", name, "trusting"); + }); + + accountAsTeam.teamMap.edit((editable) => { + editable.set("profile", profile.id, "trusting"); + }); + + return controlledAccount; } resolveAccountAgent(id: AccountIDOrAgentID, expectation?: string): AgentID { @@ -177,7 +247,7 @@ export class LocalNode { "trusting" ); - editable.set('readKey', readKey.id, "trusting"); + editable.set("readKey", readKey.id, "trusting"); }); return new Team(teamContent, this); diff --git a/src/permissions.ts b/src/permissions.ts index 87085831c..cf61e3e30 100644 --- a/src/permissions.ts +++ b/src/permissions.ts @@ -20,7 +20,7 @@ import { } from "./coValue.js"; import { LocalNode } from "./node.js"; import { RawCoID, SessionID, TransactionID, isAgentID } from "./ids.js"; -import { AccountIDOrAgentID, GeneralizedControlledAccount } from "./account.js"; +import { AccountIDOrAgentID, GeneralizedControlledAccount, Profile } from "./account.js"; export type PermissionsDef = | { type: "team"; initialAdmin: AccountIDOrAgentID } @@ -77,7 +77,8 @@ export function determineValidTransactions( const change = tx.changes[0] as | MapOpPayload - | MapOpPayload<"readKey", JsonValue>; + | MapOpPayload<"readKey", JsonValue> + | MapOpPayload<"profile", CoID>; if (tx.changes.length !== 1) { console.warn("Team transaction must have exactly one change"); continue; @@ -94,6 +95,14 @@ export function determineValidTransactions( continue; } + validTransactions.push({ txID: { sessionID, txIndex }, tx }); + continue; + } else if (change.key === 'profile') { + if (memberState[transactor] !== "admin") { + console.warn("Only admins can set profile"); + continue; + } + validTransactions.push({ txID: { sessionID, txIndex }, tx }); continue; } else if (isKeyForKeyField(change.key) || isKeyForAccountField(change.key)) { @@ -205,6 +214,7 @@ export function determineValidTransactions( } export type TeamContent = { + profile: CoID | null; [key: AccountIDOrAgentID]: Role; readKey: KeyID; [revelationFor: `${KeyID}_for_${AccountIDOrAgentID}`]: Sealed; @@ -343,7 +353,7 @@ export class Team { } createMap( - meta?: M + meta?: Meta ): CoMap { return this.node .createCoValue({ From 4617b6a125f62ea793d21f9a6e037c94f66b1b9c Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 15 Aug 2023 12:20:13 +0100 Subject: [PATCH 2/2] Add and implement loading an account on node creation --- src/account.test.ts | 49 ++++++++++++--- src/node.ts | 25 +++++++- src/sync.test.ts | 148 +------------------------------------------ src/testUtils.ts | 150 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 215 insertions(+), 157 deletions(-) diff --git a/src/account.test.ts b/src/account.test.ts index 640aacccd..12a1acc18 100644 --- a/src/account.test.ts +++ b/src/account.test.ts @@ -1,20 +1,26 @@ +import { newRandomSessionID } from "./coValue.js"; import { LocalNode } from "./node.js"; +import { connectedPeers } from "./testUtils.js"; test("Can create a node while creating a new account with profile", async () => { - const { node, account, accountSecret, sessionID } = + const { node, accountID, accountSecret, sessionID } = LocalNode.withNewlyCreatedAccount("Hermes Puggington"); expect(node).not.toBeNull(); - expect(account).not.toBeNull(); + expect(accountID).not.toBeNull(); expect(accountSecret).not.toBeNull(); expect(sessionID).not.toBeNull(); - expect((node.expectProfileLoaded(account)).get("name")).toEqual("Hermes Puggington"); - expect((await node.loadProfile(account)).get("name")).toEqual("Hermes Puggington"); + expect(node.expectProfileLoaded(accountID).get("name")).toEqual( + "Hermes Puggington" + ); + expect((await node.loadProfile(accountID)).get("name")).toEqual( + "Hermes Puggington" + ); }); test("A node with an account can create teams and and objects within them", async () => { - const { node, account } = + const { node, accountID } = LocalNode.withNewlyCreatedAccount("Hermes Puggington"); const team = await node.createTeam(); @@ -28,5 +34,34 @@ test("A node with an account can create teams and and objects within them", asyn expect(map.get("foo")).toEqual("bar"); - expect(map.getLastEditor("foo")).toEqual(account); -}); \ No newline at end of file + expect(map.getLastEditor("foo")).toEqual(accountID); +}); + +test("Can create account with one node, and then load it on another", async () => { + const { node, accountID, accountSecret } = + LocalNode.withNewlyCreatedAccount("Hermes Puggington"); + + const team = await node.createTeam(); + expect(team).not.toBeNull(); + + let map = team.createMap(); + map = map.edit((edit) => { + edit.set("foo", "bar", "private"); + expect(edit.get("foo")).toEqual("bar"); + }); + + const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {trace: true, peer1role: "server", peer2role: "client"}); + + node.sync.addPeer(node2asPeer); + + const node2 = await LocalNode.withLoadedAccount( + accountID, + accountSecret, + newRandomSessionID(accountID), + [node1asPeer] + ); + + const map2 = await node2.load(map.id); + + expect(map2.get("foo")).toEqual("bar"); +}); diff --git a/src/node.ts b/src/node.ts index 1778fabbb..7279cc245 100644 --- a/src/node.ts +++ b/src/node.ts @@ -10,7 +10,7 @@ import { } from "./crypto.js"; import { CoValue, CoValueHeader, newRandomSessionID } from "./coValue.js"; import { Team, TeamContent, expectTeamContent } from "./permissions.js"; -import { SyncManager } from "./sync.js"; +import { Peer, SyncManager } from "./sync.js"; import { AgentID, RawCoID, SessionID, isAgentID } from "./ids.js"; import { CoID, ContentType } from "./contentType.js"; import { @@ -45,7 +45,7 @@ export class LocalNode { static withNewlyCreatedAccount(name: string): { node: LocalNode; - account: AccountID; + accountID: AccountID; accountSecret: AgentSecret; sessionID: SessionID; } { @@ -64,12 +64,31 @@ export class LocalNode { return { node: nodeWithAccount, - account: account.id, + accountID: account.id, accountSecret: account.agentSecret, sessionID: nodeWithAccount.ownSessionID, }; } + static async withLoadedAccount(accountID: AccountID, accountSecret: AgentSecret, sessionID: SessionID, peersToLoadFrom: Peer[]): Promise { + const loadingNode = new LocalNode(new AnonymousControlledAccount(accountSecret), newRandomSessionID(accountID)); + + const accountPromise = loadingNode.load(accountID); + + for (const peer of peersToLoadFrom) { + loadingNode.sync.addPeer(peer); + } + + const account = await accountPromise; + + // since this is all synchronous, we can just swap out nodes for the SyncManager + const node = loadingNode.testWithDifferentAccount(new ControlledAccount(accountSecret, account, loadingNode), sessionID); + node.sync = loadingNode.sync; + node.sync.local = node; + + return node; + } + createCoValue(header: CoValueHeader): CoValue { const coValue = new CoValue(header, this); this.coValues[coValue.id] = { state: "loaded", coValue: coValue }; diff --git a/src/sync.test.ts b/src/sync.test.ts index d85143d15..92edce4d1 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -9,7 +9,7 @@ import { WritableStream, TransformStream, } from "isomorphic-streams"; -import { randomAnonymousAccountAndSessionID } from "./testUtils.js"; +import { connectedPeers, newStreamPair, randomAnonymousAccountAndSessionID, shouldNotResolve } from "./testUtils.js"; import { AccountID } from "./account.js"; test("Node replies with initial tx and header to empty subscribe", async () => { @@ -1124,150 +1124,4 @@ function admStateEx(adminID: AccountID) { }; } -function newStreamPair(): [ReadableStream, WritableStream] { - const queue: T[] = []; - let resolveNextItemReady: () => void = () => {}; - let nextItemReady: Promise = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - let writerClosed = false; - let readerClosed = false; - - const readable = new ReadableStream({ - async pull(controller) { - let retriesLeft = 3; - while (retriesLeft > 0) { - if (writerClosed) { - controller.close(); - return; - } - retriesLeft--; - if (queue.length > 0) { - controller.enqueue(queue.shift()!); - if (queue.length === 0) { - nextItemReady = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - } - return; - } else { - await nextItemReady; - } - } - throw new Error( - "Should only use one retry to get next item in queue." - ); - }, - - cancel(reason) { - console.log("Manually closing reader"); - readerClosed = true; - }, - }); - - const writable = new WritableStream({ - write(chunk, controller) { - if (readerClosed) { - console.log("Reader closed, not writing chunk", chunk); - throw new Error("Reader closed, not writing chunk"); - } - queue.push(chunk); - if (queue.length === 1) { - // make sure that await write resolves before corresponding read - process.nextTick(() => resolveNextItemReady()); - } - }, - abort(reason) { - console.log("Manually closing writer"); - writerClosed = true; - resolveNextItemReady(); - return Promise.resolve(); - }, - }); - - return [readable, writable]; -} - -function shouldNotResolve( - promise: Promise, - ops: { timeout: number } -): Promise { - return new Promise((resolve, reject) => { - promise - .then((v) => - reject( - new Error( - "Should not have resolved, but resolved to " + - JSON.stringify(v) - ) - ) - ) - .catch(reject); - setTimeout(resolve, ops.timeout); - }); -} - -function connectedPeers( - peer1id: PeerID, - peer2id: PeerID, - { - trace = false, - peer1role = "peer", - peer2role = "peer", - }: { - trace?: boolean; - peer1role?: Peer["role"]; - peer2role?: Peer["role"]; - } = {} -): [Peer, Peer] { - const [inRx1, inTx1] = newStreamPair(); - const [outRx1, outTx1] = newStreamPair(); - - const [inRx2, inTx2] = newStreamPair(); - const [outRx2, outTx2] = newStreamPair(); - - void outRx2 - .pipeThrough( - new TransformStream({ - transform( - chunk: SyncMessage, - controller: { enqueue: (msg: SyncMessage) => void } - ) { - trace && console.log(`${peer2id} -> ${peer1id}`, chunk); - controller.enqueue(chunk); - }, - }) - ) - .pipeTo(inTx1); - - void outRx1 - .pipeThrough( - new TransformStream({ - transform( - chunk: SyncMessage, - controller: { enqueue: (msg: SyncMessage) => void } - ) { - trace && console.log(`${peer1id} -> ${peer2id}`, chunk); - controller.enqueue(chunk); - }, - }) - ) - .pipeTo(inTx2); - - const peer2AsPeer: Peer = { - id: peer2id, - incoming: inRx1, - outgoing: outTx1, - role: peer2role, - }; - - const peer1AsPeer: Peer = { - id: peer1id, - incoming: inRx2, - outgoing: outTx2, - role: peer1role, - }; - - return [peer1AsPeer, peer2AsPeer]; -} diff --git a/src/testUtils.ts b/src/testUtils.ts index 1264798a5..18c3f5e27 100644 --- a/src/testUtils.ts +++ b/src/testUtils.ts @@ -4,6 +4,8 @@ import { LocalNode } from "./node.js"; import { expectTeamContent } from "./permissions.js"; import { AnonymousControlledAccount } from "./account.js"; import { SessionID } from "./ids.js"; +import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams"; +import { Peer, PeerID, SyncMessage } from "./sync.js"; export function randomAnonymousAccountAndSessionID(): [AnonymousControlledAccount, SessionID] { const agentSecret = newRandomAgentSecret(); @@ -76,4 +78,152 @@ export function teamWithTwoAdminsHighLevel() { team.addMember(otherAdmin.id, "admin"); return { admin, node, team, otherAdmin }; +} + +export function newStreamPair(): [ReadableStream, WritableStream] { + const queue: T[] = []; + let resolveNextItemReady: () => void = () => {}; + let nextItemReady: Promise = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + + let writerClosed = false; + let readerClosed = false; + + const readable = new ReadableStream({ + async pull(controller) { + let retriesLeft = 3; + while (retriesLeft > 0) { + if (writerClosed) { + controller.close(); + return; + } + retriesLeft--; + if (queue.length > 0) { + controller.enqueue(queue.shift()!); + if (queue.length === 0) { + nextItemReady = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + } + return; + } else { + await nextItemReady; + } + } + throw new Error( + "Should only use one retry to get next item in queue." + ); + }, + + cancel(reason) { + console.log("Manually closing reader"); + readerClosed = true; + }, + }); + + const writable = new WritableStream({ + write(chunk, controller) { + if (readerClosed) { + console.log("Reader closed, not writing chunk", chunk); + throw new Error("Reader closed, not writing chunk"); + } + queue.push(chunk); + if (queue.length === 1) { + // make sure that await write resolves before corresponding read + process.nextTick(() => resolveNextItemReady()); + } + }, + abort(reason) { + console.log("Manually closing writer"); + writerClosed = true; + resolveNextItemReady(); + return Promise.resolve(); + }, + }); + + return [readable, writable]; +} + +export function shouldNotResolve( + promise: Promise, + ops: { timeout: number } +): Promise { + return new Promise((resolve, reject) => { + promise + .then((v) => + reject( + new Error( + "Should not have resolved, but resolved to " + + JSON.stringify(v) + ) + ) + ) + .catch(reject); + setTimeout(resolve, ops.timeout); + }); +} + +export function connectedPeers( + peer1id: PeerID, + peer2id: PeerID, + { + trace = false, + peer1role = "peer", + peer2role = "peer", + }: { + trace?: boolean; + peer1role?: Peer["role"]; + peer2role?: Peer["role"]; + } = {} +): [Peer, Peer] { + const [inRx1, inTx1] = newStreamPair(); + const [outRx1, outTx1] = newStreamPair(); + + const [inRx2, inTx2] = newStreamPair(); + const [outRx2, outTx2] = newStreamPair(); + + void outRx2 + .pipeThrough( + new TransformStream({ + transform( + chunk: SyncMessage, + controller: { enqueue: (msg: SyncMessage) => void } + ) { + trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2)); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx1); + + void outRx1 + .pipeThrough( + new TransformStream({ + transform( + chunk: SyncMessage, + controller: { enqueue: (msg: SyncMessage) => void } + ) { + trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2)); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx2); + + const peer2AsPeer: Peer = { + id: peer2id, + incoming: inRx1, + outgoing: outTx1, + role: peer2role, + }; + + const peer1AsPeer: Peer = { + id: peer1id, + incoming: inRx2, + outgoing: outTx2, + role: peer1role, + }; + + return [peer1AsPeer, peer2AsPeer]; } \ No newline at end of file