diff --git a/src/node.ts b/src/node.ts index 8ff1f6cf7..326f80e22 100644 --- a/src/node.ts +++ b/src/node.ts @@ -139,7 +139,7 @@ export class LocalNode { case "newContent": return this.handleNewContent(msg); case "wrongAssumedKnownState": - return this.handleWrongAssumedKnownState(msg); + return this.handleWrongAssumedKnownState(msg, peer); case "unsubscribe": return this.handleUnsubscribe(msg); } @@ -161,9 +161,14 @@ export class LocalNode { } handleWrongAssumedKnownState( - msg: WrongAssumedKnownStateMessage + msg: WrongAssumedKnownStateMessage, + peer: PeerState ): SyncMessage | undefined { - return undefined; + const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); + + peer.optimisticKnownStates[msg.knownState.multilogID] = msg.knownState; + + return multilog.newContentSince(msg.knownState); } handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined { diff --git a/src/sync.test.ts b/src/sync.test.ts index 352a3affc..1ddf187d2 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -265,6 +265,95 @@ test("After subscribing, node sends new txs to peer", async () => { } satisfies SyncMessage); }); +test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + }); + + const writer = inTx.getWriter(); + + await writer.write({ + type: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: false, + sessions: { + [node.ownSessionID]: 0, + }, + }, + }); + + const reader = outRx.getReader(); + + const _firstMessage = await reader.read(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + map.edit((editable) => { + editable.set("goodbye", "world", "trusting"); + }); + + const _secondMessage = await reader.read(); + const _thirdMessage = await reader.read(); + + await writer.write({ + type: "wrongAssumedKnownState", + knownState: { + multilogID: map.multiLog.id, + header: true, + sessions: { + [node.ownSessionID]: 1, + }, + }, + } satisfies SyncMessage); + + const fourthMessage = await reader.read(); + + expect(fourthMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: undefined, + newContent: { + [node.ownSessionID]: { + after: 1, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[1].madeAt, + changes: [ + { + op: "insert", + key: "goodbye", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; @@ -274,14 +363,18 @@ function newStreamPair(): [ReadableStream, WritableStream] { const readable = new ReadableStream({ async pull(controller) { - if (queue.length > 0) { - controller.enqueue(queue.shift()); - } else { - await nextItemReady; - nextItemReady = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - controller.enqueue(queue.shift()); + while(true) { + if (queue.length > 0) { + controller.enqueue(queue.shift()); + if (queue.length === 0) { + nextItemReady = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + } + return; + } else { + await nextItemReady; + } } }, }); @@ -289,7 +382,9 @@ function newStreamPair(): [ReadableStream, WritableStream] { const writable = new WritableStream({ write(chunk) { queue.push(chunk); - resolveNextItemReady(); + if (queue.length === 1) { + resolveNextItemReady(); + } }, });