Compare commits
12 Commits
jazz-inspe
...
refactor/s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
598265f5be | ||
|
|
df16629eac | ||
|
|
e5eed7bd35 | ||
|
|
39ae497153 | ||
|
|
e0b5df7f9e | ||
|
|
264c1c1fa5 | ||
|
|
54b2907f08 | ||
|
|
f3e0b1ed74 | ||
|
|
ca9162476c | ||
|
|
f98f6f1a7a | ||
|
|
2624d037f3 | ||
|
|
2d7917b169 |
@@ -9,7 +9,7 @@ export class CoValuesStore {
|
||||
let entry = this.coValues.get(id);
|
||||
|
||||
if (!entry) {
|
||||
entry = CoValueState.Unknown(id);
|
||||
entry = new CoValueState(id);
|
||||
this.coValues.set(id, entry);
|
||||
}
|
||||
|
||||
@@ -18,10 +18,7 @@ export class CoValuesStore {
|
||||
|
||||
setAsAvailable(id: RawCoID, coValue: CoValueCore) {
|
||||
const entry = this.get(id);
|
||||
entry.dispatch({
|
||||
type: "available",
|
||||
coValue,
|
||||
});
|
||||
entry.markAvailable(coValue);
|
||||
}
|
||||
|
||||
getEntries() {
|
||||
|
||||
@@ -5,57 +5,37 @@ import {
|
||||
emptyKnownState,
|
||||
} from "./sync.js";
|
||||
|
||||
export type PeerKnownStateActions =
|
||||
| {
|
||||
type: "SET_AS_EMPTY";
|
||||
id: RawCoID;
|
||||
}
|
||||
| {
|
||||
type: "UPDATE_HEADER";
|
||||
id: RawCoID;
|
||||
header: boolean;
|
||||
}
|
||||
| {
|
||||
type: "UPDATE_SESSION_COUNTER";
|
||||
id: RawCoID;
|
||||
sessionId: SessionID;
|
||||
value: number;
|
||||
}
|
||||
| {
|
||||
type: "SET";
|
||||
id: RawCoID;
|
||||
value: CoValueKnownState;
|
||||
}
|
||||
| {
|
||||
type: "COMBINE_WITH";
|
||||
id: RawCoID;
|
||||
value: CoValueKnownState;
|
||||
};
|
||||
|
||||
export class PeerKnownStates {
|
||||
private coValues = new Map<RawCoID, CoValueKnownState>();
|
||||
|
||||
private updateHeader(id: RawCoID, header: boolean) {
|
||||
updateHeader(id: RawCoID, header: boolean) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
knownState.header = header;
|
||||
this.coValues.set(id, knownState);
|
||||
this.triggerUpdate(id);
|
||||
}
|
||||
|
||||
private combineWith(id: RawCoID, value: CoValueKnownState) {
|
||||
combineWith(id: RawCoID, value: CoValueKnownState) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
this.coValues.set(id, combinedKnownStates(knownState, value));
|
||||
this.triggerUpdate(id);
|
||||
}
|
||||
|
||||
private updateSessionCounter(
|
||||
id: RawCoID,
|
||||
sessionId: SessionID,
|
||||
value: number,
|
||||
) {
|
||||
updateSessionCounter(id: RawCoID, sessionId: SessionID, value: number) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
const currentValue = knownState.sessions[sessionId] || 0;
|
||||
knownState.sessions[sessionId] = Math.max(currentValue, value);
|
||||
|
||||
this.coValues.set(id, knownState);
|
||||
this.triggerUpdate(id);
|
||||
}
|
||||
|
||||
set(id: RawCoID, knownState: CoValueKnownState | "empty") {
|
||||
this.coValues.set(
|
||||
id,
|
||||
knownState === "empty" ? emptyKnownState(id) : knownState,
|
||||
);
|
||||
this.triggerUpdate(id);
|
||||
}
|
||||
|
||||
get(id: RawCoID) {
|
||||
@@ -72,28 +52,6 @@ export class PeerKnownStates {
|
||||
return clone;
|
||||
}
|
||||
|
||||
dispatch(action: PeerKnownStateActions) {
|
||||
switch (action.type) {
|
||||
case "UPDATE_HEADER":
|
||||
this.updateHeader(action.id, action.header);
|
||||
break;
|
||||
case "UPDATE_SESSION_COUNTER":
|
||||
this.updateSessionCounter(action.id, action.sessionId, action.value);
|
||||
break;
|
||||
case "SET":
|
||||
this.coValues.set(action.id, action.value);
|
||||
break;
|
||||
case "COMBINE_WITH":
|
||||
this.combineWith(action.id, action.value);
|
||||
break;
|
||||
case "SET_AS_EMPTY":
|
||||
this.coValues.set(action.id, emptyKnownState(action.id));
|
||||
break;
|
||||
}
|
||||
|
||||
this.triggerUpdate(action.id);
|
||||
}
|
||||
|
||||
listeners = new Set<(id: RawCoID, knownState: CoValueKnownState) => void>();
|
||||
|
||||
triggerUpdate(id: RawCoID) {
|
||||
@@ -114,3 +72,8 @@ export class PeerKnownStates {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
export type ReadonlyPeerKnownStates = Pick<
|
||||
PeerKnownStates,
|
||||
"get" | "has" | "clone" | "subscribe"
|
||||
>;
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { PeerKnownStateActions, PeerKnownStates } from "./PeerKnownStates.js";
|
||||
import { PeerKnownStates, ReadonlyPeerKnownStates } from "./PeerKnownStates.js";
|
||||
import {
|
||||
PriorityBasedMessageQueue,
|
||||
QueueEntry,
|
||||
@@ -7,7 +7,7 @@ import { TryAddTransactionsError } from "./coValueCore.js";
|
||||
import { RawCoID, SessionID } from "./ids.js";
|
||||
import { logger } from "./logger.js";
|
||||
import { CO_VALUE_PRIORITY } from "./priority.js";
|
||||
import { Peer, SyncMessage } from "./sync.js";
|
||||
import { CoValueKnownState, Peer, SyncMessage } from "./sync.js";
|
||||
|
||||
export class PeerState {
|
||||
private queue: PriorityBasedMessageQueue;
|
||||
@@ -17,7 +17,7 @@ export class PeerState {
|
||||
|
||||
constructor(
|
||||
private peer: Peer,
|
||||
knownStates: PeerKnownStates | undefined,
|
||||
knownStates: ReadonlyPeerKnownStates | undefined,
|
||||
) {
|
||||
/**
|
||||
* We set as default priority HIGH to handle all the messages without a
|
||||
@@ -28,14 +28,16 @@ export class PeerState {
|
||||
this.queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.HIGH, {
|
||||
peerRole: peer.role,
|
||||
});
|
||||
this.optimisticKnownStates = knownStates?.clone() ?? new PeerKnownStates();
|
||||
|
||||
this._knownStates = knownStates?.clone() ?? new PeerKnownStates();
|
||||
|
||||
// We assume that exchanges with storage peers are always successful
|
||||
// hence we don't need to differentiate between knownStates and optimisticKnownStates
|
||||
if (peer.role === "storage") {
|
||||
this.knownStates = this.optimisticKnownStates;
|
||||
this._optimisticKnownStates = "assumeInfallible";
|
||||
} else {
|
||||
this.knownStates = knownStates?.clone() ?? new PeerKnownStates();
|
||||
this._optimisticKnownStates =
|
||||
knownStates?.clone() ?? new PeerKnownStates();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -44,7 +46,11 @@ export class PeerState {
|
||||
*
|
||||
* This can be used to safely track the sync state of a coValue in a given peer.
|
||||
*/
|
||||
readonly knownStates: PeerKnownStates;
|
||||
readonly _knownStates: PeerKnownStates;
|
||||
|
||||
get knownStates(): ReadonlyPeerKnownStates {
|
||||
return this._knownStates;
|
||||
}
|
||||
|
||||
/**
|
||||
* This one collects the known states "optimistically".
|
||||
@@ -53,18 +59,68 @@ export class PeerState {
|
||||
* The main difference with knownState is that this is updated when the content is sent to the peer without
|
||||
* waiting for any acknowledgement from the peer.
|
||||
*/
|
||||
readonly optimisticKnownStates: PeerKnownStates;
|
||||
readonly _optimisticKnownStates: PeerKnownStates | "assumeInfallible";
|
||||
|
||||
get optimisticKnownStates(): ReadonlyPeerKnownStates {
|
||||
if (this._optimisticKnownStates === "assumeInfallible") {
|
||||
return this.knownStates;
|
||||
}
|
||||
|
||||
return this._optimisticKnownStates;
|
||||
}
|
||||
|
||||
readonly toldKnownState: Set<RawCoID> = new Set();
|
||||
|
||||
dispatchToKnownStates(action: PeerKnownStateActions) {
|
||||
this.knownStates.dispatch(action);
|
||||
updateHeader(id: RawCoID, header: boolean) {
|
||||
this._knownStates.updateHeader(id, header);
|
||||
|
||||
if (this.role !== "storage") {
|
||||
this.optimisticKnownStates.dispatch(action);
|
||||
if (this._optimisticKnownStates !== "assumeInfallible") {
|
||||
this._optimisticKnownStates.updateHeader(id, header);
|
||||
}
|
||||
}
|
||||
|
||||
readonly erroredCoValues: Map<RawCoID, TryAddTransactionsError> = new Map();
|
||||
combineWith(id: RawCoID, value: CoValueKnownState) {
|
||||
this._knownStates.combineWith(id, value);
|
||||
|
||||
if (this._optimisticKnownStates !== "assumeInfallible") {
|
||||
this._optimisticKnownStates.combineWith(id, value);
|
||||
}
|
||||
}
|
||||
|
||||
combineOptimisticWith(id: RawCoID, value: CoValueKnownState) {
|
||||
if (this._optimisticKnownStates === "assumeInfallible") {
|
||||
this._knownStates.combineWith(id, value);
|
||||
} else {
|
||||
this._optimisticKnownStates.combineWith(id, value);
|
||||
}
|
||||
}
|
||||
|
||||
updateSessionCounter(id: RawCoID, sessionId: SessionID, value: number) {
|
||||
this._knownStates.updateSessionCounter(id, sessionId, value);
|
||||
|
||||
if (this._optimisticKnownStates !== "assumeInfallible") {
|
||||
this._optimisticKnownStates.updateSessionCounter(id, sessionId, value);
|
||||
}
|
||||
}
|
||||
|
||||
setKnownState(id: RawCoID, knownState: CoValueKnownState | "empty") {
|
||||
this._knownStates.set(id, knownState);
|
||||
|
||||
if (this._optimisticKnownStates !== "assumeInfallible") {
|
||||
this._optimisticKnownStates.set(id, knownState);
|
||||
}
|
||||
}
|
||||
|
||||
setOptimisticKnownState(
|
||||
id: RawCoID,
|
||||
knownState: CoValueKnownState | "empty",
|
||||
) {
|
||||
if (this._optimisticKnownStates === "assumeInfallible") {
|
||||
this._knownStates.set(id, knownState);
|
||||
} else {
|
||||
this._optimisticKnownStates.set(id, knownState);
|
||||
}
|
||||
}
|
||||
|
||||
get id() {
|
||||
return this.peer.id;
|
||||
|
||||
@@ -116,11 +116,11 @@ export class SyncStateManager {
|
||||
|
||||
const entry = this.syncManager.local.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
if (!entry.isAvailable()) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const coValue = entry.state.coValue;
|
||||
const coValue = entry.core;
|
||||
const coValueSessions = coValue.knownState().sessions;
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,370 +1,189 @@
|
||||
import { ValueType } from "@opentelemetry/api";
|
||||
import { UpDownCounter, metrics } from "@opentelemetry/api";
|
||||
import { PeerState } from "./PeerState.js";
|
||||
import { CoValueCore } from "./coValueCore.js";
|
||||
import { CoValueCore, TryAddTransactionsError } from "./coValueCore.js";
|
||||
import { RawCoID } from "./ids.js";
|
||||
import { logger } from "./logger.js";
|
||||
import { PeerID } from "./sync.js";
|
||||
import { PeerID, emptyKnownState } from "./sync.js";
|
||||
|
||||
export const CO_VALUE_LOADING_CONFIG = {
|
||||
MAX_RETRIES: 2,
|
||||
TIMEOUT: 30_000,
|
||||
};
|
||||
|
||||
export class CoValueUnknownState {
|
||||
type = "unknown" as const;
|
||||
}
|
||||
|
||||
export class CoValueLoadingState {
|
||||
type = "loading" as const;
|
||||
export class CoValueState {
|
||||
private peers = new Map<
|
||||
PeerID,
|
||||
ReturnType<typeof createResolvablePromise<void>>
|
||||
| { type: "unknown" | "pending" | "available" | "unavailable" }
|
||||
| { type: "errored"; error: TryAddTransactionsError }
|
||||
>();
|
||||
private resolveResult: (value: CoValueCore | "unavailable") => void;
|
||||
|
||||
result: Promise<CoValueCore | "unavailable">;
|
||||
private peersToRequestFrom = new Map<PeerID, PeerState>();
|
||||
loading = false;
|
||||
|
||||
constructor(peersIds: Iterable<PeerID>) {
|
||||
this.peers = new Map();
|
||||
core: CoValueCore | null = null;
|
||||
id: RawCoID;
|
||||
|
||||
for (const peerId of peersIds) {
|
||||
this.peers.set(peerId, createResolvablePromise<void>());
|
||||
listeners: Set<(state: CoValueState) => void> = new Set();
|
||||
|
||||
constructor(id: RawCoID) {
|
||||
this.id = id;
|
||||
}
|
||||
|
||||
addListener(listener: (state: CoValueState) => void) {
|
||||
this.listeners.add(listener);
|
||||
listener(this);
|
||||
}
|
||||
|
||||
removeListener(listener: (state: CoValueState) => void) {
|
||||
this.listeners.delete(listener);
|
||||
}
|
||||
|
||||
private notifyListeners() {
|
||||
this.loadFromNextPeer();
|
||||
|
||||
for (const listener of this.listeners) {
|
||||
listener(this);
|
||||
}
|
||||
|
||||
const { resolve, promise } = createResolvablePromise<
|
||||
CoValueCore | "unavailable"
|
||||
>();
|
||||
|
||||
this.result = promise;
|
||||
this.resolveResult = resolve;
|
||||
}
|
||||
|
||||
markAsUnavailable(peerId: PeerID) {
|
||||
const entry = this.peers.get(peerId);
|
||||
|
||||
if (entry) {
|
||||
entry.resolve();
|
||||
}
|
||||
|
||||
this.peers.delete(peerId);
|
||||
|
||||
// If none of the peers have the coValue, we resolve to unavailable
|
||||
if (this.peers.size === 0) {
|
||||
this.resolve("unavailable");
|
||||
}
|
||||
}
|
||||
|
||||
resolve(value: CoValueCore | "unavailable") {
|
||||
this.resolveResult(value);
|
||||
for (const entry of this.peers.values()) {
|
||||
entry.resolve();
|
||||
}
|
||||
this.peers.clear();
|
||||
}
|
||||
|
||||
// Wait for a specific peer to have a known state
|
||||
waitForPeer(peerId: PeerID) {
|
||||
const entry = this.peers.get(peerId);
|
||||
|
||||
if (!entry) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return entry.promise;
|
||||
}
|
||||
}
|
||||
|
||||
export class CoValueAvailableState {
|
||||
type = "available" as const;
|
||||
|
||||
constructor(public coValue: CoValueCore) {}
|
||||
}
|
||||
|
||||
export class CoValueUnavailableState {
|
||||
type = "unavailable" as const;
|
||||
}
|
||||
|
||||
type CoValueStateAction =
|
||||
| {
|
||||
type: "load-requested";
|
||||
peersIds: PeerID[];
|
||||
}
|
||||
| {
|
||||
type: "not-found-in-peer";
|
||||
peerId: PeerID;
|
||||
}
|
||||
| {
|
||||
type: "available";
|
||||
coValue: CoValueCore;
|
||||
};
|
||||
|
||||
type CoValueStateType =
|
||||
| CoValueUnknownState
|
||||
| CoValueLoadingState
|
||||
| CoValueAvailableState
|
||||
| CoValueUnavailableState;
|
||||
|
||||
export class CoValueState {
|
||||
promise?: Promise<CoValueCore | "unavailable">;
|
||||
private resolve?: (value: CoValueCore | "unavailable") => void;
|
||||
private counter: UpDownCounter;
|
||||
|
||||
constructor(
|
||||
public id: RawCoID,
|
||||
public state: CoValueStateType,
|
||||
) {
|
||||
this.counter = metrics
|
||||
.getMeter("cojson")
|
||||
.createUpDownCounter("jazz.covalues.loaded", {
|
||||
description: "The number of covalues in the system",
|
||||
unit: "covalue",
|
||||
valueType: ValueType.INT,
|
||||
});
|
||||
|
||||
this.counter.add(1, {
|
||||
state: this.state.type,
|
||||
});
|
||||
}
|
||||
|
||||
static Unknown(id: RawCoID) {
|
||||
return new CoValueState(id, new CoValueUnknownState());
|
||||
}
|
||||
|
||||
static Loading(id: RawCoID, peersIds: Iterable<PeerID>) {
|
||||
return new CoValueState(id, new CoValueLoadingState(peersIds));
|
||||
}
|
||||
|
||||
static Available(coValue: CoValueCore) {
|
||||
return new CoValueState(coValue.id, new CoValueAvailableState(coValue));
|
||||
}
|
||||
|
||||
static Unavailable(id: RawCoID) {
|
||||
return new CoValueState(id, new CoValueUnavailableState());
|
||||
}
|
||||
|
||||
async getCoValue() {
|
||||
if (this.state.type === "available") {
|
||||
return this.state.coValue;
|
||||
if (this.core) {
|
||||
return this.core;
|
||||
}
|
||||
if (this.state.type === "unavailable") {
|
||||
|
||||
if (this.isDefinitelyUnavailable()) {
|
||||
return "unavailable";
|
||||
}
|
||||
|
||||
// If we don't have a resolved state we return a new promise
|
||||
// that will be resolved when the state will move to available or unavailable
|
||||
if (!this.promise) {
|
||||
const { promise, resolve } = createResolvablePromise<
|
||||
CoValueCore | "unavailable"
|
||||
>();
|
||||
return new Promise<CoValueCore | "unavailable">((resolve) => {
|
||||
const listener = (state: CoValueState) => {
|
||||
if (state.core) {
|
||||
resolve(state.core);
|
||||
this.removeListener(listener);
|
||||
} else if (this.isDefinitelyUnavailable()) {
|
||||
resolve("unavailable");
|
||||
this.removeListener(listener);
|
||||
}
|
||||
};
|
||||
|
||||
this.promise = promise;
|
||||
this.resolve = resolve;
|
||||
}
|
||||
|
||||
return this.promise;
|
||||
}
|
||||
|
||||
private moveToState(value: CoValueStateType) {
|
||||
this.counter.add(-1, {
|
||||
state: this.state.type,
|
||||
this.addListener(listener);
|
||||
});
|
||||
this.state = value;
|
||||
|
||||
this.counter.add(1, {
|
||||
state: this.state.type,
|
||||
});
|
||||
|
||||
if (!this.resolve) {
|
||||
return;
|
||||
}
|
||||
|
||||
// If the state is available we resolve the promise
|
||||
// and clear it to handle the possible transition from unavailable to available
|
||||
if (value.type === "available") {
|
||||
this.resolve(value.coValue);
|
||||
this.clearPromise();
|
||||
} else if (value.type === "unavailable") {
|
||||
this.resolve("unavailable");
|
||||
this.clearPromise();
|
||||
}
|
||||
}
|
||||
|
||||
private clearPromise() {
|
||||
this.promise = undefined;
|
||||
this.resolve = undefined;
|
||||
}
|
||||
|
||||
async loadFromPeers(peers: PeerState[]) {
|
||||
const state = this.state;
|
||||
for (const peer of peers) {
|
||||
this.peersToRequestFrom.set(peer.id, peer);
|
||||
}
|
||||
|
||||
if (state.type === "loading" || state.type === "available") {
|
||||
this.loadFromNextPeer();
|
||||
}
|
||||
|
||||
private loadFromNextPeer() {
|
||||
if (this.isLoading() || this.peersToRequestFrom.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (peers.length === 0) {
|
||||
this.moveToState(new CoValueUnavailableState());
|
||||
// TODO: Load the peers with the same priority in parallel
|
||||
let selectedPeer: PeerState | undefined;
|
||||
|
||||
for (const peer of this.peersToRequestFrom.values()) {
|
||||
const currentState = this.peers.get(peer.id);
|
||||
|
||||
switch (currentState?.type) {
|
||||
case "available":
|
||||
case "errored":
|
||||
case "pending":
|
||||
this.peersToRequestFrom.delete(peer.id);
|
||||
continue;
|
||||
|
||||
case "unavailable":
|
||||
case "unknown":
|
||||
default:
|
||||
if (
|
||||
!peer.shouldRetryUnavailableCoValues() &&
|
||||
currentState?.type === "unavailable"
|
||||
) {
|
||||
this.peersToRequestFrom.delete(peer.id);
|
||||
} else if (
|
||||
!selectedPeer ||
|
||||
(peer.priority ?? 0) > (selectedPeer.priority ?? 0)
|
||||
) {
|
||||
selectedPeer = peer;
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!selectedPeer) {
|
||||
return;
|
||||
}
|
||||
|
||||
const doLoad = async (peersToLoadFrom: PeerState[]) => {
|
||||
const peersWithoutErrors = getPeersWithoutErrors(
|
||||
peersToLoadFrom,
|
||||
this.id,
|
||||
);
|
||||
this.peersToRequestFrom.delete(selectedPeer.id);
|
||||
this.peers.set(selectedPeer.id, { type: "pending" });
|
||||
|
||||
// If we are in the loading state we move to a new loading state
|
||||
// to reset all the loading promises
|
||||
if (
|
||||
this.state.type === "loading" ||
|
||||
this.state.type === "unknown" ||
|
||||
this.state.type === "unavailable"
|
||||
) {
|
||||
this.moveToState(
|
||||
new CoValueLoadingState(peersWithoutErrors.map((p) => p.id)),
|
||||
);
|
||||
}
|
||||
const knownState = this.core
|
||||
? this.core.knownState()
|
||||
: emptyKnownState(this.id);
|
||||
|
||||
// Assign the current state to a variable to not depend on the state changes
|
||||
// that may happen while we wait for loadCoValueFromPeers to complete
|
||||
const currentState = this.state;
|
||||
selectedPeer
|
||||
.pushOutgoingMessage({
|
||||
action: "load",
|
||||
...knownState,
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.warn(`Failed to push load message to peer ${selectedPeer.id}`, {
|
||||
err,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// If we entered successfully the loading state, we load the coValue from the peers
|
||||
//
|
||||
// We may not enter the loading state if the coValue has become available in between
|
||||
// of the retries
|
||||
if (currentState.type === "loading") {
|
||||
await loadCoValueFromPeers(this, peersWithoutErrors);
|
||||
markNotFoundInPeer(peerId: PeerID) {
|
||||
this.peers.set(peerId, { type: "unavailable" });
|
||||
this.notifyListeners();
|
||||
}
|
||||
|
||||
const result = await currentState.result;
|
||||
return result !== "unavailable";
|
||||
}
|
||||
markAvailable(coValue: CoValueCore) {
|
||||
this.core = coValue;
|
||||
this.notifyListeners();
|
||||
}
|
||||
|
||||
return currentState.type === "available";
|
||||
};
|
||||
markErrored(peerId: PeerID, error: TryAddTransactionsError) {
|
||||
this.peers.set(peerId, { type: "errored", error });
|
||||
this.notifyListeners();
|
||||
}
|
||||
|
||||
await doLoad(peers);
|
||||
isErroredInPeer(peerId: PeerID) {
|
||||
return this.peers.get(peerId)?.type === "errored";
|
||||
}
|
||||
|
||||
// Retry loading from peers that have the retry flag enabled
|
||||
const peersWithRetry = peers.filter((p) =>
|
||||
p.shouldRetryUnavailableCoValues(),
|
||||
isAvailable(): this is { type: "available"; core: CoValueCore } {
|
||||
return !!this.core;
|
||||
}
|
||||
|
||||
isUnknown() {
|
||||
if (this.core) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return this.peers.values().every((p) => p.type === "unknown");
|
||||
}
|
||||
|
||||
isLoading() {
|
||||
return this.peers.values().some((p) => p.type === "pending");
|
||||
}
|
||||
|
||||
isDefinitelyUnavailable() {
|
||||
if (this.core) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return (
|
||||
this.peers
|
||||
.values()
|
||||
.every((p) => p.type === "unavailable" || p.type === "errored") &&
|
||||
!this.isAvailable()
|
||||
);
|
||||
|
||||
if (peersWithRetry.length > 0) {
|
||||
// We want to exit early if the coValue becomes available in between the retries
|
||||
await Promise.race([
|
||||
this.getCoValue(),
|
||||
runWithRetry(
|
||||
() => doLoad(peersWithRetry),
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
// If after the retries the coValue is still loading, we consider the load failed
|
||||
if (this.state.type === "loading") {
|
||||
this.moveToState(new CoValueUnavailableState());
|
||||
}
|
||||
}
|
||||
|
||||
dispatch(action: CoValueStateAction) {
|
||||
const currentState = this.state;
|
||||
|
||||
switch (action.type) {
|
||||
case "available":
|
||||
if (currentState.type === "loading") {
|
||||
currentState.resolve(action.coValue);
|
||||
}
|
||||
|
||||
// It should be always possible to move to the available state
|
||||
this.moveToState(new CoValueAvailableState(action.coValue));
|
||||
|
||||
break;
|
||||
case "not-found-in-peer":
|
||||
if (currentState.type === "loading") {
|
||||
currentState.markAsUnavailable(action.peerId);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function loadCoValueFromPeers(
|
||||
coValueEntry: CoValueState,
|
||||
peers: PeerState[],
|
||||
) {
|
||||
for (const peer of peers) {
|
||||
if (peer.closed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (coValueEntry.state.type === "available") {
|
||||
/**
|
||||
* We don't need to wait for the message to be delivered here.
|
||||
*
|
||||
* This way when the coValue becomes available because it's cached we don't wait for the server
|
||||
* peer to consume the messages queue before moving forward.
|
||||
*/
|
||||
peer
|
||||
.pushOutgoingMessage({
|
||||
action: "load",
|
||||
...coValueEntry.state.coValue.knownState(),
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.warn(`Failed to push load message to peer ${peer.id}`, {
|
||||
err,
|
||||
});
|
||||
});
|
||||
} else {
|
||||
/**
|
||||
* We only wait for the load state to be resolved.
|
||||
*/
|
||||
peer
|
||||
.pushOutgoingMessage({
|
||||
action: "load",
|
||||
id: coValueEntry.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
})
|
||||
.catch((err) => {
|
||||
logger.warn(`Failed to push load message to peer ${peer.id}`, {
|
||||
err,
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
const { promise, resolve } = createResolvablePromise<void>();
|
||||
|
||||
/**
|
||||
* Use a very long timeout for storage peers, because under pressure
|
||||
* they may take a long time to consume the messages queue
|
||||
*
|
||||
* TODO: Track errors on storage and do not rely on timeout
|
||||
*/
|
||||
const timeoutDuration =
|
||||
peer.role === "storage"
|
||||
? CO_VALUE_LOADING_CONFIG.TIMEOUT * 10
|
||||
: CO_VALUE_LOADING_CONFIG.TIMEOUT;
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
logger.warn("Failed to load coValue from peer", {
|
||||
coValueId: coValueEntry.id,
|
||||
peerId: peer.id,
|
||||
peerRole: peer.role,
|
||||
});
|
||||
coValueEntry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
resolve();
|
||||
}
|
||||
}, timeoutDuration);
|
||||
await Promise.race([promise, coValueEntry.state.waitForPeer(peer.id)]);
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -391,29 +210,6 @@ async function runWithRetry<T>(fn: () => Promise<T>, maxRetries: number) {
|
||||
}
|
||||
}
|
||||
|
||||
function createResolvablePromise<T>() {
|
||||
let resolve!: (value: T) => void;
|
||||
|
||||
const promise = new Promise<T>((res) => {
|
||||
resolve = res;
|
||||
});
|
||||
|
||||
return { promise, resolve };
|
||||
}
|
||||
|
||||
function sleep(ms: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function getPeersWithoutErrors(peers: PeerState[], coValueId: RawCoID) {
|
||||
return peers.filter((p) => {
|
||||
if (p.erroredCoValues.has(coValueId)) {
|
||||
logger.warn(
|
||||
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
|
||||
@@ -185,10 +185,7 @@ export class RawGroup<
|
||||
const id = getChildGroupId(key);
|
||||
const child = store.get(id);
|
||||
|
||||
if (
|
||||
child.state.type === "unknown" ||
|
||||
child.state.type === "unavailable"
|
||||
) {
|
||||
if (child.isUnknown() || child.isDefinitelyUnavailable()) {
|
||||
child.loadFromPeers(peers).catch(() => {
|
||||
logger.error(`Failed to load child group ${id}`);
|
||||
});
|
||||
|
||||
@@ -139,10 +139,8 @@ export class LocalNode {
|
||||
// we shouldn't need this, but it fixes account data not syncing for new accounts
|
||||
function syncAllCoValuesAfterCreateAccount() {
|
||||
for (const coValueEntry of nodeWithAccount.coValuesStore.getValues()) {
|
||||
if (coValueEntry.state.type === "available") {
|
||||
void nodeWithAccount.syncManager.syncCoValue(
|
||||
coValueEntry.state.coValue,
|
||||
);
|
||||
if (coValueEntry.isAvailable()) {
|
||||
void nodeWithAccount.syncManager.syncCoValue(coValueEntry.core);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -265,7 +263,7 @@ export class LocalNode {
|
||||
|
||||
const entry = this.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
|
||||
if (entry.isUnknown() || entry.isDefinitelyUnavailable()) {
|
||||
const peers =
|
||||
this.syncManager.getServerAndStoragePeers(skipLoadingFromPeer);
|
||||
|
||||
@@ -309,8 +307,8 @@ export class LocalNode {
|
||||
getLoaded<T extends RawCoValue>(id: CoID<T>): T | undefined {
|
||||
const entry = this.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
return entry.state.coValue.getCurrentContent() as T;
|
||||
if (entry.isAvailable()) {
|
||||
return entry.core.getCurrentContent() as T;
|
||||
}
|
||||
|
||||
return undefined;
|
||||
@@ -439,12 +437,12 @@ export class LocalNode {
|
||||
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
|
||||
const entry = this.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
if (!entry.isAvailable()) {
|
||||
throw new Error(
|
||||
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
|
||||
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${JSON.stringify(entry)}`,
|
||||
);
|
||||
}
|
||||
return entry.state.coValue;
|
||||
return entry.core;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
@@ -638,15 +636,13 @@ export class LocalNode {
|
||||
while (coValuesToCopy.length > 0) {
|
||||
const [coValueID, entry] = coValuesToCopy[coValuesToCopy.length - 1]!;
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
if (!entry.isAvailable()) {
|
||||
coValuesToCopy.pop();
|
||||
continue;
|
||||
} else {
|
||||
const allDepsCopied = entry.state.coValue
|
||||
const allDepsCopied = entry.core
|
||||
.getDependedOnCoValues()
|
||||
.every(
|
||||
(dep) => newNode.coValuesStore.get(dep).state.type === "available",
|
||||
);
|
||||
.every((dep) => newNode.coValuesStore.get(dep).isAvailable());
|
||||
|
||||
if (!allDepsCopied) {
|
||||
// move to end of queue
|
||||
@@ -655,9 +651,9 @@ export class LocalNode {
|
||||
}
|
||||
|
||||
const newCoValue = new CoValueCore(
|
||||
entry.state.coValue.header,
|
||||
entry.core.header,
|
||||
newNode,
|
||||
new Map(entry.state.coValue.sessionLogs),
|
||||
new Map(entry.core.sessionLogs),
|
||||
);
|
||||
|
||||
newNode.coValuesStore.setAsAvailable(coValueID, newCoValue);
|
||||
|
||||
@@ -160,7 +160,7 @@ export class SyncManager {
|
||||
}
|
||||
|
||||
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
|
||||
if (peer.erroredCoValues.has(msg.id)) {
|
||||
if (this.local.coValuesStore.get(msg.id).isErroredInPeer(peer.id)) {
|
||||
logger.warn(
|
||||
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
|
||||
);
|
||||
@@ -220,11 +220,7 @@ export class SyncManager {
|
||||
}
|
||||
|
||||
peer.toldKnownState.add(id);
|
||||
peer.optimisticKnownStates.dispatch({
|
||||
type: "COMBINE_WITH",
|
||||
id: id,
|
||||
value: coValue.knownState(),
|
||||
});
|
||||
peer.combineOptimisticWith(id, coValue.knownState());
|
||||
} else if (!peer.toldKnownState.has(id)) {
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
@@ -252,8 +248,8 @@ export class SyncManager {
|
||||
for (const id of coValue.getDependedOnCoValues()) {
|
||||
const entry = this.local.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
buildOrderedCoValueList(entry.state.coValue);
|
||||
if (entry.isAvailable()) {
|
||||
buildOrderedCoValueList(entry.core);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -261,31 +257,25 @@ export class SyncManager {
|
||||
};
|
||||
|
||||
for (const entry of this.local.coValuesStore.getValues()) {
|
||||
switch (entry.state.type) {
|
||||
case "unavailable":
|
||||
// If the coValue is unavailable and we never tried this peer
|
||||
// we try to load it from the peer
|
||||
if (!peer.toldKnownState.has(entry.id)) {
|
||||
await entry.loadFromPeers([peer]).catch((e: unknown) => {
|
||||
logger.error("Error sending load", { err: e });
|
||||
});
|
||||
}
|
||||
break;
|
||||
case "available":
|
||||
const coValue = entry.state.coValue;
|
||||
if (!entry.isAvailable()) {
|
||||
// If the coValue is unavailable and we never tried this peer
|
||||
// we try to load it from the peer
|
||||
if (!peer.toldKnownState.has(entry.id)) {
|
||||
await entry.loadFromPeers([peer]).catch((e: unknown) => {
|
||||
logger.error("Error sending load", { err: e });
|
||||
});
|
||||
}
|
||||
} else {
|
||||
const coValue = entry.core;
|
||||
|
||||
// Build the list of coValues ordered by dependency
|
||||
// so we can send the load message in the correct order
|
||||
buildOrderedCoValueList(coValue);
|
||||
break;
|
||||
// Build the list of coValues ordered by dependency
|
||||
// so we can send the load message in the correct order
|
||||
buildOrderedCoValueList(coValue);
|
||||
}
|
||||
|
||||
// Fill the missing known states with empty known states
|
||||
if (!peer.optimisticKnownStates.has(entry.id)) {
|
||||
peer.optimisticKnownStates.dispatch({
|
||||
type: "SET_AS_EMPTY",
|
||||
id: entry.id,
|
||||
});
|
||||
peer.setOptimisticKnownState(entry.id, "empty");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -400,14 +390,10 @@ export class SyncManager {
|
||||
* This way we can track part of the data loss that may occur when the other peer is restarted
|
||||
*
|
||||
*/
|
||||
peer.dispatchToKnownStates({
|
||||
type: "SET",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
peer.setKnownState(msg.id, knownStateIn(msg));
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
|
||||
if (entry.isUnknown() || entry.isDefinitelyUnavailable()) {
|
||||
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
|
||||
|
||||
if (eligiblePeers.length === 0) {
|
||||
@@ -434,7 +420,7 @@ export class SyncManager {
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.state.type === "loading") {
|
||||
if (entry.isLoading()) {
|
||||
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
|
||||
// in a new task, otherwise we might block further incoming content messages that would
|
||||
// resolve the CoValue as available. This can happen when we receive fresh
|
||||
@@ -464,7 +450,7 @@ export class SyncManager {
|
||||
err: e,
|
||||
});
|
||||
});
|
||||
} else if (entry.state.type === "available") {
|
||||
} else if (entry.isAvailable()) {
|
||||
await this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
} else {
|
||||
this.trySendToPeer(peer, {
|
||||
@@ -479,28 +465,17 @@ export class SyncManager {
|
||||
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "COMBINE_WITH",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
peer.combineWith(msg.id, knownStateIn(msg));
|
||||
|
||||
// The header is a boolean value that tells us if the other peer do have information about the header.
|
||||
// If it's false in this point it means that the coValue is unavailable on the other peer.
|
||||
if (entry.state.type !== "available") {
|
||||
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
|
||||
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
|
||||
|
||||
if (!availableOnPeer) {
|
||||
entry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
if (!availableOnPeer) {
|
||||
entry.markNotFoundInPeer(peer.id);
|
||||
}
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
if (entry.isAvailable()) {
|
||||
await this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
}
|
||||
@@ -535,11 +510,11 @@ export class SyncManager {
|
||||
* If this load fails we send a correction request, because the client has the wrong assumption that
|
||||
* we have the coValue while we don't.
|
||||
*/
|
||||
if (entry.state.type !== "available" && !msg.header) {
|
||||
if (!entry.isAvailable() && !msg.header) {
|
||||
await this.local.loadCoValueCore(msg.id, peer.id);
|
||||
}
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
if (!entry.isAvailable()) {
|
||||
if (!msg.header) {
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
@@ -557,20 +532,13 @@ export class SyncManager {
|
||||
return;
|
||||
}
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "UPDATE_HEADER",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
});
|
||||
peer.updateHeader(msg.id, true);
|
||||
|
||||
coValue = new CoValueCore(msg.header, this.local);
|
||||
|
||||
entry.dispatch({
|
||||
type: "available",
|
||||
coValue,
|
||||
});
|
||||
entry.markAvailable(coValue);
|
||||
} else {
|
||||
coValue = entry.state.coValue;
|
||||
coValue = entry.core;
|
||||
}
|
||||
|
||||
let invalidStateAssumed = false;
|
||||
@@ -613,20 +581,18 @@ export class SyncManager {
|
||||
id: msg.id,
|
||||
err: result.error,
|
||||
});
|
||||
peer.erroredCoValues.set(msg.id, result.error);
|
||||
entry.markErrored(peer.id, result.error);
|
||||
continue;
|
||||
}
|
||||
|
||||
this.recordTransactionsSize(newTransactions, peer.role);
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "UPDATE_SESSION_COUNTER",
|
||||
id: msg.id,
|
||||
sessionId: sessionID,
|
||||
value:
|
||||
newContentForSession.after +
|
||||
peer.updateSessionCounter(
|
||||
msg.id,
|
||||
sessionID,
|
||||
newContentForSession.after +
|
||||
newContentForSession.newTransactions.length,
|
||||
});
|
||||
);
|
||||
}
|
||||
|
||||
if (invalidStateAssumed) {
|
||||
@@ -672,11 +638,7 @@ export class SyncManager {
|
||||
}
|
||||
|
||||
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
|
||||
peer.dispatchToKnownStates({
|
||||
type: "SET",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
peer.setKnownState(msg.id, knownStateIn(msg));
|
||||
|
||||
return this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
@@ -709,7 +671,8 @@ export class SyncManager {
|
||||
async actuallySyncCoValue(coValue: CoValueCore) {
|
||||
for (const peer of this.peersInPriorityOrder()) {
|
||||
if (peer.closed) continue;
|
||||
if (peer.erroredCoValues.has(coValue.id)) continue;
|
||||
if (this.local.coValuesStore.get(coValue.id).isErroredInPeer(peer.id))
|
||||
continue;
|
||||
|
||||
if (peer.optimisticKnownStates.has(coValue.id)) {
|
||||
await this.sendNewContentIncludingDependencies(coValue.id, peer);
|
||||
@@ -763,8 +726,7 @@ export class SyncManager {
|
||||
async waitForAllCoValuesSync(timeout = 60_000) {
|
||||
const coValues = this.local.coValuesStore.getValues();
|
||||
const validCoValues = Array.from(coValues).filter(
|
||||
(coValue) =>
|
||||
coValue.state.type === "available" || coValue.state.type === "loading",
|
||||
(coValue) => coValue.isAvailable() || coValue.isLoading(),
|
||||
);
|
||||
|
||||
return Promise.all(
|
||||
|
||||
@@ -9,7 +9,7 @@ describe("PeerKnownStates", () => {
|
||||
const id = "test-id" as RawCoID;
|
||||
const knownState: CoValueKnownState = emptyKnownState(id);
|
||||
|
||||
peerKnownStates.dispatch({ type: "SET", id, value: knownState });
|
||||
peerKnownStates.set(id, knownState);
|
||||
|
||||
expect(peerKnownStates.get(id)).toEqual(knownState);
|
||||
expect(peerKnownStates.has(id)).toBe(true);
|
||||
@@ -19,7 +19,7 @@ describe("PeerKnownStates", () => {
|
||||
const peerKnownStates = new PeerKnownStates();
|
||||
const id = "test-id" as RawCoID;
|
||||
|
||||
peerKnownStates.dispatch({ type: "UPDATE_HEADER", id, header: true });
|
||||
peerKnownStates.updateHeader(id, true);
|
||||
|
||||
const result = peerKnownStates.get(id);
|
||||
expect(result?.header).toBe(true);
|
||||
@@ -30,12 +30,7 @@ describe("PeerKnownStates", () => {
|
||||
const id = "test-id" as RawCoID;
|
||||
const sessionId = "session-1" as SessionID;
|
||||
|
||||
peerKnownStates.dispatch({
|
||||
type: "UPDATE_SESSION_COUNTER",
|
||||
id,
|
||||
sessionId,
|
||||
value: 5,
|
||||
});
|
||||
peerKnownStates.updateSessionCounter(id, sessionId, 5);
|
||||
|
||||
const result = peerKnownStates.get(id);
|
||||
expect(result?.sessions[sessionId]).toBe(5);
|
||||
@@ -55,8 +50,8 @@ describe("PeerKnownStates", () => {
|
||||
sessions: { [session2]: 10 },
|
||||
};
|
||||
|
||||
peerKnownStates.dispatch({ type: "SET", id, value: initialState });
|
||||
peerKnownStates.dispatch({ type: "COMBINE_WITH", id, value: combineState });
|
||||
peerKnownStates.set(id, initialState);
|
||||
peerKnownStates.combineWith(id, combineState);
|
||||
|
||||
const result = peerKnownStates.get(id);
|
||||
expect(result?.sessions).toEqual({ [session1]: 5, [session2]: 10 });
|
||||
@@ -71,8 +66,8 @@ describe("PeerKnownStates", () => {
|
||||
sessions: { [sessionId]: 5 },
|
||||
};
|
||||
|
||||
peerKnownStates.dispatch({ type: "SET", id, value: initialState });
|
||||
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
|
||||
peerKnownStates.set(id, initialState);
|
||||
peerKnownStates.set(id, "empty");
|
||||
|
||||
const result = peerKnownStates.get(id);
|
||||
expect(result).toEqual(emptyKnownState(id));
|
||||
@@ -84,7 +79,7 @@ describe("PeerKnownStates", () => {
|
||||
const listener = vi.fn();
|
||||
|
||||
peerKnownStates.subscribe(listener);
|
||||
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
|
||||
peerKnownStates.set(id, "empty");
|
||||
|
||||
expect(listener).toHaveBeenCalledWith(id, emptyKnownState(id));
|
||||
});
|
||||
@@ -97,7 +92,7 @@ describe("PeerKnownStates", () => {
|
||||
const unsubscribe = peerKnownStates.subscribe(listener);
|
||||
unsubscribe();
|
||||
|
||||
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
|
||||
peerKnownStates.set(id, "empty");
|
||||
|
||||
expect(listener).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { PeerKnownStateActions } from "../PeerKnownStates.js";
|
||||
import { PeerState } from "../PeerState.js";
|
||||
import { CO_VALUE_PRIORITY } from "../priority.js";
|
||||
import { Peer, SyncMessage } from "../sync.js";
|
||||
import { CoValueKnownState, Peer, SyncMessage } from "../sync.js";
|
||||
|
||||
function setup() {
|
||||
const mockPeer: Peer = {
|
||||
@@ -146,16 +145,11 @@ describe("PeerState", () => {
|
||||
|
||||
test("should clone the knownStates into optimisticKnownStates and knownStates when passed as argument", () => {
|
||||
const { peerState, mockPeer } = setup();
|
||||
const action: PeerKnownStateActions = {
|
||||
type: "SET",
|
||||
peerState.setKnownState("co_z1", {
|
||||
id: "co_z1",
|
||||
value: {
|
||||
id: "co_z1",
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
};
|
||||
peerState.dispatchToKnownStates(action);
|
||||
header: false,
|
||||
sessions: {},
|
||||
});
|
||||
|
||||
const newPeerState = new PeerState(mockPeer, peerState.knownStates);
|
||||
|
||||
@@ -165,25 +159,26 @@ describe("PeerState", () => {
|
||||
|
||||
test("should dispatch to both states", () => {
|
||||
const { peerState } = setup();
|
||||
const knownStatesSpy = vi.spyOn(peerState.knownStates, "dispatch");
|
||||
const knownStatesSpy = vi.spyOn(peerState._knownStates, "set");
|
||||
if (peerState._optimisticKnownStates === "assumeInfallible") {
|
||||
throw new Error("Expected normal optimisticKnownStates");
|
||||
}
|
||||
|
||||
const optimisticKnownStatesSpy = vi.spyOn(
|
||||
peerState.optimisticKnownStates,
|
||||
"dispatch",
|
||||
peerState._optimisticKnownStates,
|
||||
"set",
|
||||
);
|
||||
|
||||
const action: PeerKnownStateActions = {
|
||||
type: "SET",
|
||||
const state: CoValueKnownState = {
|
||||
id: "co_z1",
|
||||
value: {
|
||||
id: "co_z1",
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
header: false,
|
||||
sessions: {},
|
||||
};
|
||||
peerState.dispatchToKnownStates(action);
|
||||
|
||||
expect(knownStatesSpy).toHaveBeenCalledWith(action);
|
||||
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith(action);
|
||||
peerState.setKnownState("co_z1", state);
|
||||
|
||||
expect(knownStatesSpy).toHaveBeenCalledWith("co_z1", state);
|
||||
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith("co_z1", state);
|
||||
});
|
||||
|
||||
test("should use same reference for knownStates and optimisticKnownStates for storage peers", () => {
|
||||
@@ -204,28 +199,20 @@ describe("PeerState", () => {
|
||||
expect(peerState.knownStates).toBe(peerState.optimisticKnownStates);
|
||||
|
||||
// Verify that dispatching only updates one state
|
||||
const knownStatesSpy = vi.spyOn(peerState.knownStates, "dispatch");
|
||||
const optimisticKnownStatesSpy = vi.spyOn(
|
||||
peerState.optimisticKnownStates,
|
||||
"dispatch",
|
||||
);
|
||||
const knownStatesSpy = vi.spyOn(peerState._knownStates, "set");
|
||||
expect(peerState._optimisticKnownStates).toBe("assumeInfallible");
|
||||
|
||||
const action: PeerKnownStateActions = {
|
||||
type: "SET",
|
||||
const state: CoValueKnownState = {
|
||||
id: "co_z1",
|
||||
value: {
|
||||
id: "co_z1",
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
header: false,
|
||||
sessions: {},
|
||||
};
|
||||
peerState.dispatchToKnownStates(action);
|
||||
|
||||
peerState.setKnownState("co_z1", state);
|
||||
|
||||
// Only one dispatch should happen since they're the same reference
|
||||
expect(knownStatesSpy).toHaveBeenCalledTimes(1);
|
||||
expect(knownStatesSpy).toHaveBeenCalledWith(action);
|
||||
expect(optimisticKnownStatesSpy).toHaveBeenCalledTimes(1);
|
||||
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith(action);
|
||||
expect(knownStatesSpy).toHaveBeenCalledWith("co_z1", state);
|
||||
});
|
||||
|
||||
test("should use separate references for knownStates and optimisticKnownStates for non-storage peers", () => {
|
||||
|
||||
@@ -40,10 +40,10 @@ describe("CoValueState", () => {
|
||||
const mockCoValueId = "co_test123" as RawCoID;
|
||||
|
||||
test("should create unknown state", async () => {
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
expect(state.state.type).toBe("unknown");
|
||||
expect(state.isUnknown()).toBe(true);
|
||||
expect(
|
||||
await metricReader.getMetricValue("jazz.covalues.loaded", {
|
||||
state: "unknown",
|
||||
@@ -52,11 +52,14 @@ describe("CoValueState", () => {
|
||||
});
|
||||
|
||||
test("should create loading state", async () => {
|
||||
const peerIds = ["peer1", "peer2"];
|
||||
const state = CoValueState.Loading(mockCoValueId, peerIds);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
state.loadFromPeers([
|
||||
createMockPeerState({ id: "peer1", role: "server" }),
|
||||
createMockPeerState({ id: "peer2", role: "server" }),
|
||||
]);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
expect(state.state.type).toBe("loading");
|
||||
expect(state.isLoading()).toBe(true);
|
||||
expect(
|
||||
await metricReader.getMetricValue("jazz.covalues.loaded", {
|
||||
state: "loading",
|
||||
@@ -66,11 +69,12 @@ describe("CoValueState", () => {
|
||||
|
||||
test("should create available state", async () => {
|
||||
const mockCoValue = createMockCoValueCore(mockCoValueId);
|
||||
const state = CoValueState.Available(mockCoValue);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
state.markAvailable(mockCoValue);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
assert(state.state.type === "available");
|
||||
expect(state.state.coValue).toBe(mockCoValue);
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
expect(state.core).toBe(mockCoValue);
|
||||
await expect(state.getCoValue()).resolves.toEqual(mockCoValue);
|
||||
expect(
|
||||
await metricReader.getMetricValue("jazz.covalues.loaded", {
|
||||
@@ -81,7 +85,11 @@ describe("CoValueState", () => {
|
||||
|
||||
test("should handle found action", async () => {
|
||||
const mockCoValue = createMockCoValueCore(mockCoValueId);
|
||||
const state = CoValueState.Loading(mockCoValueId, ["peer1", "peer2"]);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
state.loadFromPeers([
|
||||
createMockPeerState({ id: "peer1", role: "server" }),
|
||||
createMockPeerState({ id: "peer2", role: "server" }),
|
||||
]);
|
||||
|
||||
expect(
|
||||
await metricReader.getMetricValue("jazz.covalues.loaded", {
|
||||
@@ -96,10 +104,7 @@ describe("CoValueState", () => {
|
||||
|
||||
const stateValuePromise = state.getCoValue();
|
||||
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: mockCoValue,
|
||||
});
|
||||
state.markAvailable(mockCoValue);
|
||||
|
||||
const result = await state.getCoValue();
|
||||
expect(result).toBe(mockCoValue);
|
||||
@@ -117,17 +122,6 @@ describe("CoValueState", () => {
|
||||
).toBe(0);
|
||||
});
|
||||
|
||||
test("should ignore actions when not in loading state", () => {
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
|
||||
expect(state.state.type).toBe("unknown");
|
||||
});
|
||||
|
||||
test("should retry loading from peers when unsuccessful", async () => {
|
||||
vi.useFakeTimers();
|
||||
|
||||
@@ -137,10 +131,7 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markNotFoundInPeer("peer1");
|
||||
},
|
||||
);
|
||||
const peer2 = createMockPeerState(
|
||||
@@ -149,15 +140,12 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer2",
|
||||
});
|
||||
state.markNotFoundInPeer("peer2");
|
||||
},
|
||||
);
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
|
||||
@@ -173,7 +161,7 @@ describe("CoValueState", () => {
|
||||
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
);
|
||||
expect(state.state.type).toBe("unavailable");
|
||||
expect(state.isDefinitelyUnavailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toBe("unavailable");
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -188,11 +176,7 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
peer1.erroredCoValues.set(mockCoValueId, new Error("test") as any);
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markErrored("peer1", {} as any);
|
||||
},
|
||||
);
|
||||
const peer2 = createMockPeerState(
|
||||
@@ -201,16 +185,13 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer2",
|
||||
});
|
||||
state.markNotFoundInPeer("peer2");
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
|
||||
@@ -224,7 +205,7 @@ describe("CoValueState", () => {
|
||||
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
);
|
||||
expect(state.state.type).toBe("unavailable");
|
||||
expect(state.isDefinitelyUnavailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toBe("unavailable");
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -239,10 +220,7 @@ describe("CoValueState", () => {
|
||||
role: "storage",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markNotFoundInPeer("peer1");
|
||||
},
|
||||
);
|
||||
const peer2 = createMockPeerState(
|
||||
@@ -251,15 +229,12 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer2",
|
||||
});
|
||||
state.markNotFoundInPeer("peer2");
|
||||
},
|
||||
);
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
|
||||
@@ -273,7 +248,7 @@ describe("CoValueState", () => {
|
||||
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
);
|
||||
expect(state.state.type).toBe("unavailable");
|
||||
expect(state.isDefinitelyUnavailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual("unavailable");
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -293,17 +268,11 @@ describe("CoValueState", () => {
|
||||
},
|
||||
async () => {
|
||||
retries++;
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markNotFoundInPeer("peer1");
|
||||
|
||||
if (retries === 2) {
|
||||
setTimeout(() => {
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: createMockCoValueCore(mockCoValueId),
|
||||
});
|
||||
state.markAvailable(createMockCoValueCore(mockCoValueId));
|
||||
}, 100);
|
||||
}
|
||||
},
|
||||
@@ -311,7 +280,7 @@ describe("CoValueState", () => {
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
|
||||
@@ -322,7 +291,7 @@ describe("CoValueState", () => {
|
||||
await loadPromise;
|
||||
|
||||
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(2);
|
||||
expect(state.state.type).toBe("available");
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
|
||||
vi.useRealTimers();
|
||||
});
|
||||
@@ -336,16 +305,13 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markNotFoundInPeer("peer1");
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
|
||||
@@ -353,17 +319,14 @@ describe("CoValueState", () => {
|
||||
await vi.runAllTimersAsync();
|
||||
}
|
||||
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: createMockCoValueCore(mockCoValueId),
|
||||
});
|
||||
state.markAvailable(createMockCoValueCore(mockCoValueId));
|
||||
|
||||
await loadPromise;
|
||||
|
||||
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
);
|
||||
expect(state.state.type).toBe("available");
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -383,22 +346,16 @@ describe("CoValueState", () => {
|
||||
},
|
||||
async () => {
|
||||
if (run > 2) {
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: createMockCoValueCore(mockCoValueId),
|
||||
});
|
||||
state.markAvailable(createMockCoValueCore(mockCoValueId));
|
||||
}
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer1",
|
||||
});
|
||||
state.markNotFoundInPeer("peer1");
|
||||
run++;
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
|
||||
@@ -407,7 +364,7 @@ describe("CoValueState", () => {
|
||||
await loadPromise;
|
||||
|
||||
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(3);
|
||||
expect(state.state.type).toBe("available");
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -424,10 +381,7 @@ describe("CoValueState", () => {
|
||||
role: "storage",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: mockCoValue,
|
||||
});
|
||||
state.markAvailable(mockCoValue);
|
||||
},
|
||||
);
|
||||
const peer2 = createMockPeerState(
|
||||
@@ -436,14 +390,11 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: "peer2",
|
||||
});
|
||||
state.markNotFoundInPeer("peer2");
|
||||
},
|
||||
);
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1, peer2]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
|
||||
@@ -457,7 +408,7 @@ describe("CoValueState", () => {
|
||||
action: "load",
|
||||
...mockCoValue.knownState(),
|
||||
});
|
||||
expect(state.state.type).toBe("available");
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -483,16 +434,13 @@ describe("CoValueState", () => {
|
||||
role: "server",
|
||||
},
|
||||
async () => {
|
||||
state.dispatch({
|
||||
type: "available",
|
||||
coValue: mockCoValue,
|
||||
});
|
||||
state.markAvailable(mockCoValue);
|
||||
},
|
||||
);
|
||||
|
||||
peer1.closed = true;
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1, peer2]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
|
||||
@@ -503,7 +451,7 @@ describe("CoValueState", () => {
|
||||
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(0);
|
||||
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(1);
|
||||
|
||||
expect(state.state.type).toBe("available");
|
||||
expect(state.isAvailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
|
||||
|
||||
vi.useRealTimers();
|
||||
@@ -520,7 +468,7 @@ describe("CoValueState", () => {
|
||||
async () => {},
|
||||
);
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = new CoValueState(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES * 2; i++) {
|
||||
@@ -532,7 +480,7 @@ describe("CoValueState", () => {
|
||||
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
|
||||
);
|
||||
|
||||
expect(state.state.type).toBe("unavailable");
|
||||
expect(state.isDefinitelyUnavailable()).toBe(true);
|
||||
await expect(state.getCoValue()).resolves.toEqual("unavailable");
|
||||
|
||||
vi.useRealTimers();
|
||||
|
||||
@@ -245,8 +245,6 @@ describe("loading coValues from server", () => {
|
||||
[
|
||||
"client -> server | LOAD Map sessions: empty",
|
||||
"server -> client | KNOWN Map sessions: empty",
|
||||
"client -> server | LOAD Map sessions: empty",
|
||||
"server -> client | KNOWN Map sessions: empty",
|
||||
]
|
||||
`);
|
||||
});
|
||||
|
||||
@@ -78,10 +78,10 @@ describe("peer reconciliation", () => {
|
||||
|
||||
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
|
||||
|
||||
assert(mapOnSyncServer.state.type === "available");
|
||||
assert(mapOnSyncServer.isAvailable());
|
||||
|
||||
expect(
|
||||
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
|
||||
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
|
||||
).toEqual("updated");
|
||||
|
||||
expect(
|
||||
@@ -125,10 +125,10 @@ describe("peer reconciliation", () => {
|
||||
|
||||
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
|
||||
|
||||
assert(mapOnSyncServer.state.type === "available");
|
||||
assert(mapOnSyncServer.isAvailable());
|
||||
|
||||
expect(
|
||||
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
|
||||
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
|
||||
).toEqual("updated");
|
||||
|
||||
expect(peer.outgoing).toMatchObject({
|
||||
@@ -175,10 +175,10 @@ describe("peer reconciliation", () => {
|
||||
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
|
||||
|
||||
await waitFor(() => {
|
||||
expect(mapOnSyncServer.state.type).toBe("available");
|
||||
expect(mapOnSyncServer.isAvailable()).toBe(true);
|
||||
});
|
||||
|
||||
assert(mapOnSyncServer.state.type === "available");
|
||||
assert(mapOnSyncServer.isAvailable());
|
||||
|
||||
expect(
|
||||
SyncMessagesLog.getMessages({
|
||||
@@ -195,7 +195,7 @@ describe("peer reconciliation", () => {
|
||||
`);
|
||||
|
||||
expect(
|
||||
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
|
||||
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
|
||||
).toEqual("updated");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -44,7 +44,6 @@ describe("client with storage syncs with server", () => {
|
||||
"storage -> client | KNOWN Group sessions: header/3",
|
||||
"client -> server | KNOWN Map sessions: header/1",
|
||||
"client -> storage | CONTENT Map header: true new: After: 0 New: 1",
|
||||
"storage -> client | KNOWN Map sessions: header/1",
|
||||
]
|
||||
`);
|
||||
});
|
||||
@@ -78,11 +77,6 @@ describe("client with storage syncs with server", () => {
|
||||
"client -> server | CONTENT Group header: true new: After: 0 New: 3",
|
||||
"server -> client | KNOWN Group sessions: header/3",
|
||||
"client -> storage | KNOWN Map sessions: header/1",
|
||||
"client -> server | LOAD Map sessions: header/1",
|
||||
"server -> client | CONTENT Group header: true new: After: 0 New: 3",
|
||||
"client -> server | KNOWN Group sessions: header/3",
|
||||
"server -> client | KNOWN Map sessions: header/1",
|
||||
"client -> server | CONTENT Map header: true new: After: 0 New: 1",
|
||||
]
|
||||
`);
|
||||
});
|
||||
@@ -126,8 +120,6 @@ describe("client with storage syncs with server", () => {
|
||||
"client -> storage | CONTENT Group header: true new: After: 0 New: 5",
|
||||
"storage -> client | KNOWN Group sessions: header/5",
|
||||
"client -> server | KNOWN Map sessions: header/1",
|
||||
"client -> storage | CONTENT Map header: true new: After: 0 New: 1",
|
||||
"storage -> client | KNOWN Map sessions: header/1",
|
||||
]
|
||||
`);
|
||||
});
|
||||
@@ -147,6 +139,8 @@ describe("client with storage syncs with server", () => {
|
||||
|
||||
client.node.syncManager.getPeers()[0]?.gracefulShutdown();
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
SyncMessagesLog.clear();
|
||||
map.set("hello", "updated", "trusting");
|
||||
|
||||
|
||||
@@ -759,9 +759,7 @@ describe("SyncManager.addPeer", () => {
|
||||
|
||||
await map.core.waitForSync();
|
||||
|
||||
expect(jazzCloud.node.coValuesStore.get(map.id).state.type).toBe(
|
||||
"available",
|
||||
);
|
||||
expect(jazzCloud.node.coValuesStore.get(map.id).isAvailable()).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1087,10 +1085,9 @@ describe("SyncManager.handleSyncMessage", () => {
|
||||
|
||||
// Add a coValue to the errored set
|
||||
const erroredId = "co_z123" as const;
|
||||
peerState.erroredCoValues.set(
|
||||
erroredId,
|
||||
new Error("Test error") as unknown as TryAddTransactionsError,
|
||||
);
|
||||
client.node.coValuesStore.get(erroredId).markErrored(peerState.id, {
|
||||
message: "Test error",
|
||||
} as any);
|
||||
|
||||
const message = {
|
||||
action: "load" as const,
|
||||
|
||||
@@ -103,7 +103,7 @@ export class Ref<out V extends CoValue> {
|
||||
this.id as unknown as CoID<RawCoValue>,
|
||||
);
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
if (entry.isAvailable()) {
|
||||
return new Ref(this.id, this.controlledAccount, this.schema).value!;
|
||||
}
|
||||
|
||||
|
||||
@@ -155,8 +155,8 @@ function loadCoValue(
|
||||
) {
|
||||
const entry = node.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type === "available" && syncResolution) {
|
||||
callback(entry.state.coValue);
|
||||
if (entry.isAvailable() && syncResolution) {
|
||||
callback(entry.core);
|
||||
} else {
|
||||
void node.loadCoValueCore(id).then((core) => {
|
||||
callback(core);
|
||||
|
||||
@@ -487,7 +487,7 @@ describe("createCoValueObservable", () => {
|
||||
expect(observable.getCurrentValue()).toBeUndefined();
|
||||
});
|
||||
|
||||
it("should return null if the coValue is not found", async () => {
|
||||
it.only("should return null if the coValue is not found", async () => {
|
||||
const { meOnSecondPeer } = await setupAccount();
|
||||
const observable = createCoValueObservable<
|
||||
TestMap,
|
||||
|
||||
Reference in New Issue
Block a user