Implement invalid known state handling

This commit is contained in:
Anselm
2023-08-01 13:23:25 +01:00
parent ad8e51c66c
commit c5bc519fbe
2 changed files with 112 additions and 12 deletions

View File

@@ -139,7 +139,7 @@ export class LocalNode {
case "newContent":
return this.handleNewContent(msg);
case "wrongAssumedKnownState":
return this.handleWrongAssumedKnownState(msg);
return this.handleWrongAssumedKnownState(msg, peer);
case "unsubscribe":
return this.handleUnsubscribe(msg);
}
@@ -161,9 +161,14 @@ export class LocalNode {
}
handleWrongAssumedKnownState(
msg: WrongAssumedKnownStateMessage
msg: WrongAssumedKnownStateMessage,
peer: PeerState
): SyncMessage | undefined {
return undefined;
const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID);
peer.optimisticKnownStates[msg.knownState.multilogID] = msg.knownState;
return multilog.newContentSince(msg.knownState);
}
handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined {

View File

@@ -265,6 +265,95 @@ test("After subscribing, node sends new txs to peer", async () => {
} satisfies SyncMessage);
});
test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node = new LocalNode(admin, newRandomSessionID(adminID));
const team = node.createTeam();
const map = team.createMap();
const [inRx, inTx] = newStreamPair<SyncMessage>();
const [outRx, outTx] = newStreamPair<SyncMessage>();
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
});
const writer = inTx.getWriter();
await writer.write({
type: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: false,
sessions: {
[node.ownSessionID]: 0,
},
},
});
const reader = outRx.getReader();
const _firstMessage = await reader.read();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
});
map.edit((editable) => {
editable.set("goodbye", "world", "trusting");
});
const _secondMessage = await reader.read();
const _thirdMessage = await reader.read();
await writer.write({
type: "wrongAssumedKnownState",
knownState: {
multilogID: map.multiLog.id,
header: true,
sessions: {
[node.ownSessionID]: 1,
},
},
} satisfies SyncMessage);
const fourthMessage = await reader.read();
expect(fourthMessage.value).toEqual({
type: "newContent",
multilogID: map.multiLog.id,
header: undefined,
newContent: {
[node.ownSessionID]: {
after: 1,
newTransactions: [
{
privacy: "trusting",
madeAt: map.multiLog.sessions[node.ownSessionID]
.transactions[1].madeAt,
changes: [
{
op: "insert",
key: "goodbye",
value: "world",
} satisfies MapOpPayload<string, string>,
],
},
],
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
lastSignature:
map.multiLog.sessions[node.ownSessionID].lastSignature!,
},
},
} satisfies SyncMessage);
});
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];
let resolveNextItemReady: () => void = () => {};
@@ -274,14 +363,18 @@ function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const readable = new ReadableStream<T>({
async pull(controller) {
if (queue.length > 0) {
controller.enqueue(queue.shift());
} else {
await nextItemReady;
nextItemReady = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
controller.enqueue(queue.shift());
while(true) {
if (queue.length > 0) {
controller.enqueue(queue.shift());
if (queue.length === 0) {
nextItemReady = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
}
return;
} else {
await nextItemReady;
}
}
},
});
@@ -289,7 +382,9 @@ function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const writable = new WritableStream<T>({
write(chunk) {
queue.push(chunk);
resolveNextItemReady();
if (queue.length === 1) {
resolveNextItemReady();
}
},
});