From 362a83c2bc02a1cb0c017a16af1f0142b0224c95 Mon Sep 17 00:00:00 2001 From: Anselm Date: Mon, 31 Jul 2023 16:52:20 +0100 Subject: [PATCH 1/7] First test and implementation for sync subscribe --- src/node.ts | 97 +++++++++++++++++++++++++++++++++---- src/permissions.ts | 4 ++ src/sync.test.ts | 118 +++++++++++++++++++++++++++++++++++++++++++++ src/sync.ts | 55 +++++++++++++++++++++ 4 files changed, 264 insertions(+), 10 deletions(-) create mode 100644 src/sync.test.ts create mode 100644 src/sync.ts diff --git a/src/node.ts b/src/node.ts index 9e37ad48e..b61bf43d7 100644 --- a/src/node.ts +++ b/src/node.ts @@ -13,10 +13,20 @@ import { MultiLogHeader, } from "./multilog"; import { Team, expectTeamContent } from "./permissions"; +import { + NewContentMessage, + Peer, + PeerID, + SessionNewContent, + SubscribeMessage, + SyncMessage, + UnsubscribeMessage, + WrongAssumedKnownStateMessage, +} from "./sync"; export class LocalNode { multilogs: { [key: MultiLogID]: Promise | MultiLog } = {}; - // peers: {[key: Hostname]: Peer} = {}; + peers: { [key: PeerID]: Peer } = {}; agentCredential: AgentCredential; agentID: AgentID; ownSessionID: SessionID; @@ -109,13 +119,80 @@ export class LocalNode { return new Team(teamContent, this); } + + async addPeer(peer: Peer) { + this.peers[peer.id] = peer; + + const writer = peer.outgoing.getWriter(); + + for await (const msg of peer.incoming) { + const response = this.handleSyncMessage(msg); + + if (response) { + await writer.write(response); + } + } + } + + handleSyncMessage(msg: SyncMessage): SyncMessage | undefined { + // TODO: validate + switch (msg.type) { + case "subscribe": + return this.handleSubscribe(msg); + case "newContent": + return this.handleNewContent(msg); + case "wrongAssumedKnownState": + return this.handleWrongAssumedKnownState(msg); + case "unsubscribe": + return this.handleUnsubscribe(msg); + } + } + + handleSubscribe(msg: SubscribeMessage): SyncMessage | undefined { + const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); + + return { + type: "newContent", + multilogID: multilog.id, + header: multilog.header, + newContent: Object.fromEntries( + Object.entries(multilog.sessions) + .map(([sessionID, log]) => { + const newTransactions = log.transactions.slice( + msg.knownState.sessions[sessionID as SessionID] || 0 + ); + + if ( + newTransactions.length === 0 || + !log.lastHash || + !log.lastSignature + ) { + return undefined; + } + + return [ + sessionID, + { + after: + msg.knownState.sessions[ + sessionID as SessionID + ] || 0, + newTransactions, + lastHash: log.lastHash, + lastSignature: log.lastSignature, + }, + ]; + }) + .filter((x): x is Exclude => !!x) + ), + }; + } + + handleNewContent(msg: NewContentMessage): SyncMessage | undefined {} + + handleWrongAssumedKnownState( + msg: WrongAssumedKnownStateMessage + ): SyncMessage | undefined {} + + handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined {} } - -// type Hostname = string; - -// interface Peer { -// hostname: Hostname; -// incoming: ReadableStream; -// outgoing: WritableStream; -// optimisticKnownStates: {[multilogID: MultiLogID]: MultilogKnownState}; -// } diff --git a/src/permissions.ts b/src/permissions.ts index a2f4f1ce8..e5a6ee12d 100644 --- a/src/permissions.ts +++ b/src/permissions.ts @@ -228,6 +228,10 @@ export class Team { this.node = node; } + get id(): MultiLogID { + return this.teamMap.id; + } + addMember(agentID: AgentID, role: Role) { this.teamMap = this.teamMap.edit((map) => { const agent = this.node.knownAgents[agentID]; diff --git a/src/sync.test.ts b/src/sync.test.ts new file mode 100644 index 000000000..5132f9f6e --- /dev/null +++ b/src/sync.test.ts @@ -0,0 +1,118 @@ +import { test, expect } from "bun:test"; +import { + getAgent, + getAgentID, + newRandomAgentCredential, + newRandomSessionID, +} from "./multilog"; +import { LocalNode } from "./node"; +import { SyncMessage } from "./sync"; +import { MapOpPayload } from "./coValue"; + +test( + "Node replies with initial tx and header to empty subscribe", + async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + optimisticKnownStates: {}, + }); + + const writer = inTx.getWriter(); + + await writer.write({ + type: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: false, + sessions: {}, + }, + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: { + type: "comap", + ruleset: { type: "ownedByTeam", team: team.id }, + meta: null, + }, + newContent: { + [node.ownSessionID]: { + after: 0, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[0].madeAt, + changes: [ + { + op: "insert", + key: "hello", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: + map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); + }, + { timeout: 100 } +); + +function newStreamPair(): [ReadableStream, WritableStream] { + const queue: T[] = []; + let resolveNextItemReady: () => void = () => {}; + let nextItemReady: Promise = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + + const readable = new ReadableStream({ + async pull(controller) { + if (queue.length > 0) { + controller.enqueue(queue.shift()); + } else { + await nextItemReady; + nextItemReady = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + controller.enqueue(queue.shift()); + } + }, + }); + + const writable = new WritableStream({ + write(chunk) { + queue.push(chunk); + resolveNextItemReady(); + }, + }); + + return [readable, writable]; +} diff --git a/src/sync.ts b/src/sync.ts new file mode 100644 index 000000000..992cc2766 --- /dev/null +++ b/src/sync.ts @@ -0,0 +1,55 @@ +import { Hash } from "./crypto"; +import { MultiLogHeader, MultiLogID, SessionID, Transaction } from "./multilog"; + +type MultiLogKnownState = { + multilogID: MultiLogID; + header: boolean; + sessions: { [sessionID: SessionID]: number }; +}; + +export type SyncMessage = + | SubscribeMessage + | NewContentMessage + | WrongAssumedKnownStateMessage + | UnsubscribeMessage; + +export type SubscribeMessage = { + type: "subscribe"; + knownState: MultiLogKnownState; +}; + +export type NewContentMessage = { + type: "newContent"; + multilogID: MultiLogID; + header?: MultiLogHeader; + newContent: { + [sessionID: SessionID]: SessionNewContent; + }; +}; + +export type SessionNewContent = { + after: number; + newTransactions: Transaction[]; + lastHash: Hash; + lastSignature: string; +} + +export type WrongAssumedKnownStateMessage = { + type: "wrongAssumedKnownState"; + knownState: MultiLogKnownState; +}; + +export type UnsubscribeMessage = { + type: "unsubscribe"; + multilogID: MultiLogID; +}; + +export type PeerID = string; + +export interface Peer { + id: PeerID; + incoming: ReadableStream; + outgoing: WritableStream; + optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState}; +} + From ad8e51c66c5b963af1b67012978fe01d121c8a5c Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 1 Aug 2023 11:19:54 +0100 Subject: [PATCH 2/7] Test and implementation for syncing new transactions --- src/multilog.ts | 132 +++++++++++++++++++-------------- src/node.ts | 141 ++++++++++++++++++++--------------- src/permissions.ts | 8 +- src/sync.test.ts | 181 ++++++++++++++++++++++++++++++++++++++++++++- src/sync.ts | 9 ++- 5 files changed, 347 insertions(+), 124 deletions(-) diff --git a/src/multilog.ts b/src/multilog.ts index 221fe66ff..62d274649 100644 --- a/src/multilog.ts +++ b/src/multilog.ts @@ -30,6 +30,8 @@ import { determineValidTransactions, expectTeamContent, } from "./permissions"; +import { LocalNode } from "./node"; +import { MultiLogKnownState, NewContentMessage } from "./sync"; export type MultiLogID = `coval_${string}`; @@ -65,7 +67,10 @@ export type PrivateTransaction = { privacy: "private"; madeAt: number; keyUsed: KeyID; - encryptedChanges: Encrypted; + encryptedChanges: Encrypted< + JsonValue[], + { in: MultiLogID; tx: TransactionID } + >; }; export type TrustingTransaction = { @@ -86,61 +91,33 @@ export type TransactionID = { sessionID: SessionID; txIndex: number }; export class MultiLog { id: MultiLogID; + node: LocalNode; header: MultiLogHeader; sessions: { [key: SessionID]: SessionLog }; - agentCredential: AgentCredential; - ownSessionID: SessionID; - knownAgents: { [key: AgentID]: Agent }; - requiredMultiLogs: { [key: MultiLogID]: MultiLog }; content?: CoValue; - constructor( - header: MultiLogHeader, - agentCredential: AgentCredential, - ownSessionID: SessionID, - knownAgents: { [key: AgentID]: Agent }, - requiredMultiLogs: { [key: MultiLogID]: MultiLog } - ) { + constructor(header: MultiLogHeader, node: LocalNode) { this.id = multilogIDforHeader(header); this.header = header; this.sessions = {}; - this.agentCredential = agentCredential; - this.ownSessionID = ownSessionID; - this.knownAgents = knownAgents; - this.requiredMultiLogs = requiredMultiLogs; + this.node = node; } testWithDifferentCredentials( agentCredential: AgentCredential, ownSessionID: SessionID ): MultiLog { - const knownAgents = { - ...this.knownAgents, - [agentIDfromSessionID(ownSessionID)]: getAgent(agentCredential), - }; - const cloned = new MultiLog( - this.header, + const newNode = this.node.testWithDifferentCredentials( agentCredential, - ownSessionID, - knownAgents, - Object.fromEntries( - Object.entries(this.requiredMultiLogs).map(([id, multilog]) => [ - id, - multilog.testWithDifferentCredentials( - agentCredential, - ownSessionID - ), - ]) - ) + ownSessionID ); - cloned.sessions = JSON.parse(JSON.stringify(this.sessions)); - - return cloned; + return newNode.expectMultiLogLoaded(this.id); } - knownState(): MultilogKnownState { + knownState(): MultiLogKnownState { return { + multilogID: this.id, header: true, sessions: Object.fromEntries( Object.entries(this.sessions).map(([k, v]) => [ @@ -156,7 +133,7 @@ export class MultiLog { } nextTransactionID(): TransactionID { - const sessionID = this.ownSessionID; + const sessionID = this.node.ownSessionID; return { sessionID, txIndex: this.sessions[sessionID]?.transactions.length || 0, @@ -170,7 +147,7 @@ export class MultiLog { newSignature: Signature ): boolean { const signatoryID = - this.knownAgents[agentIDfromSessionID(sessionID)]?.signatoryID; + this.node.knownAgents[agentIDfromSessionID(sessionID)]?.signatoryID; if (!signatoryID) { console.warn("Unknown agent", agentIDfromSessionID(sessionID)); @@ -210,6 +187,8 @@ export class MultiLog { this.content = undefined; + this.node.syncMultiLog(this); + const _ = this.getCurrentContent(); return true; @@ -262,14 +241,14 @@ export class MultiLog { }; } - const sessionID = this.ownSessionID; + const sessionID = this.node.ownSessionID; const { expectedNewHash } = this.expectedNewHashAfter(sessionID, [ transaction, ]); const signature = sign( - this.agentCredential.signatorySecret, + this.node.agentCredential.signatorySecret, expectedNewHash ); @@ -353,9 +332,9 @@ export class MultiLog { id: currentKeyId, }; } else if (this.header.ruleset.type === "ownedByTeam") { - return this.requiredMultiLogs[ - this.header.ruleset.team - ].getCurrentReadKey(); + return this.node + .expectMultiLogLoaded(this.header.ruleset.team) + .getCurrentReadKey(); } else { throw new Error( "Only teams or values owned by teams have read secrets" @@ -374,7 +353,7 @@ export class MultiLog { for (const entry of readKeyHistory) { if (entry.value?.keyID === keyID) { const revealer = agentIDfromSessionID(entry.txID.sessionID); - const revealerAgent = this.knownAgents[revealer]; + const revealerAgent = this.node.knownAgents[revealer]; if (!revealerAgent) { throw new Error("Unknown revealer"); @@ -382,7 +361,7 @@ export class MultiLog { const secret = openAs( entry.value.revelation, - this.agentCredential.recipientSecret, + this.node.agentCredential.recipientSecret, revealerAgent.recipientID, { in: this.id, @@ -417,7 +396,9 @@ export class MultiLog { if (secret) { return secret; } else { - console.error(`Sealing ${sealingKeyID} key didn't unseal ${keyID}`); + console.error( + `Sealing ${sealingKeyID} key didn't unseal ${keyID}` + ); } } } @@ -426,12 +407,12 @@ export class MultiLog { "readKey " + keyID + " not revealed for " + - getAgentID(getAgent(this.agentCredential)) + getAgentID(getAgent(this.node.agentCredential)) ); } else if (this.header.ruleset.type === "ownedByTeam") { - return this.requiredMultiLogs[this.header.ruleset.team].getReadKey( - keyID - ); + return this.node + .expectMultiLogLoaded(this.header.ruleset.team) + .getReadKey(keyID); } else { throw new Error( "Only teams or values owned by teams have read secrets" @@ -442,12 +423,51 @@ export class MultiLog { getTx(txID: TransactionID): Transaction | undefined { return this.sessions[txID.sessionID]?.transactions[txID.txIndex]; } -} -type MultilogKnownState = { - header: boolean; - sessions: { [key: SessionID]: number }; -}; + newContentSince(knownState: MultiLogKnownState | undefined): NewContentMessage | undefined { + const newContent: NewContentMessage = { + type: "newContent", + multilogID: this.id, + header: knownState?.header ? undefined : this.header, + newContent: Object.fromEntries( + Object.entries(this.sessions) + .map(([sessionID, log]) => { + const newTransactions = log.transactions.slice( + knownState?.sessions[sessionID as SessionID] || 0 + ); + + if ( + newTransactions.length === 0 || + !log.lastHash || + !log.lastSignature + ) { + return undefined; + } + + return [ + sessionID, + { + after: + knownState?.sessions[ + sessionID as SessionID + ] || 0, + newTransactions, + lastHash: log.lastHash, + lastSignature: log.lastSignature, + }, + ]; + }) + .filter((x): x is Exclude => !!x) + ), + } + + if (!newContent.header && Object.keys(newContent.newContent).length === 0) { + return undefined; + } + + return newContent; + } +} export type AgentID = `agent_${string}`; diff --git a/src/node.ts b/src/node.ts index b61bf43d7..8ff1f6cf7 100644 --- a/src/node.ts +++ b/src/node.ts @@ -11,12 +11,14 @@ import { getAgentID, getAgentMultilogHeader, MultiLogHeader, + agentIDfromSessionID, } from "./multilog"; import { Team, expectTeamContent } from "./permissions"; import { NewContentMessage, Peer, PeerID, + PeerState, SessionNewContent, SubscribeMessage, SyncMessage, @@ -26,7 +28,7 @@ import { export class LocalNode { multilogs: { [key: MultiLogID]: Promise | MultiLog } = {}; - peers: { [key: PeerID]: Peer } = {}; + peers: { [key: PeerID]: PeerState } = {}; agentCredential: AgentCredential; agentID: AgentID; ownSessionID: SessionID; @@ -40,13 +42,7 @@ export class LocalNode { this.knownAgents[agentID] = agent; this.ownSessionID = ownSessionID; - const agentMultilog = new MultiLog( - getAgentMultilogHeader(agent), - agentCredential, - ownSessionID, - this.knownAgents, - {} - ); + const agentMultilog = new MultiLog(getAgentMultilogHeader(agent), this); this.multilogs[agentMultilog.id] = Promise.resolve(agentMultilog); } @@ -60,13 +56,7 @@ export class LocalNode { } : {}; - const multilog = new MultiLog( - header, - this.agentCredential, - this.ownSessionID, - this.knownAgents, - requiredMultiLogs - ); + const multilog = new MultiLog(header, this); this.multilogs[multilog.id] = multilog; return multilog; } @@ -121,24 +111,31 @@ export class LocalNode { } async addPeer(peer: Peer) { - this.peers[peer.id] = peer; + const peerState = { + id: peer.id, + optimisticKnownStates: {}, + incoming: peer.incoming, + outgoing: peer.outgoing.getWriter(), + }; + this.peers[peer.id] = peerState; - const writer = peer.outgoing.getWriter(); - - for await (const msg of peer.incoming) { - const response = this.handleSyncMessage(msg); + for await (const msg of peerState.incoming) { + const response = this.handleSyncMessage(msg, peerState); if (response) { - await writer.write(response); + await peerState.outgoing.write(response); } } } - handleSyncMessage(msg: SyncMessage): SyncMessage | undefined { + handleSyncMessage( + msg: SyncMessage, + peer: PeerState + ): SyncMessage | undefined { // TODO: validate switch (msg.type) { case "subscribe": - return this.handleSubscribe(msg); + return this.handleSubscribe(msg, peer); case "newContent": return this.handleNewContent(msg); case "wrongAssumedKnownState": @@ -148,51 +145,73 @@ export class LocalNode { } } - handleSubscribe(msg: SubscribeMessage): SyncMessage | undefined { + handleSubscribe( + msg: SubscribeMessage, + peer: PeerState + ): SyncMessage | undefined { const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); - return { - type: "newContent", - multilogID: multilog.id, - header: multilog.header, - newContent: Object.fromEntries( - Object.entries(multilog.sessions) - .map(([sessionID, log]) => { - const newTransactions = log.transactions.slice( - msg.knownState.sessions[sessionID as SessionID] || 0 - ); + peer.optimisticKnownStates[multilog.id] = multilog.knownState(); - if ( - newTransactions.length === 0 || - !log.lastHash || - !log.lastSignature - ) { - return undefined; - } - - return [ - sessionID, - { - after: - msg.knownState.sessions[ - sessionID as SessionID - ] || 0, - newTransactions, - lastHash: log.lastHash, - lastSignature: log.lastSignature, - }, - ]; - }) - .filter((x): x is Exclude => !!x) - ), - }; + return multilog.newContentSince(msg.knownState); } - handleNewContent(msg: NewContentMessage): SyncMessage | undefined {} + handleNewContent(msg: NewContentMessage): SyncMessage | undefined { + return undefined; + } handleWrongAssumedKnownState( msg: WrongAssumedKnownStateMessage - ): SyncMessage | undefined {} + ): SyncMessage | undefined { + return undefined; + } - handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined {} + handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined { + return undefined; + } + + async syncMultiLog(multilog: MultiLog) { + for (const peer of Object.values(this.peers)) { + const optimisticKnownState = + peer.optimisticKnownStates[multilog.id]; + + const newContent = multilog.newContentSince(optimisticKnownState); + + peer.optimisticKnownStates[multilog.id] = multilog.knownState(); + + if (newContent) { + await peer.outgoing.write(newContent); + } + } + } + + testWithDifferentCredentials( + agentCredential: AgentCredential, + ownSessionID: SessionID + ): LocalNode { + const newNode = new LocalNode(agentCredential, ownSessionID); + + newNode.multilogs = Object.fromEntries( + Object.entries(this.multilogs) + .map(([id, multilog]) => { + if (multilog instanceof Promise) { + return [id, undefined]; + } + + const newMultilog = new MultiLog(multilog.header, newNode); + + newMultilog.sessions = multilog.sessions; + + return [id, newMultilog]; + }) + .filter((x): x is Exclude => !!x) + ); + + newNode.knownAgents = { + ...this.knownAgents, + [agentIDfromSessionID(ownSessionID)]: getAgent(agentCredential), + }; + + return newNode; + } } diff --git a/src/permissions.ts b/src/permissions.ts index e5a6ee12d..1f3253255 100644 --- a/src/permissions.ts +++ b/src/permissions.ts @@ -155,9 +155,9 @@ export function determineValidTransactions( return validTransactions; } else if (multilog.header.ruleset.type === "ownedByTeam") { const teamContent = - multilog.requiredMultiLogs[ + multilog.node.expectMultiLogLoaded( multilog.header.ruleset.team - ].getCurrentContent(); + ).getCurrentContent(); if (teamContent.type !== "comap") { throw new Error("Team must be a map"); @@ -249,7 +249,7 @@ export class Team { const revelation = seal( currentReadKey.secret, - this.teamMap.multiLog.agentCredential.recipientSecret, + this.teamMap.multiLog.node.agentCredential.recipientSecret, new Set([agent.recipientID]), { in: this.teamMap.multiLog.id, @@ -283,7 +283,7 @@ export class Team { const newReadKeyRevelation = seal( newReadKey.secret, - this.teamMap.multiLog.agentCredential.recipientSecret, + this.teamMap.multiLog.node.agentCredential.recipientSecret, new Set( currentlyPermittedReaders.map( (reader) => this.node.knownAgents[reader].recipientID diff --git a/src/sync.test.ts b/src/sync.test.ts index 5132f9f6e..352a3affc 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -32,7 +32,6 @@ test( id: "test", incoming: inRx, outgoing: outTx, - optimisticKnownStates: {}, }); const writer = inTx.getWriter(); @@ -86,6 +85,186 @@ test( { timeout: 100 } ); +test("Node replies with only new tx to subscribe with some known state", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + editable.set("goodbye", "world", "trusting"); + }); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + }); + + const writer = inTx.getWriter(); + + await writer.write({ + type: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: true, + sessions: { + [node.ownSessionID]: 1, + }, + }, + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: undefined, + newContent: { + [node.ownSessionID]: { + after: 1, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[1].madeAt, + changes: [ + { + op: "insert", + key: "goodbye", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + +test("After subscribing, node sends new txs to peer", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + }); + + const writer = inTx.getWriter(); + + await writer.write({ + type: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: false, + sessions: { + [node.ownSessionID]: 0, + }, + }, + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: map.multiLog.header, + newContent: {}, + } satisfies SyncMessage); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const secondMessage = await reader.read(); + + expect(secondMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + newContent: { + [node.ownSessionID]: { + after: 0, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[0].madeAt, + changes: [ + { + op: "insert", + key: "hello", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); + + map.edit((editable) => { + editable.set("goodbye", "world", "trusting"); + }); + + const thirdMessage = await reader.read(); + + expect(thirdMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + newContent: { + [node.ownSessionID]: { + after: 1, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[1].madeAt, + changes: [ + { + op: "insert", + key: "goodbye", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; diff --git a/src/sync.ts b/src/sync.ts index 992cc2766..9c7c89a7c 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -1,7 +1,7 @@ import { Hash } from "./crypto"; import { MultiLogHeader, MultiLogID, SessionID, Transaction } from "./multilog"; -type MultiLogKnownState = { +export type MultiLogKnownState = { multilogID: MultiLogID; header: boolean; sessions: { [sessionID: SessionID]: number }; @@ -50,6 +50,11 @@ export interface Peer { id: PeerID; incoming: ReadableStream; outgoing: WritableStream; - optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState}; } +export interface PeerState { + id: PeerID; + optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState}; + incoming: ReadableStream; + outgoing: WritableStreamDefaultWriter; +} \ No newline at end of file From c5bc519fbe203169b69195416cf44e421c499799 Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 1 Aug 2023 13:23:25 +0100 Subject: [PATCH 3/7] Implement invalid known state handling --- src/node.ts | 11 +++-- src/sync.test.ts | 113 +++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 112 insertions(+), 12 deletions(-) diff --git a/src/node.ts b/src/node.ts index 8ff1f6cf7..326f80e22 100644 --- a/src/node.ts +++ b/src/node.ts @@ -139,7 +139,7 @@ export class LocalNode { case "newContent": return this.handleNewContent(msg); case "wrongAssumedKnownState": - return this.handleWrongAssumedKnownState(msg); + return this.handleWrongAssumedKnownState(msg, peer); case "unsubscribe": return this.handleUnsubscribe(msg); } @@ -161,9 +161,14 @@ export class LocalNode { } handleWrongAssumedKnownState( - msg: WrongAssumedKnownStateMessage + msg: WrongAssumedKnownStateMessage, + peer: PeerState ): SyncMessage | undefined { - return undefined; + const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); + + peer.optimisticKnownStates[msg.knownState.multilogID] = msg.knownState; + + return multilog.newContentSince(msg.knownState); } handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined { diff --git a/src/sync.test.ts b/src/sync.test.ts index 352a3affc..1ddf187d2 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -265,6 +265,95 @@ test("After subscribing, node sends new txs to peer", async () => { } satisfies SyncMessage); }); +test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + }); + + const writer = inTx.getWriter(); + + await writer.write({ + type: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: false, + sessions: { + [node.ownSessionID]: 0, + }, + }, + }); + + const reader = outRx.getReader(); + + const _firstMessage = await reader.read(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + map.edit((editable) => { + editable.set("goodbye", "world", "trusting"); + }); + + const _secondMessage = await reader.read(); + const _thirdMessage = await reader.read(); + + await writer.write({ + type: "wrongAssumedKnownState", + knownState: { + multilogID: map.multiLog.id, + header: true, + sessions: { + [node.ownSessionID]: 1, + }, + }, + } satisfies SyncMessage); + + const fourthMessage = await reader.read(); + + expect(fourthMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: undefined, + newContent: { + [node.ownSessionID]: { + after: 1, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[1].madeAt, + changes: [ + { + op: "insert", + key: "goodbye", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; @@ -274,14 +363,18 @@ function newStreamPair(): [ReadableStream, WritableStream] { const readable = new ReadableStream({ async pull(controller) { - if (queue.length > 0) { - controller.enqueue(queue.shift()); - } else { - await nextItemReady; - nextItemReady = new Promise((resolve) => { - resolveNextItemReady = resolve; - }); - controller.enqueue(queue.shift()); + while(true) { + if (queue.length > 0) { + controller.enqueue(queue.shift()); + if (queue.length === 0) { + nextItemReady = new Promise((resolve) => { + resolveNextItemReady = resolve; + }); + } + return; + } else { + await nextItemReady; + } } }, }); @@ -289,7 +382,9 @@ function newStreamPair(): [ReadableStream, WritableStream] { const writable = new WritableStream({ write(chunk) { queue.push(chunk); - resolveNextItemReady(); + if (queue.length === 1) { + resolveNextItemReady(); + } }, }); From b31bf7c9e95d5784401d31111cba4c2cad7dbe35 Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 1 Aug 2023 15:23:55 +0100 Subject: [PATCH 4/7] Add tests and implementation for peer roles --- src/node.ts | 15 +++-- src/sync.test.ts | 140 ++++++++++++++++++++++++++++++++++++++++++++++- src/sync.ts | 2 + 3 files changed, 152 insertions(+), 5 deletions(-) diff --git a/src/node.ts b/src/node.ts index 326f80e22..afa6c68c5 100644 --- a/src/node.ts +++ b/src/node.ts @@ -58,6 +58,9 @@ export class LocalNode { const multilog = new MultiLog(header, this); this.multilogs[multilog.id] = multilog; + + this.syncMultiLog(multilog); + return multilog; } @@ -116,6 +119,7 @@ export class LocalNode { optimisticKnownStates: {}, incoming: peer.incoming, outgoing: peer.outgoing.getWriter(), + role: peer.role, }; this.peers[peer.id] = peerState; @@ -180,12 +184,15 @@ export class LocalNode { const optimisticKnownState = peer.optimisticKnownStates[multilog.id]; - const newContent = multilog.newContentSince(optimisticKnownState); + if (optimisticKnownState || peer.role === "server") { + const newContent = + multilog.newContentSince(optimisticKnownState); - peer.optimisticKnownStates[multilog.id] = multilog.knownState(); + peer.optimisticKnownStates[multilog.id] = multilog.knownState(); - if (newContent) { - await peer.outgoing.write(newContent); + if (newContent) { + await peer.outgoing.write(newContent); + } } } } diff --git a/src/sync.test.ts b/src/sync.test.ts index 1ddf187d2..a4c934b5d 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -32,6 +32,7 @@ test( id: "test", incoming: inRx, outgoing: outTx, + role: "peer", }); const writer = inTx.getWriter(); @@ -107,6 +108,7 @@ test("Node replies with only new tx to subscribe with some known state", async ( id: "test", incoming: inRx, outgoing: outTx, + role: "peer", }); const writer = inTx.getWriter(); @@ -172,6 +174,7 @@ test("After subscribing, node sends new txs to peer", async () => { id: "test", incoming: inRx, outgoing: outTx, + role: "peer", }); const writer = inTx.getWriter(); @@ -282,6 +285,7 @@ test("No matter the optimistic known state, node respects invalid known state me id: "test", incoming: inRx, outgoing: outTx, + role: "peer", }); const writer = inTx.getWriter(); @@ -354,6 +358,126 @@ test("No matter the optimistic known state, node respects invalid known state me } satisfies SyncMessage); }); +test("If we add a peer, but it never subscribes to a multilog, it won't get any messages", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "peer", + }); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const reader = outRx.getReader(); + + await shouldNotResolve(reader.read(), { timeout: 50 }); +}); + +test("If we add a server peer, all updates to all multilogs are sent to it, even if it doesn't subscribe", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "server", + }); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: map.multiLog.header, + newContent: { + [node.ownSessionID]: { + after: 0, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[0].madeAt, + changes: [ + { + op: "insert", + key: "hello", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + +test("If we add a server peer, it even receives just headers of newly created multilogs", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "server", + }); + + const map = team.createMap(); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "newContent", + multilogID: map.multiLog.id, + header: map.multiLog.header, + newContent: {}, + } satisfies SyncMessage); +}) + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; @@ -363,7 +487,7 @@ function newStreamPair(): [ReadableStream, WritableStream] { const readable = new ReadableStream({ async pull(controller) { - while(true) { + while (true) { if (queue.length > 0) { controller.enqueue(queue.shift()); if (queue.length === 0) { @@ -390,3 +514,17 @@ function newStreamPair(): [ReadableStream, WritableStream] { return [readable, writable]; } + +function shouldNotResolve(promise: Promise, ops: { timeout: number }) { + return new Promise((resolve, reject) => { + promise.then((v) => + reject( + new Error( + "Should not have resolved, but resolved to " + + JSON.stringify(v) + ) + ) + ); + setTimeout(resolve, ops.timeout); + }); +} diff --git a/src/sync.ts b/src/sync.ts index 9c7c89a7c..ae187d36f 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -50,6 +50,7 @@ export interface Peer { id: PeerID; incoming: ReadableStream; outgoing: WritableStream; + role: 'peer' | 'server' | 'client'; } export interface PeerState { @@ -57,4 +58,5 @@ export interface PeerState { optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState}; incoming: ReadableStream; outgoing: WritableStreamDefaultWriter; + role: 'peer' | 'server' | 'client'; } \ No newline at end of file From d7682d73d8eeb74a5d04a9b5e4a79a532e2f607b Mon Sep 17 00:00:00 2001 From: Anselm Date: Tue, 1 Aug 2023 15:38:39 +0100 Subject: [PATCH 5/7] Initial sync to server --- src/node.ts | 15 +++++++++++++++ src/sync.test.ts | 45 ++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/node.ts b/src/node.ts index afa6c68c5..b6ae7d238 100644 --- a/src/node.ts +++ b/src/node.ts @@ -123,6 +123,21 @@ export class LocalNode { }; this.peers[peer.id] = peerState; + if (peer.role === "server") { + for (const multilog of Object.values(this.multilogs)) { + if (multilog instanceof Promise) { + continue; + } + + await peerState.outgoing.write( + { + type: "subscribe", + knownState: multilog.knownState(), + } + ); + } + } + for await (const msg of peerState.incoming) { const response = this.handleSyncMessage(msg, peerState); diff --git a/src/sync.test.ts b/src/sync.test.ts index a4c934b5d..530b8368f 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -412,6 +412,7 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even }); const reader = outRx.getReader(); + const _initialSyncMessage = await reader.read(); const firstMessage = await reader.read(); @@ -464,9 +465,10 @@ test("If we add a server peer, it even receives just headers of newly created mu role: "server", }); - const map = team.createMap(); - const reader = outRx.getReader(); + const _initialSyncMessage = await reader.read(); + + const map = team.createMap(); const firstMessage = await reader.read(); @@ -476,7 +478,44 @@ test("If we add a server peer, it even receives just headers of newly created mu header: map.multiLog.header, newContent: {}, } satisfies SyncMessage); -}) +}); + +test("When we connect a new server peer, we try to sync all existing multilogs to it", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "server", + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + type: "subscribe", + knownState: team.teamMap.multiLog.knownState(), + } satisfies SyncMessage); + + const secondMessage = await reader.read(); + + expect(secondMessage.value).toEqual({ + type: "subscribe", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); +}); function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; From c04d2797d21687fbd8cacdabf0a84fa8b51cf6f7 Mon Sep 17 00:00:00 2001 From: Anselm Date: Wed, 2 Aug 2023 15:27:27 +0100 Subject: [PATCH 6/7] Several further tests and improvements for syncing --- src/coValue.ts | 12 ++ src/multilog.ts | 4 +- src/node.ts | 293 +++++++++++++++++++++++++++++++++--------- src/permissions.ts | 8 +- src/sync.test.ts | 314 +++++++++++++++++++++++++++++++++++++++------ src/sync.ts | 71 ++++++++-- tsconfig.json | 4 +- 7 files changed, 592 insertions(+), 114 deletions(-) diff --git a/src/coValue.ts b/src/coValue.ts index b498f77c9..51c6f4007 100644 --- a/src/coValue.ts +++ b/src/coValue.ts @@ -198,6 +198,10 @@ export class CoList { constructor(multilog: MultiLog) { this.id = multilog.id as CoValueID>; } + + toJSON(): JsonObject { + throw new Error("Method not implemented."); + } } export class MultiStream { @@ -207,6 +211,10 @@ export class MultiStream { constructor(multilog: MultiLog) { this.id = multilog.id as CoValueID>; } + + toJSON(): JsonObject { + throw new Error("Method not implemented."); + } } export class Static { @@ -216,6 +224,10 @@ export class Static { constructor(multilog: MultiLog) { this.id = multilog.id as CoValueID>; } + + toJSON(): JsonObject { + throw new Error("Method not implemented."); + } } export function expectMap(content: CoValue): CoMap<{ [key: string]: string }, {}> { diff --git a/src/multilog.ts b/src/multilog.ts index 62d274649..99cf430d0 100644 --- a/src/multilog.ts +++ b/src/multilog.ts @@ -60,7 +60,7 @@ type SessionLog = { transactions: Transaction[]; lastHash?: Hash; streamingHash: StreamingHash; - lastSignature: string; + lastSignature: Signature; }; export type PrivateTransaction = { @@ -426,7 +426,7 @@ export class MultiLog { newContentSince(knownState: MultiLogKnownState | undefined): NewContentMessage | undefined { const newContent: NewContentMessage = { - type: "newContent", + action: "newContent", multilogID: this.id, header: knownState?.header ? undefined : this.header, newContent: Object.fromEntries( diff --git a/src/node.ts b/src/node.ts index b6ae7d238..5cc179978 100644 --- a/src/node.ts +++ b/src/node.ts @@ -21,13 +21,16 @@ import { PeerState, SessionNewContent, SubscribeMessage, + SubscribeResponseMessage, SyncMessage, UnsubscribeMessage, WrongAssumedKnownStateMessage, + combinedKnownStates, + weAreStrictlyAhead, } from "./sync"; export class LocalNode { - multilogs: { [key: MultiLogID]: Promise | MultiLog } = {}; + multilogs: { [key: MultiLogID]: MultilogState } = {}; peers: { [key: PeerID]: PeerState } = {}; agentCredential: AgentCredential; agentID: AgentID; @@ -43,36 +46,34 @@ export class LocalNode { this.ownSessionID = ownSessionID; const agentMultilog = new MultiLog(getAgentMultilogHeader(agent), this); - this.multilogs[agentMultilog.id] = Promise.resolve(agentMultilog); + this.multilogs[agentMultilog.id] = { + state: "loaded", + multilog: agentMultilog, + }; } createMultiLog(header: MultiLogHeader): MultiLog { - const requiredMultiLogs = - header.ruleset.type === "ownedByTeam" - ? { - [header.ruleset.team]: this.expectMultiLogLoaded( - header.ruleset.team - ), - } - : {}; - const multilog = new MultiLog(header, this); - this.multilogs[multilog.id] = multilog; + this.multilogs[multilog.id] = { state: "loaded", multilog }; this.syncMultiLog(multilog); return multilog; } - expectMultiLogLoaded(id: MultiLogID): MultiLog { - const multilog = this.multilogs[id]; - if (!multilog) { + expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog { + const entry = this.multilogs[id]; + if (!entry) { throw new Error(`Unknown multilog ${id}`); } - if (multilog instanceof Promise) { - throw new Error(`Multilog ${id} not yet loaded`); + if (entry.state === "loading") { + throw new Error( + `${ + expectation ? expectation + ": " : "" + }Multilog ${id} not yet loaded` + ); } - return multilog; + return entry.multilog; } addKnownAgent(agent: Agent) { @@ -113,8 +114,8 @@ export class LocalNode { return new Team(teamContent, this); } - async addPeer(peer: Peer) { - const peerState = { + addPeer(peer: Peer) { + const peerState: PeerState = { id: peer.id, optimisticKnownStates: {}, incoming: peer.incoming, @@ -124,74 +125,221 @@ export class LocalNode { this.peers[peer.id] = peerState; if (peer.role === "server") { - for (const multilog of Object.values(this.multilogs)) { - if (multilog instanceof Promise) { + for (const entry of Object.values(this.multilogs)) { + if (entry.state === "loading") { continue; } - await peerState.outgoing.write( - { - type: "subscribe", - knownState: multilog.knownState(), - } - ); + peerState.outgoing + .write({ + action: "subscribe", + knownState: entry.multilog.knownState(), + }) + .catch((e) => { + // TODO: handle error + console.error("Error writing to peer", e); + }); + + peerState.optimisticKnownStates[entry.multilog.id] = { + multilogID: entry.multilog.id, + header: false, + sessions: {}, + }; } } - for await (const msg of peerState.incoming) { - const response = this.handleSyncMessage(msg, peerState); - - if (response) { - await peerState.outgoing.write(response); + const readIncoming = async () => { + for await (const msg of peerState.incoming) { + for (const responseMsg of this.handleSyncMessage( + msg, + peerState + )) { + await peerState.outgoing.write(responseMsg); + } } - } + }; + + readIncoming().catch((e) => { + // TODO: handle error + console.error("Error reading from peer", e); + }); } - handleSyncMessage( - msg: SyncMessage, - peer: PeerState - ): SyncMessage | undefined { + handleSyncMessage(msg: SyncMessage, peer: PeerState): SyncMessage[] { // TODO: validate - switch (msg.type) { + switch (msg.action) { case "subscribe": return this.handleSubscribe(msg, peer); + case "subscribeResponse": + return this.handleSubscribeResponse(msg, peer); case "newContent": return this.handleNewContent(msg); case "wrongAssumedKnownState": return this.handleWrongAssumedKnownState(msg, peer); case "unsubscribe": return this.handleUnsubscribe(msg); + default: + throw new Error(`Unknown message type ${(msg as any).action}`); } } - handleSubscribe( - msg: SubscribeMessage, - peer: PeerState - ): SyncMessage | undefined { - const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); + handleSubscribe(msg: SubscribeMessage, peer: PeerState): SyncMessage[] { + const entry = this.multilogs[msg.knownState.multilogID]; - peer.optimisticKnownStates[multilog.id] = multilog.knownState(); + if (!entry || entry.state === "loading") { + if (!entry) { + let resolve: (multilog: MultiLog) => void; - return multilog.newContentSince(msg.knownState); + const promise = new Promise((r) => { + resolve = r; + }); + + this.multilogs[msg.knownState.multilogID] = { + state: "loading", + done: promise, + resolve: resolve!, + }; + } + + return [ + { + action: "subscribeResponse", + knownState: { + multilogID: msg.knownState.multilogID, + header: false, + sessions: {}, + }, + }, + ]; + } + + peer.optimisticKnownStates[entry.multilog.id] = + entry.multilog.knownState(); + + const newContent = entry.multilog.newContentSince(msg.knownState); + + return [ + { + action: "subscribeResponse", + knownState: entry.multilog.knownState(), + }, + ...(newContent ? [newContent] : []), + ]; } - handleNewContent(msg: NewContentMessage): SyncMessage | undefined { - return undefined; + handleSubscribeResponse( + msg: SubscribeResponseMessage, + peer: PeerState + ): SyncMessage[] { + const entry = this.multilogs[msg.knownState.multilogID]; + + if (!entry || entry.state === "loading") { + throw new Error( + "Expected multilog entry to be created, missing subscribe?" + ); + } + + const newContent = entry.multilog.newContentSince(msg.knownState); + peer.optimisticKnownStates[msg.knownState.multilogID] = + combinedKnownStates(msg.knownState, entry.multilog.knownState()); + + return newContent ? [newContent] : []; + } + + handleNewContent(msg: NewContentMessage): SyncMessage[] { + let entry = this.multilogs[msg.multilogID]; + + if (!entry) { + throw new Error( + "Expected multilog entry to be created, missing subscribe?" + ); + } + + let resolveAfterDone: ((multilog: MultiLog) => void) | undefined; + + if (entry.state === "loading") { + if (!msg.header) { + throw new Error("Expected header to be sent in first message"); + } + + const multilog = new MultiLog(msg.header, this); + + resolveAfterDone = entry.resolve; + + entry = { + state: "loaded", + multilog, + }; + + this.multilogs[msg.multilogID] = entry; + } + + const multilog = entry.multilog; + + let invalidStateAssumed = false; + + for (const sessionID of Object.keys(msg.newContent) as SessionID[]) { + const ourKnownTxIdx = + multilog.sessions[sessionID]?.transactions.length; + const theirFirstNewTxIdx = msg.newContent[sessionID].after; + + if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) { + invalidStateAssumed = true; + continue; + } + + const alreadyKnownOffset = ourKnownTxIdx + ? ourKnownTxIdx - theirFirstNewTxIdx + : 0; + + const newTransactions = + msg.newContent[sessionID].newTransactions.slice( + alreadyKnownOffset + ); + + const success = multilog.tryAddTransactions( + sessionID, + newTransactions, + msg.newContent[sessionID].lastHash, + msg.newContent[sessionID].lastSignature + ); + + if (!success) { + console.error("Failed to add transactions", newTransactions); + continue; + } + } + + if (resolveAfterDone) { + resolveAfterDone(multilog); + } + + return invalidStateAssumed + ? [ + { + action: "wrongAssumedKnownState", + knownState: multilog.knownState(), + }, + ] + : []; } handleWrongAssumedKnownState( msg: WrongAssumedKnownStateMessage, peer: PeerState - ): SyncMessage | undefined { + ): SyncMessage[] { const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID); - peer.optimisticKnownStates[msg.knownState.multilogID] = msg.knownState; + peer.optimisticKnownStates[msg.knownState.multilogID] = + combinedKnownStates(msg.knownState, multilog.knownState()); - return multilog.newContentSince(msg.knownState); + const newContent = multilog.newContentSince(msg.knownState); + + return newContent ? [newContent] : []; } - handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined { - return undefined; + handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage[] { + throw new Error("Method not implemented."); } async syncMultiLog(multilog: MultiLog) { @@ -203,7 +351,21 @@ export class LocalNode { const newContent = multilog.newContentSince(optimisticKnownState); - peer.optimisticKnownStates[multilog.id] = multilog.knownState(); + peer.optimisticKnownStates[multilog.id] = peer + .optimisticKnownStates[multilog.id] + ? combinedKnownStates( + peer.optimisticKnownStates[multilog.id], + multilog.knownState() + ) + : multilog.knownState(); + + if (!optimisticKnownState && peer.role === "server") { + // auto-subscribe + await peer.outgoing.write({ + action: "subscribe", + knownState: multilog.knownState(), + }); + } if (newContent) { await peer.outgoing.write(newContent); @@ -220,16 +382,19 @@ export class LocalNode { newNode.multilogs = Object.fromEntries( Object.entries(this.multilogs) - .map(([id, multilog]) => { - if (multilog instanceof Promise) { - return [id, undefined]; + .map(([id, entry]) => { + if (entry.state === "loading") { + return undefined; } - const newMultilog = new MultiLog(multilog.header, newNode); + const newMultilog = new MultiLog( + entry.multilog.header, + newNode + ); - newMultilog.sessions = multilog.sessions; + newMultilog.sessions = entry.multilog.sessions; - return [id, newMultilog]; + return [id, { state: "loaded", multilog: newMultilog }]; }) .filter((x): x is Exclude => !!x) ); @@ -242,3 +407,11 @@ export class LocalNode { return newNode; } } + +type MultilogState = + | { + state: "loading"; + done: Promise; + resolve: (multilog: MultiLog) => void; + } + | { state: "loaded"; multilog: MultiLog }; diff --git a/src/permissions.ts b/src/permissions.ts index 1f3253255..9d1ce83b7 100644 --- a/src/permissions.ts +++ b/src/permissions.ts @@ -156,7 +156,8 @@ export function determineValidTransactions( } else if (multilog.header.ruleset.type === "ownedByTeam") { const teamContent = multilog.node.expectMultiLogLoaded( - multilog.header.ruleset.team + multilog.header.ruleset.team, + "Determining valid transaction in owned object but its team wasn't loaded" ).getCurrentContent(); if (teamContent.type !== "comap") { @@ -193,8 +194,11 @@ export function determineValidTransactions( })); } ); + } else if (multilog.header.ruleset.type === "agent") { + // TODO + return []; } else { - throw new Error("Unknown ruleset type " + multilog.header.ruleset.type); + throw new Error("Unknown ruleset type " + (multilog.header.ruleset as any).type); } } diff --git a/src/sync.test.ts b/src/sync.test.ts index 530b8368f..f386f87ad 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -7,7 +7,7 @@ import { } from "./multilog"; import { LocalNode } from "./node"; import { SyncMessage } from "./sync"; -import { MapOpPayload } from "./coValue"; +import { MapOpPayload, expectMap } from "./coValue"; test( "Node replies with initial tx and header to empty subscribe", @@ -38,7 +38,7 @@ test( const writer = inTx.getWriter(); await writer.write({ - type: "subscribe", + action: "subscribe", knownState: { multilogID: map.multiLog.id, header: false, @@ -51,7 +51,14 @@ test( const firstMessage = await reader.read(); expect(firstMessage.value).toEqual({ - type: "newContent", + action: "subscribeResponse", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); + + const secondMessage = await reader.read(); + + expect(secondMessage.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: { type: "comap", @@ -114,7 +121,7 @@ test("Node replies with only new tx to subscribe with some known state", async ( const writer = inTx.getWriter(); await writer.write({ - type: "subscribe", + action: "subscribe", knownState: { multilogID: map.multiLog.id, header: true, @@ -126,10 +133,17 @@ test("Node replies with only new tx to subscribe with some known state", async ( const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const msg1 = await reader.read(); - expect(firstMessage.value).toEqual({ - type: "newContent", + expect(msg1.value).toEqual({ + action: "subscribeResponse", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); + + const msg2 = await reader.read(); + + expect(msg2.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: undefined, newContent: { @@ -157,7 +171,7 @@ test("Node replies with only new tx to subscribe with some known state", async ( } satisfies SyncMessage); }); -test("After subscribing, node sends new txs to peer", async () => { +test("After subscribing, node sends own known state and new txs to peer", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -180,7 +194,7 @@ test("After subscribing, node sends new txs to peer", async () => { const writer = inTx.getWriter(); await writer.write({ - type: "subscribe", + action: "subscribe", knownState: { multilogID: map.multiLog.id, header: false, @@ -192,10 +206,17 @@ test("After subscribing, node sends new txs to peer", async () => { const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const msg1 = await reader.read(); - expect(firstMessage.value).toEqual({ - type: "newContent", + expect(msg1.value).toEqual({ + action: "subscribeResponse", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); + + const msg2 = await reader.read(); + + expect(msg2.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: map.multiLog.header, newContent: {}, @@ -205,10 +226,10 @@ test("After subscribing, node sends new txs to peer", async () => { editable.set("hello", "world", "trusting"); }); - const secondMessage = await reader.read(); + const msg3 = await reader.read(); - expect(secondMessage.value).toEqual({ - type: "newContent", + expect(msg3.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, newContent: { [node.ownSessionID]: { @@ -238,10 +259,10 @@ test("After subscribing, node sends new txs to peer", async () => { editable.set("goodbye", "world", "trusting"); }); - const thirdMessage = await reader.read(); + const msg4 = await reader.read(); - expect(thirdMessage.value).toEqual({ - type: "newContent", + expect(msg4.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, newContent: { [node.ownSessionID]: { @@ -268,6 +289,76 @@ test("After subscribing, node sends new txs to peer", async () => { } satisfies SyncMessage); }); +test("Client replies with known new content to subscribeResponse from server", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "peer", + }); + + const writer = inTx.getWriter(); + + await writer.write({ + action: "subscribeResponse", + knownState: { + multilogID: map.multiLog.id, + header: false, + sessions: { + [node.ownSessionID]: 0, + }, + }, + }); + + const reader = outRx.getReader(); + + const msg1 = await reader.read(); + + expect(msg1.value).toEqual({ + action: "newContent", + multilogID: map.multiLog.id, + header: map.multiLog.header, + newContent: { + [node.ownSessionID]: { + after: 0, + newTransactions: [ + { + privacy: "trusting", + madeAt: map.multiLog.sessions[node.ownSessionID] + .transactions[0].madeAt, + changes: [ + { + op: "insert", + key: "hello", + value: "world", + } satisfies MapOpPayload, + ], + }, + ], + lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!, + lastSignature: + map.multiLog.sessions[node.ownSessionID].lastSignature!, + }, + }, + } satisfies SyncMessage); +}); + test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -291,7 +382,7 @@ test("No matter the optimistic known state, node respects invalid known state me const writer = inTx.getWriter(); await writer.write({ - type: "subscribe", + action: "subscribe", knownState: { multilogID: map.multiLog.id, header: false, @@ -303,7 +394,8 @@ test("No matter the optimistic known state, node respects invalid known state me const reader = outRx.getReader(); - const _firstMessage = await reader.read(); + const _msg1 = await reader.read(); + const _msg2 = await reader.read(); map.edit((editable) => { editable.set("hello", "world", "trusting"); @@ -313,11 +405,11 @@ test("No matter the optimistic known state, node respects invalid known state me editable.set("goodbye", "world", "trusting"); }); - const _secondMessage = await reader.read(); - const _thirdMessage = await reader.read(); + const _msg3 = await reader.read(); + const _msg4 = await reader.read(); await writer.write({ - type: "wrongAssumedKnownState", + action: "wrongAssumedKnownState", knownState: { multilogID: map.multiLog.id, header: true, @@ -327,10 +419,10 @@ test("No matter the optimistic known state, node respects invalid known state me }, } satisfies SyncMessage); - const fourthMessage = await reader.read(); + const msg5 = await reader.read(); - expect(fourthMessage.value).toEqual({ - type: "newContent", + expect(msg5.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: undefined, newContent: { @@ -412,12 +504,24 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even }); const reader = outRx.getReader(); - const _initialSyncMessage = await reader.read(); + const _adminSubscribeMsg = await reader.read(); + const _teamSubscribeMsg = await reader.read(); - const firstMessage = await reader.read(); + const subscribeMsg = await reader.read(); - expect(firstMessage.value).toEqual({ - type: "newContent", + expect(subscribeMsg.value).toEqual({ + action: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: true, + sessions: {}, + }, + } satisfies SyncMessage); + + const newContentMsg = await reader.read(); + + expect(newContentMsg.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: map.multiLog.header, newContent: { @@ -445,7 +549,7 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even } satisfies SyncMessage); }); -test("If we add a server peer, it even receives just headers of newly created multilogs", async () => { +test("If we add a server peer, newly created multilogs are auto-subscribed to", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -466,20 +570,30 @@ test("If we add a server peer, it even receives just headers of newly created mu }); const reader = outRx.getReader(); - const _initialSyncMessage = await reader.read(); + const _initialMsg1 = await reader.read(); + const _initialMsg2 = await reader.read(); const map = team.createMap(); - const firstMessage = await reader.read(); + const msg1 = await reader.read(); - expect(firstMessage.value).toEqual({ - type: "newContent", + expect(msg1.value).toEqual({ + action: "subscribe", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); + + const msg2 = await reader.read(); + + expect(msg2.value).toEqual({ + action: "newContent", multilogID: map.multiLog.id, header: map.multiLog.header, newContent: {}, } satisfies SyncMessage); }); +test.skip("TODO: when receiving a subscribe response that is behind our optimistic state (due to already sent content), we ignore it", () => {}); + test("When we connect a new server peer, we try to sync all existing multilogs to it", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -502,21 +616,143 @@ test("When we connect a new server peer, we try to sync all existing multilogs t const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const _adminSubscribeMessage = await reader.read(); + const teamSubscribeMessage = await reader.read(); - expect(firstMessage.value).toEqual({ - type: "subscribe", + expect(teamSubscribeMessage.value).toEqual({ + action: "subscribe", knownState: team.teamMap.multiLog.knownState(), } satisfies SyncMessage); const secondMessage = await reader.read(); expect(secondMessage.value).toEqual({ - type: "subscribe", + action: "subscribe", knownState: map.multiLog.knownState(), } satisfies SyncMessage); }); +test("When receiving a subscribe with a known state that is ahead of our own, peers should respond with a corresponding subscribe response message", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node.createTeam(); + + const map = team.createMap(); + + const [inRx, inTx] = newStreamPair(); + const [outRx, outTx] = newStreamPair(); + + node.addPeer({ + id: "test", + incoming: inRx, + outgoing: outTx, + role: "peer", + }); + + const writer = inTx.getWriter(); + + await writer.write({ + action: "subscribe", + knownState: { + multilogID: map.multiLog.id, + header: true, + sessions: { + [node.ownSessionID]: 1, + }, + }, + }); + + const reader = outRx.getReader(); + + const firstMessage = await reader.read(); + + expect(firstMessage.value).toEqual({ + action: "subscribeResponse", + knownState: map.multiLog.knownState(), + } satisfies SyncMessage); +}); + +test("When replaying creation and transactions of a multilog as new content, the receiving peer integrates this information", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node1 = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node1.createTeam(); + + const [inRx1, inTx1] = newStreamPair(); + const [outRx1, outTx1] = newStreamPair(); + + node1.addPeer({ + id: "test2", + incoming: inRx1, + outgoing: outTx1, + role: "server", + }); + + const reader1 = outRx1.getReader(); + + const _adminSubscriptionMsg = await reader1.read(); + const teamSubscribeMsg = await reader1.read(); + + const map = team.createMap(); + + const mapSubscriptionMsg = await reader1.read(); + const mapNewContentMsg = await reader1.read(); + + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const mapEditMsg = await reader1.read(); + + const node2 = new LocalNode(admin, newRandomSessionID(adminID)); + + const [inRx2, inTx2] = newStreamPair(); + const [outRx2, outTx2] = newStreamPair(); + + node2.addPeer({ + id: "test1", + incoming: inRx2, + outgoing: outTx2, + role: "client", + }); + + const writer2 = inTx2.getWriter(); + const reader2 = outRx2.getReader(); + + await writer2.write(teamSubscribeMsg.value); + const teamSubscribeResponseMsg = await reader2.read(); + + expect(node2.multilogs[team.teamMap.multiLog.id]?.state).toEqual("loading"); + + const writer1 = inTx1.getWriter(); + + await writer1.write(teamSubscribeResponseMsg.value); + const teamContentMsg = await reader1.read(); + + await writer2.write(teamContentMsg.value); + + await writer2.write(mapSubscriptionMsg.value); + const _mapSubscribeResponseMsg = await reader2.read(); + await writer2.write(mapNewContentMsg.value); + + expect(node2.multilogs[map.multiLog.id]?.state).toEqual("loading"); + + await writer2.write(mapEditMsg.value); + + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect( + expectMap( + node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent() + ).get("hello") + ).toEqual("world"); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; diff --git a/src/sync.ts b/src/sync.ts index ae187d36f..cf8367f01 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -1,4 +1,4 @@ -import { Hash } from "./crypto"; +import { Hash, Signature } from "./crypto"; import { MultiLogHeader, MultiLogID, SessionID, Transaction } from "./multilog"; export type MultiLogKnownState = { @@ -9,17 +9,23 @@ export type MultiLogKnownState = { export type SyncMessage = | SubscribeMessage + | SubscribeResponseMessage | NewContentMessage | WrongAssumedKnownStateMessage | UnsubscribeMessage; export type SubscribeMessage = { - type: "subscribe"; + action: "subscribe"; + knownState: MultiLogKnownState; +}; + +export type SubscribeResponseMessage = { + action: "subscribeResponse"; knownState: MultiLogKnownState; }; export type NewContentMessage = { - type: "newContent"; + action: "newContent"; multilogID: MultiLogID; header?: MultiLogHeader; newContent: { @@ -30,17 +36,18 @@ export type NewContentMessage = { export type SessionNewContent = { after: number; newTransactions: Transaction[]; + // TODO: is lastHash needed here? lastHash: Hash; - lastSignature: string; -} + lastSignature: Signature; +}; export type WrongAssumedKnownStateMessage = { - type: "wrongAssumedKnownState"; + action: "wrongAssumedKnownState"; knownState: MultiLogKnownState; }; export type UnsubscribeMessage = { - type: "unsubscribe"; + action: "unsubscribe"; multilogID: MultiLogID; }; @@ -50,13 +57,57 @@ export interface Peer { id: PeerID; incoming: ReadableStream; outgoing: WritableStream; - role: 'peer' | 'server' | 'client'; + role: "peer" | "server" | "client"; } export interface PeerState { id: PeerID; - optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState}; + optimisticKnownStates: { [multilogID: MultiLogID]: MultiLogKnownState }; incoming: ReadableStream; outgoing: WritableStreamDefaultWriter; - role: 'peer' | 'server' | 'client'; + role: "peer" | "server" | "client"; +} + +export function weAreStrictlyAhead( + ourKnownState: MultiLogKnownState, + theirKnownState: MultiLogKnownState +): boolean { + if (theirKnownState.header && !ourKnownState.header) { + return false; + } + + const allSessions = new Set([ + ...(Object.keys(ourKnownState.sessions) as SessionID[]), + ...(Object.keys(theirKnownState.sessions) as SessionID[]), + ]); + + for (const sessionID of allSessions) { + const ourSession = ourKnownState.sessions[sessionID]; + const theirSession = theirKnownState.sessions[sessionID]; + + if ((ourSession || 0) < (theirSession || 0)) { + return false; + } + } + + return true; +} + +export function combinedKnownStates(stateA: MultiLogKnownState, stateB: MultiLogKnownState): MultiLogKnownState { + const sessionStates: MultiLogKnownState["sessions"] = {}; + + const allSessions = new Set([...Object.keys(stateA.sessions), ...Object.keys(stateB.sessions)] as SessionID[]); + + for (const sessionID of allSessions) { + const stateAValue = stateA.sessions[sessionID]; + const stateBValue = stateB.sessions[sessionID]; + + sessionStates[sessionID] = Math.max(stateAValue || 0, stateBValue || 0); + } + + return { + multilogID: stateA.multilogID, + header: stateA.header || stateB.header, + sessions: sessionStates, + }; } \ No newline at end of file diff --git a/tsconfig.json b/tsconfig.json index 29f8aa003..91aa71fb6 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -16,6 +16,8 @@ "noEmit": true, "types": [ "bun-types" // add Bun global - ] + ], + + // "noUncheckedIndexedAccess": true } } From a5f4bbf3dc3883a8f84556f1376ae38d7119cebb Mon Sep 17 00:00:00 2001 From: Anselm Date: Wed, 2 Aug 2023 16:13:42 +0100 Subject: [PATCH 7/7] Ensure depended on multilogs are loaded/sent first --- src/multilog.ts | 4 ++ src/node.ts | 114 +++++++++++++++++++++++++++++------ src/sync.test.ts | 150 +++++++++++++++++++++++++++++++++++++++-------- src/sync.ts | 1 + 4 files changed, 226 insertions(+), 43 deletions(-) diff --git a/src/multilog.ts b/src/multilog.ts index 99cf430d0..a7282aa76 100644 --- a/src/multilog.ts +++ b/src/multilog.ts @@ -501,6 +501,10 @@ export function getAgentID(agent: Agent): AgentID { )}`; } +export function agentIDasMultiLogID(agentID: AgentID): MultiLogID { + return `coval_${agentID.substring("agent_".length)}`; +} + export type AgentCredential = { signatorySecret: SignatorySecret; recipientSecret: RecipientSecret; diff --git a/src/node.ts b/src/node.ts index 5cc179978..c824702c5 100644 --- a/src/node.ts +++ b/src/node.ts @@ -12,6 +12,7 @@ import { getAgentMultilogHeader, MultiLogHeader, agentIDfromSessionID, + agentIDasMultiLogID, } from "./multilog"; import { Team, expectTeamContent } from "./permissions"; import { @@ -61,10 +62,40 @@ export class LocalNode { return multilog; } + loadMultiLog(id: MultiLogID): Promise { + let entry = this.multilogs[id]; + if (!entry) { + entry = newLoadingState(); + + this.multilogs[id] = entry; + + for (const peer of Object.values(this.peers)) { + peer.outgoing + .write({ + action: "subscribe", + knownState: { + multilogID: id, + header: false, + sessions: {}, + }, + }) + .catch((e) => { + console.error("Error writing to peer", e); + }); + } + } + if (entry.state === "loaded") { + return Promise.resolve(entry.multilog); + } + return entry.done; + } + expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog { const entry = this.multilogs[id]; if (!entry) { - throw new Error(`Unknown multilog ${id}`); + throw new Error( + `${expectation ? expectation + ": " : ""}Unknown multilog ${id}` + ); } if (entry.state === "loading") { throw new Error( @@ -183,22 +214,16 @@ export class LocalNode { } } - handleSubscribe(msg: SubscribeMessage, peer: PeerState): SyncMessage[] { + handleSubscribe( + msg: SubscribeMessage, + peer: PeerState, + asDependencyOf?: MultiLogID + ): SyncMessage[] { const entry = this.multilogs[msg.knownState.multilogID]; if (!entry || entry.state === "loading") { if (!entry) { - let resolve: (multilog: MultiLog) => void; - - const promise = new Promise((r) => { - resolve = r; - }); - - this.multilogs[msg.knownState.multilogID] = { - state: "loading", - done: promise, - resolve: resolve!, - }; + this.multilogs[msg.knownState.multilogID] = newLoadingState(); } return [ @@ -218,10 +243,35 @@ export class LocalNode { const newContent = entry.multilog.newContentSince(msg.knownState); + const dependedOnMultilogs = + entry.multilog.header.ruleset.type === "team" + ? expectTeamContent(entry.multilog.getCurrentContent()) + .keys() + .filter((k): k is AgentID => k.startsWith("agent_")) + .map((agent) => agentIDasMultiLogID(agent)) + : entry.multilog.header.ruleset.type === "ownedByTeam" + ? [entry.multilog.header.ruleset.team] + : []; + return [ + ...dependedOnMultilogs.flatMap((multilogID) => + this.handleSubscribe( + { + action: "subscribe", + knownState: { + multilogID, + header: false, + sessions: {}, + }, + }, + peer, + asDependencyOf || msg.knownState.multilogID + ) + ), { action: "subscribeResponse", knownState: entry.multilog.knownState(), + asDependencyOf, }, ...(newContent ? [newContent] : []), ]; @@ -231,12 +281,26 @@ export class LocalNode { msg: SubscribeResponseMessage, peer: PeerState ): SyncMessage[] { - const entry = this.multilogs[msg.knownState.multilogID]; + let entry = this.multilogs[msg.knownState.multilogID]; - if (!entry || entry.state === "loading") { - throw new Error( - "Expected multilog entry to be created, missing subscribe?" - ); + if (!entry) { + if (msg.asDependencyOf) { + if (this.multilogs[msg.asDependencyOf]) { + entry = newLoadingState(); + + this.multilogs[msg.knownState.multilogID] = entry; + } + } else { + throw new Error( + "Expected multilog entry to be created, missing subscribe?" + ); + } + } + + if (entry.state === "loading") { + peer.optimisticKnownStates[msg.knownState.multilogID] = + msg.knownState; + return []; } const newContent = entry.multilog.newContentSince(msg.knownState); @@ -415,3 +479,17 @@ type MultilogState = resolve: (multilog: MultiLog) => void; } | { state: "loaded"; multilog: MultiLog }; + +function newLoadingState(): MultilogState { + let resolve: (multilog: MultiLog) => void; + + const promise = new Promise((r) => { + resolve = r; + }); + + return { + state: "loading", + done: promise, + resolve: resolve!, + }; +} diff --git a/src/sync.test.ts b/src/sync.test.ts index f386f87ad..1c7edbea3 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -6,7 +6,7 @@ import { newRandomSessionID, } from "./multilog"; import { LocalNode } from "./node"; -import { SyncMessage } from "./sync"; +import { Peer, SyncMessage } from "./sync"; import { MapOpPayload, expectMap } from "./coValue"; test( @@ -48,16 +48,21 @@ test( const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(firstMessage.value).toEqual({ + const subscribeResponseMsg = await reader.read(); + + expect(subscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const secondMessage = await reader.read(); + const newContentMsg = await reader.read(); - expect(secondMessage.value).toEqual({ + expect(newContentMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: { @@ -133,16 +138,21 @@ test("Node replies with only new tx to subscribe with some known state", async ( const reader = outRx.getReader(); - const msg1 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(msg1.value).toEqual({ + const mapSubscribeResponseMsg = await reader.read(); + + expect(mapSubscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const msg2 = await reader.read(); + const mapNewContentMsg = await reader.read(); - expect(msg2.value).toEqual({ + expect(mapNewContentMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: undefined, @@ -171,6 +181,8 @@ test("Node replies with only new tx to subscribe with some known state", async ( } satisfies SyncMessage); }); +test.skip("TODO: node only replies with new tx to subscribe with some known state, even in the depended on multilogs", () => {}); + test("After subscribing, node sends own known state and new txs to peer", async () => { const admin = newRandomAgentCredential(); const adminID = getAgentID(getAgent(admin)); @@ -206,16 +218,21 @@ test("After subscribing, node sends own known state and new txs to peer", async const reader = outRx.getReader(); - const msg1 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); - expect(msg1.value).toEqual({ + const mapSubscribeResponseMsg = await reader.read(); + + expect(mapSubscribeResponseMsg.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); - const msg2 = await reader.read(); + const mapNewContentHeaderOnlyMsg = await reader.read(); - expect(msg2.value).toEqual({ + expect(mapNewContentHeaderOnlyMsg.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: map.multiLog.header, @@ -226,9 +243,9 @@ test("After subscribing, node sends own known state and new txs to peer", async editable.set("hello", "world", "trusting"); }); - const msg3 = await reader.read(); + const mapEditMsg1 = await reader.read(); - expect(msg3.value).toEqual({ + expect(mapEditMsg1.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, newContent: { @@ -259,9 +276,9 @@ test("After subscribing, node sends own known state and new txs to peer", async editable.set("goodbye", "world", "trusting"); }); - const msg4 = await reader.read(); + const mapEditMsg2 = await reader.read(); - expect(msg4.value).toEqual({ + expect(mapEditMsg2.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, newContent: { @@ -394,8 +411,12 @@ test("No matter the optimistic known state, node respects invalid known state me const reader = outRx.getReader(); - const _msg1 = await reader.read(); - const _msg2 = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); + const _mapSubscribeResponseMsg = await reader.read(); + const _mapNewContentHeaderOnlyMsg = await reader.read(); map.edit((editable) => { editable.set("hello", "world", "trusting"); @@ -405,8 +426,8 @@ test("No matter the optimistic known state, node respects invalid known state me editable.set("goodbye", "world", "trusting"); }); - const _msg3 = await reader.read(); - const _msg4 = await reader.read(); + const _mapEditMsg1 = await reader.read(); + const _mapEditMsg2 = await reader.read(); await writer.write({ action: "wrongAssumedKnownState", @@ -419,9 +440,9 @@ test("No matter the optimistic known state, node respects invalid known state me }, } satisfies SyncMessage); - const msg5 = await reader.read(); + const newContentAfterWrongAssumedState = await reader.read(); - expect(msg5.value).toEqual({ + expect(newContentAfterWrongAssumedState.value).toEqual({ action: "newContent", multilogID: map.multiLog.id, header: undefined, @@ -667,9 +688,13 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe const reader = outRx.getReader(); - const firstMessage = await reader.read(); + const _adminSubscribeResponseMsg = await reader.read(); + const _adminNewContentMsg = await reader.read(); + const _teamSubscribeResponseMsg = await reader.read(); + const _teamNewContentMsg = await reader.read(); + const mapSubscribeResponse = await reader.read(); - expect(firstMessage.value).toEqual({ + expect(mapSubscribeResponse.value).toEqual({ action: "subscribeResponse", knownState: map.multiLog.knownState(), } satisfies SyncMessage); @@ -753,6 +778,35 @@ test("When replaying creation and transactions of a multilog as new content, the ).toEqual("world"); }); +test("When loading a multilog on one node, the server node it is requested from replies with all the necessary depended on multilogs to make it work", async () => { + const admin = newRandomAgentCredential(); + const adminID = getAgentID(getAgent(admin)); + + const node1 = new LocalNode(admin, newRandomSessionID(adminID)); + + const team = node1.createTeam(); + + const map = team.createMap(); + map.edit((editable) => { + editable.set("hello", "world", "trusting"); + }); + + const node2 = new LocalNode(admin, newRandomSessionID(adminID)); + + const [node2asPeer, node1asPeer] = connectedPeers(); + + node1.addPeer(node2asPeer); + node2.addPeer(node1asPeer); + + await node2.loadMultiLog(map.multiLog.id); + + expect( + expectMap( + node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent() + ).get("hello") + ).toEqual("world"); +}); + function newStreamPair(): [ReadableStream, WritableStream] { const queue: T[] = []; let resolveNextItemReady: () => void = () => {}; @@ -803,3 +857,49 @@ function shouldNotResolve(promise: Promise, ops: { timeout: number }) { setTimeout(resolve, ops.timeout); }); } + +function connectedPeers(trace?: boolean): [Peer, Peer] { + const [inRx1, inTx1] = newStreamPair(); + const [outRx1, outTx1] = newStreamPair(); + + const [inRx2, inTx2] = newStreamPair(); + const [outRx2, outTx2] = newStreamPair(); + + outRx2 + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + trace && console.log("peer 2 -> peer 1", chunk); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx1); + + outRx1 + .pipeThrough( + new TransformStream({ + transform(chunk, controller) { + trace && console.log("peer 1 -> peer 2", chunk); + controller.enqueue(chunk); + }, + }) + ) + .pipeTo(inTx2); + + const peer2AsPeer: Peer = { + id: "test2", + incoming: inRx1, + outgoing: outTx1, + role: "peer", + }; + + const peer1AsPeer: Peer = { + id: "test1", + incoming: inRx2, + outgoing: outTx2, + role: "peer", + }; + + return [peer2AsPeer, peer1AsPeer]; +} diff --git a/src/sync.ts b/src/sync.ts index cf8367f01..276d84bbe 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -22,6 +22,7 @@ export type SubscribeMessage = { export type SubscribeResponseMessage = { action: "subscribeResponse"; knownState: MultiLogKnownState; + asDependencyOf?: MultiLogID; }; export type NewContentMessage = {