diff --git a/src/multilog.ts b/src/multilog.ts index 99cf430d0..a7282aa76 100644 --- a/src/multilog.ts +++ b/src/multilog.ts @@ -501,6 +501,10 @@ export function getAgentID(agent: Agent): AgentID { )}`; } +export function agentIDasMultiLogID(agentID: AgentID): MultiLogID { + return `coval_${agentID.substring("agent_".length)}`; +} + export type AgentCredential = { signatorySecret: SignatorySecret; recipientSecret: RecipientSecret; diff --git a/src/node.ts b/src/node.ts index 5cc179978..c824702c5 100644 --- a/src/node.ts +++ b/src/node.ts @@ -12,6 +12,7 @@ import { getAgentMultilogHeader, MultiLogHeader, agentIDfromSessionID, + agentIDasMultiLogID, } from "./multilog"; import { Team, expectTeamContent } from "./permissions"; import { @@ -61,10 +62,40 @@ export class LocalNode { return multilog; } + loadMultiLog(id: MultiLogID): Promise { + let entry = this.multilogs[id]; + if (!entry) { + entry = newLoadingState(); + + this.multilogs[id] = entry; + + for (const peer of Object.values(this.peers)) { + peer.outgoing + .write({ + action: "subscribe", + knownState: { + multilogID: id, + header: false, + sessions: {}, + }, + }) + .catch((e) => { + console.error("Error writing to peer", e); + }); + } + } + if (entry.state === "loaded") { + return Promise.resolve(entry.multilog); + } + return entry.done; + } + expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog { const entry = this.multilogs[id]; if (!entry) { - throw new Error(`Unknown multilog ${id}`); + throw new Error( + `${expectation ? expectation + ": " : ""}Unknown multilog ${id}` + ); } if (entry.state === "loading") { throw new Error( @@ -183,22 +214,16 @@ export class LocalNode { } } - handleSubscribe(msg: SubscribeMessage, peer: PeerState): SyncMessage[] { + handleSubscribe( + msg: SubscribeMessage, + peer: PeerState, + asDependencyOf?: MultiLogID + ): SyncMessage[] { const entry = this.multilogs[msg.knownState.multilogID]; if (!entry || entry.state === "loading") { if (!entry) { - let resolve: (multilog: MultiLog) => void; - - const promise = new Promise((r) => { - resolve = r; - }); - - this.multilogs[msg.knownState.multilogID] = { - state: "loading", - done: promise, - resolve: resolve!, - }; + this.multilogs[msg.knownState.multilogID] = newLoadingState(); } return [ @@ -218,10 +243,35 @@ export class LocalNode { const newContent = entry.multilog.newContentSince(msg.knownState); + const dependedOnMultilogs = + entry.multilog.header.ruleset.type === "team" + ? expectTeamContent(entry.multilog.getCurrentContent()) + .keys() + .filter((k): k is AgentID => k.startsWith("agent_")) + .map((agent) => agentIDasMultiLogID(agent)) + : entry.multilog.header.ruleset.type === "ownedByTeam" + ? [entry.multilog.header.ruleset.team] + : []; + return [ + ...dependedOnMultilogs.flatMap((multilogID) => + this.handleSubscribe( + { + action: "subscribe", + knownState: { + multilogID, + header: false, + sessions: {}, + }, + }, + peer, + asDependencyOf || msg.knownState.multilogID + ) + ), { action: "subscribeResponse", knownState: entry.multilog.knownState(), + asDependencyOf, }, ...(newContent ? [newContent] : []), ]; @@ -231,12 +281,26 @@ export class LocalNode { msg: SubscribeResponseMessage, peer: PeerState ): SyncMessage[] { - const entry = this.multilogs[msg.knownState.multilogID]; + let entry = this.multilogs[msg.knownState.multilogID]; - if (!entry || entry.state === "loading") { - throw new Error( - "Expected multilog entry to be created, missing subscribe?" - ); + if (!entry) { + if (msg.asDependencyOf) { + if (this.multilogs[msg.asDependencyOf]) { + entry = newLoadingState(); + + this.multilogs[msg.knownState.multilogID] = entry; + } + } else { + throw new Error( + "Expected multilog entry to be created, missing subscribe?" + ); + } + } + + if (entry.state === "loading") { + peer.optimisticKnownStates[msg.knownState.multilogID] = + msg.knownState; + return []; } const newContent = entry.multilog.newContentSince(msg.knownState); @@ -415,3 +479,17 @@ type MultilogState = resolve: (multilog: MultiLog) => void; } | { state: "loaded"; multilog: MultiLog }; + +function newLoadingState(): MultilogState { + let resolve: (multilog: MultiLog) => void; + + const promise = new Promise((r) => { + resolve = r; + }); + + return { + state: "loading", + done: promise, + resolve: resolve!, + }; +} diff --git a/src/sync.test.ts b/src/sync.test.ts index f386f87ad..1c7edbea3 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -6,7 +6,7 @@ import { newRandomSessionID, } from "./multilog"; import { LocalNode } from "./node"; -import { SyncMessage } from "./sync"; +import { Peer, SyncMessage } from "./sync"; import { MapOpPayload, expectMap } from "./coValue"; test( @@ -48,16 +48,21 @@ test( const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(firstMessage.value).toEqual({ + const subscribeResponseMsg = await reader.read(); + + expect(subscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const secondMessage = await reader.read(); + const newContentMsg = await reader.read(); - expect(secondMessage.value).toEqual({ + expect(newContentMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: { @@ -133,16 +138,21 @@ test("Node replies with only new tx to subscribe with some known state", async ( const reader = outRx.getReader(); - const msg1 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(msg1.value).toEqual({ + const mapSubscribeResponseMsg = await reader.read(); + + expect(mapSubscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const msg2 = await reader.read(); + const mapNewContentMsg = await reader.read(); - expect(msg2.value).toEqual({ + expect(mapNewContentMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: undefined, @@ -171,6 +181,8 @@ test("Node replies with only new tx to subscribe with some known state", async ( } satisfies SyncMessage); }); +test.skip("TODO: node only replies with new tx to subscribe with some known state, even in the depended on multilogs", () => {}); + test("After subscribing, node sends own known state and new txs to peer", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -206,16 +218,21 @@ test("After subscribing, node sends own known state and new txs to peer", async const reader = outRx.getReader(); - const msg1 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(msg1.value).toEqual({ + const mapSubscribeResponseMsg = await reader.read(); + + expect(mapSubscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const msg2 = await reader.read(); + const mapNewContentHeaderOnlyMsg = await reader.read(); - expect(msg2.value).toEqual({ + expect(mapNewContentHeaderOnlyMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: map.multiLog.header, @@ -226,9 +243,9 @@ test("After subscribing, node sends own known state and new txs to peer", async editable.set("hello", "world", "trusting"); }); - const msg3 = await reader.read(); + const mapEditMsg1 = await reader.read(); - expect(msg3.value).toEqual({ + expect(mapEditMsg1.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, newContent: { @@ -259,9 +276,9 @@ test("After subscribing, node sends own known state and new txs to peer", async editable.set("goodbye", "world", "trusting"); }); - const msg4 = await reader.read(); + const mapEditMsg2 = await reader.read(); - expect(msg4.value).toEqual({ + expect(mapEditMsg2.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, newContent: { @@ -394,8 +411,12 @@ test("No matter the optimistic known state, node respects invalid known state me const reader = outRx.getReader(); - const _msg1 = await reader.read(); - const _msg2 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); + const _mapSubscribeResponseMsg = await reader.read(); + const _mapNewContentHeaderOnlyMsg = await reader.read(); map.edit((editable) => { editable.set("hello", "world", "trusting"); @@ -405,8 +426,8 @@ test("No matter the optimistic known state, node respects invalid known state me editable.set("goodbye", "world", "trusting"); }); - const _msg3 = await reader.read(); - const _msg4 = await reader.read(); + const _mapEditMsg1 = await reader.read(); + const _mapEditMsg2 = await reader.read(); await writer.write({ action: "wrongAssumedKnownState", @@ -419,9 +440,9 @@ test("No matter the optimistic known state, node respects invalid known state me }, } satisfies SyncMessage); - const msg5 = await reader.read(); + const newContentAfterWrongAssumedState = await reader.read(); - expect(msg5.value).toEqual({ + expect(newContentAfterWrongAssumedState.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: undefined, @@ -667,9 +688,13 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); + const mapSubscribeResponse = await reader.read(); - expect(firstMessage.value).toEqual({ + expect(mapSubscribeResponse.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); @@ -753,6 +778,35 @@ test("When replaying creation and transactions of a multilog as new content, the ).toEqual("world"); }); +test("When loading a multilog on one node, the server node it is requested from replies with all the necessary depended on multilogs to make it work", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node1 = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node1.createTeam(); + + const map = team.createMap(); + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const node2 = new LocalNode(admin, newRandomSessionID(adminID)); + + const [node2asPeer, node1asPeer] = connectedPeers(); + + node1.addPeer(node2asPeer); + node2.addPeer(node1asPeer); + + await node2.loadMultiLog(map.multiLog.id); + + expect( + expectMap( + node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent() + ).get("hello") + ).toEqual("world"); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; @@ -803,3 +857,49 @@ function shouldNotResolve(promise: Promise, ops: { timeout: number }) { setTimeout(resolve, ops.timeout); }); } + +function connectedPeers(trace?: boolean): [Peer, Peer] { + const [inRx1, inTx1] = newStreamPair(); + const [outRx1, outTx1] = newStreamPair(); + + const [inRx2, inTx2] = newStreamPair(); + const [outRx2, outTx2] = newStreamPair(); + + outRx2 + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + trace && console.log("peer 2 -> peer 1", chunk); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx1); + + outRx1 + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + trace && console.log("peer 1 -> peer 2", chunk); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx2); + + const peer2AsPeer: Peer = { + id: "test2", + incoming: inRx1, + outgoing: outTx1, + role: "peer", + }; + + const peer1AsPeer: Peer = { + id: "test1", + incoming: inRx2, + outgoing: outTx2, + role: "peer", + }; + + return [peer2AsPeer, peer1AsPeer]; +} diff --git a/src/sync.ts b/src/sync.ts index cf8367f01..276d84bbe 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -22,6 +22,7 @@ export type SubscribeMessage = { export type SubscribeResponseMessage = { action: "subscribeResponse"; knownState: MultiLogKnownState; + asDependencyOf?: MultiLogID; }; export type NewContentMessage = {