Merge pull request #2214 from garden-co/feat/sync-polish

feat: make the SyncManager async-free, support parallel load on server peers
This commit is contained in:
Guido D'Orsi
2025-05-14 12:55:36 +02:00
committed by GitHub
6 changed files with 192 additions and 131 deletions

View File

@@ -943,7 +943,11 @@ export class CoValueCore {
return;
}
const peersToActuallyLoadFrom = [];
const peersToActuallyLoadFrom = {
storage: [] as PeerState[],
server: [] as PeerState[],
};
for (const peer of peers) {
const currentState = this.peers.get(peer.id);
@@ -959,78 +963,87 @@ export class CoValueCore {
}
if (currentState?.type === "unavailable") {
if (peer.shouldRetryUnavailableCoValues()) {
if (peer.role === "server") {
peersToActuallyLoadFrom.server.push(peer);
this.markPending(peer.id);
peersToActuallyLoadFrom.push(peer);
}
continue;
}
if (!currentState || currentState?.type === "unknown") {
if (peer.role === "storage") {
peersToActuallyLoadFrom.storage.push(peer);
} else {
peersToActuallyLoadFrom.server.push(peer);
}
this.markPending(peer.id);
peersToActuallyLoadFrom.push(peer);
}
}
for (const peer of peersToActuallyLoadFrom) {
if (peer.closed) {
this.markNotFoundInPeer(peer.id);
continue;
}
peer.pushOutgoingMessage({
action: "load",
...this.knownState(),
});
peer.trackLoadRequestSent(this.id);
/**
* Use a very long timeout for storage peers, because under pressure
* they may take a long time to consume the messages queue
*
* TODO: Track errors on storage and do not rely on timeout
*/
const timeoutDuration =
peer.role === "storage"
? CO_VALUE_LOADING_CONFIG.TIMEOUT * 10
: CO_VALUE_LOADING_CONFIG.TIMEOUT;
const waitingForPeer = new Promise<void>((resolve) => {
const markNotFound = () => {
if (this.peers.get(peer.id)?.type === "pending") {
logger.warn("Timeout waiting for peer to load coValue", {
id: this.id,
peerID: peer.id,
});
this.markNotFoundInPeer(peer.id);
}
};
const timeout = setTimeout(markNotFound, timeoutDuration);
const removeCloseListener = peer.addCloseListener(markNotFound);
const listener = (state: CoValueCore) => {
const peerState = state.peers.get(peer.id);
if (
state.isAvailable() || // might have become available from another peer e.g. through handleNewContent
peerState?.type === "available" ||
peerState?.type === "errored" ||
peerState?.type === "unavailable"
) {
this.listeners.delete(listener);
removeCloseListener();
clearTimeout(timeout);
resolve();
}
};
this.listeners.add(listener);
listener(this);
});
await waitingForPeer;
// Load from storage peers first, then from server peers
if (peersToActuallyLoadFrom.storage.length > 0) {
await Promise.all(
peersToActuallyLoadFrom.storage.map((peer) =>
this.internalLoadFromPeer(peer),
),
);
}
if (peersToActuallyLoadFrom.server.length > 0) {
await Promise.all(
peersToActuallyLoadFrom.server.map((peer) =>
this.internalLoadFromPeer(peer),
),
);
}
}
internalLoadFromPeer(peer: PeerState) {
if (peer.closed) {
this.markNotFoundInPeer(peer.id);
return;
}
peer.pushOutgoingMessage({
action: "load",
...this.knownState(),
});
peer.trackLoadRequestSent(this.id);
return new Promise<void>((resolve) => {
const markNotFound = () => {
if (this.peers.get(peer.id)?.type === "pending") {
logger.warn("Timeout waiting for peer to load coValue", {
id: this.id,
peerID: peer.id,
});
this.markNotFoundInPeer(peer.id);
}
};
const timeout = setTimeout(markNotFound, CO_VALUE_LOADING_CONFIG.TIMEOUT);
const removeCloseListener = peer.addCloseListener(markNotFound);
const listener = (state: CoValueCore) => {
const peerState = state.peers.get(peer.id);
if (
state.isAvailable() || // might have become available from another peer e.g. through handleNewContent
peerState?.type === "available" ||
peerState?.type === "errored" ||
peerState?.type === "unavailable"
) {
this.listeners.delete(listener);
removeCloseListener();
clearTimeout(timeout);
resolve();
}
};
this.listeners.add(listener);
listener(this);
});
}
}

View File

@@ -298,7 +298,7 @@ export class SyncManager {
}
}
async addPeer(peer: Peer) {
addPeer(peer: Peer) {
const prevPeer = this.peers[peer.id];
if (prevPeer && !prevPeer.closed) {
@@ -379,65 +379,17 @@ export class SyncManager {
peer.setKnownState(msg.id, knownStateIn(msg));
const coValue = this.local.getCoValue(msg.id);
if (
coValue.loadingState === "unknown" ||
coValue.loadingState === "unavailable"
) {
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
if (eligiblePeers.length === 0) {
// We don't have any eligible peers to load the coValue from
// so we send a known state back to the sender to let it know
// that the coValue is unavailable
peer.trackToldKnownState(msg.id);
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
});
return;
} else {
// Syncronously updates the state loading is possible
coValue
.loadFromPeers(this.getServerAndStoragePeers(peer.id))
.catch((e) => {
logger.error("Error loading coValue in handleLoad", { err: e });
});
}
if (coValue.isAvailable()) {
this.sendNewContentIncludingDependencies(msg.id, peer);
return;
}
if (coValue.loadingState === "loading") {
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
// in a new task, otherwise we might block further incoming content messages that would
// resolve the CoValue as available. This can happen when we receive fresh
// content from a client, but we are a server with our own upstream server(s)
coValue
.waitForAvailableOrUnavailable()
.then(async (value) => {
if (!value.isAvailable()) {
peer.trackToldKnownState(msg.id);
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
});
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
return;
}
this.sendNewContentIncludingDependencies(msg.id, peer);
})
.catch((e) => {
logger.error("Error loading coValue in handleLoad loading state", {
err: e,
});
});
} else if (coValue.isAvailable()) {
this.sendNewContentIncludingDependencies(msg.id, peer);
} else {
if (eligiblePeers.length === 0) {
// We don't have any eligible peers to load the coValue from
// so we send a known state back to the sender to let it know
// that the coValue is unavailable
peer.trackToldKnownState(msg.id);
this.trySendToPeer(peer, {
action: "known",
@@ -445,7 +397,40 @@ export class SyncManager {
header: false,
sessions: {},
});
return;
}
coValue.loadFromPeers(eligiblePeers).catch((e) => {
logger.error("Error loading coValue in handleLoad", { err: e });
});
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
// in a new task, otherwise we might block further incoming content messages that would
// resolve the CoValue as available. This can happen when we receive fresh
// content from a client, but we are a server with our own upstream server(s)
coValue
.waitForAvailableOrUnavailable()
.then((value) => {
if (!value.isAvailable()) {
peer.trackToldKnownState(msg.id);
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
});
return;
}
this.sendNewContentIncludingDependencies(msg.id, peer);
})
.catch((e) => {
logger.error("Error loading coValue in handleLoad loading state", {
err: e,
});
});
}
handleKnownState(msg: KnownStateMessage, peer: PeerState) {
@@ -607,6 +592,8 @@ export class SyncManager {
// Check if there is a inflight load operation and we
// are waiting for other peers to send the load request
if (state === "unknown" || state === undefined) {
// Sending a load message to the peer to get to know how much content is missing
// before sending the new content
this.trySendToPeer(peer, {
action: "load",
...coValue.knownState(),
@@ -645,7 +632,7 @@ export class SyncManager {
this.requestedSyncs.add(coValue.id);
}
async syncCoValue(coValue: CoValueCore) {
syncCoValue(coValue: CoValueCore) {
this.requestedSyncs.delete(coValue.id);
for (const peer of this.peersInPriorityOrder()) {
@@ -668,21 +655,21 @@ export class SyncManager {
}
}
async waitForSyncWithPeer(peerId: PeerID, id: RawCoID, timeout: number) {
waitForSyncWithPeer(peerId: PeerID, id: RawCoID, timeout: number) {
const { syncState } = this;
const currentSyncState = syncState.getCurrentSyncState(peerId, id);
const isTheConditionAlreadyMet = currentSyncState.uploaded;
if (isTheConditionAlreadyMet) {
return true;
return;
}
const peerState = this.peers[peerId];
// The peer has been closed, so it isn't possible to sync
if (!peerState || peerState.closed) {
return true;
return;
}
// The client isn't subscribed to the coValue, so we won't sync it
@@ -690,7 +677,7 @@ export class SyncManager {
peerState.role === "client" &&
!peerState.optimisticKnownStates.has(id)
) {
return true;
return;
}
return new Promise((resolve, reject) => {
@@ -712,25 +699,25 @@ export class SyncManager {
});
}
async waitForStorageSync(id: RawCoID, timeout = 30_000) {
waitForStorageSync(id: RawCoID, timeout = 30_000) {
const peers = this.getPeers();
await Promise.all(
return Promise.all(
peers
.filter((peer) => peer.role === "storage")
.map((peer) => this.waitForSyncWithPeer(peer.id, id, timeout)),
);
}
async waitForSync(id: RawCoID, timeout = 30_000) {
waitForSync(id: RawCoID, timeout = 30_000) {
const peers = this.getPeers();
await Promise.all(
return Promise.all(
peers.map((peer) => this.waitForSyncWithPeer(peer.id, id, timeout)),
);
}
async waitForAllCoValuesSync(timeout = 60_000) {
waitForAllCoValuesSync(timeout = 60_000) {
const coValues = this.local.allCoValues();
const validCoValues = Array.from(coValues).filter(
(coValue) =>

View File

@@ -257,7 +257,7 @@ describe("SyncStateManager", () => {
const group = client.node.createGroup();
const map = group.createMap();
await expect(map.core.waitForSync()).resolves.toBeUndefined();
await map.core.waitForSync();
});
test("should skip client peers that are not subscribed to the coValue", async () => {

View File

@@ -4,6 +4,7 @@ import { expectMap } from "../coValue";
import {
SyncMessagesLog,
blockMessageTypeOnOutgoingPeer,
connectedPeersWithMessagesTracking,
loadCoValueOrFail,
setupTestNode,
waitFor,
@@ -76,8 +77,8 @@ describe("multiple clients syncing with the a cloud-like server mesh", () => {
"core -> storage | CONTENT Group header: true new: After: 0 New: 3",
"storage -> core | KNOWN Group sessions: header/3",
"core -> storage | CONTENT Map header: true new: After: 0 New: 1",
"storage -> core | KNOWN Map sessions: header/1",
"client -> edge-italy | LOAD Map sessions: empty",
"storage -> core | KNOWN Map sessions: header/1",
"edge-italy -> core | LOAD Map sessions: empty",
"core -> edge-italy | CONTENT Group header: true new: After: 0 New: 3",
"edge-italy -> core | KNOWN Group sessions: header/3",
@@ -138,8 +139,8 @@ describe("multiple clients syncing with the a cloud-like server mesh", () => {
"core -> storage | CONTENT Group header: true new: After: 0 New: 5",
"storage -> core | KNOWN Group sessions: header/5",
"core -> storage | CONTENT Map header: true new: After: 0 New: 1",
"storage -> core | KNOWN Map sessions: header/1",
"client -> edge-italy | LOAD Map sessions: empty",
"storage -> core | KNOWN Map sessions: header/1",
"edge-italy -> core | LOAD Map sessions: empty",
"core -> edge-italy | CONTENT ParentGroup header: true new: After: 0 New: 6",
"edge-italy -> core | KNOWN ParentGroup sessions: header/6",
@@ -355,6 +356,7 @@ describe("multiple clients syncing with the a cloud-like server mesh", () => {
syncServer: storage.node,
});
storagePeer.role = "storage";
storagePeer.priority = 100;
const group = coreServer.node.createGroup();
@@ -402,4 +404,60 @@ describe("multiple clients syncing with the a cloud-like server mesh", () => {
expect(mapOnClient.get("hello")).toEqual("world");
});
test("a stuck server peer should not block the load from other server peers", async () => {
const client = setupTestNode();
const coreServer = setupTestNode({
isSyncServer: true,
});
const anotherServer = setupTestNode({});
const { peer: peerToCoreServer } = client.connectToSyncServer({
syncServerName: "core",
syncServer: coreServer.node,
});
const { peer1, peer2 } = connectedPeersWithMessagesTracking({
peer1: {
id: anotherServer.node.getCurrentAgent().id,
role: "server",
name: "another-server",
},
peer2: {
id: client.node.getCurrentAgent().id,
role: "client",
name: "client",
},
});
blockMessageTypeOnOutgoingPeer(peerToCoreServer, "load");
client.node.syncManager.addPeer(peer1);
anotherServer.node.syncManager.addPeer(peer2);
const group = anotherServer.node.createGroup();
const map = group.createMap();
map.set("hello", "world", "trusting");
const mapOnClient = await loadCoValueOrFail(client.node, map.id);
expect(
SyncMessagesLog.getMessages({
Group: group.core,
Map: map.core,
}),
).toMatchInlineSnapshot(`
[
"client -> another-server | LOAD Map sessions: empty",
"another-server -> client | CONTENT Group header: true new: After: 0 New: 3",
"client -> another-server | KNOWN Group sessions: header/3",
"another-server -> client | CONTENT Map header: true new: After: 0 New: 1",
"client -> another-server | KNOWN Map sessions: header/1",
]
`);
expect(mapOnClient.get("hello")).toEqual("world");
});
});

View File

@@ -463,6 +463,7 @@ export function createMockStoragePeer(opts: {
},
});
peer1.role = "storage";
peer1.priority = 100;
storage.syncManager.addPeer(peer2);

View File

@@ -72,6 +72,8 @@ async function setup() {
const organizationId = organization.id;
await admin1.waitForAllCoValuesSync();
return {
admin1,
admin2,