Ensure depended on multilogs are loaded/sent first

This commit is contained in:
Anselm
2023-08-02 16:13:42 +01:00
parent c04d2797d2
commit a5f4bbf3dc
4 changed files with 226 additions and 43 deletions

View File

@@ -501,6 +501,10 @@ export function getAgentID(agent: Agent): AgentID {
)}`;
}
export function agentIDasMultiLogID(agentID: AgentID): MultiLogID {
return `coval_${agentID.substring("agent_".length)}`;
}
export type AgentCredential = {
signatorySecret: SignatorySecret;
recipientSecret: RecipientSecret;

View File

@@ -12,6 +12,7 @@ import {
getAgentMultilogHeader,
MultiLogHeader,
agentIDfromSessionID,
agentIDasMultiLogID,
} from "./multilog";
import { Team, expectTeamContent } from "./permissions";
import {
@@ -61,10 +62,40 @@ export class LocalNode {
return multilog;
}
loadMultiLog(id: MultiLogID): Promise<MultiLog> {
let entry = this.multilogs[id];
if (!entry) {
entry = newLoadingState();
this.multilogs[id] = entry;
for (const peer of Object.values(this.peers)) {
peer.outgoing
.write({
action: "subscribe",
knownState: {
multilogID: id,
header: false,
sessions: {},
},
})
.catch((e) => {
console.error("Error writing to peer", e);
});
}
}
if (entry.state === "loaded") {
return Promise.resolve(entry.multilog);
}
return entry.done;
}
expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog {
const entry = this.multilogs[id];
if (!entry) {
throw new Error(`Unknown multilog ${id}`);
throw new Error(
`${expectation ? expectation + ": " : ""}Unknown multilog ${id}`
);
}
if (entry.state === "loading") {
throw new Error(
@@ -183,22 +214,16 @@ export class LocalNode {
}
}
handleSubscribe(msg: SubscribeMessage, peer: PeerState): SyncMessage[] {
handleSubscribe(
msg: SubscribeMessage,
peer: PeerState,
asDependencyOf?: MultiLogID
): SyncMessage[] {
const entry = this.multilogs[msg.knownState.multilogID];
if (!entry || entry.state === "loading") {
if (!entry) {
let resolve: (multilog: MultiLog) => void;
const promise = new Promise<MultiLog>((r) => {
resolve = r;
});
this.multilogs[msg.knownState.multilogID] = {
state: "loading",
done: promise,
resolve: resolve!,
};
this.multilogs[msg.knownState.multilogID] = newLoadingState();
}
return [
@@ -218,10 +243,35 @@ export class LocalNode {
const newContent = entry.multilog.newContentSince(msg.knownState);
const dependedOnMultilogs =
entry.multilog.header.ruleset.type === "team"
? expectTeamContent(entry.multilog.getCurrentContent())
.keys()
.filter((k): k is AgentID => k.startsWith("agent_"))
.map((agent) => agentIDasMultiLogID(agent))
: entry.multilog.header.ruleset.type === "ownedByTeam"
? [entry.multilog.header.ruleset.team]
: [];
return [
...dependedOnMultilogs.flatMap((multilogID) =>
this.handleSubscribe(
{
action: "subscribe",
knownState: {
multilogID,
header: false,
sessions: {},
},
},
peer,
asDependencyOf || msg.knownState.multilogID
)
),
{
action: "subscribeResponse",
knownState: entry.multilog.knownState(),
asDependencyOf,
},
...(newContent ? [newContent] : []),
];
@@ -231,12 +281,26 @@ export class LocalNode {
msg: SubscribeResponseMessage,
peer: PeerState
): SyncMessage[] {
const entry = this.multilogs[msg.knownState.multilogID];
let entry = this.multilogs[msg.knownState.multilogID];
if (!entry || entry.state === "loading") {
throw new Error(
"Expected multilog entry to be created, missing subscribe?"
);
if (!entry) {
if (msg.asDependencyOf) {
if (this.multilogs[msg.asDependencyOf]) {
entry = newLoadingState();
this.multilogs[msg.knownState.multilogID] = entry;
}
} else {
throw new Error(
"Expected multilog entry to be created, missing subscribe?"
);
}
}
if (entry.state === "loading") {
peer.optimisticKnownStates[msg.knownState.multilogID] =
msg.knownState;
return [];
}
const newContent = entry.multilog.newContentSince(msg.knownState);
@@ -415,3 +479,17 @@ type MultilogState =
resolve: (multilog: MultiLog) => void;
}
| { state: "loaded"; multilog: MultiLog };
function newLoadingState(): MultilogState {
let resolve: (multilog: MultiLog) => void;
const promise = new Promise<MultiLog>((r) => {
resolve = r;
});
return {
state: "loading",
done: promise,
resolve: resolve!,
};
}

View File

@@ -6,7 +6,7 @@ import {
newRandomSessionID,
} from "./multilog";
import { LocalNode } from "./node";
import { SyncMessage } from "./sync";
import { Peer, SyncMessage } from "./sync";
import { MapOpPayload, expectMap } from "./coValue";
test(
@@ -48,16 +48,21 @@ test(
const reader = outRx.getReader();
const firstMessage = await reader.read();
const _adminSubscribeResponseMsg = await reader.read();
const _adminNewContentMsg = await reader.read();
const _teamSubscribeResponseMsg = await reader.read();
const _teamNewContentMsg = await reader.read();
expect(firstMessage.value).toEqual({
const subscribeResponseMsg = await reader.read();
expect(subscribeResponseMsg.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const secondMessage = await reader.read();
const newContentMsg = await reader.read();
expect(secondMessage.value).toEqual({
expect(newContentMsg.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: {
@@ -133,16 +138,21 @@ test("Node replies with only new tx to subscribe with some known state", async (
const reader = outRx.getReader();
const msg1 = await reader.read();
const _adminSubscribeResponseMsg = await reader.read();
const _adminNewContentMsg = await reader.read();
const _teamSubscribeResponseMsg = await reader.read();
const _teamNewContentMsg = await reader.read();
expect(msg1.value).toEqual({
const mapSubscribeResponseMsg = await reader.read();
expect(mapSubscribeResponseMsg.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const msg2 = await reader.read();
const mapNewContentMsg = await reader.read();
expect(msg2.value).toEqual({
expect(mapNewContentMsg.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: undefined,
@@ -171,6 +181,8 @@ test("Node replies with only new tx to subscribe with some known state", async (
} satisfies SyncMessage);
});
test.skip("TODO: node only replies with new tx to subscribe with some known state, even in the depended on multilogs", () => {});
test("After subscribing, node sends own known state and new txs to peer", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
@@ -206,16 +218,21 @@ test("After subscribing, node sends own known state and new txs to peer", async
const reader = outRx.getReader();
const msg1 = await reader.read();
const _adminSubscribeResponseMsg = await reader.read();
const _adminNewContentMsg = await reader.read();
const _teamSubscribeResponseMsg = await reader.read();
const _teamNewContentMsg = await reader.read();
expect(msg1.value).toEqual({
const mapSubscribeResponseMsg = await reader.read();
expect(mapSubscribeResponseMsg.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const msg2 = await reader.read();
const mapNewContentHeaderOnlyMsg = await reader.read();
expect(msg2.value).toEqual({
expect(mapNewContentHeaderOnlyMsg.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: map.multiLog.header,
@@ -226,9 +243,9 @@ test("After subscribing, node sends own known state and new txs to peer", async
editable.set("hello", "world", "trusting");
});
const msg3 = await reader.read();
const mapEditMsg1 = await reader.read();
expect(msg3.value).toEqual({
expect(mapEditMsg1.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
newContent: {
@@ -259,9 +276,9 @@ test("After subscribing, node sends own known state and new txs to peer", async
editable.set("goodbye", "world", "trusting");
});
const msg4 = await reader.read();
const mapEditMsg2 = await reader.read();
expect(msg4.value).toEqual({
expect(mapEditMsg2.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
newContent: {
@@ -394,8 +411,12 @@ test("No matter the optimistic known state, node respects invalid known state me
const reader = outRx.getReader();
const _msg1 = await reader.read();
const _msg2 = await reader.read();
const _adminSubscribeResponseMsg = await reader.read();
const _adminNewContentMsg = await reader.read();
const _teamSubscribeResponseMsg = await reader.read();
const _teamNewContentMsg = await reader.read();
const _mapSubscribeResponseMsg = await reader.read();
const _mapNewContentHeaderOnlyMsg = await reader.read();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
@@ -405,8 +426,8 @@ test("No matter the optimistic known state, node respects invalid known state me
editable.set("goodbye", "world", "trusting");
});
const _msg3 = await reader.read();
const _msg4 = await reader.read();
const _mapEditMsg1 = await reader.read();
const _mapEditMsg2 = await reader.read();
await writer.write({
action: "wrongAssumedKnownState",
@@ -419,9 +440,9 @@ test("No matter the optimistic known state, node respects invalid known state me
},
} satisfies SyncMessage);
const msg5 = await reader.read();
const newContentAfterWrongAssumedState = await reader.read();
expect(msg5.value).toEqual({
expect(newContentAfterWrongAssumedState.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: undefined,
@@ -667,9 +688,13 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe
const reader = outRx.getReader();
const firstMessage = await reader.read();
const _adminSubscribeResponseMsg = await reader.read();
const _adminNewContentMsg = await reader.read();
const _teamSubscribeResponseMsg = await reader.read();
const _teamNewContentMsg = await reader.read();
const mapSubscribeResponse = await reader.read();
expect(firstMessage.value).toEqual({
expect(mapSubscribeResponse.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
@@ -753,6 +778,35 @@ test("When replaying creation and transactions of a multilog as new content, the
).toEqual("world");
});
test("When loading a multilog on one node, the server node it is requested from replies with all the necessary depended on multilogs to make it work", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node1 = new LocalNode(admin, newRandomSessionID(adminID));
const team = node1.createTeam();
const map = team.createMap();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
});
const node2 = new LocalNode(admin, newRandomSessionID(adminID));
const [node2asPeer, node1asPeer] = connectedPeers();
node1.addPeer(node2asPeer);
node2.addPeer(node1asPeer);
await node2.loadMultiLog(map.multiLog.id);
expect(
expectMap(
node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent()
).get("hello")
).toEqual("world");
});
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];
let resolveNextItemReady: () => void = () => {};
@@ -803,3 +857,49 @@ function shouldNotResolve(promise: Promise<any>, ops: { timeout: number }) {
setTimeout(resolve, ops.timeout);
});
}
function connectedPeers(trace?: boolean): [Peer, Peer] {
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
outRx2
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
trace && console.log("peer 2 -> peer 1", chunk);
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx1);
outRx1
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
trace && console.log("peer 1 -> peer 2", chunk);
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx2);
const peer2AsPeer: Peer = {
id: "test2",
incoming: inRx1,
outgoing: outTx1,
role: "peer",
};
const peer1AsPeer: Peer = {
id: "test1",
incoming: inRx2,
outgoing: outTx2,
role: "peer",
};
return [peer2AsPeer, peer1AsPeer];
}

View File

@@ -22,6 +22,7 @@ export type SubscribeMessage = {
export type SubscribeResponseMessage = {
action: "subscribeResponse";
knownState: MultiLogKnownState;
asDependencyOf?: MultiLogID;
};
export type NewContentMessage = {