Initial sync to server

This commit is contained in:
Anselm
2023-08-01 15:38:39 +01:00
parent b31bf7c9e9
commit d7682d73d8
2 changed files with 57 additions and 3 deletions

View File

@@ -123,6 +123,21 @@ export class LocalNode {
};
this.peers[peer.id] = peerState;
if (peer.role === "server") {
for (const multilog of Object.values(this.multilogs)) {
if (multilog instanceof Promise) {
continue;
}
await peerState.outgoing.write(
{
type: "subscribe",
knownState: multilog.knownState(),
}
);
}
}
for await (const msg of peerState.incoming) {
const response = this.handleSyncMessage(msg, peerState);

View File

@@ -412,6 +412,7 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even
});
const reader = outRx.getReader();
const _initialSyncMessage = await reader.read();
const firstMessage = await reader.read();
@@ -464,9 +465,10 @@ test("If we add a server peer, it even receives just headers of newly created mu
role: "server",
});
const map = team.createMap();
const reader = outRx.getReader();
const _initialSyncMessage = await reader.read();
const map = team.createMap();
const firstMessage = await reader.read();
@@ -476,7 +478,44 @@ test("If we add a server peer, it even receives just headers of newly created mu
header: map.multiLog.header,
newContent: {},
} satisfies SyncMessage);
})
});
test("When we connect a new server peer, we try to sync all existing multilogs to it", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node = new LocalNode(admin, newRandomSessionID(adminID));
const team = node.createTeam();
const map = team.createMap();
const [inRx, inTx] = newStreamPair<SyncMessage>();
const [outRx, outTx] = newStreamPair<SyncMessage>();
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
role: "server",
});
const reader = outRx.getReader();
const firstMessage = await reader.read();
expect(firstMessage.value).toEqual({
type: "subscribe",
knownState: team.teamMap.multiLog.knownState(),
} satisfies SyncMessage);
const secondMessage = await reader.read();
expect(secondMessage.value).toEqual({
type: "subscribe",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
});
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];