Several further tests and improvements for syncing

This commit is contained in:
Anselm
2023-08-02 15:27:27 +01:00
parent d7682d73d8
commit c04d2797d2
7 changed files with 592 additions and 114 deletions

View File

@@ -198,6 +198,10 @@ export class CoList<T extends JsonValue, Meta extends JsonValue> {
constructor(multilog: MultiLog) {
this.id = multilog.id as CoValueID<CoList<T, Meta>>;
}
toJSON(): JsonObject {
throw new Error("Method not implemented.");
}
}
export class MultiStream<T extends JsonValue, Meta extends JsonValue> {
@@ -207,6 +211,10 @@ export class MultiStream<T extends JsonValue, Meta extends JsonValue> {
constructor(multilog: MultiLog) {
this.id = multilog.id as CoValueID<MultiStream<T, Meta>>;
}
toJSON(): JsonObject {
throw new Error("Method not implemented.");
}
}
export class Static<T extends JsonValue> {
@@ -216,6 +224,10 @@ export class Static<T extends JsonValue> {
constructor(multilog: MultiLog) {
this.id = multilog.id as CoValueID<Static<T>>;
}
toJSON(): JsonObject {
throw new Error("Method not implemented.");
}
}
export function expectMap(content: CoValue): CoMap<{ [key: string]: string }, {}> {

View File

@@ -60,7 +60,7 @@ type SessionLog = {
transactions: Transaction[];
lastHash?: Hash;
streamingHash: StreamingHash;
lastSignature: string;
lastSignature: Signature;
};
export type PrivateTransaction = {
@@ -426,7 +426,7 @@ export class MultiLog {
newContentSince(knownState: MultiLogKnownState | undefined): NewContentMessage | undefined {
const newContent: NewContentMessage = {
type: "newContent",
action: "newContent",
multilogID: this.id,
header: knownState?.header ? undefined : this.header,
newContent: Object.fromEntries(

View File

@@ -21,13 +21,16 @@ import {
PeerState,
SessionNewContent,
SubscribeMessage,
SubscribeResponseMessage,
SyncMessage,
UnsubscribeMessage,
WrongAssumedKnownStateMessage,
combinedKnownStates,
weAreStrictlyAhead,
} from "./sync";
export class LocalNode {
multilogs: { [key: MultiLogID]: Promise<MultiLog> | MultiLog } = {};
multilogs: { [key: MultiLogID]: MultilogState } = {};
peers: { [key: PeerID]: PeerState } = {};
agentCredential: AgentCredential;
agentID: AgentID;
@@ -43,36 +46,34 @@ export class LocalNode {
this.ownSessionID = ownSessionID;
const agentMultilog = new MultiLog(getAgentMultilogHeader(agent), this);
this.multilogs[agentMultilog.id] = Promise.resolve(agentMultilog);
this.multilogs[agentMultilog.id] = {
state: "loaded",
multilog: agentMultilog,
};
}
createMultiLog(header: MultiLogHeader): MultiLog {
const requiredMultiLogs =
header.ruleset.type === "ownedByTeam"
? {
[header.ruleset.team]: this.expectMultiLogLoaded(
header.ruleset.team
),
}
: {};
const multilog = new MultiLog(header, this);
this.multilogs[multilog.id] = multilog;
this.multilogs[multilog.id] = { state: "loaded", multilog };
this.syncMultiLog(multilog);
return multilog;
}
expectMultiLogLoaded(id: MultiLogID): MultiLog {
const multilog = this.multilogs[id];
if (!multilog) {
expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog {
const entry = this.multilogs[id];
if (!entry) {
throw new Error(`Unknown multilog ${id}`);
}
if (multilog instanceof Promise) {
throw new Error(`Multilog ${id} not yet loaded`);
if (entry.state === "loading") {
throw new Error(
`${
expectation ? expectation + ": " : ""
}Multilog ${id} not yet loaded`
);
}
return multilog;
return entry.multilog;
}
addKnownAgent(agent: Agent) {
@@ -113,8 +114,8 @@ export class LocalNode {
return new Team(teamContent, this);
}
async addPeer(peer: Peer) {
const peerState = {
addPeer(peer: Peer) {
const peerState: PeerState = {
id: peer.id,
optimisticKnownStates: {},
incoming: peer.incoming,
@@ -124,74 +125,221 @@ export class LocalNode {
this.peers[peer.id] = peerState;
if (peer.role === "server") {
for (const multilog of Object.values(this.multilogs)) {
if (multilog instanceof Promise) {
for (const entry of Object.values(this.multilogs)) {
if (entry.state === "loading") {
continue;
}
await peerState.outgoing.write(
{
type: "subscribe",
knownState: multilog.knownState(),
}
);
peerState.outgoing
.write({
action: "subscribe",
knownState: entry.multilog.knownState(),
})
.catch((e) => {
// TODO: handle error
console.error("Error writing to peer", e);
});
peerState.optimisticKnownStates[entry.multilog.id] = {
multilogID: entry.multilog.id,
header: false,
sessions: {},
};
}
}
for await (const msg of peerState.incoming) {
const response = this.handleSyncMessage(msg, peerState);
if (response) {
await peerState.outgoing.write(response);
const readIncoming = async () => {
for await (const msg of peerState.incoming) {
for (const responseMsg of this.handleSyncMessage(
msg,
peerState
)) {
await peerState.outgoing.write(responseMsg);
}
}
}
};
readIncoming().catch((e) => {
// TODO: handle error
console.error("Error reading from peer", e);
});
}
handleSyncMessage(
msg: SyncMessage,
peer: PeerState
): SyncMessage | undefined {
handleSyncMessage(msg: SyncMessage, peer: PeerState): SyncMessage[] {
// TODO: validate
switch (msg.type) {
switch (msg.action) {
case "subscribe":
return this.handleSubscribe(msg, peer);
case "subscribeResponse":
return this.handleSubscribeResponse(msg, peer);
case "newContent":
return this.handleNewContent(msg);
case "wrongAssumedKnownState":
return this.handleWrongAssumedKnownState(msg, peer);
case "unsubscribe":
return this.handleUnsubscribe(msg);
default:
throw new Error(`Unknown message type ${(msg as any).action}`);
}
}
handleSubscribe(
msg: SubscribeMessage,
peer: PeerState
): SyncMessage | undefined {
const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID);
handleSubscribe(msg: SubscribeMessage, peer: PeerState): SyncMessage[] {
const entry = this.multilogs[msg.knownState.multilogID];
peer.optimisticKnownStates[multilog.id] = multilog.knownState();
if (!entry || entry.state === "loading") {
if (!entry) {
let resolve: (multilog: MultiLog) => void;
return multilog.newContentSince(msg.knownState);
const promise = new Promise<MultiLog>((r) => {
resolve = r;
});
this.multilogs[msg.knownState.multilogID] = {
state: "loading",
done: promise,
resolve: resolve!,
};
}
return [
{
action: "subscribeResponse",
knownState: {
multilogID: msg.knownState.multilogID,
header: false,
sessions: {},
},
},
];
}
peer.optimisticKnownStates[entry.multilog.id] =
entry.multilog.knownState();
const newContent = entry.multilog.newContentSince(msg.knownState);
return [
{
action: "subscribeResponse",
knownState: entry.multilog.knownState(),
},
...(newContent ? [newContent] : []),
];
}
handleNewContent(msg: NewContentMessage): SyncMessage | undefined {
return undefined;
handleSubscribeResponse(
msg: SubscribeResponseMessage,
peer: PeerState
): SyncMessage[] {
const entry = this.multilogs[msg.knownState.multilogID];
if (!entry || entry.state === "loading") {
throw new Error(
"Expected multilog entry to be created, missing subscribe?"
);
}
const newContent = entry.multilog.newContentSince(msg.knownState);
peer.optimisticKnownStates[msg.knownState.multilogID] =
combinedKnownStates(msg.knownState, entry.multilog.knownState());
return newContent ? [newContent] : [];
}
handleNewContent(msg: NewContentMessage): SyncMessage[] {
let entry = this.multilogs[msg.multilogID];
if (!entry) {
throw new Error(
"Expected multilog entry to be created, missing subscribe?"
);
}
let resolveAfterDone: ((multilog: MultiLog) => void) | undefined;
if (entry.state === "loading") {
if (!msg.header) {
throw new Error("Expected header to be sent in first message");
}
const multilog = new MultiLog(msg.header, this);
resolveAfterDone = entry.resolve;
entry = {
state: "loaded",
multilog,
};
this.multilogs[msg.multilogID] = entry;
}
const multilog = entry.multilog;
let invalidStateAssumed = false;
for (const sessionID of Object.keys(msg.newContent) as SessionID[]) {
const ourKnownTxIdx =
multilog.sessions[sessionID]?.transactions.length;
const theirFirstNewTxIdx = msg.newContent[sessionID].after;
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
invalidStateAssumed = true;
continue;
}
const alreadyKnownOffset = ourKnownTxIdx
? ourKnownTxIdx - theirFirstNewTxIdx
: 0;
const newTransactions =
msg.newContent[sessionID].newTransactions.slice(
alreadyKnownOffset
);
const success = multilog.tryAddTransactions(
sessionID,
newTransactions,
msg.newContent[sessionID].lastHash,
msg.newContent[sessionID].lastSignature
);
if (!success) {
console.error("Failed to add transactions", newTransactions);
continue;
}
}
if (resolveAfterDone) {
resolveAfterDone(multilog);
}
return invalidStateAssumed
? [
{
action: "wrongAssumedKnownState",
knownState: multilog.knownState(),
},
]
: [];
}
handleWrongAssumedKnownState(
msg: WrongAssumedKnownStateMessage,
peer: PeerState
): SyncMessage | undefined {
): SyncMessage[] {
const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID);
peer.optimisticKnownStates[msg.knownState.multilogID] = msg.knownState;
peer.optimisticKnownStates[msg.knownState.multilogID] =
combinedKnownStates(msg.knownState, multilog.knownState());
return multilog.newContentSince(msg.knownState);
const newContent = multilog.newContentSince(msg.knownState);
return newContent ? [newContent] : [];
}
handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage | undefined {
return undefined;
handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage[] {
throw new Error("Method not implemented.");
}
async syncMultiLog(multilog: MultiLog) {
@@ -203,7 +351,21 @@ export class LocalNode {
const newContent =
multilog.newContentSince(optimisticKnownState);
peer.optimisticKnownStates[multilog.id] = multilog.knownState();
peer.optimisticKnownStates[multilog.id] = peer
.optimisticKnownStates[multilog.id]
? combinedKnownStates(
peer.optimisticKnownStates[multilog.id],
multilog.knownState()
)
: multilog.knownState();
if (!optimisticKnownState && peer.role === "server") {
// auto-subscribe
await peer.outgoing.write({
action: "subscribe",
knownState: multilog.knownState(),
});
}
if (newContent) {
await peer.outgoing.write(newContent);
@@ -220,16 +382,19 @@ export class LocalNode {
newNode.multilogs = Object.fromEntries(
Object.entries(this.multilogs)
.map(([id, multilog]) => {
if (multilog instanceof Promise) {
return [id, undefined];
.map(([id, entry]) => {
if (entry.state === "loading") {
return undefined;
}
const newMultilog = new MultiLog(multilog.header, newNode);
const newMultilog = new MultiLog(
entry.multilog.header,
newNode
);
newMultilog.sessions = multilog.sessions;
newMultilog.sessions = entry.multilog.sessions;
return [id, newMultilog];
return [id, { state: "loaded", multilog: newMultilog }];
})
.filter((x): x is Exclude<typeof x, undefined> => !!x)
);
@@ -242,3 +407,11 @@ export class LocalNode {
return newNode;
}
}
type MultilogState =
| {
state: "loading";
done: Promise<MultiLog>;
resolve: (multilog: MultiLog) => void;
}
| { state: "loaded"; multilog: MultiLog };

View File

@@ -156,7 +156,8 @@ export function determineValidTransactions(
} else if (multilog.header.ruleset.type === "ownedByTeam") {
const teamContent =
multilog.node.expectMultiLogLoaded(
multilog.header.ruleset.team
multilog.header.ruleset.team,
"Determining valid transaction in owned object but its team wasn't loaded"
).getCurrentContent();
if (teamContent.type !== "comap") {
@@ -193,8 +194,11 @@ export function determineValidTransactions(
}));
}
);
} else if (multilog.header.ruleset.type === "agent") {
// TODO
return [];
} else {
throw new Error("Unknown ruleset type " + multilog.header.ruleset.type);
throw new Error("Unknown ruleset type " + (multilog.header.ruleset as any).type);
}
}

View File

@@ -7,7 +7,7 @@ import {
} from "./multilog";
import { LocalNode } from "./node";
import { SyncMessage } from "./sync";
import { MapOpPayload } from "./coValue";
import { MapOpPayload, expectMap } from "./coValue";
test(
"Node replies with initial tx and header to empty subscribe",
@@ -38,7 +38,7 @@ test(
const writer = inTx.getWriter();
await writer.write({
type: "subscribe",
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: false,
@@ -51,7 +51,14 @@ test(
const firstMessage = await reader.read();
expect(firstMessage.value).toEqual({
type: "newContent",
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const secondMessage = await reader.read();
expect(secondMessage.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: {
type: "comap",
@@ -114,7 +121,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
const writer = inTx.getWriter();
await writer.write({
type: "subscribe",
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: true,
@@ -126,10 +133,17 @@ test("Node replies with only new tx to subscribe with some known state", async (
const reader = outRx.getReader();
const firstMessage = await reader.read();
const msg1 = await reader.read();
expect(firstMessage.value).toEqual({
type: "newContent",
expect(msg1.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const msg2 = await reader.read();
expect(msg2.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: undefined,
newContent: {
@@ -157,7 +171,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
} satisfies SyncMessage);
});
test("After subscribing, node sends new txs to peer", async () => {
test("After subscribing, node sends own known state and new txs to peer", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
@@ -180,7 +194,7 @@ test("After subscribing, node sends new txs to peer", async () => {
const writer = inTx.getWriter();
await writer.write({
type: "subscribe",
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: false,
@@ -192,10 +206,17 @@ test("After subscribing, node sends new txs to peer", async () => {
const reader = outRx.getReader();
const firstMessage = await reader.read();
const msg1 = await reader.read();
expect(firstMessage.value).toEqual({
type: "newContent",
expect(msg1.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const msg2 = await reader.read();
expect(msg2.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: map.multiLog.header,
newContent: {},
@@ -205,10 +226,10 @@ test("After subscribing, node sends new txs to peer", async () => {
editable.set("hello", "world", "trusting");
});
const secondMessage = await reader.read();
const msg3 = await reader.read();
expect(secondMessage.value).toEqual({
type: "newContent",
expect(msg3.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
newContent: {
[node.ownSessionID]: {
@@ -238,10 +259,10 @@ test("After subscribing, node sends new txs to peer", async () => {
editable.set("goodbye", "world", "trusting");
});
const thirdMessage = await reader.read();
const msg4 = await reader.read();
expect(thirdMessage.value).toEqual({
type: "newContent",
expect(msg4.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
newContent: {
[node.ownSessionID]: {
@@ -268,6 +289,76 @@ test("After subscribing, node sends new txs to peer", async () => {
} satisfies SyncMessage);
});
test("Client replies with known new content to subscribeResponse from server", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node = new LocalNode(admin, newRandomSessionID(adminID));
const team = node.createTeam();
const map = team.createMap();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
});
const [inRx, inTx] = newStreamPair<SyncMessage>();
const [outRx, outTx] = newStreamPair<SyncMessage>();
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
role: "peer",
});
const writer = inTx.getWriter();
await writer.write({
action: "subscribeResponse",
knownState: {
multilogID: map.multiLog.id,
header: false,
sessions: {
[node.ownSessionID]: 0,
},
},
});
const reader = outRx.getReader();
const msg1 = await reader.read();
expect(msg1.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: map.multiLog.header,
newContent: {
[node.ownSessionID]: {
after: 0,
newTransactions: [
{
privacy: "trusting",
madeAt: map.multiLog.sessions[node.ownSessionID]
.transactions[0].madeAt,
changes: [
{
op: "insert",
key: "hello",
value: "world",
} satisfies MapOpPayload<string, string>,
],
},
],
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
lastSignature:
map.multiLog.sessions[node.ownSessionID].lastSignature!,
},
},
} satisfies SyncMessage);
});
test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
@@ -291,7 +382,7 @@ test("No matter the optimistic known state, node respects invalid known state me
const writer = inTx.getWriter();
await writer.write({
type: "subscribe",
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: false,
@@ -303,7 +394,8 @@ test("No matter the optimistic known state, node respects invalid known state me
const reader = outRx.getReader();
const _firstMessage = await reader.read();
const _msg1 = await reader.read();
const _msg2 = await reader.read();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
@@ -313,11 +405,11 @@ test("No matter the optimistic known state, node respects invalid known state me
editable.set("goodbye", "world", "trusting");
});
const _secondMessage = await reader.read();
const _thirdMessage = await reader.read();
const _msg3 = await reader.read();
const _msg4 = await reader.read();
await writer.write({
type: "wrongAssumedKnownState",
action: "wrongAssumedKnownState",
knownState: {
multilogID: map.multiLog.id,
header: true,
@@ -327,10 +419,10 @@ test("No matter the optimistic known state, node respects invalid known state me
},
} satisfies SyncMessage);
const fourthMessage = await reader.read();
const msg5 = await reader.read();
expect(fourthMessage.value).toEqual({
type: "newContent",
expect(msg5.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: undefined,
newContent: {
@@ -412,12 +504,24 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even
});
const reader = outRx.getReader();
const _initialSyncMessage = await reader.read();
const _adminSubscribeMsg = await reader.read();
const _teamSubscribeMsg = await reader.read();
const firstMessage = await reader.read();
const subscribeMsg = await reader.read();
expect(firstMessage.value).toEqual({
type: "newContent",
expect(subscribeMsg.value).toEqual({
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: true,
sessions: {},
},
} satisfies SyncMessage);
const newContentMsg = await reader.read();
expect(newContentMsg.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: map.multiLog.header,
newContent: {
@@ -445,7 +549,7 @@ test("If we add a server peer, all updates to all multilogs are sent to it, even
} satisfies SyncMessage);
});
test("If we add a server peer, it even receives just headers of newly created multilogs", async () => {
test("If we add a server peer, newly created multilogs are auto-subscribed to", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
@@ -466,20 +570,30 @@ test("If we add a server peer, it even receives just headers of newly created mu
});
const reader = outRx.getReader();
const _initialSyncMessage = await reader.read();
const _initialMsg1 = await reader.read();
const _initialMsg2 = await reader.read();
const map = team.createMap();
const firstMessage = await reader.read();
const msg1 = await reader.read();
expect(firstMessage.value).toEqual({
type: "newContent",
expect(msg1.value).toEqual({
action: "subscribe",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
const msg2 = await reader.read();
expect(msg2.value).toEqual({
action: "newContent",
multilogID: map.multiLog.id,
header: map.multiLog.header,
newContent: {},
} satisfies SyncMessage);
});
test.skip("TODO: when receiving a subscribe response that is behind our optimistic state (due to already sent content), we ignore it", () => {});
test("When we connect a new server peer, we try to sync all existing multilogs to it", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
@@ -502,21 +616,143 @@ test("When we connect a new server peer, we try to sync all existing multilogs t
const reader = outRx.getReader();
const firstMessage = await reader.read();
const _adminSubscribeMessage = await reader.read();
const teamSubscribeMessage = await reader.read();
expect(firstMessage.value).toEqual({
type: "subscribe",
expect(teamSubscribeMessage.value).toEqual({
action: "subscribe",
knownState: team.teamMap.multiLog.knownState(),
} satisfies SyncMessage);
const secondMessage = await reader.read();
expect(secondMessage.value).toEqual({
type: "subscribe",
action: "subscribe",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
});
test("When receiving a subscribe with a known state that is ahead of our own, peers should respond with a corresponding subscribe response message", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node = new LocalNode(admin, newRandomSessionID(adminID));
const team = node.createTeam();
const map = team.createMap();
const [inRx, inTx] = newStreamPair<SyncMessage>();
const [outRx, outTx] = newStreamPair<SyncMessage>();
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
role: "peer",
});
const writer = inTx.getWriter();
await writer.write({
action: "subscribe",
knownState: {
multilogID: map.multiLog.id,
header: true,
sessions: {
[node.ownSessionID]: 1,
},
},
});
const reader = outRx.getReader();
const firstMessage = await reader.read();
expect(firstMessage.value).toEqual({
action: "subscribeResponse",
knownState: map.multiLog.knownState(),
} satisfies SyncMessage);
});
test("When replaying creation and transactions of a multilog as new content, the receiving peer integrates this information", async () => {
const admin = newRandomAgentCredential();
const adminID = getAgentID(getAgent(admin));
const node1 = new LocalNode(admin, newRandomSessionID(adminID));
const team = node1.createTeam();
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
node1.addPeer({
id: "test2",
incoming: inRx1,
outgoing: outTx1,
role: "server",
});
const reader1 = outRx1.getReader();
const _adminSubscriptionMsg = await reader1.read();
const teamSubscribeMsg = await reader1.read();
const map = team.createMap();
const mapSubscriptionMsg = await reader1.read();
const mapNewContentMsg = await reader1.read();
map.edit((editable) => {
editable.set("hello", "world", "trusting");
});
const mapEditMsg = await reader1.read();
const node2 = new LocalNode(admin, newRandomSessionID(adminID));
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
node2.addPeer({
id: "test1",
incoming: inRx2,
outgoing: outTx2,
role: "client",
});
const writer2 = inTx2.getWriter();
const reader2 = outRx2.getReader();
await writer2.write(teamSubscribeMsg.value);
const teamSubscribeResponseMsg = await reader2.read();
expect(node2.multilogs[team.teamMap.multiLog.id]?.state).toEqual("loading");
const writer1 = inTx1.getWriter();
await writer1.write(teamSubscribeResponseMsg.value);
const teamContentMsg = await reader1.read();
await writer2.write(teamContentMsg.value);
await writer2.write(mapSubscriptionMsg.value);
const _mapSubscribeResponseMsg = await reader2.read();
await writer2.write(mapNewContentMsg.value);
expect(node2.multilogs[map.multiLog.id]?.state).toEqual("loading");
await writer2.write(mapEditMsg.value);
await new Promise((resolve) => setTimeout(resolve, 100));
expect(
expectMap(
node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent()
).get("hello")
).toEqual("world");
});
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];
let resolveNextItemReady: () => void = () => {};

View File

@@ -1,4 +1,4 @@
import { Hash } from "./crypto";
import { Hash, Signature } from "./crypto";
import { MultiLogHeader, MultiLogID, SessionID, Transaction } from "./multilog";
export type MultiLogKnownState = {
@@ -9,17 +9,23 @@ export type MultiLogKnownState = {
export type SyncMessage =
| SubscribeMessage
| SubscribeResponseMessage
| NewContentMessage
| WrongAssumedKnownStateMessage
| UnsubscribeMessage;
export type SubscribeMessage = {
type: "subscribe";
action: "subscribe";
knownState: MultiLogKnownState;
};
export type SubscribeResponseMessage = {
action: "subscribeResponse";
knownState: MultiLogKnownState;
};
export type NewContentMessage = {
type: "newContent";
action: "newContent";
multilogID: MultiLogID;
header?: MultiLogHeader;
newContent: {
@@ -30,17 +36,18 @@ export type NewContentMessage = {
export type SessionNewContent = {
after: number;
newTransactions: Transaction[];
// TODO: is lastHash needed here?
lastHash: Hash;
lastSignature: string;
}
lastSignature: Signature;
};
export type WrongAssumedKnownStateMessage = {
type: "wrongAssumedKnownState";
action: "wrongAssumedKnownState";
knownState: MultiLogKnownState;
};
export type UnsubscribeMessage = {
type: "unsubscribe";
action: "unsubscribe";
multilogID: MultiLogID;
};
@@ -50,13 +57,57 @@ export interface Peer {
id: PeerID;
incoming: ReadableStream<SyncMessage>;
outgoing: WritableStream<SyncMessage>;
role: 'peer' | 'server' | 'client';
role: "peer" | "server" | "client";
}
export interface PeerState {
id: PeerID;
optimisticKnownStates: {[multilogID: MultiLogID]: MultiLogKnownState};
optimisticKnownStates: { [multilogID: MultiLogID]: MultiLogKnownState };
incoming: ReadableStream<SyncMessage>;
outgoing: WritableStreamDefaultWriter<SyncMessage>;
role: 'peer' | 'server' | 'client';
role: "peer" | "server" | "client";
}
export function weAreStrictlyAhead(
ourKnownState: MultiLogKnownState,
theirKnownState: MultiLogKnownState
): boolean {
if (theirKnownState.header && !ourKnownState.header) {
return false;
}
const allSessions = new Set([
...(Object.keys(ourKnownState.sessions) as SessionID[]),
...(Object.keys(theirKnownState.sessions) as SessionID[]),
]);
for (const sessionID of allSessions) {
const ourSession = ourKnownState.sessions[sessionID];
const theirSession = theirKnownState.sessions[sessionID];
if ((ourSession || 0) < (theirSession || 0)) {
return false;
}
}
return true;
}
export function combinedKnownStates(stateA: MultiLogKnownState, stateB: MultiLogKnownState): MultiLogKnownState {
const sessionStates: MultiLogKnownState["sessions"] = {};
const allSessions = new Set([...Object.keys(stateA.sessions), ...Object.keys(stateB.sessions)] as SessionID[]);
for (const sessionID of allSessions) {
const stateAValue = stateA.sessions[sessionID];
const stateBValue = stateB.sessions[sessionID];
sessionStates[sessionID] = Math.max(stateAValue || 0, stateBValue || 0);
}
return {
multilogID: stateA.multilogID,
header: stateA.header || stateB.header,
sessions: sessionStates,
};
}

View File

@@ -16,6 +16,8 @@
"noEmit": true,
"types": [
"bun-types" // add Bun global
]
],
// "noUncheckedIndexedAccess": true
}
}