From 4336d2aa7dd7acbdf3ad5ede5a0fcf307771680c Mon Sep 17 00:00:00 2001 From: Anselm Date: Mon, 14 Aug 2023 18:27:01 +0100 Subject: [PATCH] Review sync message properties and names --- src/coValue.ts | 6 +-- src/sync.test.ts | 123 ++++++++++++++++++++++++----------------------- src/sync.ts | 77 ++++++++++++++--------------- 3 files changed, 102 insertions(+), 104 deletions(-) diff --git a/src/coValue.ts b/src/coValue.ts index f440002c2..afea7cbeb 100644 --- a/src/coValue.ts +++ b/src/coValue.ts @@ -491,10 +491,10 @@ export class CoValue { knownState: CoValueKnownState | undefined ): NewContentMessage | undefined { const newContent: NewContentMessage = { - action: "newContent", + action: "content", id: this.id, header: knownState?.header ? undefined : this.header, - newContent: Object.fromEntries( + new: Object.fromEntries( Object.entries(this.sessions) .map(([sessionID, log]) => { const newTransactions = log.transactions.slice( @@ -528,7 +528,7 @@ export class CoValue { if ( !newContent.header && - Object.keys(newContent.newContent).length === 0 + Object.keys(newContent.new).length === 0 ) { return undefined; } diff --git a/src/sync.test.ts b/src/sync.test.ts index 1d6528172..d85143d15 100644 --- a/src/sync.test.ts +++ b/src/sync.test.ts @@ -37,7 +37,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => { const writer = inTx.getWriter(); await writer.write({ - action: "subscribe", + action: "load", id: map.coValue.id, header: false, sessions: {}, @@ -50,7 +50,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => { const mapTellKnownStateMsg = await reader.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -60,7 +60,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => { const newContentMsg = await reader.read(); expect(newContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: { type: "comap", @@ -69,7 +69,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => { createdAt: map.coValue.header.createdAt, uniqueness: map.coValue.header.uniqueness, }, - newContent: { + new: { [node.ownSessionID]: { after: 0, newTransactions: [ @@ -120,7 +120,7 @@ test("Node replies with only new tx to subscribe with some known state", async ( const writer = inTx.getWriter(); await writer.write({ - action: "subscribe", + action: "load", id: map.coValue.id, header: true, sessions: { @@ -135,7 +135,7 @@ test("Node replies with only new tx to subscribe with some known state", async ( const mapTellKnownStateMsg = await reader.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -145,10 +145,10 @@ test("Node replies with only new tx to subscribe with some known state", async ( const mapNewContentMsg = await reader.read(); expect(mapNewContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: undefined, - newContent: { + new: { [node.ownSessionID]: { after: 1, newTransactions: [ @@ -198,7 +198,7 @@ test("After subscribing, node sends own known state and new txs to peer", async const writer = inTx.getWriter(); await writer.write({ - action: "subscribe", + action: "load", id: map.coValue.id, header: false, sessions: { @@ -213,7 +213,7 @@ test("After subscribing, node sends own known state and new txs to peer", async const mapTellKnownStateMsg = await reader.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -223,10 +223,10 @@ test("After subscribing, node sends own known state and new txs to peer", async const mapNewContentHeaderOnlyMsg = await reader.read(); expect(mapNewContentHeaderOnlyMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); map.edit((editable) => { @@ -236,9 +236,9 @@ test("After subscribing, node sends own known state and new txs to peer", async const mapEditMsg1 = await reader.read(); expect(mapEditMsg1.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, - newContent: { + new: { [node.ownSessionID]: { after: 0, newTransactions: [ @@ -269,9 +269,9 @@ test("After subscribing, node sends own known state and new txs to peer", async const mapEditMsg2 = await reader.read(); expect(mapEditMsg2.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, - newContent: { + new: { [node.ownSessionID]: { after: 1, newTransactions: [ @@ -325,7 +325,7 @@ test("Client replies with known new content to tellKnownState from server", asyn const writer = inTx.getWriter(); await writer.write({ - action: "tellKnownState", + action: "known", id: map.coValue.id, header: false, sessions: { @@ -338,7 +338,7 @@ test("Client replies with known new content to tellKnownState from server", asyn const mapTellKnownStateMsg = await reader.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -348,10 +348,10 @@ test("Client replies with known new content to tellKnownState from server", asyn const mapNewContentMsg = await reader.read(); expect(mapNewContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: { + new: { [node.ownSessionID]: { after: 0, newTransactions: [ @@ -397,7 +397,7 @@ test("No matter the optimistic known state, node respects invalid known state me const writer = inTx.getWriter(); await writer.write({ - action: "subscribe", + action: "load", id: map.coValue.id, header: false, sessions: { @@ -412,7 +412,7 @@ test("No matter the optimistic known state, node respects invalid known state me const mapTellKnownStateMsg = await reader.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -422,10 +422,10 @@ test("No matter the optimistic known state, node respects invalid known state me const mapNewContentHeaderOnlyMsg = await reader.read(); expect(mapNewContentHeaderOnlyMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); map.edit((editable) => { @@ -440,7 +440,8 @@ test("No matter the optimistic known state, node respects invalid known state me const _mapEditMsg2 = await reader.read(); await writer.write({ - action: "wrongAssumedKnownState", + action: "known", + isCorrection: true, id: map.coValue.id, header: true, sessions: { @@ -451,10 +452,10 @@ test("No matter the optimistic known state, node respects invalid known state me const newContentAfterWrongAssumedState = await reader.read(); expect(newContentAfterWrongAssumedState.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: undefined, - newContent: { + new: { [node.ownSessionID]: { after: 1, newTransactions: [ @@ -528,18 +529,18 @@ test("If we add a server peer, all updates to all coValues are sent to it, even const reader = outRx.getReader(); // expect((await reader.read()).value).toMatchObject({ - // action: "subscribe", + // action: "load", // id: adminID, // }); expect((await reader.read()).value).toMatchObject({ - action: "subscribe", + action: "load", id: team.teamMap.coValue.id, }); const mapSubscribeMsg = await reader.read(); expect(mapSubscribeMsg.value).toEqual({ - action: "subscribe", + action: "load", id: map.coValue.id, header: true, sessions: {}, @@ -555,10 +556,10 @@ test("If we add a server peer, all updates to all coValues are sent to it, even const mapNewContentMsg = await reader.read(); expect(mapNewContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: { + new: { [node.ownSessionID]: { after: 0, newTransactions: [ @@ -601,11 +602,11 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a const reader = outRx.getReader(); // expect((await reader.read()).value).toMatchObject({ - // action: "subscribe", + // action: "load", // id: admin.id, // }); expect((await reader.read()).value).toMatchObject({ - action: "subscribe", + action: "load", id: team.teamMap.coValue.id, }); @@ -614,7 +615,7 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a const mapSubscribeMsg = await reader.read(); expect(mapSubscribeMsg.value).toEqual({ - action: "subscribe", + action: "load", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -624,10 +625,10 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a const mapContentMsg = await reader.read(); expect(mapContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); }); @@ -659,14 +660,14 @@ test("When we connect a new server peer, we try to sync all existing coValues to const teamSubscribeMessage = await reader.read(); expect(teamSubscribeMessage.value).toEqual({ - action: "subscribe", + action: "load", ...team.teamMap.coValue.knownState(), } satisfies SyncMessage); const secondMessage = await reader.read(); expect(secondMessage.value).toEqual({ - action: "subscribe", + action: "load", ...map.coValue.knownState(), } satisfies SyncMessage); }); @@ -692,7 +693,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe const writer = inTx.getWriter(); await writer.write({ - action: "subscribe", + action: "load", id: map.coValue.id, header: true, sessions: { @@ -707,7 +708,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe const mapTellKnownState = await reader.read(); expect(mapTellKnownState.value).toEqual({ - action: "tellKnownState", + action: "known", ...map.coValue.knownState(), } satisfies SyncMessage); }); @@ -750,12 +751,12 @@ test.skip("When replaying creation and transactions of a coValue as new content, const adminSubscribeMessage = await from1.read(); expect(adminSubscribeMessage.value).toMatchObject({ - action: "subscribe", + action: "load", id: admin.id, }); const teamSubscribeMsg = await from1.read(); expect(teamSubscribeMsg.value).toMatchObject({ - action: "subscribe", + action: "load", id: team.teamMap.coValue.id, }); @@ -790,23 +791,23 @@ test.skip("When replaying creation and transactions of a coValue as new content, const mapSubscriptionMsg = await from1.read(); expect(mapSubscriptionMsg.value).toMatchObject({ - action: "subscribe", + action: "load", id: map.coValue.id, }); const mapNewContentMsg = await from1.read(); expect(mapNewContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); await to2.write(mapSubscriptionMsg.value!); const mapTellKnownStateMsg = await from2.read(); expect(mapTellKnownStateMsg.value).toEqual({ - action: "tellKnownState", + action: "known", id: map.coValue.id, header: false, sessions: {}, @@ -965,11 +966,11 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async const reader = outRx.getReader(); // expect((await reader.read()).value).toMatchObject({ - // action: "subscribe", + // action: "load", // id: admin.id, // }); expect((await reader.read()).value).toMatchObject({ - action: "subscribe", + action: "load", id: team.teamMap.coValue.id, }); @@ -978,7 +979,7 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async const mapSubscribeMsg = await reader.read(); expect(mapSubscribeMsg.value).toEqual({ - action: "subscribe", + action: "load", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -988,10 +989,10 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async const mapContentMsg = await reader.read(); expect(mapContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); await inTx.abort(); @@ -1019,11 +1020,11 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async const reader = outRx.getReader(); // expect((await reader.read()).value).toMatchObject({ - // action: "subscribe", + // action: "load", // id: admin.id, // }); expect((await reader.read()).value).toMatchObject({ - action: "subscribe", + action: "load", id: team.teamMap.coValue.id, }); @@ -1032,7 +1033,7 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async const mapSubscribeMsg = await reader.read(); expect(mapSubscribeMsg.value).toEqual({ - action: "subscribe", + action: "load", ...map.coValue.knownState(), } satisfies SyncMessage); @@ -1042,10 +1043,10 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async const mapContentMsg = await reader.read(); expect(mapContentMsg.value).toEqual({ - action: "newContent", + action: "content", id: map.coValue.id, header: map.coValue.header, - newContent: {}, + new: {}, } satisfies SyncMessage); reader.releaseLock(); @@ -1097,28 +1098,28 @@ test("If we start loading a coValue before connecting to a peer that has it, it function teamContentEx(team: Team) { return { - action: "newContent", + action: "content", id: team.teamMap.coValue.id, }; } function admContEx(adminID: AccountID) { return { - action: "newContent", + action: "content", id: adminID, }; } function teamStateEx(team: Team) { return { - action: "tellKnownState", + action: "known", id: team.teamMap.coValue.id, }; } function admStateEx(adminID: AccountID) { return { - action: "tellKnownState", + action: "known", id: adminID, }; } diff --git a/src/sync.ts b/src/sync.ts index d65a4930c..2241806c5 100644 --- a/src/sync.ts +++ b/src/sync.ts @@ -25,26 +25,26 @@ export function emptyKnownState(id: RawCoID): CoValueKnownState { } export type SyncMessage = - | SubscribeMessage - | TellKnownStateMessage + | LoadMessage + | KnownStateMessage | NewContentMessage - | WrongAssumedKnownStateMessage - | UnsubscribeMessage; + | DoneMessage; -export type SubscribeMessage = { - action: "subscribe"; +export type LoadMessage = { + action: "load"; } & CoValueKnownState; -export type TellKnownStateMessage = { - action: "tellKnownState"; +export type KnownStateMessage = { + action: "known"; asDependencyOf?: RawCoID; + isCorrection?: boolean; } & CoValueKnownState; export type NewContentMessage = { - action: "newContent"; + action: "content"; id: RawCoID; header?: CoValueHeader; - newContent: { + new: { [sessionID: SessionID]: SessionNewContent; }; }; @@ -56,13 +56,8 @@ export type SessionNewContent = { lastHash: Hash; lastSignature: Signature; }; - -export type WrongAssumedKnownStateMessage = { - action: "wrongAssumedKnownState"; -} & CoValueKnownState; - -export type UnsubscribeMessage = { - action: "unsubscribe"; +export type DoneMessage = { + action: "done"; id: RawCoID; }; @@ -121,7 +116,7 @@ export class SyncManager { for (const peer of Object.values(this.peers)) { peer.outgoing .write({ - action: "subscribe", + action: "load", id: id, header: false, sessions: {}, @@ -135,15 +130,17 @@ export class SyncManager { async handleSyncMessage(msg: SyncMessage, peer: PeerState) { // TODO: validate switch (msg.action) { - case "subscribe": - return await this.handleSubscribe(msg, peer); - case "tellKnownState": - return await this.handleTellKnownState(msg, peer); - case "newContent": + case "load": + return await this.handleLoad(msg, peer); + case "known": + if (msg.isCorrection) { + return await this.handleCorrection(msg, peer); + } else { + return await this.handleKnownState(msg, peer); + } + case "content": return await this.handleNewContent(msg, peer); - case "wrongAssumedKnownState": - return await this.handleWrongAssumedKnownState(msg, peer); - case "unsubscribe": + case "done": return await this.handleUnsubscribe(msg); default: throw new Error( @@ -168,7 +165,7 @@ export class SyncManager { if (entry.state === "loading") { await this.trySendToPeer(peer, { - action: "subscribe", + action: "load", id, header: false, sessions: {}, @@ -185,7 +182,7 @@ export class SyncManager { if (!peer.toldKnownState.has(id)) { peer.toldKnownState.add(id); await this.trySendToPeer(peer, { - action: "subscribe", + action: "load", ...coValue.knownState(), }); } @@ -208,7 +205,7 @@ export class SyncManager { if (!peer.toldKnownState.has(id)) { await this.trySendToPeer(peer, { - action: "tellKnownState", + action: "known", asDependencyOf, ...coValue.knownState(), }); @@ -295,7 +292,7 @@ export class SyncManager { }); } - async handleSubscribe(msg: SubscribeMessage, peer: PeerState) { + async handleLoad(msg: LoadMessage, peer: PeerState) { const entry = this.local.coValues[msg.id]; if (!entry || entry.state === "loading") { @@ -307,7 +304,7 @@ export class SyncManager { peer.toldKnownState.add(msg.id); await this.trySendToPeer(peer, { - action: "tellKnownState", + action: "known", id: msg.id, header: false, sessions: {}, @@ -326,7 +323,7 @@ export class SyncManager { await this.sendNewContentIncludingDependencies(msg.id, peer); } - async handleTellKnownState(msg: TellKnownStateMessage, peer: PeerState) { + async handleKnownState(msg: KnownStateMessage, peer: PeerState) { let entry = this.local.coValues[msg.id]; peer.optimisticKnownStates[msg.id] = combinedKnownStates( @@ -408,7 +405,7 @@ export class SyncManager { let invalidStateAssumed = false; for (const [sessionID, newContentForSession] of Object.entries( - msg.newContent + msg.new ) as [SessionID, SessionNewContent][]) { const ourKnownTxIdx = coValue.sessions[sessionID]?.transactions.length; @@ -451,14 +448,15 @@ export class SyncManager { if (invalidStateAssumed) { await this.trySendToPeer(peer, { - action: "wrongAssumedKnownState", + action: "known", + isCorrection: true, ...coValue.knownState(), }); } } - async handleWrongAssumedKnownState( - msg: WrongAssumedKnownStateMessage, + async handleCorrection( + msg: KnownStateMessage, peer: PeerState ) { const coValue = this.local.expectCoValueLoaded(msg.id); @@ -475,7 +473,7 @@ export class SyncManager { } } - handleUnsubscribe(_msg: UnsubscribeMessage) { + handleUnsubscribe(_msg: DoneMessage) { throw new Error("Method not implemented."); } @@ -505,9 +503,8 @@ export class SyncManager { function knownStateIn( msg: - | SubscribeMessage - | TellKnownStateMessage - | WrongAssumedKnownStateMessage + | LoadMessage + | KnownStateMessage ) { return { id: msg.id,