Compare commits

...

2 Commits

Author SHA1 Message Date
Guido D'Orsi
ff44f51a14 feat: make the PeerKnownState subscribable 2024-10-20 01:24:59 +02:00
Guido D'Orsi
a9d2c82557 chore: refactor peer known states management 2024-10-19 16:02:22 +02:00
8 changed files with 292 additions and 88 deletions

View File

@@ -0,0 +1,100 @@
import { PeerKnownStates } from './PeerKnownStates.js';
import { CoValueKnownState, emptyKnownState } from './sync.js';
import { RawCoID, SessionID } from './ids.js';
import { vi, describe, test, expect } from 'vitest';
describe('PeerKnownStates', () => {
test('should set and get a known state', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const knownState: CoValueKnownState = emptyKnownState(id);
peerKnownStates.dispatch({ type: 'SET', id, value: knownState });
expect(peerKnownStates.get(id)).toEqual(knownState);
expect(peerKnownStates.has(id)).toBe(true);
});
test('should update header', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
peerKnownStates.dispatch({ type: 'UPDATE_HEADER', id, header: true });
const result = peerKnownStates.get(id);
expect(result?.header).toBe(true);
});
test('should update session counter', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const sessionId = 'session-1' as SessionID;
peerKnownStates.dispatch({ type: 'UPDATE_SESSION_COUNTER', id, sessionId, value: 5 });
const result = peerKnownStates.get(id);
expect(result?.sessions[sessionId]).toBe(5);
});
test('should combine with existing state', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const session1 = 'session-1' as SessionID;
const session2 = 'session-2' as SessionID;
const initialState: CoValueKnownState = {
...emptyKnownState(id),
sessions: { [session1]: 5 },
};
const combineState: CoValueKnownState = {
...emptyKnownState(id),
sessions: { [session2]: 10 },
};
peerKnownStates.dispatch({ type: 'SET', id, value: initialState });
peerKnownStates.dispatch({ type: 'COMBINE_WITH', id, value: combineState });
const result = peerKnownStates.get(id);
expect(result?.sessions).toEqual({ [session1]: 5, [session2]: 10 });
});
test('should set as empty', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const sessionId = 'session-1' as SessionID;
const initialState: CoValueKnownState = {
...emptyKnownState(id),
sessions: { [sessionId]: 5 },
};
peerKnownStates.dispatch({ type: 'SET', id, value: initialState });
peerKnownStates.dispatch({ type: 'SET_AS_EMPTY', id });
const result = peerKnownStates.get(id);
expect(result).toEqual(emptyKnownState(id));
});
test('should trigger listeners on dispatch', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const listener = vi.fn();
peerKnownStates.subscribe(listener);
peerKnownStates.dispatch({ type: 'SET_AS_EMPTY', id });
expect(listener).toHaveBeenCalledWith(id, emptyKnownState(id));
});
test('should unsubscribe listener', () => {
const peerKnownStates = new PeerKnownStates();
const id = 'test-id' as RawCoID;
const listener = vi.fn();
const unsubscribe = peerKnownStates.subscribe(listener);
unsubscribe();
peerKnownStates.dispatch({ type: 'SET_AS_EMPTY', id });
expect(listener).not.toHaveBeenCalled();
});
});

View File

@@ -0,0 +1,103 @@
import { RawCoID, SessionID } from "./ids.js";
import { CoValueKnownState, emptyKnownState, combinedKnownStates } from "./sync.js";
type PeerKnownStateActions = {
type: "SET_AS_EMPTY";
id: RawCoID;
} | {
type: "UPDATE_HEADER";
id: RawCoID;
header: boolean;
} |
{
type: "UPDATE_SESSION_COUNTER";
id: RawCoID;
sessionId: SessionID;
value: number;
} |
{
type: "SET";
id: RawCoID;
value: CoValueKnownState;
} |
{
type: "COMBINE_WITH";
id: RawCoID;
value: CoValueKnownState;
};
export class PeerKnownStates {
private coValues = new Map<RawCoID, CoValueKnownState>();
private updateHeader(id: RawCoID, header: boolean) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
knownState.header = header;
this.coValues.set(id, knownState);
}
private combineWith(id: RawCoID, value: CoValueKnownState) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
this.coValues.set(id, combinedKnownStates(knownState, value));
}
private updateSessionCounter(
id: RawCoID,
sessionId: SessionID,
value: number
) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
const currentValue = knownState.sessions[sessionId] || 0;
knownState.sessions[sessionId] = Math.max(currentValue, value);
this.coValues.set(id, knownState);
}
get(id: RawCoID) {
return this.coValues.get(id);
}
has(id: RawCoID) {
return this.coValues.has(id);
}
dispatch(action: PeerKnownStateActions) {
switch (action.type) {
case "UPDATE_HEADER":
this.updateHeader(action.id, action.header);
break;
case "UPDATE_SESSION_COUNTER":
this.updateSessionCounter(
action.id,
action.sessionId,
action.value
);
break;
case "SET":
this.coValues.set(action.id, action.value);
break;
case "COMBINE_WITH":
this.combineWith(action.id, action.value);
break;
case "SET_AS_EMPTY":
this.coValues.set(action.id, emptyKnownState(action.id));
break;
}
this.trigger(action.id, this.coValues.get(action.id));
}
listeners = new Set<(id: RawCoID, knownState: CoValueKnownState | undefined) => void>();
private trigger(id: RawCoID, knownState: CoValueKnownState | undefined) {
for (const listener of this.listeners) {
listener(id, knownState);
}
}
subscribe(listener: (id: RawCoID, knownState: CoValueKnownState | undefined) => void) {
this.listeners.add(listener);
return () => {
this.listeners.delete(listener);
};
}
}

View File

@@ -1,16 +1,21 @@
import { RawCoID } from "./ids.js";
import { PeerKnownStates } from "./PeerKnownStates.js";
import { CO_VALUE_PRIORITY } from "./priority.js";
import {
PriorityBasedMessageQueue,
QueueEntry,
} from "./PriorityBasedMessageQueue.js";
import { CoValueKnownState, Peer, SyncMessage } from "./sync.js";
import {
Peer,
SyncMessage,
} from "./sync.js";
export class PeerState {
constructor(private peer: Peer) {}
readonly optimisticKnownStates: { [id: RawCoID]: CoValueKnownState } = {};
public knownStates = new PeerKnownStates();
readonly toldKnownState: Set<RawCoID> = new Set();
get id() {
return this.peer.id;
}
@@ -44,7 +49,6 @@ export class PeerState {
this.processing = true;
let entry: QueueEntry | undefined;
while ((entry = this.queue.pull())) {
// Awaiting the push to send one message at a time

View File

@@ -1,15 +1,6 @@
import { CoValueCore } from "./coValueCore.js";
import { PeerID } from "./sync.js";
function createResolvablePromise<T>() {
let resolve!: (value: T) => void;
const promise = new Promise<T>((res) => {
resolve = res;
});
return { promise, resolve };
}
import { createResolvablePromise } from "./utils.js";
class CoValueUnknownState {
type = "unknown" as const;

View File

@@ -6,6 +6,7 @@ import { CoValueState } from "./coValueState.js";
import { RawCoID, SessionID } from "./ids.js";
import { PeerState } from "./PeerState.js";
import { CoValuePriority } from "./priority.js";
import { createResolvablePromise } from "./utils.js";
export type CoValueKnownState = {
id: RawCoID;
@@ -108,11 +109,7 @@ export function combinedKnownStates(
export class SyncManager {
peers: { [key: PeerID]: PeerState } = {};
local: LocalNode;
requestedSyncs: {
[id: RawCoID]:
| { done: Promise<void>; nRequestsThisTick: number }
| undefined;
} = {};
requestedSyncs = new Map<RawCoID, Promise<void>>();
constructor(local: LocalNode) {
this.local = local;
@@ -253,12 +250,11 @@ export class SyncManager {
);
const newContentPieces = coValue.newContentSince(
peer.optimisticKnownStates[id],
peer.knownStates.get(id),
);
if (newContentPieces) {
const optimisticKnownStateBefore =
peer.optimisticKnownStates[id] || emptyKnownState(id);
const optimisticKnownStateBefore = peer.knownStates.get(id);
const sendPieces = async () => {
let lastYield = performance.now();
@@ -285,14 +281,19 @@ export class SyncManager {
sendPieces().catch((e) => {
console.error("Error sending new content piece, retrying", e);
peer.optimisticKnownStates[id] = optimisticKnownStateBefore;
peer.knownStates.dispatch({
type: "SET",
id,
value: optimisticKnownStateBefore ?? emptyKnownState(id),
});
return this.sendNewContentIncludingDependencies(id, peer);
});
peer.optimisticKnownStates[id] = combinedKnownStates(
optimisticKnownStateBefore,
coValue.knownState(),
);
peer.knownStates.dispatch({
type: "COMBINE_WITH",
id,
value: coValue.knownState(),
});
}
}
@@ -308,11 +309,10 @@ export class SyncManager {
// console.log("subscribing to after peer added", id, peer.id)
await this.subscribeToIncludingDependencies(id, peerState);
peerState.optimisticKnownStates[id] = {
id: id,
header: false,
sessions: {},
};
peerState.knownStates.dispatch({
type: "SET_AS_EMPTY",
id,
});
}
};
void initialSync();
@@ -376,7 +376,12 @@ export class SyncManager {
}
async handleLoad(msg: LoadMessage, peer: PeerState) {
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
peer.knownStates.dispatch({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
let entry = this.local.coValues[msg.id];
if (!entry) {
@@ -426,7 +431,11 @@ export class SyncManager {
const loaded = await entry.state.ready;
if (loaded === "unavailable") {
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
peer.knownStates.dispatch({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
peer.toldKnownState.add(msg.id);
this.trySendToPeer(peer, {
@@ -449,10 +458,11 @@ export class SyncManager {
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
let entry = this.local.coValues[msg.id];
peer.optimisticKnownStates[msg.id] = combinedKnownStates(
peer.optimisticKnownStates[msg.id] || emptyKnownState(msg.id),
knownStateIn(msg),
);
peer.knownStates.dispatch({
type: "COMBINE_WITH",
id: msg.id,
value: knownStateIn(msg),
});
if (!entry) {
if (msg.asDependencyOf) {
@@ -479,7 +489,7 @@ export class SyncManager {
}
if (entry.state.type === "unknown") {
const availableOnPeer = peer.optimisticKnownStates[msg.id]?.header;
const availableOnPeer = peer.knownStates.get(msg.id)?.header;
if (!availableOnPeer) {
entry.dispatch({
@@ -505,15 +515,6 @@ export class SyncManager {
return;
}
const peerOptimisticKnownState = peer.optimisticKnownStates[msg.id];
if (!peerOptimisticKnownState) {
console.error(
"Expected optimisticKnownState to be set for coValue we receive new content for",
);
return;
}
let coValue: CoValueCore;
if (entry.state.type === "unknown") {
@@ -522,7 +523,11 @@ export class SyncManager {
return;
}
peerOptimisticKnownState.header = true;
peer.knownStates.dispatch({
type: "UPDATE_HEADER",
id: msg.id,
header: true,
});
coValue = new CoValueCore(msg.header, this.local);
@@ -588,14 +593,6 @@ export class SyncManager {
);
}
// const theirTotalnTxs = Object.values(
// peer.optimisticKnownStates[msg.id]?.sessions || {},
// ).reduce((sum, nTxs) => sum + nTxs, 0);
// const ourTotalnTxs = [...coValue.sessionLogs.values()].reduce(
// (sum, session) => sum + session.transactions.length,
// 0,
// );
if (result.isErr()) {
console.error(
"Failed to add transactions from",
@@ -611,11 +608,14 @@ export class SyncManager {
continue;
}
peerOptimisticKnownState.sessions[sessionID] = Math.max(
peerOptimisticKnownState.sessions[sessionID] || 0,
newContentForSession.after +
peer.knownStates.dispatch({
type: "UPDATE_SESSION_COUNTER",
id: msg.id,
sessionId: sessionID,
value:
newContentForSession.after +
newContentForSession.newTransactions.length,
);
});
}
await this.syncCoValue(coValue);
@@ -632,7 +632,11 @@ export class SyncManager {
}
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
peer.optimisticKnownStates[msg.id] = msg;
peer.knownStates.dispatch({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
return this.sendNewContentIncludingDependencies(msg.id, peer);
}
@@ -642,27 +646,24 @@ export class SyncManager {
}
async syncCoValue(coValue: CoValueCore) {
if (this.requestedSyncs[coValue.id]) {
this.requestedSyncs[coValue.id]!.nRequestsThisTick++;
return this.requestedSyncs[coValue.id]!.done;
} else {
const done = new Promise<void>((resolve) => {
queueMicrotask(async () => {
delete this.requestedSyncs[coValue.id];
// if (entry.nRequestsThisTick >= 2) {
// console.log("Syncing", coValue.id, "for", entry.nRequestsThisTick, "requests");
// }
await this.actuallySyncCoValue(coValue);
resolve();
});
});
const entry = {
done,
nRequestsThisTick: 1,
};
this.requestedSyncs[coValue.id] = entry;
return done;
const syncRequest = this.requestedSyncs.get(coValue.id);
if (syncRequest) {
return syncRequest;
}
const { promise, resolve } = createResolvablePromise<void>();
this.requestedSyncs.set(coValue.id, promise);
queueMicrotask(async () => {
this.requestedSyncs.delete(coValue.id);
await this.actuallySyncCoValue(coValue);
resolve();
});
return promise;
}
async actuallySyncCoValue(coValue: CoValueCore) {
@@ -674,9 +675,7 @@ export class SyncManager {
// });
// blockingSince = performance.now();
// }
const optimisticKnownState = peer.optimisticKnownStates[coValue.id];
if (optimisticKnownState) {
if (peer.knownStates.has(coValue.id)) {
await this.tellUntoldKnownStateIncludingDependencies(
coValue.id,
peer,

View File

@@ -27,8 +27,6 @@ describe("PeerState", () => {
expect(peerState.priority).toBe(1);
expect(peerState.crashOnClose).toBe(false);
expect(peerState.closed).toBe(false);
expect(peerState.optimisticKnownStates).toEqual({});
expect(peerState.toldKnownState).toEqual(new Set());
});
test("should push outgoing message to peer", async () => {

View File

@@ -761,8 +761,8 @@ test.skip("When replaying creation and transactions of a coValue as new content,
expect(groupTellKnownStateMsg).toMatchObject(groupStateEx(group));
expect(
node2.syncManager.peers["test1"]!.optimisticKnownStates[group.core.id],
).toBeDefined();
node2.syncManager.peers["test1"]!.knownStates.has(group.core.id),
).toBe(true);
// await inTx1.push(adminTellKnownStateMsg);
await inTx1.push(groupTellKnownStateMsg);

View File

@@ -0,0 +1,9 @@
export function createResolvablePromise<T>() {
let resolve!: (value: T) => void;
const promise = new Promise<T>((res) => {
resolve = res;
});
return { promise, resolve };
}