Compare commits

...

12 Commits

18 changed files with 393 additions and 707 deletions

View File

@@ -9,7 +9,7 @@ export class CoValuesStore {
let entry = this.coValues.get(id);
if (!entry) {
entry = CoValueState.Unknown(id);
entry = new CoValueState(id);
this.coValues.set(id, entry);
}
@@ -18,10 +18,7 @@ export class CoValuesStore {
setAsAvailable(id: RawCoID, coValue: CoValueCore) {
const entry = this.get(id);
entry.dispatch({
type: "available",
coValue,
});
entry.markAvailable(coValue);
}
getEntries() {

View File

@@ -5,57 +5,37 @@ import {
emptyKnownState,
} from "./sync.js";
export type PeerKnownStateActions =
| {
type: "SET_AS_EMPTY";
id: RawCoID;
}
| {
type: "UPDATE_HEADER";
id: RawCoID;
header: boolean;
}
| {
type: "UPDATE_SESSION_COUNTER";
id: RawCoID;
sessionId: SessionID;
value: number;
}
| {
type: "SET";
id: RawCoID;
value: CoValueKnownState;
}
| {
type: "COMBINE_WITH";
id: RawCoID;
value: CoValueKnownState;
};
export class PeerKnownStates {
private coValues = new Map<RawCoID, CoValueKnownState>();
private updateHeader(id: RawCoID, header: boolean) {
updateHeader(id: RawCoID, header: boolean) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
knownState.header = header;
this.coValues.set(id, knownState);
this.triggerUpdate(id);
}
private combineWith(id: RawCoID, value: CoValueKnownState) {
combineWith(id: RawCoID, value: CoValueKnownState) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
this.coValues.set(id, combinedKnownStates(knownState, value));
this.triggerUpdate(id);
}
private updateSessionCounter(
id: RawCoID,
sessionId: SessionID,
value: number,
) {
updateSessionCounter(id: RawCoID, sessionId: SessionID, value: number) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
const currentValue = knownState.sessions[sessionId] || 0;
knownState.sessions[sessionId] = Math.max(currentValue, value);
this.coValues.set(id, knownState);
this.triggerUpdate(id);
}
set(id: RawCoID, knownState: CoValueKnownState | "empty") {
this.coValues.set(
id,
knownState === "empty" ? emptyKnownState(id) : knownState,
);
this.triggerUpdate(id);
}
get(id: RawCoID) {
@@ -72,28 +52,6 @@ export class PeerKnownStates {
return clone;
}
dispatch(action: PeerKnownStateActions) {
switch (action.type) {
case "UPDATE_HEADER":
this.updateHeader(action.id, action.header);
break;
case "UPDATE_SESSION_COUNTER":
this.updateSessionCounter(action.id, action.sessionId, action.value);
break;
case "SET":
this.coValues.set(action.id, action.value);
break;
case "COMBINE_WITH":
this.combineWith(action.id, action.value);
break;
case "SET_AS_EMPTY":
this.coValues.set(action.id, emptyKnownState(action.id));
break;
}
this.triggerUpdate(action.id);
}
listeners = new Set<(id: RawCoID, knownState: CoValueKnownState) => void>();
triggerUpdate(id: RawCoID) {
@@ -114,3 +72,8 @@ export class PeerKnownStates {
};
}
}
export type ReadonlyPeerKnownStates = Pick<
PeerKnownStates,
"get" | "has" | "clone" | "subscribe"
>;

View File

@@ -1,4 +1,4 @@
import { PeerKnownStateActions, PeerKnownStates } from "./PeerKnownStates.js";
import { PeerKnownStates, ReadonlyPeerKnownStates } from "./PeerKnownStates.js";
import {
PriorityBasedMessageQueue,
QueueEntry,
@@ -7,7 +7,7 @@ import { TryAddTransactionsError } from "./coValueCore.js";
import { RawCoID, SessionID } from "./ids.js";
import { logger } from "./logger.js";
import { CO_VALUE_PRIORITY } from "./priority.js";
import { Peer, SyncMessage } from "./sync.js";
import { CoValueKnownState, Peer, SyncMessage } from "./sync.js";
export class PeerState {
private queue: PriorityBasedMessageQueue;
@@ -17,7 +17,7 @@ export class PeerState {
constructor(
private peer: Peer,
knownStates: PeerKnownStates | undefined,
knownStates: ReadonlyPeerKnownStates | undefined,
) {
/**
* We set as default priority HIGH to handle all the messages without a
@@ -28,14 +28,16 @@ export class PeerState {
this.queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.HIGH, {
peerRole: peer.role,
});
this.optimisticKnownStates = knownStates?.clone() ?? new PeerKnownStates();
this._knownStates = knownStates?.clone() ?? new PeerKnownStates();
// We assume that exchanges with storage peers are always successful
// hence we don't need to differentiate between knownStates and optimisticKnownStates
if (peer.role === "storage") {
this.knownStates = this.optimisticKnownStates;
this._optimisticKnownStates = "assumeInfallible";
} else {
this.knownStates = knownStates?.clone() ?? new PeerKnownStates();
this._optimisticKnownStates =
knownStates?.clone() ?? new PeerKnownStates();
}
}
@@ -44,7 +46,11 @@ export class PeerState {
*
* This can be used to safely track the sync state of a coValue in a given peer.
*/
readonly knownStates: PeerKnownStates;
readonly _knownStates: PeerKnownStates;
get knownStates(): ReadonlyPeerKnownStates {
return this._knownStates;
}
/**
* This one collects the known states "optimistically".
@@ -53,18 +59,68 @@ export class PeerState {
* The main difference with knownState is that this is updated when the content is sent to the peer without
* waiting for any acknowledgement from the peer.
*/
readonly optimisticKnownStates: PeerKnownStates;
readonly _optimisticKnownStates: PeerKnownStates | "assumeInfallible";
get optimisticKnownStates(): ReadonlyPeerKnownStates {
if (this._optimisticKnownStates === "assumeInfallible") {
return this.knownStates;
}
return this._optimisticKnownStates;
}
readonly toldKnownState: Set<RawCoID> = new Set();
dispatchToKnownStates(action: PeerKnownStateActions) {
this.knownStates.dispatch(action);
updateHeader(id: RawCoID, header: boolean) {
this._knownStates.updateHeader(id, header);
if (this.role !== "storage") {
this.optimisticKnownStates.dispatch(action);
if (this._optimisticKnownStates !== "assumeInfallible") {
this._optimisticKnownStates.updateHeader(id, header);
}
}
readonly erroredCoValues: Map<RawCoID, TryAddTransactionsError> = new Map();
combineWith(id: RawCoID, value: CoValueKnownState) {
this._knownStates.combineWith(id, value);
if (this._optimisticKnownStates !== "assumeInfallible") {
this._optimisticKnownStates.combineWith(id, value);
}
}
combineOptimisticWith(id: RawCoID, value: CoValueKnownState) {
if (this._optimisticKnownStates === "assumeInfallible") {
this._knownStates.combineWith(id, value);
} else {
this._optimisticKnownStates.combineWith(id, value);
}
}
updateSessionCounter(id: RawCoID, sessionId: SessionID, value: number) {
this._knownStates.updateSessionCounter(id, sessionId, value);
if (this._optimisticKnownStates !== "assumeInfallible") {
this._optimisticKnownStates.updateSessionCounter(id, sessionId, value);
}
}
setKnownState(id: RawCoID, knownState: CoValueKnownState | "empty") {
this._knownStates.set(id, knownState);
if (this._optimisticKnownStates !== "assumeInfallible") {
this._optimisticKnownStates.set(id, knownState);
}
}
setOptimisticKnownState(
id: RawCoID,
knownState: CoValueKnownState | "empty",
) {
if (this._optimisticKnownStates === "assumeInfallible") {
this._knownStates.set(id, knownState);
} else {
this._optimisticKnownStates.set(id, knownState);
}
}
get id() {
return this.peer.id;

View File

@@ -116,11 +116,11 @@ export class SyncStateManager {
const entry = this.syncManager.local.coValuesStore.get(id);
if (entry.state.type !== "available") {
if (!entry.isAvailable()) {
return undefined;
}
const coValue = entry.state.coValue;
const coValue = entry.core;
const coValueSessions = coValue.knownState().sessions;
return {

View File

@@ -1,370 +1,189 @@
import { ValueType } from "@opentelemetry/api";
import { UpDownCounter, metrics } from "@opentelemetry/api";
import { PeerState } from "./PeerState.js";
import { CoValueCore } from "./coValueCore.js";
import { CoValueCore, TryAddTransactionsError } from "./coValueCore.js";
import { RawCoID } from "./ids.js";
import { logger } from "./logger.js";
import { PeerID } from "./sync.js";
import { PeerID, emptyKnownState } from "./sync.js";
export const CO_VALUE_LOADING_CONFIG = {
MAX_RETRIES: 2,
TIMEOUT: 30_000,
};
export class CoValueUnknownState {
type = "unknown" as const;
}
export class CoValueLoadingState {
type = "loading" as const;
export class CoValueState {
private peers = new Map<
PeerID,
ReturnType<typeof createResolvablePromise<void>>
| { type: "unknown" | "pending" | "available" | "unavailable" }
| { type: "errored"; error: TryAddTransactionsError }
>();
private resolveResult: (value: CoValueCore | "unavailable") => void;
result: Promise<CoValueCore | "unavailable">;
private peersToRequestFrom = new Map<PeerID, PeerState>();
loading = false;
constructor(peersIds: Iterable<PeerID>) {
this.peers = new Map();
core: CoValueCore | null = null;
id: RawCoID;
for (const peerId of peersIds) {
this.peers.set(peerId, createResolvablePromise<void>());
listeners: Set<(state: CoValueState) => void> = new Set();
constructor(id: RawCoID) {
this.id = id;
}
addListener(listener: (state: CoValueState) => void) {
this.listeners.add(listener);
listener(this);
}
removeListener(listener: (state: CoValueState) => void) {
this.listeners.delete(listener);
}
private notifyListeners() {
this.loadFromNextPeer();
for (const listener of this.listeners) {
listener(this);
}
const { resolve, promise } = createResolvablePromise<
CoValueCore | "unavailable"
>();
this.result = promise;
this.resolveResult = resolve;
}
markAsUnavailable(peerId: PeerID) {
const entry = this.peers.get(peerId);
if (entry) {
entry.resolve();
}
this.peers.delete(peerId);
// If none of the peers have the coValue, we resolve to unavailable
if (this.peers.size === 0) {
this.resolve("unavailable");
}
}
resolve(value: CoValueCore | "unavailable") {
this.resolveResult(value);
for (const entry of this.peers.values()) {
entry.resolve();
}
this.peers.clear();
}
// Wait for a specific peer to have a known state
waitForPeer(peerId: PeerID) {
const entry = this.peers.get(peerId);
if (!entry) {
return Promise.resolve();
}
return entry.promise;
}
}
export class CoValueAvailableState {
type = "available" as const;
constructor(public coValue: CoValueCore) {}
}
export class CoValueUnavailableState {
type = "unavailable" as const;
}
type CoValueStateAction =
| {
type: "load-requested";
peersIds: PeerID[];
}
| {
type: "not-found-in-peer";
peerId: PeerID;
}
| {
type: "available";
coValue: CoValueCore;
};
type CoValueStateType =
| CoValueUnknownState
| CoValueLoadingState
| CoValueAvailableState
| CoValueUnavailableState;
export class CoValueState {
promise?: Promise<CoValueCore | "unavailable">;
private resolve?: (value: CoValueCore | "unavailable") => void;
private counter: UpDownCounter;
constructor(
public id: RawCoID,
public state: CoValueStateType,
) {
this.counter = metrics
.getMeter("cojson")
.createUpDownCounter("jazz.covalues.loaded", {
description: "The number of covalues in the system",
unit: "covalue",
valueType: ValueType.INT,
});
this.counter.add(1, {
state: this.state.type,
});
}
static Unknown(id: RawCoID) {
return new CoValueState(id, new CoValueUnknownState());
}
static Loading(id: RawCoID, peersIds: Iterable<PeerID>) {
return new CoValueState(id, new CoValueLoadingState(peersIds));
}
static Available(coValue: CoValueCore) {
return new CoValueState(coValue.id, new CoValueAvailableState(coValue));
}
static Unavailable(id: RawCoID) {
return new CoValueState(id, new CoValueUnavailableState());
}
async getCoValue() {
if (this.state.type === "available") {
return this.state.coValue;
if (this.core) {
return this.core;
}
if (this.state.type === "unavailable") {
if (this.isDefinitelyUnavailable()) {
return "unavailable";
}
// If we don't have a resolved state we return a new promise
// that will be resolved when the state will move to available or unavailable
if (!this.promise) {
const { promise, resolve } = createResolvablePromise<
CoValueCore | "unavailable"
>();
return new Promise<CoValueCore | "unavailable">((resolve) => {
const listener = (state: CoValueState) => {
if (state.core) {
resolve(state.core);
this.removeListener(listener);
} else if (this.isDefinitelyUnavailable()) {
resolve("unavailable");
this.removeListener(listener);
}
};
this.promise = promise;
this.resolve = resolve;
}
return this.promise;
}
private moveToState(value: CoValueStateType) {
this.counter.add(-1, {
state: this.state.type,
this.addListener(listener);
});
this.state = value;
this.counter.add(1, {
state: this.state.type,
});
if (!this.resolve) {
return;
}
// If the state is available we resolve the promise
// and clear it to handle the possible transition from unavailable to available
if (value.type === "available") {
this.resolve(value.coValue);
this.clearPromise();
} else if (value.type === "unavailable") {
this.resolve("unavailable");
this.clearPromise();
}
}
private clearPromise() {
this.promise = undefined;
this.resolve = undefined;
}
async loadFromPeers(peers: PeerState[]) {
const state = this.state;
for (const peer of peers) {
this.peersToRequestFrom.set(peer.id, peer);
}
if (state.type === "loading" || state.type === "available") {
this.loadFromNextPeer();
}
private loadFromNextPeer() {
if (this.isLoading() || this.peersToRequestFrom.size === 0) {
return;
}
if (peers.length === 0) {
this.moveToState(new CoValueUnavailableState());
// TODO: Load the peers with the same priority in parallel
let selectedPeer: PeerState | undefined;
for (const peer of this.peersToRequestFrom.values()) {
const currentState = this.peers.get(peer.id);
switch (currentState?.type) {
case "available":
case "errored":
case "pending":
this.peersToRequestFrom.delete(peer.id);
continue;
case "unavailable":
case "unknown":
default:
if (
!peer.shouldRetryUnavailableCoValues() &&
currentState?.type === "unavailable"
) {
this.peersToRequestFrom.delete(peer.id);
} else if (
!selectedPeer ||
(peer.priority ?? 0) > (selectedPeer.priority ?? 0)
) {
selectedPeer = peer;
}
break;
}
}
if (!selectedPeer) {
return;
}
const doLoad = async (peersToLoadFrom: PeerState[]) => {
const peersWithoutErrors = getPeersWithoutErrors(
peersToLoadFrom,
this.id,
);
this.peersToRequestFrom.delete(selectedPeer.id);
this.peers.set(selectedPeer.id, { type: "pending" });
// If we are in the loading state we move to a new loading state
// to reset all the loading promises
if (
this.state.type === "loading" ||
this.state.type === "unknown" ||
this.state.type === "unavailable"
) {
this.moveToState(
new CoValueLoadingState(peersWithoutErrors.map((p) => p.id)),
);
}
const knownState = this.core
? this.core.knownState()
: emptyKnownState(this.id);
// Assign the current state to a variable to not depend on the state changes
// that may happen while we wait for loadCoValueFromPeers to complete
const currentState = this.state;
selectedPeer
.pushOutgoingMessage({
action: "load",
...knownState,
})
.catch((err) => {
logger.warn(`Failed to push load message to peer ${selectedPeer.id}`, {
err,
});
});
}
// If we entered successfully the loading state, we load the coValue from the peers
//
// We may not enter the loading state if the coValue has become available in between
// of the retries
if (currentState.type === "loading") {
await loadCoValueFromPeers(this, peersWithoutErrors);
markNotFoundInPeer(peerId: PeerID) {
this.peers.set(peerId, { type: "unavailable" });
this.notifyListeners();
}
const result = await currentState.result;
return result !== "unavailable";
}
markAvailable(coValue: CoValueCore) {
this.core = coValue;
this.notifyListeners();
}
return currentState.type === "available";
};
markErrored(peerId: PeerID, error: TryAddTransactionsError) {
this.peers.set(peerId, { type: "errored", error });
this.notifyListeners();
}
await doLoad(peers);
isErroredInPeer(peerId: PeerID) {
return this.peers.get(peerId)?.type === "errored";
}
// Retry loading from peers that have the retry flag enabled
const peersWithRetry = peers.filter((p) =>
p.shouldRetryUnavailableCoValues(),
isAvailable(): this is { type: "available"; core: CoValueCore } {
return !!this.core;
}
isUnknown() {
if (this.core) {
return false;
}
return this.peers.values().every((p) => p.type === "unknown");
}
isLoading() {
return this.peers.values().some((p) => p.type === "pending");
}
isDefinitelyUnavailable() {
if (this.core) {
return false;
}
return (
this.peers
.values()
.every((p) => p.type === "unavailable" || p.type === "errored") &&
!this.isAvailable()
);
if (peersWithRetry.length > 0) {
// We want to exit early if the coValue becomes available in between the retries
await Promise.race([
this.getCoValue(),
runWithRetry(
() => doLoad(peersWithRetry),
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
),
]);
}
// If after the retries the coValue is still loading, we consider the load failed
if (this.state.type === "loading") {
this.moveToState(new CoValueUnavailableState());
}
}
dispatch(action: CoValueStateAction) {
const currentState = this.state;
switch (action.type) {
case "available":
if (currentState.type === "loading") {
currentState.resolve(action.coValue);
}
// It should be always possible to move to the available state
this.moveToState(new CoValueAvailableState(action.coValue));
break;
case "not-found-in-peer":
if (currentState.type === "loading") {
currentState.markAsUnavailable(action.peerId);
}
break;
}
}
}
async function loadCoValueFromPeers(
coValueEntry: CoValueState,
peers: PeerState[],
) {
for (const peer of peers) {
if (peer.closed) {
continue;
}
if (coValueEntry.state.type === "available") {
/**
* We don't need to wait for the message to be delivered here.
*
* This way when the coValue becomes available because it's cached we don't wait for the server
* peer to consume the messages queue before moving forward.
*/
peer
.pushOutgoingMessage({
action: "load",
...coValueEntry.state.coValue.knownState(),
})
.catch((err) => {
logger.warn(`Failed to push load message to peer ${peer.id}`, {
err,
});
});
} else {
/**
* We only wait for the load state to be resolved.
*/
peer
.pushOutgoingMessage({
action: "load",
id: coValueEntry.id,
header: false,
sessions: {},
})
.catch((err) => {
logger.warn(`Failed to push load message to peer ${peer.id}`, {
err,
});
});
}
if (coValueEntry.state.type === "loading") {
const { promise, resolve } = createResolvablePromise<void>();
/**
* Use a very long timeout for storage peers, because under pressure
* they may take a long time to consume the messages queue
*
* TODO: Track errors on storage and do not rely on timeout
*/
const timeoutDuration =
peer.role === "storage"
? CO_VALUE_LOADING_CONFIG.TIMEOUT * 10
: CO_VALUE_LOADING_CONFIG.TIMEOUT;
const timeout = setTimeout(() => {
if (coValueEntry.state.type === "loading") {
logger.warn("Failed to load coValue from peer", {
coValueId: coValueEntry.id,
peerId: peer.id,
peerRole: peer.role,
});
coValueEntry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
resolve();
}
}, timeoutDuration);
await Promise.race([promise, coValueEntry.state.waitForPeer(peer.id)]);
clearTimeout(timeout);
}
}
}
@@ -391,29 +210,6 @@ async function runWithRetry<T>(fn: () => Promise<T>, maxRetries: number) {
}
}
function createResolvablePromise<T>() {
let resolve!: (value: T) => void;
const promise = new Promise<T>((res) => {
resolve = res;
});
return { promise, resolve };
}
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function getPeersWithoutErrors(peers: PeerState[], coValueId: RawCoID) {
return peers.filter((p) => {
if (p.erroredCoValues.has(coValueId)) {
logger.warn(
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
);
return false;
}
return true;
});
}

View File

@@ -185,10 +185,7 @@ export class RawGroup<
const id = getChildGroupId(key);
const child = store.get(id);
if (
child.state.type === "unknown" ||
child.state.type === "unavailable"
) {
if (child.isUnknown() || child.isDefinitelyUnavailable()) {
child.loadFromPeers(peers).catch(() => {
logger.error(`Failed to load child group ${id}`);
});

View File

@@ -139,10 +139,8 @@ export class LocalNode {
// we shouldn't need this, but it fixes account data not syncing for new accounts
function syncAllCoValuesAfterCreateAccount() {
for (const coValueEntry of nodeWithAccount.coValuesStore.getValues()) {
if (coValueEntry.state.type === "available") {
void nodeWithAccount.syncManager.syncCoValue(
coValueEntry.state.coValue,
);
if (coValueEntry.isAvailable()) {
void nodeWithAccount.syncManager.syncCoValue(coValueEntry.core);
}
}
}
@@ -265,7 +263,7 @@ export class LocalNode {
const entry = this.coValuesStore.get(id);
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
if (entry.isUnknown() || entry.isDefinitelyUnavailable()) {
const peers =
this.syncManager.getServerAndStoragePeers(skipLoadingFromPeer);
@@ -309,8 +307,8 @@ export class LocalNode {
getLoaded<T extends RawCoValue>(id: CoID<T>): T | undefined {
const entry = this.coValuesStore.get(id);
if (entry.state.type === "available") {
return entry.state.coValue.getCurrentContent() as T;
if (entry.isAvailable()) {
return entry.core.getCurrentContent() as T;
}
return undefined;
@@ -439,12 +437,12 @@ export class LocalNode {
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
const entry = this.coValuesStore.get(id);
if (entry.state.type !== "available") {
if (!entry.isAvailable()) {
throw new Error(
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${JSON.stringify(entry)}`,
);
}
return entry.state.coValue;
return entry.core;
}
/** @internal */
@@ -638,15 +636,13 @@ export class LocalNode {
while (coValuesToCopy.length > 0) {
const [coValueID, entry] = coValuesToCopy[coValuesToCopy.length - 1]!;
if (entry.state.type !== "available") {
if (!entry.isAvailable()) {
coValuesToCopy.pop();
continue;
} else {
const allDepsCopied = entry.state.coValue
const allDepsCopied = entry.core
.getDependedOnCoValues()
.every(
(dep) => newNode.coValuesStore.get(dep).state.type === "available",
);
.every((dep) => newNode.coValuesStore.get(dep).isAvailable());
if (!allDepsCopied) {
// move to end of queue
@@ -655,9 +651,9 @@ export class LocalNode {
}
const newCoValue = new CoValueCore(
entry.state.coValue.header,
entry.core.header,
newNode,
new Map(entry.state.coValue.sessionLogs),
new Map(entry.core.sessionLogs),
);
newNode.coValuesStore.setAsAvailable(coValueID, newCoValue);

View File

@@ -160,7 +160,7 @@ export class SyncManager {
}
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
if (peer.erroredCoValues.has(msg.id)) {
if (this.local.coValuesStore.get(msg.id).isErroredInPeer(peer.id)) {
logger.warn(
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
);
@@ -220,11 +220,7 @@ export class SyncManager {
}
peer.toldKnownState.add(id);
peer.optimisticKnownStates.dispatch({
type: "COMBINE_WITH",
id: id,
value: coValue.knownState(),
});
peer.combineOptimisticWith(id, coValue.knownState());
} else if (!peer.toldKnownState.has(id)) {
this.trySendToPeer(peer, {
action: "known",
@@ -252,8 +248,8 @@ export class SyncManager {
for (const id of coValue.getDependedOnCoValues()) {
const entry = this.local.coValuesStore.get(id);
if (entry.state.type === "available") {
buildOrderedCoValueList(entry.state.coValue);
if (entry.isAvailable()) {
buildOrderedCoValueList(entry.core);
}
}
@@ -261,31 +257,25 @@ export class SyncManager {
};
for (const entry of this.local.coValuesStore.getValues()) {
switch (entry.state.type) {
case "unavailable":
// If the coValue is unavailable and we never tried this peer
// we try to load it from the peer
if (!peer.toldKnownState.has(entry.id)) {
await entry.loadFromPeers([peer]).catch((e: unknown) => {
logger.error("Error sending load", { err: e });
});
}
break;
case "available":
const coValue = entry.state.coValue;
if (!entry.isAvailable()) {
// If the coValue is unavailable and we never tried this peer
// we try to load it from the peer
if (!peer.toldKnownState.has(entry.id)) {
await entry.loadFromPeers([peer]).catch((e: unknown) => {
logger.error("Error sending load", { err: e });
});
}
} else {
const coValue = entry.core;
// Build the list of coValues ordered by dependency
// so we can send the load message in the correct order
buildOrderedCoValueList(coValue);
break;
// Build the list of coValues ordered by dependency
// so we can send the load message in the correct order
buildOrderedCoValueList(coValue);
}
// Fill the missing known states with empty known states
if (!peer.optimisticKnownStates.has(entry.id)) {
peer.optimisticKnownStates.dispatch({
type: "SET_AS_EMPTY",
id: entry.id,
});
peer.setOptimisticKnownState(entry.id, "empty");
}
}
@@ -400,14 +390,10 @@ export class SyncManager {
* This way we can track part of the data loss that may occur when the other peer is restarted
*
*/
peer.dispatchToKnownStates({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
peer.setKnownState(msg.id, knownStateIn(msg));
const entry = this.local.coValuesStore.get(msg.id);
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
if (entry.isUnknown() || entry.isDefinitelyUnavailable()) {
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
if (eligiblePeers.length === 0) {
@@ -434,7 +420,7 @@ export class SyncManager {
}
}
if (entry.state.type === "loading") {
if (entry.isLoading()) {
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
// in a new task, otherwise we might block further incoming content messages that would
// resolve the CoValue as available. This can happen when we receive fresh
@@ -464,7 +450,7 @@ export class SyncManager {
err: e,
});
});
} else if (entry.state.type === "available") {
} else if (entry.isAvailable()) {
await this.sendNewContentIncludingDependencies(msg.id, peer);
} else {
this.trySendToPeer(peer, {
@@ -479,28 +465,17 @@ export class SyncManager {
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
const entry = this.local.coValuesStore.get(msg.id);
peer.dispatchToKnownStates({
type: "COMBINE_WITH",
id: msg.id,
value: knownStateIn(msg),
});
peer.combineWith(msg.id, knownStateIn(msg));
// The header is a boolean value that tells us if the other peer do have information about the header.
// If it's false in this point it means that the coValue is unavailable on the other peer.
if (entry.state.type !== "available") {
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
if (!availableOnPeer) {
entry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
}
return;
if (!availableOnPeer) {
entry.markNotFoundInPeer(peer.id);
}
if (entry.state.type === "available") {
if (entry.isAvailable()) {
await this.sendNewContentIncludingDependencies(msg.id, peer);
}
}
@@ -535,11 +510,11 @@ export class SyncManager {
* If this load fails we send a correction request, because the client has the wrong assumption that
* we have the coValue while we don't.
*/
if (entry.state.type !== "available" && !msg.header) {
if (!entry.isAvailable() && !msg.header) {
await this.local.loadCoValueCore(msg.id, peer.id);
}
if (entry.state.type !== "available") {
if (!entry.isAvailable()) {
if (!msg.header) {
this.trySendToPeer(peer, {
action: "known",
@@ -557,20 +532,13 @@ export class SyncManager {
return;
}
peer.dispatchToKnownStates({
type: "UPDATE_HEADER",
id: msg.id,
header: true,
});
peer.updateHeader(msg.id, true);
coValue = new CoValueCore(msg.header, this.local);
entry.dispatch({
type: "available",
coValue,
});
entry.markAvailable(coValue);
} else {
coValue = entry.state.coValue;
coValue = entry.core;
}
let invalidStateAssumed = false;
@@ -613,20 +581,18 @@ export class SyncManager {
id: msg.id,
err: result.error,
});
peer.erroredCoValues.set(msg.id, result.error);
entry.markErrored(peer.id, result.error);
continue;
}
this.recordTransactionsSize(newTransactions, peer.role);
peer.dispatchToKnownStates({
type: "UPDATE_SESSION_COUNTER",
id: msg.id,
sessionId: sessionID,
value:
newContentForSession.after +
peer.updateSessionCounter(
msg.id,
sessionID,
newContentForSession.after +
newContentForSession.newTransactions.length,
});
);
}
if (invalidStateAssumed) {
@@ -672,11 +638,7 @@ export class SyncManager {
}
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
peer.dispatchToKnownStates({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
peer.setKnownState(msg.id, knownStateIn(msg));
return this.sendNewContentIncludingDependencies(msg.id, peer);
}
@@ -709,7 +671,8 @@ export class SyncManager {
async actuallySyncCoValue(coValue: CoValueCore) {
for (const peer of this.peersInPriorityOrder()) {
if (peer.closed) continue;
if (peer.erroredCoValues.has(coValue.id)) continue;
if (this.local.coValuesStore.get(coValue.id).isErroredInPeer(peer.id))
continue;
if (peer.optimisticKnownStates.has(coValue.id)) {
await this.sendNewContentIncludingDependencies(coValue.id, peer);
@@ -763,8 +726,7 @@ export class SyncManager {
async waitForAllCoValuesSync(timeout = 60_000) {
const coValues = this.local.coValuesStore.getValues();
const validCoValues = Array.from(coValues).filter(
(coValue) =>
coValue.state.type === "available" || coValue.state.type === "loading",
(coValue) => coValue.isAvailable() || coValue.isLoading(),
);
return Promise.all(

View File

@@ -9,7 +9,7 @@ describe("PeerKnownStates", () => {
const id = "test-id" as RawCoID;
const knownState: CoValueKnownState = emptyKnownState(id);
peerKnownStates.dispatch({ type: "SET", id, value: knownState });
peerKnownStates.set(id, knownState);
expect(peerKnownStates.get(id)).toEqual(knownState);
expect(peerKnownStates.has(id)).toBe(true);
@@ -19,7 +19,7 @@ describe("PeerKnownStates", () => {
const peerKnownStates = new PeerKnownStates();
const id = "test-id" as RawCoID;
peerKnownStates.dispatch({ type: "UPDATE_HEADER", id, header: true });
peerKnownStates.updateHeader(id, true);
const result = peerKnownStates.get(id);
expect(result?.header).toBe(true);
@@ -30,12 +30,7 @@ describe("PeerKnownStates", () => {
const id = "test-id" as RawCoID;
const sessionId = "session-1" as SessionID;
peerKnownStates.dispatch({
type: "UPDATE_SESSION_COUNTER",
id,
sessionId,
value: 5,
});
peerKnownStates.updateSessionCounter(id, sessionId, 5);
const result = peerKnownStates.get(id);
expect(result?.sessions[sessionId]).toBe(5);
@@ -55,8 +50,8 @@ describe("PeerKnownStates", () => {
sessions: { [session2]: 10 },
};
peerKnownStates.dispatch({ type: "SET", id, value: initialState });
peerKnownStates.dispatch({ type: "COMBINE_WITH", id, value: combineState });
peerKnownStates.set(id, initialState);
peerKnownStates.combineWith(id, combineState);
const result = peerKnownStates.get(id);
expect(result?.sessions).toEqual({ [session1]: 5, [session2]: 10 });
@@ -71,8 +66,8 @@ describe("PeerKnownStates", () => {
sessions: { [sessionId]: 5 },
};
peerKnownStates.dispatch({ type: "SET", id, value: initialState });
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
peerKnownStates.set(id, initialState);
peerKnownStates.set(id, "empty");
const result = peerKnownStates.get(id);
expect(result).toEqual(emptyKnownState(id));
@@ -84,7 +79,7 @@ describe("PeerKnownStates", () => {
const listener = vi.fn();
peerKnownStates.subscribe(listener);
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
peerKnownStates.set(id, "empty");
expect(listener).toHaveBeenCalledWith(id, emptyKnownState(id));
});
@@ -97,7 +92,7 @@ describe("PeerKnownStates", () => {
const unsubscribe = peerKnownStates.subscribe(listener);
unsubscribe();
peerKnownStates.dispatch({ type: "SET_AS_EMPTY", id });
peerKnownStates.set(id, "empty");
expect(listener).not.toHaveBeenCalled();
});

View File

@@ -1,8 +1,7 @@
import { describe, expect, test, vi } from "vitest";
import { PeerKnownStateActions } from "../PeerKnownStates.js";
import { PeerState } from "../PeerState.js";
import { CO_VALUE_PRIORITY } from "../priority.js";
import { Peer, SyncMessage } from "../sync.js";
import { CoValueKnownState, Peer, SyncMessage } from "../sync.js";
function setup() {
const mockPeer: Peer = {
@@ -146,16 +145,11 @@ describe("PeerState", () => {
test("should clone the knownStates into optimisticKnownStates and knownStates when passed as argument", () => {
const { peerState, mockPeer } = setup();
const action: PeerKnownStateActions = {
type: "SET",
peerState.setKnownState("co_z1", {
id: "co_z1",
value: {
id: "co_z1",
header: false,
sessions: {},
},
};
peerState.dispatchToKnownStates(action);
header: false,
sessions: {},
});
const newPeerState = new PeerState(mockPeer, peerState.knownStates);
@@ -165,25 +159,26 @@ describe("PeerState", () => {
test("should dispatch to both states", () => {
const { peerState } = setup();
const knownStatesSpy = vi.spyOn(peerState.knownStates, "dispatch");
const knownStatesSpy = vi.spyOn(peerState._knownStates, "set");
if (peerState._optimisticKnownStates === "assumeInfallible") {
throw new Error("Expected normal optimisticKnownStates");
}
const optimisticKnownStatesSpy = vi.spyOn(
peerState.optimisticKnownStates,
"dispatch",
peerState._optimisticKnownStates,
"set",
);
const action: PeerKnownStateActions = {
type: "SET",
const state: CoValueKnownState = {
id: "co_z1",
value: {
id: "co_z1",
header: false,
sessions: {},
},
header: false,
sessions: {},
};
peerState.dispatchToKnownStates(action);
expect(knownStatesSpy).toHaveBeenCalledWith(action);
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith(action);
peerState.setKnownState("co_z1", state);
expect(knownStatesSpy).toHaveBeenCalledWith("co_z1", state);
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith("co_z1", state);
});
test("should use same reference for knownStates and optimisticKnownStates for storage peers", () => {
@@ -204,28 +199,20 @@ describe("PeerState", () => {
expect(peerState.knownStates).toBe(peerState.optimisticKnownStates);
// Verify that dispatching only updates one state
const knownStatesSpy = vi.spyOn(peerState.knownStates, "dispatch");
const optimisticKnownStatesSpy = vi.spyOn(
peerState.optimisticKnownStates,
"dispatch",
);
const knownStatesSpy = vi.spyOn(peerState._knownStates, "set");
expect(peerState._optimisticKnownStates).toBe("assumeInfallible");
const action: PeerKnownStateActions = {
type: "SET",
const state: CoValueKnownState = {
id: "co_z1",
value: {
id: "co_z1",
header: false,
sessions: {},
},
header: false,
sessions: {},
};
peerState.dispatchToKnownStates(action);
peerState.setKnownState("co_z1", state);
// Only one dispatch should happen since they're the same reference
expect(knownStatesSpy).toHaveBeenCalledTimes(1);
expect(knownStatesSpy).toHaveBeenCalledWith(action);
expect(optimisticKnownStatesSpy).toHaveBeenCalledTimes(1);
expect(optimisticKnownStatesSpy).toHaveBeenCalledWith(action);
expect(knownStatesSpy).toHaveBeenCalledWith("co_z1", state);
});
test("should use separate references for knownStates and optimisticKnownStates for non-storage peers", () => {

View File

@@ -40,10 +40,10 @@ describe("CoValueState", () => {
const mockCoValueId = "co_test123" as RawCoID;
test("should create unknown state", async () => {
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
expect(state.id).toBe(mockCoValueId);
expect(state.state.type).toBe("unknown");
expect(state.isUnknown()).toBe(true);
expect(
await metricReader.getMetricValue("jazz.covalues.loaded", {
state: "unknown",
@@ -52,11 +52,14 @@ describe("CoValueState", () => {
});
test("should create loading state", async () => {
const peerIds = ["peer1", "peer2"];
const state = CoValueState.Loading(mockCoValueId, peerIds);
const state = new CoValueState(mockCoValueId);
state.loadFromPeers([
createMockPeerState({ id: "peer1", role: "server" }),
createMockPeerState({ id: "peer2", role: "server" }),
]);
expect(state.id).toBe(mockCoValueId);
expect(state.state.type).toBe("loading");
expect(state.isLoading()).toBe(true);
expect(
await metricReader.getMetricValue("jazz.covalues.loaded", {
state: "loading",
@@ -66,11 +69,12 @@ describe("CoValueState", () => {
test("should create available state", async () => {
const mockCoValue = createMockCoValueCore(mockCoValueId);
const state = CoValueState.Available(mockCoValue);
const state = new CoValueState(mockCoValueId);
state.markAvailable(mockCoValue);
expect(state.id).toBe(mockCoValueId);
assert(state.state.type === "available");
expect(state.state.coValue).toBe(mockCoValue);
expect(state.isAvailable()).toBe(true);
expect(state.core).toBe(mockCoValue);
await expect(state.getCoValue()).resolves.toEqual(mockCoValue);
expect(
await metricReader.getMetricValue("jazz.covalues.loaded", {
@@ -81,7 +85,11 @@ describe("CoValueState", () => {
test("should handle found action", async () => {
const mockCoValue = createMockCoValueCore(mockCoValueId);
const state = CoValueState.Loading(mockCoValueId, ["peer1", "peer2"]);
const state = new CoValueState(mockCoValueId);
state.loadFromPeers([
createMockPeerState({ id: "peer1", role: "server" }),
createMockPeerState({ id: "peer2", role: "server" }),
]);
expect(
await metricReader.getMetricValue("jazz.covalues.loaded", {
@@ -96,10 +104,7 @@ describe("CoValueState", () => {
const stateValuePromise = state.getCoValue();
state.dispatch({
type: "available",
coValue: mockCoValue,
});
state.markAvailable(mockCoValue);
const result = await state.getCoValue();
expect(result).toBe(mockCoValue);
@@ -117,17 +122,6 @@ describe("CoValueState", () => {
).toBe(0);
});
test("should ignore actions when not in loading state", () => {
const state = CoValueState.Unknown(mockCoValueId);
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
expect(state.state.type).toBe("unknown");
});
test("should retry loading from peers when unsuccessful", async () => {
vi.useFakeTimers();
@@ -137,10 +131,7 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markNotFoundInPeer("peer1");
},
);
const peer2 = createMockPeerState(
@@ -149,15 +140,12 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer2",
});
state.markNotFoundInPeer("peer2");
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
@@ -173,7 +161,7 @@ describe("CoValueState", () => {
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
);
expect(state.state.type).toBe("unavailable");
expect(state.isDefinitelyUnavailable()).toBe(true);
await expect(state.getCoValue()).resolves.toBe("unavailable");
vi.useRealTimers();
@@ -188,11 +176,7 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
peer1.erroredCoValues.set(mockCoValueId, new Error("test") as any);
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markErrored("peer1", {} as any);
},
);
const peer2 = createMockPeerState(
@@ -201,16 +185,13 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer2",
});
state.markNotFoundInPeer("peer2");
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
@@ -224,7 +205,7 @@ describe("CoValueState", () => {
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
);
expect(state.state.type).toBe("unavailable");
expect(state.isDefinitelyUnavailable()).toBe(true);
await expect(state.getCoValue()).resolves.toBe("unavailable");
vi.useRealTimers();
@@ -239,10 +220,7 @@ describe("CoValueState", () => {
role: "storage",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markNotFoundInPeer("peer1");
},
);
const peer2 = createMockPeerState(
@@ -251,15 +229,12 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer2",
});
state.markNotFoundInPeer("peer2");
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
@@ -273,7 +248,7 @@ describe("CoValueState", () => {
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
);
expect(state.state.type).toBe("unavailable");
expect(state.isDefinitelyUnavailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual("unavailable");
vi.useRealTimers();
@@ -293,17 +268,11 @@ describe("CoValueState", () => {
},
async () => {
retries++;
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markNotFoundInPeer("peer1");
if (retries === 2) {
setTimeout(() => {
state.dispatch({
type: "available",
coValue: createMockCoValueCore(mockCoValueId),
});
state.markAvailable(createMockCoValueCore(mockCoValueId));
}, 100);
}
},
@@ -311,7 +280,7 @@ describe("CoValueState", () => {
const mockPeers = [peer1] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
@@ -322,7 +291,7 @@ describe("CoValueState", () => {
await loadPromise;
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(2);
expect(state.state.type).toBe("available");
expect(state.isAvailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
vi.useRealTimers();
});
@@ -336,16 +305,13 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markNotFoundInPeer("peer1");
},
);
const mockPeers = [peer1] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_CONFIG.MAX_RETRIES retries
@@ -353,17 +319,14 @@ describe("CoValueState", () => {
await vi.runAllTimersAsync();
}
state.dispatch({
type: "available",
coValue: createMockCoValueCore(mockCoValueId),
});
state.markAvailable(createMockCoValueCore(mockCoValueId));
await loadPromise;
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
);
expect(state.state.type).toBe("available");
expect(state.isAvailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
vi.useRealTimers();
@@ -383,22 +346,16 @@ describe("CoValueState", () => {
},
async () => {
if (run > 2) {
state.dispatch({
type: "available",
coValue: createMockCoValueCore(mockCoValueId),
});
state.markAvailable(createMockCoValueCore(mockCoValueId));
}
state.dispatch({
type: "not-found-in-peer",
peerId: "peer1",
});
state.markNotFoundInPeer("peer1");
run++;
},
);
const mockPeers = [peer1] as unknown as PeerState[];
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
@@ -407,7 +364,7 @@ describe("CoValueState", () => {
await loadPromise;
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(3);
expect(state.state.type).toBe("available");
expect(state.isAvailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
vi.useRealTimers();
@@ -424,10 +381,7 @@ describe("CoValueState", () => {
role: "storage",
},
async () => {
state.dispatch({
type: "available",
coValue: mockCoValue,
});
state.markAvailable(mockCoValue);
},
);
const peer2 = createMockPeerState(
@@ -436,14 +390,11 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "not-found-in-peer",
peerId: "peer2",
});
state.markNotFoundInPeer("peer2");
},
);
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1, peer2]);
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
@@ -457,7 +408,7 @@ describe("CoValueState", () => {
action: "load",
...mockCoValue.knownState(),
});
expect(state.state.type).toBe("available");
expect(state.isAvailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
vi.useRealTimers();
@@ -483,16 +434,13 @@ describe("CoValueState", () => {
role: "server",
},
async () => {
state.dispatch({
type: "available",
coValue: mockCoValue,
});
state.markAvailable(mockCoValue);
},
);
peer1.closed = true;
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1, peer2]);
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES; i++) {
@@ -503,7 +451,7 @@ describe("CoValueState", () => {
expect(peer1.pushOutgoingMessage).toHaveBeenCalledTimes(0);
expect(peer2.pushOutgoingMessage).toHaveBeenCalledTimes(1);
expect(state.state.type).toBe("available");
expect(state.isAvailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual({ id: mockCoValueId });
vi.useRealTimers();
@@ -520,7 +468,7 @@ describe("CoValueState", () => {
async () => {},
);
const state = CoValueState.Unknown(mockCoValueId);
const state = new CoValueState(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1]);
for (let i = 0; i < CO_VALUE_LOADING_CONFIG.MAX_RETRIES * 2; i++) {
@@ -532,7 +480,7 @@ describe("CoValueState", () => {
CO_VALUE_LOADING_CONFIG.MAX_RETRIES,
);
expect(state.state.type).toBe("unavailable");
expect(state.isDefinitelyUnavailable()).toBe(true);
await expect(state.getCoValue()).resolves.toEqual("unavailable");
vi.useRealTimers();

View File

@@ -245,8 +245,6 @@ describe("loading coValues from server", () => {
[
"client -> server | LOAD Map sessions: empty",
"server -> client | KNOWN Map sessions: empty",
"client -> server | LOAD Map sessions: empty",
"server -> client | KNOWN Map sessions: empty",
]
`);
});

View File

@@ -78,10 +78,10 @@ describe("peer reconciliation", () => {
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
assert(mapOnSyncServer.state.type === "available");
assert(mapOnSyncServer.isAvailable());
expect(
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
).toEqual("updated");
expect(
@@ -125,10 +125,10 @@ describe("peer reconciliation", () => {
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
assert(mapOnSyncServer.state.type === "available");
assert(mapOnSyncServer.isAvailable());
expect(
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
).toEqual("updated");
expect(peer.outgoing).toMatchObject({
@@ -175,10 +175,10 @@ describe("peer reconciliation", () => {
const mapOnSyncServer = jazzCloud.node.coValuesStore.get(map.id);
await waitFor(() => {
expect(mapOnSyncServer.state.type).toBe("available");
expect(mapOnSyncServer.isAvailable()).toBe(true);
});
assert(mapOnSyncServer.state.type === "available");
assert(mapOnSyncServer.isAvailable());
expect(
SyncMessagesLog.getMessages({
@@ -195,7 +195,7 @@ describe("peer reconciliation", () => {
`);
expect(
expectMap(mapOnSyncServer.state.coValue.getCurrentContent()).get("hello"),
expectMap(mapOnSyncServer.core.getCurrentContent()).get("hello"),
).toEqual("updated");
});
});

View File

@@ -44,7 +44,6 @@ describe("client with storage syncs with server", () => {
"storage -> client | KNOWN Group sessions: header/3",
"client -> server | KNOWN Map sessions: header/1",
"client -> storage | CONTENT Map header: true new: After: 0 New: 1",
"storage -> client | KNOWN Map sessions: header/1",
]
`);
});
@@ -78,11 +77,6 @@ describe("client with storage syncs with server", () => {
"client -> server | CONTENT Group header: true new: After: 0 New: 3",
"server -> client | KNOWN Group sessions: header/3",
"client -> storage | KNOWN Map sessions: header/1",
"client -> server | LOAD Map sessions: header/1",
"server -> client | CONTENT Group header: true new: After: 0 New: 3",
"client -> server | KNOWN Group sessions: header/3",
"server -> client | KNOWN Map sessions: header/1",
"client -> server | CONTENT Map header: true new: After: 0 New: 1",
]
`);
});
@@ -126,8 +120,6 @@ describe("client with storage syncs with server", () => {
"client -> storage | CONTENT Group header: true new: After: 0 New: 5",
"storage -> client | KNOWN Group sessions: header/5",
"client -> server | KNOWN Map sessions: header/1",
"client -> storage | CONTENT Map header: true new: After: 0 New: 1",
"storage -> client | KNOWN Map sessions: header/1",
]
`);
});
@@ -147,6 +139,8 @@ describe("client with storage syncs with server", () => {
client.node.syncManager.getPeers()[0]?.gracefulShutdown();
await new Promise((resolve) => setTimeout(resolve, 100));
SyncMessagesLog.clear();
map.set("hello", "updated", "trusting");

View File

@@ -759,9 +759,7 @@ describe("SyncManager.addPeer", () => {
await map.core.waitForSync();
expect(jazzCloud.node.coValuesStore.get(map.id).state.type).toBe(
"available",
);
expect(jazzCloud.node.coValuesStore.get(map.id).isAvailable()).toBe(true);
});
});
@@ -1087,10 +1085,9 @@ describe("SyncManager.handleSyncMessage", () => {
// Add a coValue to the errored set
const erroredId = "co_z123" as const;
peerState.erroredCoValues.set(
erroredId,
new Error("Test error") as unknown as TryAddTransactionsError,
);
client.node.coValuesStore.get(erroredId).markErrored(peerState.id, {
message: "Test error",
} as any);
const message = {
action: "load" as const,

View File

@@ -103,7 +103,7 @@ export class Ref<out V extends CoValue> {
this.id as unknown as CoID<RawCoValue>,
);
if (entry.state.type === "available") {
if (entry.isAvailable()) {
return new Ref(this.id, this.controlledAccount, this.schema).value!;
}

View File

@@ -155,8 +155,8 @@ function loadCoValue(
) {
const entry = node.coValuesStore.get(id);
if (entry.state.type === "available" && syncResolution) {
callback(entry.state.coValue);
if (entry.isAvailable() && syncResolution) {
callback(entry.core);
} else {
void node.loadCoValueCore(id).then((core) => {
callback(core);

View File

@@ -487,7 +487,7 @@ describe("createCoValueObservable", () => {
expect(observable.getCurrentValue()).toBeUndefined();
});
it("should return null if the coValue is not found", async () => {
it.only("should return null if the coValue is not found", async () => {
const { meOnSecondPeer } = await setupAccount();
const observable = createCoValueObservable<
TestMap,