Review sync message properties and names

This commit is contained in:
Anselm
2023-08-14 18:27:01 +01:00
parent c352e9ac7e
commit 4336d2aa7d
3 changed files with 102 additions and 104 deletions

View File

@@ -491,10 +491,10 @@ export class CoValue {
knownState: CoValueKnownState | undefined
): NewContentMessage | undefined {
const newContent: NewContentMessage = {
action: "newContent",
action: "content",
id: this.id,
header: knownState?.header ? undefined : this.header,
newContent: Object.fromEntries(
new: Object.fromEntries(
Object.entries(this.sessions)
.map(([sessionID, log]) => {
const newTransactions = log.transactions.slice(
@@ -528,7 +528,7 @@ export class CoValue {
if (
!newContent.header &&
Object.keys(newContent.newContent).length === 0
Object.keys(newContent.new).length === 0
) {
return undefined;
}

View File

@@ -37,7 +37,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: false,
sessions: {},
@@ -50,7 +50,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
const mapTellKnownStateMsg = await reader.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -60,7 +60,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
const newContentMsg = await reader.read();
expect(newContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: {
type: "comap",
@@ -69,7 +69,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
createdAt: map.coValue.header.createdAt,
uniqueness: map.coValue.header.uniqueness,
},
newContent: {
new: {
[node.ownSessionID]: {
after: 0,
newTransactions: [
@@ -120,7 +120,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: true,
sessions: {
@@ -135,7 +135,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
const mapTellKnownStateMsg = await reader.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -145,10 +145,10 @@ test("Node replies with only new tx to subscribe with some known state", async (
const mapNewContentMsg = await reader.read();
expect(mapNewContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: undefined,
newContent: {
new: {
[node.ownSessionID]: {
after: 1,
newTransactions: [
@@ -198,7 +198,7 @@ test("After subscribing, node sends own known state and new txs to peer", async
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: false,
sessions: {
@@ -213,7 +213,7 @@ test("After subscribing, node sends own known state and new txs to peer", async
const mapTellKnownStateMsg = await reader.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -223,10 +223,10 @@ test("After subscribing, node sends own known state and new txs to peer", async
const mapNewContentHeaderOnlyMsg = await reader.read();
expect(mapNewContentHeaderOnlyMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
map.edit((editable) => {
@@ -236,9 +236,9 @@ test("After subscribing, node sends own known state and new txs to peer", async
const mapEditMsg1 = await reader.read();
expect(mapEditMsg1.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
newContent: {
new: {
[node.ownSessionID]: {
after: 0,
newTransactions: [
@@ -269,9 +269,9 @@ test("After subscribing, node sends own known state and new txs to peer", async
const mapEditMsg2 = await reader.read();
expect(mapEditMsg2.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
newContent: {
new: {
[node.ownSessionID]: {
after: 1,
newTransactions: [
@@ -325,7 +325,7 @@ test("Client replies with known new content to tellKnownState from server", asyn
const writer = inTx.getWriter();
await writer.write({
action: "tellKnownState",
action: "known",
id: map.coValue.id,
header: false,
sessions: {
@@ -338,7 +338,7 @@ test("Client replies with known new content to tellKnownState from server", asyn
const mapTellKnownStateMsg = await reader.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -348,10 +348,10 @@ test("Client replies with known new content to tellKnownState from server", asyn
const mapNewContentMsg = await reader.read();
expect(mapNewContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {
new: {
[node.ownSessionID]: {
after: 0,
newTransactions: [
@@ -397,7 +397,7 @@ test("No matter the optimistic known state, node respects invalid known state me
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: false,
sessions: {
@@ -412,7 +412,7 @@ test("No matter the optimistic known state, node respects invalid known state me
const mapTellKnownStateMsg = await reader.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -422,10 +422,10 @@ test("No matter the optimistic known state, node respects invalid known state me
const mapNewContentHeaderOnlyMsg = await reader.read();
expect(mapNewContentHeaderOnlyMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
map.edit((editable) => {
@@ -440,7 +440,8 @@ test("No matter the optimistic known state, node respects invalid known state me
const _mapEditMsg2 = await reader.read();
await writer.write({
action: "wrongAssumedKnownState",
action: "known",
isCorrection: true,
id: map.coValue.id,
header: true,
sessions: {
@@ -451,10 +452,10 @@ test("No matter the optimistic known state, node respects invalid known state me
const newContentAfterWrongAssumedState = await reader.read();
expect(newContentAfterWrongAssumedState.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: undefined,
newContent: {
new: {
[node.ownSessionID]: {
after: 1,
newTransactions: [
@@ -528,18 +529,18 @@ test("If we add a server peer, all updates to all coValues are sent to it, even
const reader = outRx.getReader();
// expect((await reader.read()).value).toMatchObject({
// action: "subscribe",
// action: "load",
// id: adminID,
// });
expect((await reader.read()).value).toMatchObject({
action: "subscribe",
action: "load",
id: team.teamMap.coValue.id,
});
const mapSubscribeMsg = await reader.read();
expect(mapSubscribeMsg.value).toEqual({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: true,
sessions: {},
@@ -555,10 +556,10 @@ test("If we add a server peer, all updates to all coValues are sent to it, even
const mapNewContentMsg = await reader.read();
expect(mapNewContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {
new: {
[node.ownSessionID]: {
after: 0,
newTransactions: [
@@ -601,11 +602,11 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a
const reader = outRx.getReader();
// expect((await reader.read()).value).toMatchObject({
// action: "subscribe",
// action: "load",
// id: admin.id,
// });
expect((await reader.read()).value).toMatchObject({
action: "subscribe",
action: "load",
id: team.teamMap.coValue.id,
});
@@ -614,7 +615,7 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a
const mapSubscribeMsg = await reader.read();
expect(mapSubscribeMsg.value).toEqual({
action: "subscribe",
action: "load",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -624,10 +625,10 @@ test("If we add a server peer, newly created coValues are auto-subscribed to", a
const mapContentMsg = await reader.read();
expect(mapContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
});
@@ -659,14 +660,14 @@ test("When we connect a new server peer, we try to sync all existing coValues to
const teamSubscribeMessage = await reader.read();
expect(teamSubscribeMessage.value).toEqual({
action: "subscribe",
action: "load",
...team.teamMap.coValue.knownState(),
} satisfies SyncMessage);
const secondMessage = await reader.read();
expect(secondMessage.value).toEqual({
action: "subscribe",
action: "load",
...map.coValue.knownState(),
} satisfies SyncMessage);
});
@@ -692,7 +693,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
action: "load",
id: map.coValue.id,
header: true,
sessions: {
@@ -707,7 +708,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe
const mapTellKnownState = await reader.read();
expect(mapTellKnownState.value).toEqual({
action: "tellKnownState",
action: "known",
...map.coValue.knownState(),
} satisfies SyncMessage);
});
@@ -750,12 +751,12 @@ test.skip("When replaying creation and transactions of a coValue as new content,
const adminSubscribeMessage = await from1.read();
expect(adminSubscribeMessage.value).toMatchObject({
action: "subscribe",
action: "load",
id: admin.id,
});
const teamSubscribeMsg = await from1.read();
expect(teamSubscribeMsg.value).toMatchObject({
action: "subscribe",
action: "load",
id: team.teamMap.coValue.id,
});
@@ -790,23 +791,23 @@ test.skip("When replaying creation and transactions of a coValue as new content,
const mapSubscriptionMsg = await from1.read();
expect(mapSubscriptionMsg.value).toMatchObject({
action: "subscribe",
action: "load",
id: map.coValue.id,
});
const mapNewContentMsg = await from1.read();
expect(mapNewContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
await to2.write(mapSubscriptionMsg.value!);
const mapTellKnownStateMsg = await from2.read();
expect(mapTellKnownStateMsg.value).toEqual({
action: "tellKnownState",
action: "known",
id: map.coValue.id,
header: false,
sessions: {},
@@ -965,11 +966,11 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async
const reader = outRx.getReader();
// expect((await reader.read()).value).toMatchObject({
// action: "subscribe",
// action: "load",
// id: admin.id,
// });
expect((await reader.read()).value).toMatchObject({
action: "subscribe",
action: "load",
id: team.teamMap.coValue.id,
});
@@ -978,7 +979,7 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async
const mapSubscribeMsg = await reader.read();
expect(mapSubscribeMsg.value).toEqual({
action: "subscribe",
action: "load",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -988,10 +989,10 @@ test("When a peer's incoming/readable stream closes, we remove the peer", async
const mapContentMsg = await reader.read();
expect(mapContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
await inTx.abort();
@@ -1019,11 +1020,11 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async
const reader = outRx.getReader();
// expect((await reader.read()).value).toMatchObject({
// action: "subscribe",
// action: "load",
// id: admin.id,
// });
expect((await reader.read()).value).toMatchObject({
action: "subscribe",
action: "load",
id: team.teamMap.coValue.id,
});
@@ -1032,7 +1033,7 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async
const mapSubscribeMsg = await reader.read();
expect(mapSubscribeMsg.value).toEqual({
action: "subscribe",
action: "load",
...map.coValue.knownState(),
} satisfies SyncMessage);
@@ -1042,10 +1043,10 @@ test("When a peer's outgoing/writable stream closes, we remove the peer", async
const mapContentMsg = await reader.read();
expect(mapContentMsg.value).toEqual({
action: "newContent",
action: "content",
id: map.coValue.id,
header: map.coValue.header,
newContent: {},
new: {},
} satisfies SyncMessage);
reader.releaseLock();
@@ -1097,28 +1098,28 @@ test("If we start loading a coValue before connecting to a peer that has it, it
function teamContentEx(team: Team) {
return {
action: "newContent",
action: "content",
id: team.teamMap.coValue.id,
};
}
function admContEx(adminID: AccountID) {
return {
action: "newContent",
action: "content",
id: adminID,
};
}
function teamStateEx(team: Team) {
return {
action: "tellKnownState",
action: "known",
id: team.teamMap.coValue.id,
};
}
function admStateEx(adminID: AccountID) {
return {
action: "tellKnownState",
action: "known",
id: adminID,
};
}

View File

@@ -25,26 +25,26 @@ export function emptyKnownState(id: RawCoID): CoValueKnownState {
}
export type SyncMessage =
| SubscribeMessage
| TellKnownStateMessage
| LoadMessage
| KnownStateMessage
| NewContentMessage
| WrongAssumedKnownStateMessage
| UnsubscribeMessage;
| DoneMessage;
export type SubscribeMessage = {
action: "subscribe";
export type LoadMessage = {
action: "load";
} & CoValueKnownState;
export type TellKnownStateMessage = {
action: "tellKnownState";
export type KnownStateMessage = {
action: "known";
asDependencyOf?: RawCoID;
isCorrection?: boolean;
} & CoValueKnownState;
export type NewContentMessage = {
action: "newContent";
action: "content";
id: RawCoID;
header?: CoValueHeader;
newContent: {
new: {
[sessionID: SessionID]: SessionNewContent;
};
};
@@ -56,13 +56,8 @@ export type SessionNewContent = {
lastHash: Hash;
lastSignature: Signature;
};
export type WrongAssumedKnownStateMessage = {
action: "wrongAssumedKnownState";
} & CoValueKnownState;
export type UnsubscribeMessage = {
action: "unsubscribe";
export type DoneMessage = {
action: "done";
id: RawCoID;
};
@@ -121,7 +116,7 @@ export class SyncManager {
for (const peer of Object.values(this.peers)) {
peer.outgoing
.write({
action: "subscribe",
action: "load",
id: id,
header: false,
sessions: {},
@@ -135,15 +130,17 @@ export class SyncManager {
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
// TODO: validate
switch (msg.action) {
case "subscribe":
return await this.handleSubscribe(msg, peer);
case "tellKnownState":
return await this.handleTellKnownState(msg, peer);
case "newContent":
case "load":
return await this.handleLoad(msg, peer);
case "known":
if (msg.isCorrection) {
return await this.handleCorrection(msg, peer);
} else {
return await this.handleKnownState(msg, peer);
}
case "content":
return await this.handleNewContent(msg, peer);
case "wrongAssumedKnownState":
return await this.handleWrongAssumedKnownState(msg, peer);
case "unsubscribe":
case "done":
return await this.handleUnsubscribe(msg);
default:
throw new Error(
@@ -168,7 +165,7 @@ export class SyncManager {
if (entry.state === "loading") {
await this.trySendToPeer(peer, {
action: "subscribe",
action: "load",
id,
header: false,
sessions: {},
@@ -185,7 +182,7 @@ export class SyncManager {
if (!peer.toldKnownState.has(id)) {
peer.toldKnownState.add(id);
await this.trySendToPeer(peer, {
action: "subscribe",
action: "load",
...coValue.knownState(),
});
}
@@ -208,7 +205,7 @@ export class SyncManager {
if (!peer.toldKnownState.has(id)) {
await this.trySendToPeer(peer, {
action: "tellKnownState",
action: "known",
asDependencyOf,
...coValue.knownState(),
});
@@ -295,7 +292,7 @@ export class SyncManager {
});
}
async handleSubscribe(msg: SubscribeMessage, peer: PeerState) {
async handleLoad(msg: LoadMessage, peer: PeerState) {
const entry = this.local.coValues[msg.id];
if (!entry || entry.state === "loading") {
@@ -307,7 +304,7 @@ export class SyncManager {
peer.toldKnownState.add(msg.id);
await this.trySendToPeer(peer, {
action: "tellKnownState",
action: "known",
id: msg.id,
header: false,
sessions: {},
@@ -326,7 +323,7 @@ export class SyncManager {
await this.sendNewContentIncludingDependencies(msg.id, peer);
}
async handleTellKnownState(msg: TellKnownStateMessage, peer: PeerState) {
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
let entry = this.local.coValues[msg.id];
peer.optimisticKnownStates[msg.id] = combinedKnownStates(
@@ -408,7 +405,7 @@ export class SyncManager {
let invalidStateAssumed = false;
for (const [sessionID, newContentForSession] of Object.entries(
msg.newContent
msg.new
) as [SessionID, SessionNewContent][]) {
const ourKnownTxIdx =
coValue.sessions[sessionID]?.transactions.length;
@@ -451,14 +448,15 @@ export class SyncManager {
if (invalidStateAssumed) {
await this.trySendToPeer(peer, {
action: "wrongAssumedKnownState",
action: "known",
isCorrection: true,
...coValue.knownState(),
});
}
}
async handleWrongAssumedKnownState(
msg: WrongAssumedKnownStateMessage,
async handleCorrection(
msg: KnownStateMessage,
peer: PeerState
) {
const coValue = this.local.expectCoValueLoaded(msg.id);
@@ -475,7 +473,7 @@ export class SyncManager {
}
}
handleUnsubscribe(_msg: UnsubscribeMessage) {
handleUnsubscribe(_msg: DoneMessage) {
throw new Error("Method not implemented.");
}
@@ -505,9 +503,8 @@ export class SyncManager {
function knownStateIn(
msg:
| SubscribeMessage
| TellKnownStateMessage
| WrongAssumedKnownStateMessage
| LoadMessage
| KnownStateMessage
) {
return {
id: msg.id,