Compare commits

...

3 Commits

Author SHA1 Message Date
Guido D'Orsi
01f404c72b remove chat-vue dist files 2024-11-12 22:50:04 +01:00
Guido D'Orsi
b0c6436c00 Merge remote-tracking branch 'origin/main' into feature/background-sync 2024-11-12 22:34:14 +01:00
Guido D'Orsi
9451f3628c poc: resume the partial syncs on app start 2024-11-12 22:29:44 +01:00
8 changed files with 313 additions and 41 deletions

View File

@@ -29,6 +29,7 @@
"@types/react": "^18.2.19",
"@types/react-dom": "^18.2.7",
"@vitejs/plugin-react-swc": "^3.3.2",
"jazz-run": "workspace:*",
"jstat": "^1.9.6",
"typescript": "^5.3.3",
"vite": "^5.0.10"

View File

@@ -12,42 +12,47 @@ import isCI from "is-ci";
* See https://playwright.dev/docs/test-configuration.
*/
export default defineConfig({
testDir: "./tests",
/* Run tests in files in parallel */
fullyParallel: true,
/* Fail the build on CI if you accidentally left test.only in the source code. */
forbidOnly: isCI,
/* Retry on CI only */
retries: isCI ? 2 : 0,
/* Opt out of parallel tests on CI. */
workers: isCI ? 1 : undefined,
/* Reporter to use. See https://playwright.dev/docs/test-reporters */
reporter: "html",
testDir: "./tests",
/* Run tests in files in parallel */
fullyParallel: true,
/* Fail the build on CI if you accidentally left test.only in the source code. */
forbidOnly: isCI,
/* Retry on CI only */
retries: isCI ? 2 : 0,
/* Opt out of parallel tests on CI. */
workers: isCI ? 1 : undefined,
/* Reporter to use. See https://playwright.dev/docs/test-reporters */
reporter: "html",
/* Shared settings for all the projects below. See https://playwright.dev/docs/api/class-testoptions. */
use: {
/* Base URL to use in actions like `await page.goto('/')`. */
baseURL: "http://localhost:5173/",
/* Shared settings for all the projects below. See https://playwright.dev/docs/api/class-testoptions. */
use: {
/* Base URL to use in actions like `await page.goto('/')`. */
baseURL: "http://localhost:5173/",
/* Collect trace when retrying the failed test. See https://playwright.dev/docs/trace-viewer */
trace: "on-first-retry",
permissions: ["clipboard-read", "clipboard-write"],
},
/* Configure projects for major browsers */
projects: [
{
name: "chromium",
use: { ...devices["Desktop Chrome"] },
/* Collect trace when retrying the failed test. See https://playwright.dev/docs/trace-viewer */
trace: "on-first-retry",
permissions: ["clipboard-read", "clipboard-write"],
},
],
/* Run your local dev server before starting the tests */
webServer: [
{
command: "pnpm preview --port 5173",
url: "http://localhost:5173/",
reuseExistingServer: !isCI,
},
],
/* Configure projects for major browsers */
projects: [
{
name: "chromium",
use: { ...devices["Desktop Chrome"] },
},
],
/* Run your local dev server before starting the tests */
webServer: [
{
command: "pnpm preview --port 5173",
url: "http://localhost:5173/",
reuseExistingServer: !isCI,
},
{
command: "pnpm exec jazz-run sync --in-memory --port 4200",
url: "http://localhost:4200/health",
reuseExistingServer: !isCI,
},
],
});

View File

@@ -2,13 +2,15 @@ import { setTimeout } from "node:timers/promises";
import { expect, test } from "@playwright/test";
test.describe("ResumeSyncState", () => {
test.skip("should resume the sync even after a page reload", async ({
test("should resume the sync even after a page reload", async ({
page,
browser,
}) => {
const context = page.context();
await page.goto("/resume-sync?userName=SuperMario");
await page.goto(
"/resume-sync?userName=SuperMario&peer=ws://localhost:4200",
);
const id = await page.getByTestId("id").textContent();
@@ -29,7 +31,9 @@ test.describe("ResumeSyncState", () => {
// Reload the page but without loading the coValue
// await page.goto(`/resume-sync?userName=SuperMario`);
await page.goto(`/resume-sync?userName=SuperMario`);
await page.goto(
`/resume-sync?userName=SuperMario&peer=ws://localhost:4200`,
);
await setTimeout(1000);
@@ -37,7 +41,9 @@ test.describe("ResumeSyncState", () => {
// Create a new incognito instance and try to load the coValue
const newUserPage = await (await browser.newContext()).newPage();
await newUserPage.goto(`/resume-sync?userName=Luigi&id=${id}`);
await newUserPage.goto(
`/resume-sync?userName=Luigi&id=${id}&peer=ws://localhost:4200`,
);
await expect(newUserPage.getByTestId("id")).toBeInViewport();

View File

@@ -110,7 +110,7 @@ export class IDBStorage {
toLocalNode: OutgoingSyncQueue,
) {
const dbPromise = new Promise<IDBDatabase>((resolve, reject) => {
const request = indexedDB.open("jazz-storage", 4);
const request = indexedDB.open("jazz-storage", 5);
request.onerror = () => {
reject(request.error);
};
@@ -148,6 +148,15 @@ export class IDBStorage {
keyPath: ["ses", "idx"],
});
}
if (ev.oldVersion <= 5) {
const peersKnownStates = db.createObjectStore("peersKnownStates", {
keyPath: "id",
});
peersKnownStates.createIndex("peersKnownStatesByPeerId", "peerId", {
multiEntry: true,
});
}
};
});
@@ -168,6 +177,12 @@ export class IDBStorage {
case "done":
await this.handleDone(msg);
break;
case "persistSyncState":
await this.handlePersistSyncState(msg);
break;
case "requestSyncStateHydration":
await this.handleRequestSyncStateHydration(msg);
break;
}
}
@@ -180,6 +195,7 @@ export class IDBStorage {
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
peersKnownStates: IDBObjectStore;
};
startedAt: number;
pendingRequests: ((txEntry: {
@@ -188,6 +204,7 @@ export class IDBStorage {
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
peersKnownStates: IDBObjectStore;
};
}) => void)[];
}
@@ -200,6 +217,7 @@ export class IDBStorage {
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
peersKnownStates: IDBObjectStore;
}) => IDBRequest,
): SyncPromise<T> {
return new SyncPromise((resolve, reject) => {
@@ -213,6 +231,7 @@ export class IDBStorage {
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
peersKnownStates: IDBObjectStore;
};
}) => {
const request = handler(stores);
@@ -240,7 +259,13 @@ export class IDBStorage {
if (!txEntry || performance.now() - txEntry.startedAt > 20) {
const tx = this.db.transaction(
["coValues", "sessions", "transactions", "signatureAfter"],
[
"coValues",
"sessions",
"transactions",
"signatureAfter",
"peersKnownStates",
],
"readwrite",
);
txEntry = {
@@ -251,6 +276,7 @@ export class IDBStorage {
sessions: tx.objectStore("sessions"),
transactions: tx.objectStore("transactions"),
signatureAfter: tx.objectStore("signatureAfter"),
peersKnownStates: tx.objectStore("peersKnownStates"),
},
startedAt: performance.now(),
pendingRequests: [],
@@ -275,6 +301,42 @@ export class IDBStorage {
});
}
async handlePersistSyncState(
msg: CojsonInternalTypes.PersistSyncStateMessage,
) {
if (msg.fullySynced) {
await this.makeRequest(({ peersKnownStates }) =>
peersKnownStates.delete(`${msg.peerId}-${msg.id}`),
);
} else {
await this.makeRequest(({ peersKnownStates }) =>
peersKnownStates.put({
id: `${msg.peerId}-${msg.id}`,
peerId: msg.peerId,
knownState: msg.payload,
}),
);
}
}
async handleRequestSyncStateHydration(
msg: CojsonInternalTypes.RequestSyncStateHydrationMessage,
) {
const result = await this.makeRequest<
{
knownState: CojsonInternalTypes.CoValueKnownState;
}[]
>(({ peersKnownStates }) =>
peersKnownStates.index("peersKnownStatesByPeerId").getAll(msg.peerId),
);
void this.toLocalNode.push({
action: "hydrateSyncState",
peerId: msg.peerId,
knownStates: result.map((row) => row.knownState),
});
}
sendNewContentAfter(
theirKnown: CojsonInternalTypes.CoValueKnownState,
asDependencyOf?: CojsonInternalTypes.RawCoID,

View File

@@ -137,6 +137,15 @@ export namespace CojsonInternalTypes {
export type KnownStateMessage = import("./sync.js").KnownStateMessage;
export type LoadMessage = import("./sync.js").LoadMessage;
export type NewContentMessage = import("./sync.js").NewContentMessage;
export type PersistSyncStateMessage = import(
"./sync.js",
).PersistSyncStateMessage;
export type RequestSyncStateHydrationMessage = import(
"./sync.js",
).RequestSyncStateHydrationMessage;
export type HydrateSyncStateMessage = import(
"./sync.js",
).HydrateSyncStateMessage;
export type CoValueHeader = import("./coValueCore.js").CoValueHeader;
export type Transaction = import("./coValueCore.js").Transaction;
export type TransactionID = import("./ids.js").TransactionID;

View File

@@ -145,6 +145,8 @@ export class LocalNode {
}
}
LocalNode.storeServerPeersKnownStates(nodeWithAccount);
syncAllCoValuesAfterCreateAccount();
setTimeout(syncAllCoValuesAfterCreateAccount, 500);
@@ -157,6 +159,35 @@ export class LocalNode {
};
}
static storeServerPeersKnownStates(node: LocalNode) {
const serverPeersIds = new Set<PeerID>(
node.syncManager
.getPeers()
.filter((peer) => peer.role === "server")
.map((peer) => peer.id),
);
const storagePeers = node.syncManager
.getPeers()
.filter((peer) => peer.role === "storage");
node.syncManager.subscribeToSyncStateUpdate(
(peerId, knownState, fullySynced) => {
if (serverPeersIds.has(peerId)) {
for (const storagePeer of storagePeers) {
void storagePeer.pushOutgoingMessage({
action: "persistSyncState",
payload: knownState,
fullySynced,
peerId,
id: knownState.id,
});
}
}
},
);
}
/** @category 2. Node Creation */
static async withLoadedAccount<Meta extends AccountMeta = AccountMeta>({
accountID,
@@ -209,6 +240,8 @@ export class LocalNode {
node.coValues[accountID] = CoValueState.Available(controlledAccount.core);
controlledAccount.core._cachedContent = undefined;
LocalNode.storeServerPeersKnownStates(node);
const profileID = account.get("profile");
if (!profileID) {
throw new Error("Account has no profile");
@@ -227,6 +260,14 @@ export class LocalNode {
);
}
const serverPeers = node.syncManager
.getPeers()
.filter((peer) => peer.role === "server");
for (const peer of serverPeers) {
void node.syncManager.requestSyncStateHydration(peer.id);
}
return node;
} catch (e) {
console.error("Error withLoadedAccount", e);

View File

@@ -26,7 +26,10 @@ export type SyncMessage =
| LoadMessage
| KnownStateMessage
| NewContentMessage
| DoneMessage;
| DoneMessage
| PersistSyncStateMessage
| RequestSyncStateHydrationMessage
| HydrateSyncStateMessage;
export type LoadMessage = {
action: "load";
@@ -58,6 +61,25 @@ export type DoneMessage = {
id: RawCoID;
};
export type PersistSyncStateMessage = {
action: "persistSyncState";
id: RawCoID;
payload: CoValueKnownState;
fullySynced: boolean;
peerId: PeerID;
};
export type RequestSyncStateHydrationMessage = {
action: "requestSyncStateHydration";
peerId: PeerID;
};
export type HydrateSyncStateMessage = {
action: "hydrateSyncState";
knownStates: CoValueKnownState[];
peerId: PeerID;
};
export type PeerID = string;
export type DisconnectedError = "Disconnected";
@@ -184,6 +206,11 @@ export class SyncManager {
return await this.handleNewContent(msg, peer);
case "done":
return await this.handleUnsubscribe(msg);
case "hydrateSyncState":
return await this.handleHydrateSyncState(msg);
case "persistSyncState":
case "requestSyncStateHydration":
return;
default:
throw new Error(
`Unknown message type ${(msg as { action: "string" }).action}`,
@@ -191,6 +218,52 @@ export class SyncManager {
}
}
async requestSyncStateHydration(peerId: PeerID) {
const peers = this.peersInPriorityOrder().filter(
(peer) => peer.id !== peerId && peer.role === "storage",
);
for (const peer of peers) {
await peer.pushOutgoingMessage({
action: "requestSyncStateHydration",
peerId,
});
}
}
async handleHydrateSyncState(msg: HydrateSyncStateMessage) {
const peer = this.peers[msg.peerId];
if (!peer) {
throw new Error(`Unknown peer ${msg.peerId}`);
}
const coValuesToResume = new Set<RawCoID>();
for (const knownState of msg.knownStates) {
if (peer.knownStates.has(knownState.id)) {
peer.knownStates.dispatch({
type: "COMBINE_WITH",
id: knownState.id,
value: knownState,
});
continue;
}
peer.knownStates.dispatch({
type: "SET",
id: knownState.id,
value: knownState,
});
coValuesToResume.add(knownState.id);
}
for (const id of coValuesToResume) {
void this.local.loadCoValueCore(id);
}
}
async subscribeToIncludingDependencies(id: RawCoID, peer: PeerState) {
const entry = this.local.coValues[id];
@@ -395,6 +468,65 @@ export class SyncManager {
});
}
syncStateUpdateListeners = new Set<
(
peerId: PeerID,
knownState: CoValueKnownState,
fullySynced: boolean,
) => void
>();
subscribeToSyncStateUpdate(
listener: (
peerId: PeerID,
knownState: CoValueKnownState,
fullySynced: boolean,
) => void,
) {
this.syncStateUpdateListeners.add(listener);
return () => {
this.syncStateUpdateListeners.delete(listener);
};
}
triggerSyncStateUpdate(peerId: PeerID, id: RawCoID) {
const peer = this.peers[peerId];
if (!peer) {
return;
}
const knownState = peer.knownStates.get(id) ?? emptyKnownState(id);
const fullySynced = this.getIsCoValueFullySyncedIntoPeer(peerId, id);
for (const listener of this.syncStateUpdateListeners) {
listener(peerId, knownState, fullySynced);
}
}
getIsCoValueFullySyncedIntoPeer(peerId: PeerID, id: RawCoID) {
const peer = this.peers[peerId];
const entry = this.local.coValues[id];
if (!peer) {
return false;
}
if (entry?.state.type !== "available") {
return false;
}
const coValue = entry.state.coValue;
const knownState = peer.knownStates.get(id);
if (!knownState) {
return false;
}
return getIsFullySynced(knownState.sessions, coValue.knownState().sessions);
}
trySendToPeer(peer: PeerState, msg: SyncMessage) {
return peer.pushOutgoingMessage(msg);
}
@@ -789,3 +921,16 @@ function knownStateIn(msg: LoadMessage | KnownStateMessage) {
sessions: msg.sessions,
};
}
function getIsFullySynced(
localSessions: Record<string, number>,
remoteSessions: Record<string, number>,
) {
for (const sessionId of Object.keys(localSessions)) {
if (localSessions[sessionId] !== remoteSessions[sessionId]) {
return false;
}
}
return true;
}

3
pnpm-lock.yaml generated
View File

@@ -134,6 +134,9 @@ importers:
'@vitejs/plugin-react-swc':
specifier: ^3.3.2
version: 3.5.0(@swc/helpers@0.5.5)(vite@5.0.10(@types/node@22.5.1)(terser@5.33.0))
jazz-run:
specifier: workspace:*
version: link:../../packages/jazz-run
jstat:
specifier: ^1.9.6
version: 1.9.6