Compare commits

...

43 Commits

Author SHA1 Message Date
Marina Orlova
0589bf275f Add config for cojson 2025-01-08 10:37:12 +01:00
Marina Orlova
2f6ca4cdf4 Sync if not found in peer 2025-01-07 00:58:39 +01:00
Marina Orlova
cfbe745de3 add todo 2025-01-06 13:42:23 +01:00
Marina Orlova
7b96fd719b Fix peers bug 2025-01-05 02:14:56 +01:00
Marina Orlova
c391180093 Add queue runner 2025-01-05 01:12:32 +01:00
Marina Orlova
8ea0858571 Fix build errors 2025-01-04 12:18:37 +01:00
Marina Orlova
d1c0981024 add anaware peers 2025-01-04 11:52:06 +01:00
Marina Orlova
bb0158ec51 Fix dependencies not loaing from another peer 2025-01-03 15:53:22 +01:00
Marina Orlova
8a40c02c52 Fix unloading dependencies 2025-01-02 23:58:43 +01:00
Marina Orlova
b95937511f Send content fix 2025-01-01 22:21:09 +01:00
Marina Orlova
1967c736c5 Add dependency service 2024-12-31 21:46:43 +01:00
Marina Orlova
b84edecb50 Fix data handler for dependencies 2024-12-31 14:34:08 +01:00
Marina Orlova
5631d53e3f Order initial sync messages by dependencies 2024-12-31 01:07:59 +01:00
Marina Orlova
244fd84a88 Send dependencies when unknown covalue requested 2024-12-30 21:55:36 +01:00
Marina Orlova
d4e93afdf9 dont sync from the data action 2024-12-29 23:04:55 +01:00
Marina Orlova
26d4fa985c SQLite selects protocol conditionally 2024-12-29 22:41:57 +01:00
Marina Orlova
cefc6e27de Copy entry upload state when entry is copied 2024-12-29 21:04:29 +01:00
Marina Orlova
452c284a01 Fix sync bugs 2024-12-28 23:10:06 +01:00
Marina Orlova
191a7f33b1 Refactor storage and storage-sqlite 2024-12-27 22:19:15 +01:00
Marina Orlova
d7be246f75 tiny fixes 2024-12-27 18:20:13 +01:00
Marina Orlova
5c8717543c tweaks 2024-12-26 23:11:59 +01:00
Marina Orlova
02cd6fe4b7 Fix Imports 2024-12-26 22:17:30 +01:00
Marina Orlova
526a26a39d Delete subscriptionManager 2024-12-26 21:34:00 +01:00
Marina Orlova
60adbffc26 Refactor sync.ts 2024-12-26 21:30:34 +01:00
Marina Orlova
d8cabe3fa6 Introduce message handlers 2024-12-26 15:14:44 +01:00
Marina Orlova
928ac67a06 Tweaks 2024-12-23 23:11:24 +01:00
Marina Orlova
0458e12721 Refactor handle pull 2024-12-23 20:56:57 +01:00
Marina Orlova
df59b53000 Move all response logic into PeerOperations 2024-12-23 17:54:08 +01:00
Marina Orlova
891baf2053 Move load logic into sync 2024-12-23 11:52:02 +01:00
Marina Orlova
3a55c8a627 Move getServerAndStorage to Peers 2024-12-22 20:59:26 +01:00
Marina Orlova
5a9770242f Create Peers class 2024-12-22 20:48:50 +01:00
Marina Orlova
f6bbe18a53 Rearrange logic between local node and sync 2024-12-21 15:15:52 +01:00
Marina Orlova
0712546277 Move peers tracking into node 2024-12-20 23:28:56 +01:00
Marina Orlova
14b70aa445 add peer ops and refactor sync to make use of them 2024-12-20 22:48:13 +01:00
Marina Orlova
47275a1340 Track upload state in coValueState 2024-12-19 20:03:39 +01:00
Marina Orlova
ca54b4c1a8 tweaks 2024-12-15 22:42:03 +01:00
Marina Orlova
86a2c914d3 implement all actions 2024-12-15 09:05:53 +01:00
Marina Orlova
d9c250386e replace content with push 2024-12-15 02:20:03 +01:00
Marina Orlova
73d5f18cb8 handlePush instead handleLoad 2024-12-15 01:56:53 +01:00
Marina Orlova
84e17a9189 load from peers with pull 2024-12-14 23:57:11 +01:00
Marina Orlova
0e8b04579a rebase storage on new actions 2024-12-14 23:16:52 +01:00
Marina Orlova
56a967cce6 Get rid of peer states 2024-12-14 22:06:13 +01:00
Marina Orlova
f823b2a307 get rid of statefullness 2024-12-14 21:18:51 +01:00
63 changed files with 2070 additions and 1466 deletions

View File

@@ -6,7 +6,7 @@ import { Account, CoValue, ID } from "jazz-tools";
export function waitForUpload(id: ID<CoValue>, me: Account) {
const syncManager = me._raw.core.node.syncManager;
const peers = syncManager.getPeers();
const peers = me._raw.core.node.peers.getAll();
return Promise.all(
peers.map((peer) => syncManager.waitForUploadIntoPeer(peer.id, id)),

View File

@@ -24,6 +24,7 @@ const peer =
"peer",
) as `ws://${string}`) ??
"wss://cloud.jazz.tools/?key=pets-example-jazz@garden.co";
// "ws://localhost:4200/?key=pets-example-jazz@gcmp.io";
/** Walkthrough: The top-level provider `<Jazz.Provider/>`
*

View File

@@ -15,7 +15,7 @@ test("Should be able to initialize and load from empty DB", async () => {
Crypto,
);
node.syncManager.addPeer(await IDBStorage.asPeer({ trace: true }));
node.addPeer(await IDBStorage.asPeer({ trace: true }));
console.log("yay!");
@@ -23,7 +23,7 @@ test("Should be able to initialize and load from empty DB", async () => {
await new Promise((resolve) => setTimeout(resolve, 200));
expect(node.syncManager.peers["indexedDB"]).toBeDefined();
expect(LocalNode.peers.get("indexedDB")).toBeDefined();
});
test("Should be able to sync data to database and then load that from a new node", async () => {
@@ -35,7 +35,7 @@ test("Should be able to sync data to database and then load that from a new node
Crypto,
);
node1.syncManager.addPeer(
node1.addPeer(
await IDBStorage.asPeer({ trace: true, localNodeName: "node1" }),
);
@@ -55,7 +55,7 @@ test("Should be able to sync data to database and then load that from a new node
Crypto,
);
node2.syncManager.addPeer(
node2.addPeer(
await IDBStorage.asPeer({ trace: true, localNodeName: "node2" }),
);
@@ -63,6 +63,7 @@ test("Should be able to sync data to database and then load that from a new node
if (map2 === "unavailable") {
throw new Error("Map is unavailable");
}
expect(map2.get("hello")).toBe("world");
// TODO fixme
// expect(map2.get("hello")).toBe("world");
expect(false).toBeTruthy();
});

View File

@@ -25,11 +25,9 @@ export type RawTransactionRow = {
export class SQLiteClient implements DBClientInterface {
private readonly db: DatabaseT;
private readonly toLocalNode: OutgoingSyncQueue;
constructor(db: DatabaseT, toLocalNode: OutgoingSyncQueue) {
constructor(db: DatabaseT) {
this.db = db;
this.toLocalNode = toLocalNode;
}
getCoValue(coValueId: RawCoID): StoredCoValueRow | undefined {

View File

@@ -3,12 +3,45 @@ import {
IncomingSyncStream,
OutgoingSyncQueue,
Peer,
SyncMessage,
cojsonInternals,
} from "cojson";
import { SyncManager, TransactionRow } from "cojson-storage";
import { SQLiteClient } from "./sqliteClient.js";
import {
transformIncomingMessageFromPeer,
transformOutgoingMessageToPeer,
} from "./transformers.js";
/**
* This is to transform outgoing message into older protocol message(s) for backward compatibility
* TODO To be removed after the protocol is updated in the sync server
*/
class LocalNodeWrapper {
constructor(private queue: OutgoingSyncQueue) {}
push(msg: SyncMessage): Promise<unknown> {
const transformedMessages = transformOutgoingMessageToPeer(msg);
transformedMessages.map((transformedMessage) => {
// console.log("🔴 <<<=== SQLite is sending", transformedMessage);
});
return Promise.all(
transformedMessages.map((transformedMessage) => {
return this.queue.push(transformedMessage);
}),
);
}
close() {
return this.queue.close();
}
}
export class SQLiteNode {
// ugly public static var to be deleted after new protocol is in effect on all peers
public static USE_PROTOCOL2 = false;
private readonly syncManager: SyncManager;
private readonly dbClient: SQLiteClient;
@@ -17,8 +50,11 @@ export class SQLiteNode {
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
this.dbClient = new SQLiteClient(db, toLocalNode);
this.syncManager = new SyncManager(this.dbClient, toLocalNode);
this.dbClient = new SQLiteClient(db);
this.syncManager = new SyncManager(
this.dbClient,
new LocalNodeWrapper(toLocalNode),
);
const processMessages = async () => {
for await (const msg of fromLocalNode) {
@@ -26,7 +62,11 @@ export class SQLiteNode {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.syncManager.handleSyncMessage(msg);
// console.log("🟡 <<<=== SQLite is getting", msg);
await this.syncManager.handleSyncMessage(
transformIncomingMessageFromPeer(msg),
);
} catch (e) {
console.error(
new Error(

View File

@@ -0,0 +1,83 @@
import {
CojsonInternalTypes,
SessionID,
SyncMessage,
unknownDataMessage,
} from "cojson";
import CoValueContent = CojsonInternalTypes.CoValueContent;
import { SQLiteNode } from "./sqliteNode.js";
export const transformOutgoingMessageToPeer = (
msg: SyncMessage,
): SyncMessage[] => {
if (SQLiteNode.USE_PROTOCOL2) {
return [msg];
}
const getSessionsObj = (msg: CoValueContent) =>
Object.entries(msg.new).reduce<{ [sessionID: SessionID]: number }>(
(acc, [session, content]) => {
acc[session as SessionID] =
content.after + content.newTransactions.length;
return acc;
},
{},
);
switch (msg.action) {
case "pull":
// load
return [{ ...msg, action: "load" }];
case "push":
// load + content
return [
{
action: "load",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "data":
if (!msg.known)
return [{ action: "known", id: msg.id, header: false, sessions: {} }];
// known + content => no response expected
return [
{
action: "known",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "ack":
// known => no response expected
return [{ ...msg, action: "known" }];
default:
return [msg];
}
};
export const transformIncomingMessageFromPeer = (
msg: SyncMessage,
): SyncMessage => {
if (SQLiteNode.USE_PROTOCOL2) {
return msg;
}
switch (msg.action) {
case "load":
return { ...msg, action: "pull" };
case "content":
return { ...msg, action: "push" };
case "known":
if (!msg.header) return unknownDataMessage(msg.id);
if (msg.isCorrection) return { ...msg, action: "pull" };
return { ...msg, action: "ack" };
default:
return msg;
}
};

View File

@@ -6,16 +6,23 @@ import {
SyncMessage,
cojsonInternals,
emptyKnownState,
unknownDataMessage,
} from "cojson";
import { collectNewTxs, getDependedOnCoValues } from "./syncUtils.js";
import { DBClientInterface, StoredSessionRow } from "./types.js";
import NewContentMessage = CojsonInternalTypes.NewContentMessage;
import KnownStateMessage = CojsonInternalTypes.KnownStateMessage;
import {
DBClientInterface,
StoredCoValueRow,
StoredSessionRow,
} from "./types.js";
import DataMessage = CojsonInternalTypes.DataMessage;
import PushMessage = CojsonInternalTypes.PushMessage;
import RawCoID = CojsonInternalTypes.RawCoID;
type OutputMessageMap = Record<
RawCoID,
{ knownMessage: KnownStateMessage; contentMessages?: NewContentMessage[] }
{
dataMessages: DataMessage[];
}
>;
export class SyncManager {
@@ -29,29 +36,25 @@ export class SyncManager {
async handleSyncMessage(msg: SyncMessage) {
switch (msg.action) {
case "load":
await this.handleLoad(msg);
break;
case "content":
await this.handleContent(msg);
break;
case "pull":
return this.handlePull(msg);
case "data":
return this.handleData(msg);
case "push":
return this.handlePush(msg);
case "known":
await this.handleKnown(msg);
break;
case "done":
await this.handleDone(msg);
break;
return this.handleKnown(msg);
}
}
async handleSessionUpdate({
sessionRow,
peerKnownState,
newContentMessages,
newDataMessages,
}: {
sessionRow: StoredSessionRow;
peerKnownState: CojsonInternalTypes.CoValueKnownState;
newContentMessages: CojsonInternalTypes.NewContentMessage[];
newDataMessages: CojsonInternalTypes.DataMessage[];
}) {
if (
sessionRow.lastIdx <= (peerKnownState.sessions[sessionRow.sessionID] || 0)
@@ -72,7 +75,7 @@ export class SyncManager {
collectNewTxs({
newTxsInSession,
newContentMessages,
newDataMessages,
sessionRow,
signaturesAndIdxs,
peerKnownState,
@@ -80,6 +83,7 @@ export class SyncManager {
});
}
// actually, it's a handlePull method
async sendNewContent(
coValueKnownState: CojsonInternalTypes.CoValueKnownState,
): Promise<void> {
@@ -88,11 +92,9 @@ export class SyncManager {
// reverse it to send the top level id the last in the order
const collectedMessages = Object.values(outputMessages).reverse();
collectedMessages.forEach(({ knownMessage, contentMessages }) => {
this.sendStateMessage(knownMessage);
contentMessages?.length &&
contentMessages.forEach((msg) => this.sendStateMessage(msg));
collectedMessages.forEach(({ dataMessages }) => {
dataMessages?.length &&
dataMessages.forEach((msg) => this.sendStateMessage(msg));
});
}
@@ -108,12 +110,9 @@ export class SyncManager {
const coValueRow = await this.dbClient.getCoValue(peerKnownState.id);
if (!coValueRow) {
const emptyKnownMessage: KnownStateMessage = {
action: "known",
...emptyKnownState(peerKnownState.id),
messageMap[peerKnownState.id] = {
dataMessages: [unknownDataMessage(peerKnownState.id, asDependencyOf)],
};
asDependencyOf && (emptyKnownMessage.asDependencyOf = asDependencyOf);
messageMap[peerKnownState.id] = { knownMessage: emptyKnownMessage };
return messageMap;
}
@@ -127,13 +126,15 @@ export class SyncManager {
sessions: {},
};
const newContentMessages: CojsonInternalTypes.NewContentMessage[] = [
const newDataMessages: CojsonInternalTypes.DataMessage[] = [
{
action: "content",
action: "data",
known: true,
id: coValueRow.id,
header: coValueRow.header,
new: {},
priority: cojsonInternals.getPriorityFromHeader(coValueRow.header),
asDependencyOf,
},
];
@@ -145,24 +146,23 @@ export class SyncManager {
return this.handleSessionUpdate({
sessionRow,
peerKnownState,
newContentMessages,
newDataMessages,
});
}),
);
const dependedOnCoValuesList = getDependedOnCoValues({
coValueRow,
newContentMessages,
newDataMessages,
});
const knownMessage: KnownStateMessage = {
action: "known",
...newCoValueKnownState,
};
asDependencyOf && (knownMessage.asDependencyOf = asDependencyOf);
// const knownMessage: KnownStateMessage = {
// action: "known",
// ...newCoValueKnownState,
// };
// asDependencyOf && (knownMessage.asDependencyOf = asDependencyOf);
messageMap[newCoValueKnownState.id] = {
knownMessage: knownMessage,
contentMessages: newContentMessages,
dataMessages: newDataMessages,
};
await Promise.all(
@@ -182,11 +182,11 @@ export class SyncManager {
return messageMap;
}
handleLoad(msg: CojsonInternalTypes.LoadMessage) {
handlePull(msg: CojsonInternalTypes.PullMessage) {
return this.sendNewContent(msg);
}
async handleContent(msg: CojsonInternalTypes.NewContentMessage) {
async handlePush(msg: PushMessage) {
const coValueRow = await this.dbClient.getCoValue(msg.id);
// We have no info about coValue header
@@ -194,14 +194,59 @@ export class SyncManager {
if (invalidAssumptionOnHeaderPresence) {
return this.sendStateMessage({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
...emptyKnownState(msg.id),
action: "pull",
});
}
const { needMissingTransactions, ourKnown } = await this.addTransactions(
coValueRow,
msg,
);
if (needMissingTransactions) {
return this.sendStateMessage({
action: "pull",
...ourKnown,
});
}
return this.sendStateMessage({
action: "ack",
...ourKnown,
});
}
async handleData(msg: DataMessage) {
const coValueRow = await this.dbClient.getCoValue(msg.id);
// We have no info about coValue header
const invalidAssumptionOnHeaderPresence = !msg.header && !coValueRow;
if (invalidAssumptionOnHeaderPresence) {
console.error(
'invalidAssumptionOnHeaderPresence. We should never be here. "Data" action is a response to our specific request.',
);
return;
}
const { needMissingTransactions } = await this.addTransactions(
coValueRow,
msg,
);
if (needMissingTransactions) {
console.error(
'needMissingTransactions. We should never be here. "Data" action is a response to our specific request.',
);
return;
}
}
private async addTransactions(
coValueRow: StoredCoValueRow | undefined,
msg: DataMessage | PushMessage,
) {
const storedCoValueRowID: number = coValueRow
? coValueRow.rowID
: await this.dbClient.addCoValue(msg);
@@ -221,7 +266,7 @@ export class SyncManager {
sessions: {},
};
let invalidAssumptions = false;
let needMissingTransactions = false;
await this.dbClient.unitOfWork(() =>
(Object.keys(msg.new) as SessionID[]).map((sessionID) => {
@@ -231,24 +276,17 @@ export class SyncManager {
}
if ((sessionRow?.lastIdx || 0) < (msg.new[sessionID]?.after || 0)) {
invalidAssumptions = true;
needMissingTransactions = true;
} else {
return this.putNewTxs(msg, sessionID, sessionRow, storedCoValueRowID);
}
}),
);
if (invalidAssumptions) {
this.sendStateMessage({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
});
}
return { ourKnown, needMissingTransactions };
}
private async putNewTxs(
msg: CojsonInternalTypes.NewContentMessage,
msg: DataMessage | PushMessage,
sessionID: SessionID,
sessionRow: StoredSessionRow | undefined,
storedCoValueRowID: number,
@@ -315,8 +353,6 @@ export class SyncManager {
// We don't intend to use the storage (SQLite,IDB,etc.) itself as a synchronisation mechanism, so we can ignore the known messages
}
handleDone(_msg: CojsonInternalTypes.DoneMessage) {}
async sendStateMessage(msg: any): Promise<unknown> {
return this.toLocalNode
.push(msg)

View File

@@ -14,14 +14,14 @@ import {
export function collectNewTxs({
newTxsInSession,
newContentMessages,
newDataMessages,
sessionRow,
signaturesAndIdxs,
peerKnownState,
firstNewTxIdx,
}: {
newTxsInSession: TransactionRow[];
newContentMessages: CojsonInternalTypes.NewContentMessage[];
newDataMessages: CojsonInternalTypes.DataMessage[];
sessionRow: StoredSessionRow;
signaturesAndIdxs: SignatureAfterRow[];
peerKnownState: CojsonInternalTypes.CoValueKnownState;
@@ -31,18 +31,15 @@ export function collectNewTxs({
for (const tx of newTxsInSession) {
let sessionEntry =
newContentMessages[newContentMessages.length - 1]!.new[
sessionRow.sessionID
];
newDataMessages[newDataMessages.length - 1]!.new[sessionRow.sessionID];
if (!sessionEntry) {
sessionEntry = {
after: idx,
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature,
newTransactions: [],
};
newContentMessages[newContentMessages.length - 1]!.new[
sessionRow.sessionID
] = sessionEntry;
newDataMessages[newDataMessages.length - 1]!.new[sessionRow.sessionID] =
sessionEntry;
}
sessionEntry.newTransactions.push(tx.tx);
@@ -50,8 +47,9 @@ export function collectNewTxs({
if (signaturesAndIdxs[0] && idx === signaturesAndIdxs[0].idx) {
sessionEntry.lastSignature = signaturesAndIdxs[0].signature;
signaturesAndIdxs.shift();
newContentMessages.push({
action: "content",
newDataMessages.push({
action: "data",
known: true,
id: peerKnownState.id,
new: {},
priority: cojsonInternals.getPriorityFromHeader(undefined),
@@ -65,20 +63,20 @@ export function collectNewTxs({
export function getDependedOnCoValues({
coValueRow,
newContentMessages,
newDataMessages,
}: {
coValueRow: StoredCoValueRow;
newContentMessages: CojsonInternalTypes.NewContentMessage[];
newDataMessages: CojsonInternalTypes.DataMessage[];
}) {
return coValueRow.header.ruleset.type === "group"
? getGroupDependedOnCoValues(newContentMessages)
? getGroupDependedOnCoValues(newDataMessages)
: coValueRow.header.ruleset.type === "ownedByGroup"
? getOwnedByGroupDependedOnCoValues(coValueRow, newContentMessages)
? getOwnedByGroupDependedOnCoValues(coValueRow, newDataMessages)
: [];
}
function getGroupDependedOnCoValues(
newContentMessages: CojsonInternalTypes.NewContentMessage[],
export function getGroupDependedOnCoValues(
newDataMessages: CojsonInternalTypes.DataMessage[],
) {
const keys: CojsonInternalTypes.RawCoID[] = [];
@@ -86,7 +84,7 @@ function getGroupDependedOnCoValues(
* Collect all the signing keys inside the transactions to list all the
* dependencies required to correctly access the CoValue.
*/
for (const piece of newContentMessages) {
for (const piece of newDataMessages) {
for (const sessionEntry of Object.values(piece.new)) {
for (const tx of sessionEntry.newTransactions) {
if (tx.privacy !== "trusting") continue;
@@ -117,7 +115,7 @@ function getGroupDependedOnCoValues(
function getOwnedByGroupDependedOnCoValues(
coValueRow: StoredCoValueRow,
newContentMessages: CojsonInternalTypes.NewContentMessage[],
newDataMessages: CojsonInternalTypes.DataMessage[],
) {
if (coValueRow.header.ruleset.type !== "ownedByGroup") return [];
@@ -127,7 +125,7 @@ function getOwnedByGroupDependedOnCoValues(
* Collect all the signing keys inside the transactions to list all the
* dependencies required to correctly access the CoValue.
*/
for (const piece of newContentMessages) {
for (const piece of newDataMessages) {
for (const sessionID of Object.keys(piece.new) as SessionID[]) {
const accountId =
cojsonInternals.accountOrAgentIDfromSessionID(sessionID);

View File

@@ -10,13 +10,14 @@ function getMockedCoValueId() {
return `co_z${Math.random().toString(36).substring(2, 15)}` as const;
}
function generateNewContentMessage(
function generateNewDataMessage(
privacy: "trusting" | "private",
changes: any[],
accountId?: `co_z${string}`,
) {
return {
action: "content",
action: "data",
known: true,
id: getMockedCoValueId(),
new: {
[getMockedSessionID(accountId)]: {
@@ -32,7 +33,7 @@ function generateNewContentMessage(
},
},
priority: 0,
} as CojsonInternalTypes.NewContentMessage;
} as CojsonInternalTypes.DataMessage;
}
describe("getDependedOnCoValues", () => {
@@ -48,8 +49,8 @@ describe("getDependedOnCoValues", () => {
const result = getDependedOnCoValues({
coValueRow,
newContentMessages: [
generateNewContentMessage("trusting", [
newDataMessages: [
generateNewDataMessage("trusting", [
{ op: "set", key: "co_zabc123", value: "test" },
{ op: "set", key: "parent_co_zdef456", value: "test" },
{ op: "set", key: "normal_key", value: "test" },
@@ -70,7 +71,7 @@ describe("getDependedOnCoValues", () => {
},
} as any;
const message = generateNewContentMessage("trusting", [
const message = generateNewDataMessage("trusting", [
{ op: "set", key: "co_zabc123", value: "test" },
]);
@@ -88,7 +89,7 @@ describe("getDependedOnCoValues", () => {
const result = getDependedOnCoValues({
coValueRow,
newContentMessages: [message],
newDataMessages: [message],
});
expect(result).toEqual(["co_zabc123"]);
@@ -107,7 +108,7 @@ describe("getDependedOnCoValues", () => {
} as any;
const accountId = getMockedCoValueId();
const message = generateNewContentMessage(
const message = generateNewDataMessage(
"trusting",
[
{ op: "set", key: "co_zabc123", value: "test" },
@@ -125,7 +126,7 @@ describe("getDependedOnCoValues", () => {
const result = getDependedOnCoValues({
coValueRow,
newContentMessages: [message],
newDataMessages: [message],
});
expect(result).toEqual([groupId, accountId]);
@@ -143,8 +144,8 @@ describe("getDependedOnCoValues", () => {
const result = getDependedOnCoValues({
coValueRow,
newContentMessages: [
generateNewContentMessage("trusting", [
newDataMessages: [
generateNewDataMessage("trusting", [
{ op: "set", key: "co_zabc123", value: "test" },
{ op: "set", key: "parent_co_zdef456", value: "test" },
{ op: "set", key: "normal_key", value: "test" },
@@ -167,8 +168,8 @@ describe("getDependedOnCoValues", () => {
const result = getDependedOnCoValues({
coValueRow,
newContentMessages: [
generateNewContentMessage("private", [
newDataMessages: [
generateNewDataMessage("private", [
{ op: "set", key: "co_zabc123", value: "test" },
]),
],

View File

@@ -34,6 +34,8 @@ const createEmptyLoadMsg = (id: string) =>
const sessionsData = fixtures[coValueIdToLoad].sessionRecords;
const coValueHeader = fixtures[coValueIdToLoad].getContent({ after: 0 }).header;
// TODO uncomment and fix
// @ts-ignore
const incomingContentMessage = fixtures[coValueIdToLoad].getContent({
after: 0,
}) as SyncMessage;
@@ -138,8 +140,8 @@ describe("DB sync manager", () => {
// mock content data combined with session updates
syncManager.handleSessionUpdate = vi.fn(
async ({ sessionRow, newContentMessages }) => {
newContentMessages[0]!.new[sessionRow.sessionID] = newTxData;
async ({ sessionRow, newDataMessages }) => {
newDataMessages[0]!.new[sessionRow.sessionID] = newTxData;
},
);
@@ -318,7 +320,9 @@ describe("DB sync manager", () => {
const farAheadContentMessage = fixtures[coValueIdToLoad].getContent({
after: 10000,
});
// TODO uncomment and fix
await syncManager.handleSyncMessage(
// @ts-ignore
farAheadContentMessage as SyncMessage,
);

View File

@@ -51,9 +51,7 @@ export interface DBClientInterface {
firstNewTxIdx: number,
): Promise<SignatureAfterRow[]> | SignatureAfterRow[];
addCoValue(
msg: CojsonInternalTypes.NewContentMessage,
): Promise<number> | number;
addCoValue(msg: CojsonInternalTypes.CoValueContent): Promise<number> | number;
addSessionUpdate({
sessionUpdate,

View File

@@ -1,5 +1,5 @@
import { SyncMessage } from "cojson";
import { CoValueKnownState } from "cojson/src/sync.js";
import { CoValueKnownState } from "cojson/src/sync/types.js";
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
import {
BatchedOutgoingMessages,

View File

@@ -1,15 +1,15 @@
import { CoValueCore } from "./coValueCore.js";
import { CoValueState } from "./coValueState.js";
import { CoValueEntry } from "./coValueEntry.js";
import { RawCoID } from "./ids.js";
export class CoValuesStore {
coValues = new Map<RawCoID, CoValueState>();
coValues = new Map<RawCoID, CoValueEntry>();
get(id: RawCoID) {
let entry = this.coValues.get(id);
if (!entry) {
entry = CoValueState.Unknown(id);
entry = CoValueEntry.Unknown(id);
this.coValues.set(id, entry);
}
@@ -22,6 +22,8 @@ export class CoValuesStore {
type: "available",
coValue,
});
return entry;
}
getEntries() {
@@ -32,7 +34,44 @@ export class CoValuesStore {
return this.coValues.values();
}
getOrderedIds() {
const coValues = new Set<RawCoID>();
// TODO test it thoroughly
for (const entry of this.getValues()) {
this.getOrderedDependencies(entry.id, coValues);
}
return Array.from(coValues);
}
getKeys() {
return this.coValues.keys();
}
private getOrderedDependencies(id: RawCoID, coValues: Set<RawCoID>) {
const entry = this.get(id);
const coValue = this.expectCoValueLoaded(entry.id);
if (coValues.has(coValue.id)) {
return coValues;
}
for (const id of coValue.getDependedOnCoValues()) {
this.getOrderedDependencies(id, coValues);
}
coValues.add(coValue.id);
return coValues;
}
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
const entry = this.get(id);
if (entry.state.type !== "available") {
throw new Error(
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
);
}
return entry.state.coValue;
}
}

View File

@@ -1,116 +0,0 @@
import { RawCoID, SessionID } from "./ids.js";
import {
CoValueKnownState,
combinedKnownStates,
emptyKnownState,
} from "./sync.js";
export type PeerKnownStateActions =
| {
type: "SET_AS_EMPTY";
id: RawCoID;
}
| {
type: "UPDATE_HEADER";
id: RawCoID;
header: boolean;
}
| {
type: "UPDATE_SESSION_COUNTER";
id: RawCoID;
sessionId: SessionID;
value: number;
}
| {
type: "SET";
id: RawCoID;
value: CoValueKnownState;
}
| {
type: "COMBINE_WITH";
id: RawCoID;
value: CoValueKnownState;
};
export class PeerKnownStates {
private coValues = new Map<RawCoID, CoValueKnownState>();
private updateHeader(id: RawCoID, header: boolean) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
knownState.header = header;
this.coValues.set(id, knownState);
}
private combineWith(id: RawCoID, value: CoValueKnownState) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
this.coValues.set(id, combinedKnownStates(knownState, value));
}
private updateSessionCounter(
id: RawCoID,
sessionId: SessionID,
value: number,
) {
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
const currentValue = knownState.sessions[sessionId] || 0;
knownState.sessions[sessionId] = Math.max(currentValue, value);
this.coValues.set(id, knownState);
}
get(id: RawCoID) {
return this.coValues.get(id);
}
has(id: RawCoID) {
return this.coValues.has(id);
}
clone() {
const clone = new PeerKnownStates();
clone.coValues = new Map(this.coValues);
return clone;
}
dispatch(action: PeerKnownStateActions) {
switch (action.type) {
case "UPDATE_HEADER":
this.updateHeader(action.id, action.header);
break;
case "UPDATE_SESSION_COUNTER":
this.updateSessionCounter(action.id, action.sessionId, action.value);
break;
case "SET":
this.coValues.set(action.id, action.value);
break;
case "COMBINE_WITH":
this.combineWith(action.id, action.value);
break;
case "SET_AS_EMPTY":
this.coValues.set(action.id, emptyKnownState(action.id));
break;
}
this.triggerUpdate(action.id);
}
listeners = new Set<(id: RawCoID, knownState: CoValueKnownState) => void>();
triggerUpdate(id: RawCoID) {
this.trigger(id, this.coValues.get(id) ?? emptyKnownState(id));
}
private trigger(id: RawCoID, knownState: CoValueKnownState) {
for (const listener of this.listeners) {
listener(id, knownState);
}
}
subscribe(listener: (id: RawCoID, knownState: CoValueKnownState) => void) {
this.listeners.add(listener);
return () => {
this.listeners.delete(listener);
};
}
}

View File

@@ -1,5 +1,6 @@
import { CoValuePriority } from "./priority.js";
import { SyncMessage } from "./sync.js";
import { SyncMessage } from "./sync/types.js";
function promiseWithResolvers<R>() {
let resolve = (_: R) => {};

View File

@@ -1,141 +0,0 @@
import { RawCoID } from "./ids.js";
import {
CoValueKnownState,
PeerID,
SyncManager,
emptyKnownState,
} from "./sync.js";
export type SyncStateGetter = {
isUploaded: boolean;
};
export type GlobalSyncStateListenerCallback = (
peerId: PeerID,
knownState: CoValueKnownState,
sync: SyncStateGetter,
) => void;
export type PeerSyncStateListenerCallback = (
knownState: CoValueKnownState,
sync: SyncStateGetter,
) => void;
export class SyncStateSubscriptionManager {
constructor(private syncManager: SyncManager) {}
private listeners = new Set<GlobalSyncStateListenerCallback>();
private listenersByPeers = new Map<
PeerID,
Set<PeerSyncStateListenerCallback>
>();
subscribeToUpdates(listener: GlobalSyncStateListenerCallback) {
this.listeners.add(listener);
return () => {
this.listeners.delete(listener);
};
}
subscribeToPeerUpdates(
peerId: PeerID,
listener: PeerSyncStateListenerCallback,
) {
const listeners = this.listenersByPeers.get(peerId) ?? new Set();
if (listeners.size === 0) {
this.listenersByPeers.set(peerId, listeners);
}
listeners.add(listener);
return () => {
listeners.delete(listener);
};
}
triggerUpdate(peerId: PeerID, id: RawCoID) {
const peer = this.syncManager.peers[peerId];
if (!peer) {
return;
}
const peerListeners = this.listenersByPeers.get(peer.id);
// If we don't have any active listeners do nothing
if (!peerListeners?.size && !this.listeners.size) {
return;
}
const knownState = peer.knownStates.get(id) ?? emptyKnownState(id);
// Build a lazy sync state object to process the isUploaded info
// only when requested
const syncState = {} as SyncStateGetter;
const getIsUploaded = simpleMemoize(() =>
this.getIsCoValueFullyUploadedIntoPeer(peerId, id),
);
Object.defineProperties(syncState, {
isUploaded: {
enumerable: true,
get: getIsUploaded,
},
});
for (const listener of this.listeners) {
listener(peerId, knownState, syncState);
}
if (!peerListeners) return;
for (const listener of peerListeners) {
listener(knownState, syncState);
}
}
getIsCoValueFullyUploadedIntoPeer(peerId: PeerID, id: RawCoID) {
const peer = this.syncManager.peers[peerId];
const entry = this.syncManager.local.coValuesStore.get(id);
if (!peer) {
return false;
}
if (entry.state.type !== "available") {
return false;
}
const coValue = entry.state.coValue;
const knownState = peer.knownStates.get(id);
if (!knownState) {
return false;
}
return getIsUploadCompleted(
coValue.knownState().sessions,
knownState.sessions,
);
}
}
function getIsUploadCompleted(
from: Record<string, number>,
to: Record<string, number>,
) {
for (const sessionId of Object.keys(from)) {
if (from[sessionId] !== to[sessionId]) {
return false;
}
}
return true;
}
function simpleMemoize<T>(fn: () => T): () => T {
let value: T | undefined;
return () => value ?? (value = fn());
}

View File

@@ -13,6 +13,7 @@ import {
SignerID,
StreamingHash,
} from "./crypto/crypto.js";
import { CojsonInternalTypes, cojsonInternals } from "./exports.js";
import {
RawCoID,
SessionID,
@@ -30,10 +31,11 @@ import {
isKeyForKeyField,
} from "./permissions.js";
import { getPriorityFromHeader } from "./priority.js";
import { CoValueKnownState, NewContentMessage } from "./sync.js";
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
import { expectGroup } from "./typeUtils/expectGroup.js";
import { isAccountID } from "./typeUtils/isAccountID.js";
import CoValueContent = CojsonInternalTypes.CoValueContent;
import { CoValueKnownState } from "./sync/types.js";
/**
In order to not block other concurrently syncing CoValues we introduce a maximum size of transactions,
@@ -50,6 +52,12 @@ export type CoValueHeader = {
meta: JsonObject | null;
} & CoValueUniqueness;
export type SessionNewContent = {
after: number;
newTransactions: Transaction[];
lastSignature: Signature;
};
export type CoValueUniqueness = {
uniqueness: JsonValue;
createdAt?: `2${string}` | null;
@@ -107,7 +115,7 @@ export class CoValueCore {
} = {};
_cachedKnownState?: CoValueKnownState;
_cachedDependentOn?: RawCoID[];
_cachedNewContentSinceEmpty?: NewContentMessage[] | undefined;
_cachedNewContentSinceEmpty?: CoValueContent[] | undefined;
_currentAsyncAddTransaction?: Promise<void>;
constructor(
@@ -197,7 +205,7 @@ export class CoValueCore {
};
}
tryAddTransactions(
private tryAddTransactions(
sessionID: SessionID,
newTransactions: Transaction[],
givenExpectedNewHash: Hash | undefined,
@@ -586,6 +594,8 @@ export class CoValueCore {
expectedNewHash,
);
const peersKnownState = { ...this.knownState() };
const success = this.tryAddTransactions(
sessionID,
[transaction],
@@ -594,7 +604,7 @@ export class CoValueCore {
)._unsafeUnwrap({ withStackTrace: true });
if (success) {
void this.node.syncManager.syncCoValue(this);
void this.node.syncManager.syncCoValue(this, peersKnownState);
}
return success;
@@ -889,19 +899,19 @@ export class CoValueCore {
return this.sessionLogs.get(txID.sessionID)?.transactions[txID.txIndex];
}
newContentSince(
knownState: CoValueKnownState | undefined,
): NewContentMessage[] | undefined {
const isKnownStateEmpty = !knownState?.header && !knownState?.sessions;
newContentSince(knownState: CoValueKnownState): CoValueContent[] | undefined {
const shouldSendEverything =
!knownState.header ||
!knownState.sessions ||
!Object.keys(knownState.sessions).length;
if (isKnownStateEmpty && this._cachedNewContentSinceEmpty) {
if (shouldSendEverything && this._cachedNewContentSinceEmpty) {
return this._cachedNewContentSinceEmpty;
}
let currentPiece: NewContentMessage = {
action: "content",
let currentPiece: CoValueContent = {
id: this.id,
header: knownState?.header ? undefined : this.header,
header: shouldSendEverything ? this.header : undefined,
priority: getPriorityFromHeader(this.header),
new: {},
};
@@ -963,7 +973,6 @@ export class CoValueCore {
if (pieceSize >= MAX_RECOMMENDED_TX_SIZE) {
currentPiece = {
action: "content",
id: this.id,
header: undefined,
new: {},
@@ -1006,7 +1015,7 @@ export class CoValueCore {
return undefined;
}
if (isKnownStateEmpty) {
if (shouldSendEverything) {
this._cachedNewContentSinceEmpty = piecesWithContent;
}
@@ -1043,6 +1052,55 @@ export class CoValueCore {
]
: [];
}
addNewContent(content: CoValueContent) {
let anyMissingTransaction = false;
for (const [sessionID, newContentForSession] of Object.entries(
content.new,
) as [SessionID, SessionNewContent][]) {
const ourKnownTxIdx =
this.sessionLogs.get(sessionID)?.transactions.length;
const theirFirstNewTxIdx = newContentForSession.after;
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
anyMissingTransaction = true;
continue;
}
const alreadyKnownOffset = ourKnownTxIdx
? ourKnownTxIdx - theirFirstNewTxIdx
: 0;
const newTransactions =
newContentForSession.newTransactions.slice(alreadyKnownOffset);
if (newTransactions.length === 0) {
continue;
}
const result = this.tryAddTransactions(
sessionID,
newTransactions,
undefined,
newContentForSession.lastSignature,
);
if (result.isErr()) {
const message = `Failed to add transactions for ${content.id}: ${newTransactions.length} new transactions after:
${newContentForSession.after} our last known tx idx initially: ${ourKnownTxIdx} our last known tx idx now:
${this.sessionLogs.get(sessionID)?.transactions.length}`;
throw {
type: "TryAddTransactionsError",
error: result.error,
message,
} as TryAddTransactionsException;
}
}
return anyMissingTransaction;
}
}
function getNextKnownSignatureIdx(
@@ -1077,3 +1135,85 @@ export type TryAddTransactionsError =
| ResolveAccountAgentError
| InvalidHashError
| InvalidSignatureError;
export type TryAddTransactionsException = {
type: "TryAddTransactionsError";
error: TryAddTransactionsError;
message: string;
};
export function isTryAddTransactionsException(
e: any,
): e is TryAddTransactionsException {
return (
e &&
e.type === "TryAddTransactionsError" &&
typeof e.message === "string" &&
e.error !== undefined
);
}
export function getDependedOnFromContent(msg: Required<CoValueContent>) {
if (!msg.header) {
throw new Error(`Header is required for getting dependencies ${msg.id}`);
}
return msg.header.ruleset.type === "group"
? getGroupDependedOnCoValues(msg)
: msg.header.ruleset.type === "ownedByGroup"
? [
msg.header.ruleset.group,
...new Set(
[...Object.keys(msg.new)]
.map((sessionID) =>
accountOrAgentIDfromSessionID(sessionID as SessionID),
)
.filter(
(session): session is RawAccountID =>
isAccountID(session) && session !== msg.id,
),
),
]
: [];
}
function getGroupDependedOnCoValues(content: CoValueContent) {
const keys: CojsonInternalTypes.RawCoID[] = [];
/**
* Collect all the signing keys inside the transactions to list all the
* dependencies required to correctly access the CoValue.
*/
for (const sessionEntry of Object.values(content.new)) {
for (const tx of sessionEntry.newTransactions) {
if (tx.privacy !== "trusting") continue;
const changes = safeParseChanges(tx.changes);
for (const change of changes) {
if (
change &&
typeof change === "object" &&
"op" in change &&
change.op === "set" &&
"key" in change &&
change.key
) {
const key = cojsonInternals.getGroupDependentKey(change.key);
if (key) {
keys.push(key);
}
}
}
}
}
return keys;
}
function safeParseChanges(changes: Stringified<JsonValue[]>) {
try {
return cojsonInternals.parseJSON(changes);
} catch (e) {
return [];
}
}

View File

@@ -1,7 +1,7 @@
import { PeerState } from "./PeerState.js";
import { CoValueCore } from "./coValueCore.js";
import { config } from "./config.js";
import { RawCoID } from "./ids.js";
import { PeerID } from "./sync.js";
import { PeerEntry, PeerID } from "./peer/index.js";
export const CO_VALUE_LOADING_MAX_RETRIES = 5;
export const CO_VALUE_LOADING_TIMEOUT = 30_000;
@@ -100,29 +100,115 @@ type CoValueStateType =
| CoValueAvailableState
| CoValueUnavailableState;
export class CoValueState {
class UploadState {
protected peers = new Map<
PeerID,
ReturnType<typeof createResolvablePromise<void>> & { completed?: boolean }
>();
unawarePeers = new Set<PeerID>();
constructor(private readonly coValueEntry: CoValueEntry) {
this.peers = new Map();
}
getUnawarePeerIds(): PeerID[] {
return Array.from(this.unawarePeers);
}
setPendingForPeer(peerId: PeerID) {
if (!(this.coValueEntry.state.type === "available")) {
console.error(
"Trying to set pending upload for a coValue that is not available",
this.coValueEntry.id,
);
return;
}
const peerUploadState = this.peers.get(peerId);
if (!peerUploadState) {
this.peers.set(peerId, createResolvablePromise<void>());
}
}
setCompletedForPeer(peerId: PeerID) {
const peerUploadState = this.peers.get(peerId);
if (!peerUploadState) {
if (!config.HYBRID_MESSAGING_MODE) {
// When two messaging protocol are stacked, this could produce some excessive "ack" messages
// out of peer's "known" messages which would lead to this error. Should be ignored in HYBRID_MESSAGING_MODE
console.error(
"Trying to set complete for a coValue that is not uploaded to",
peerId,
this.coValueEntry.id,
);
}
return;
}
peerUploadState.resolve();
peerUploadState.completed = true;
this.unawarePeers.delete(peerId);
}
waitForPeer(peerId: PeerID) {
const peerUploadState = this.peers.get(peerId);
if (!peerUploadState) {
console.error(
"Trying to wait for no pending upload into peer",
peerId,
this.coValueEntry.id,
);
return;
}
return peerUploadState?.promise;
}
isCoValueFullyUploadedIntoPeer(peerId: PeerID) {
const peerUploadState = this.peers.get(peerId);
return !!peerUploadState?.completed;
}
copyFrom(otherUploadState: UploadState) {
for (let [peerId] of otherUploadState.peers) {
this.setPendingForPeer(peerId);
if (otherUploadState.isCoValueFullyUploadedIntoPeer(peerId)) {
this.setCompletedForPeer(peerId);
}
}
}
}
// NOTE Renamed CoValueState into CoValueEntry
export class CoValueEntry {
promise?: Promise<CoValueCore | "unavailable">;
private resolve?: (value: CoValueCore | "unavailable") => void;
public uploadState: UploadState;
constructor(
public id: RawCoID,
public state: CoValueStateType,
) {}
) {
this.uploadState = new UploadState(this);
}
static Unknown(id: RawCoID) {
return new CoValueState(id, new CoValueUnknownState());
return new CoValueEntry(id, new CoValueUnknownState());
}
static Loading(id: RawCoID, peersIds: Iterable<PeerID>) {
return new CoValueState(id, new CoValueLoadingState(peersIds));
return new CoValueEntry(id, new CoValueLoadingState(peersIds));
}
static Available(coValue: CoValueCore) {
return new CoValueState(coValue.id, new CoValueAvailableState(coValue));
return new CoValueEntry(coValue.id, new CoValueAvailableState(coValue));
}
static Unavailable(id: RawCoID) {
return new CoValueState(id, new CoValueUnavailableState());
return new CoValueEntry(id, new CoValueUnavailableState());
}
async getCoValue() {
@@ -170,7 +256,13 @@ export class CoValueState {
this.resolve = undefined;
}
async loadFromPeers(peers: PeerState[]) {
async loadFromPeers(
peers: PeerEntry[],
loadCoValueCallback: (
coValueEntry: CoValueEntry,
peers: PeerEntry[],
) => Promise<void>,
) {
const state = this.state;
if (state.type !== "unknown" && state.type !== "unavailable") {
@@ -181,22 +273,17 @@ export class CoValueState {
return;
}
const doLoad = async (peersToLoadFrom: PeerState[]) => {
const peersWithoutErrors = getPeersWithoutErrors(
peersToLoadFrom,
this.id,
);
const doLoad = async (peersToLoadFrom: PeerEntry[]) => {
// If we are in the loading state we move to a new loading state
// to reset all the loading promises
if (this.state.type === "loading" || this.state.type === "unknown") {
this.moveToState(
new CoValueLoadingState(peersWithoutErrors.map((p) => p.id)),
new CoValueLoadingState(peersToLoadFrom.map((p) => p.id)),
);
}
// Assign the current state to a variable to not depend on the state changes
// that may happen while we wait for loadCoValueFromPeers to complete
// that may happen while we wait for loadCoValueCallback to complete
const currentState = this.state;
// If we entered successfully the loading state, we load the coValue from the peers
@@ -204,7 +291,7 @@ export class CoValueState {
// We may not enter the loading state if the coValue has become available in between
// of the retries
if (currentState.type === "loading") {
await loadCoValueFromPeers(this, peersWithoutErrors);
await loadCoValueCallback(this, peersToLoadFrom);
const result = await currentState.result;
return result !== "unavailable";
@@ -237,6 +324,18 @@ export class CoValueState {
}
}
moveToLoadingState(peers: PeerEntry[]) {
if (this.state.type !== "unknown") {
console.error(
"Cannot move to loading state from",
this.state.type,
this.id,
);
} else {
this.moveToState(new CoValueLoadingState(peers.map((p) => p.id)));
}
}
dispatch(action: CoValueStateAction) {
const currentState = this.state;
@@ -255,64 +354,16 @@ export class CoValueState {
currentState.markAsUnavailable(action.peerId);
}
this.uploadState.unawarePeers.add(action.peerId);
break;
}
}
}
async function loadCoValueFromPeers(
coValueEntry: CoValueState,
peers: PeerState[],
) {
for (const peer of peers) {
if (peer.closed) {
continue;
}
if (coValueEntry.state.type === "available") {
/**
* We don't need to wait for the message to be delivered here.
*
* This way when the coValue becomes available because it's cached we don't wait for the server
* peer to consume the messages queue before moving forward.
*/
peer
.pushOutgoingMessage({
action: "load",
...coValueEntry.state.coValue.knownState(),
})
.catch((err) => {
console.error(`Failed to push load message to peer ${peer.id}`, err);
});
} else {
/**
* We only wait for the load state to be resolved.
*/
peer
.pushOutgoingMessage({
action: "load",
id: coValueEntry.id,
header: false,
sessions: {},
})
.catch((err) => {
console.error(`Failed to push load message to peer ${peer.id}`, err);
});
}
if (coValueEntry.state.type === "loading") {
const timeout = setTimeout(() => {
if (coValueEntry.state.type === "loading") {
console.error("Failed to load coValue from peer", peer.id);
coValueEntry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
}
}, CO_VALUE_LOADING_TIMEOUT);
await coValueEntry.state.waitForPeer(peer.id);
clearTimeout(timeout);
}
markAsNotFoundInPeer(peerId: PeerID) {
this.dispatch({
type: "not-found-in-peer",
peerId,
});
}
}
@@ -352,16 +403,3 @@ function createResolvablePromise<T>() {
function sleep(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function getPeersWithoutErrors(peers: PeerState[], coValueId: RawCoID) {
return peers.filter((p) => {
if (p.erroredCoValues.has(coValueId)) {
console.error(
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
);
return false;
}
return true;
});
}

View File

@@ -136,7 +136,6 @@ export class RawGroup<
loadAllChildGroups() {
const requests: Promise<unknown>[] = [];
const store = this.core.node.coValuesStore;
const peers = this.core.node.syncManager.getServerAndStoragePeers();
for (const key of this.keys()) {
if (!isChildGroupReference(key)) {
@@ -146,14 +145,10 @@ export class RawGroup<
const id = getChildGroupId(key);
const child = store.get(id);
if (
child.state.type === "unknown" ||
child.state.type === "unavailable"
) {
child.loadFromPeers(peers).catch(() => {
console.error(`Failed to load child group ${id}`);
});
}
// NOTE the same code invoked form another end (vs entry.load...)
this.core.node.load(id).catch(() => {
console.error(`Failed to load child group ${id}`);
});
requests.push(
child.getCoValue().then((coValue) => {

View File

@@ -0,0 +1,5 @@
export const config = {
// Peers can use older messaging system
HYBRID_MESSAGING_MODE: true,
TRACE_SYNC_MESSAGES: true,
};

View File

@@ -1,4 +1,5 @@
import { base64URLtoBytes, bytesToBase64url } from "./base64url.js";
import type { AnyRawCoValue, CoID } from "./coValue.js";
import { type RawCoValue } from "./coValue.js";
import {
CoValueCore,
@@ -6,23 +7,35 @@ import {
MAX_RECOMMENDED_TX_SIZE,
idforHeader,
} from "./coValueCore.js";
import { ControlledAgent, RawControlledAccount } from "./coValues/account.js";
import type {
AccountMeta,
RawAccountID,
RawAccountMigration,
} from "./coValues/account.js";
import {
ControlledAgent,
RawAccount,
RawControlledAccount,
RawProfile,
accountHeaderForInitialAgentSecret,
} from "./coValues/account.js";
import { RawCoList } from "./coValues/coList.js";
import { RawCoMap } from "./coValues/coMap.js";
import type {
BinaryCoStreamMeta,
BinaryStreamInfo,
} from "./coValues/coStream.js";
import { RawBinaryCoStream, RawCoStream } from "./coValues/coStream.js";
import type { Everyone, InviteSecret } from "./coValues/group.js";
import { EVERYONE, RawGroup } from "./coValues/group.js";
import type { Everyone } from "./coValues/group.js";
import type { AgentSecret } from "./crypto/crypto.js";
import {
CryptoProvider,
StreamingHash,
secretSeedLength,
shortHashLength,
} from "./crypto/crypto.js";
import type { AgentID, SessionID } from "./ids.js";
import {
getGroupDependentKey,
getGroupDependentKeyList,
@@ -31,46 +44,29 @@ import {
rawCoIDtoBytes,
} from "./ids.js";
import { Stringified, parseJSON } from "./jsonStringify.js";
import { LocalNode } from "./localNode.js";
import type { JsonValue } from "./jsonValue.js";
import {
IncomingSyncStream,
LocalNode,
OutgoingSyncQueue,
} from "./localNode.js";
import type * as Media from "./media.js";
import { emptyDataMessage, unknownDataMessage } from "./peer/PeerOperations.js";
import type { Peer } from "./peer/index.js";
import type { Role } from "./permissions.js";
import { getPriorityFromHeader } from "./priority.js";
import { FileSystem } from "./storage/FileSystem.js";
import { BlockFilename, LSMStorage, WalFilename } from "./storage/index.js";
import { Channel, connectedPeers } from "./streamUtils.js";
import { DisconnectedError, PingTimeoutError } from "./sync.js";
import type { SyncMessage } from "./sync/index.js";
import { emptyKnownState } from "./sync/index.js";
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
import { expectGroup } from "./typeUtils/expectGroup.js";
import { isAccountID } from "./typeUtils/isAccountID.js";
import type { AnyRawCoValue, CoID } from "./coValue.js";
import type {
AccountMeta,
RawAccountID,
RawAccountMigration,
} from "./coValues/account.js";
import type {
BinaryCoStreamMeta,
BinaryStreamInfo,
} from "./coValues/coStream.js";
import type { InviteSecret } from "./coValues/group.js";
import type { AgentSecret } from "./crypto/crypto.js";
import type { AgentID, SessionID } from "./ids.js";
import type { JsonValue } from "./jsonValue.js";
import type * as Media from "./media.js";
import type {
IncomingSyncStream,
OutgoingSyncQueue,
Peer,
SyncMessage,
} from "./sync.js";
import {
DisconnectedError,
PingTimeoutError,
emptyKnownState,
} from "./sync.js";
type Value = JsonValue | AnyRawCoValue;
import { getPriorityFromHeader } from "./priority.js";
import { FileSystem } from "./storage/FileSystem.js";
import { BlockFilename, LSMStorage, WalFilename } from "./storage/index.js";
/** @hidden */
export const cojsonInternals = {
connectedPeers,
@@ -129,6 +125,8 @@ export {
isRawCoID,
LSMStorage,
emptyKnownState,
emptyDataMessage,
unknownDataMessage,
};
export type {
@@ -146,12 +144,15 @@ export type {
// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace CojsonInternalTypes {
export type CoValueKnownState = import("./sync.js").CoValueKnownState;
export type DoneMessage = import("./sync.js").DoneMessage;
export type KnownStateMessage = import("./sync.js").KnownStateMessage;
export type LoadMessage = import("./sync.js").LoadMessage;
export type NewContentMessage = import("./sync.js").NewContentMessage;
export type SessionNewContent = import("./sync.js").SessionNewContent;
export type KnownStateMessage = import("./sync/index.js").KnownStateMessage;
export type CoValueKnownState = import("./sync/index.js").CoValueKnownState;
export type CoValueContent = import("./sync/index.js").CoValueContent;
export type NewContentMessage = import("./sync/index.js").NewContentMessage;
export type PullMessage = import("./sync/index.js").PullMessage;
export type PushMessage = import("./sync/index.js").PushMessage;
export type DataMessage = import("./sync/index.js").DataMessage;
export type AckMessage = import("./sync/index.js").AckMessage;
export type SessionNewContent = import("./coValueCore.js").SessionNewContent;
export type CoValueHeader = import("./coValueCore.js").CoValueHeader;
export type Transaction = import("./coValueCore.js").Transaction;
export type TransactionID = import("./ids.js").TransactionID;

View File

@@ -1,7 +1,6 @@
import { Result, ResultAsync, err, ok, okAsync } from "neverthrow";
import { Result, err, ok } from "neverthrow";
import { CoValuesStore } from "./CoValuesStore.js";
import { CoID } from "./coValue.js";
import { RawCoValue } from "./coValue.js";
import { CoID, RawCoValue } from "./coValue.js";
import {
CoValueCore,
CoValueHeader,
@@ -25,11 +24,23 @@ import {
RawGroup,
secretSeedFromInviteSecret,
} from "./coValues/group.js";
import { config } from "./config.js";
import { AgentSecret, CryptoProvider } from "./crypto/crypto.js";
import { AgentID, RawCoID, SessionID, isAgentID } from "./ids.js";
import { Peer, PeerID, SyncManager } from "./sync.js";
import { Peer, PeerEntry, Peers } from "./peer/index.js";
import { transformIncomingMessageFromPeer } from "./peer/transformers.js";
import { DisconnectedError, PingTimeoutError, SyncManager } from "./sync.js";
import { SyncMessage, emptyKnownState } from "./sync/types.js";
import { expectGroup } from "./typeUtils/expectGroup.js";
export type IncomingSyncStream = AsyncIterable<
SyncMessage | DisconnectedError | PingTimeoutError
>;
export type OutgoingSyncQueue = {
push: (msg: SyncMessage) => Promise<unknown>;
close: () => void;
};
/** A `LocalNode` represents a local view of a set of loaded `CoValue`s, from the perspective of a particular account (or primitive cryptographic agent).
A `LocalNode` can have peers that it syncs to, for example some form of local persistence, or a sync server, such as `cloud.jazz.tools` (Jazz Cloud).
@@ -42,6 +53,8 @@ const { localNode } = useJazz();
```
*/
export class LocalNode {
static peers = new Peers();
/** @internal */
crypto: CryptoProvider;
/** @internal */
@@ -52,7 +65,6 @@ export class LocalNode {
currentSessionID: SessionID;
/** @category 3. Low-level */
syncManager = new SyncManager(this);
crashed: Error | undefined = undefined;
/** @category 3. Low-level */
@@ -66,6 +78,70 @@ export class LocalNode {
this.crypto = crypto;
}
async processMessages(peer: PeerEntry) {
for await (const msg of peer.incoming) {
if (msg === "Disconnected") {
return;
}
if (msg === "PingTimeout") {
console.error("Ping timeout from peer", peer.id);
return;
}
try {
if (config.TRACE_SYNC_MESSAGES) {
console.log("🔵 ===>>> Received from", peer.id, msg);
}
this.syncManager.handleSyncMessage(
transformIncomingMessageFromPeer(msg, peer.id),
peer,
);
} catch (e) {
throw new Error(
`Error reading from peer ${peer.id}, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
);
}
}
}
async addPeer(peerData: Peer) {
const peer: PeerEntry = LocalNode.peers.add(peerData);
if (peer.isServerOrStoragePeer()) {
await this.syncManager.initialSync(peer);
}
this.processMessages(peer)
.then(() => {
if (peerData.crashOnClose) {
console.error("Unexepcted close from peer", peerData.id);
this.crashed = new Error("Unexpected close from peer");
throw new Error("Unexpected close from peer");
}
})
.catch((e) => {
console.error("Error processing messages from peer", peerData.id, e);
if (peerData.crashOnClose) {
this.crashed = e;
throw new Error(e);
}
})
.finally(() => {
const state = LocalNode.peers.get(peerData.id);
state?.gracefulShutdown();
if (peerData.deletePeerStateOnClose) {
LocalNode.peers.delete(peer.id);
}
});
}
/** @category 2. Node Creation */
static async withNewlyCreatedAccount<Meta extends AccountMeta = AccountMeta>({
creationProps,
@@ -104,7 +180,7 @@ export class LocalNode {
if (peersToLoadFrom) {
for (const peer of peersToLoadFrom) {
nodeWithAccount.syncManager.addPeer(peer);
await nodeWithAccount.addPeer(peer);
}
}
@@ -141,6 +217,7 @@ export class LocalNode {
if (coValueEntry.state.type === "available") {
void nodeWithAccount.syncManager.syncCoValue(
coValueEntry.state.coValue,
emptyKnownState(coValueEntry.id),
);
}
}
@@ -182,12 +259,10 @@ export class LocalNode {
);
for (const peer of peersToLoadFrom) {
loadingNode.syncManager.addPeer(peer);
await loadingNode.addPeer(peer);
}
const accountPromise = loadingNode.load(accountID);
const account = await accountPromise;
const account = await loadingNode.load<RawAccount>(accountID);
if (account === "unavailable") {
throw new Error("Account unavailable from all peers");
@@ -246,36 +321,11 @@ export class LocalNode {
const coValue = new CoValueCore(header, this);
this.coValuesStore.setAsAvailable(coValue.id, coValue);
void this.syncManager.syncCoValue(coValue);
void this.syncManager.syncCoValue(coValue, emptyKnownState(coValue.id));
return coValue;
}
/** @internal */
async loadCoValueCore(
id: RawCoID,
skipLoadingFromPeer?: PeerID,
): Promise<CoValueCore | "unavailable"> {
if (this.crashed) {
throw new Error("Trying to load CoValue after node has crashed", {
cause: this.crashed,
});
}
const entry = this.coValuesStore.get(id);
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
const peers =
this.syncManager.getServerAndStoragePeers(skipLoadingFromPeer);
await entry.loadFromPeers(peers).catch((e) => {
console.error("Error loading from peers", id, e);
});
}
return entry.getCoValue();
}
/**
* Loads a CoValue's content, syncing from peers as necessary and resolving the returned
* promise once a first version has been loaded. See `coValue.subscribe()` and `node.useTelepathicData()`
@@ -283,13 +333,34 @@ export class LocalNode {
*
* @category 3. Low-level
*/
async load<T extends RawCoValue>(id: CoID<T>): Promise<T | "unavailable"> {
const core = await this.loadCoValueCore(id);
async load<T extends RawCoValue>(
id: RawCoID,
returnCore?: false,
): Promise<"unavailable" | T>;
async load<T extends RawCoValue>(
id: RawCoID,
returnCore: true,
): Promise<"unavailable" | CoValueCore>;
async load<T extends RawCoValue>(
id: RawCoID,
returnCore: boolean = false,
): Promise<"unavailable" | CoValueCore | T> {
if (this.crashed) {
throw new Error("Trying to load CoValue after node has crashed", {
cause: this.crashed,
});
}
const core = await this.syncManager.loadCoValue(id);
if (core === "unavailable") {
return "unavailable";
}
if (returnCore) {
return core;
}
return core.getCurrentContent() as T;
}
@@ -313,7 +384,7 @@ export class LocalNode {
// console.log("Subscribing to " + id);
this.load(id)
this.load<T>(id)
.then((coValue) => {
if (stopped) {
return;
@@ -423,14 +494,7 @@ export class LocalNode {
/** @internal */
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
const entry = this.coValuesStore.get(id);
if (entry.state.type !== "available") {
throw new Error(
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
);
}
return entry.state.coValue;
return this.coValuesStore.expectCoValueLoaded(id, expectation);
}
/** @internal */
@@ -530,50 +594,6 @@ export class LocalNode {
return (coValue.getCurrentContent() as RawAccount).currentAgentID();
}
resolveAccountAgentAsync(
id: RawAccountID | AgentID,
expectation?: string,
): ResultAsync<AgentID, ResolveAccountAgentError> {
if (isAgentID(id)) {
return okAsync(id);
}
return ResultAsync.fromPromise(
this.loadCoValueCore(id),
(e) =>
({
type: "ErrorLoadingCoValueCore",
expectation,
id,
error: e,
}) satisfies LoadCoValueCoreError,
).andThen((coValue) => {
if (coValue === "unavailable") {
return err({
type: "AccountUnavailableFromAllPeers" as const,
expectation,
id,
} satisfies AccountUnavailableFromAllPeersError);
}
if (
coValue.header.type !== "comap" ||
coValue.header.ruleset.type !== "group" ||
!coValue.header.meta ||
!("type" in coValue.header.meta) ||
coValue.header.meta.type !== "account"
) {
return err({
type: "UnexpectedlyNotAccount" as const,
expectation,
id,
} satisfies UnexpectedlyNotAccountError);
}
return (coValue.getCurrentContent() as RawAccount).currentAgentID();
});
}
/**
* @deprecated use Account.createGroup() instead
*/
@@ -648,7 +668,11 @@ export class LocalNode {
new Map(entry.state.coValue.sessionLogs),
);
newNode.coValuesStore.setAsAvailable(coValueID, newCoValue);
const newEntry = newNode.coValuesStore.setAsAvailable(
coValueID,
newCoValue,
);
newEntry.uploadState.copyFrom(entry.uploadState);
coValuesToCopy.pop();
}
@@ -670,7 +694,9 @@ export class LocalNode {
}
gracefulShutdown() {
this.syncManager.gracefulShutdown();
for (const peer of LocalNode.peers.getAll()) {
peer.gracefulShutdown();
}
}
}

View File

@@ -1,42 +1,33 @@
import { PeerKnownStateActions, PeerKnownStates } from "./PeerKnownStates.js";
import {
PriorityBasedMessageQueue,
QueueEntry,
} from "./PriorityBasedMessageQueue.js";
import { TryAddTransactionsError } from "./coValueCore.js";
import { RawCoID } from "./ids.js";
import { CO_VALUE_PRIORITY } from "./priority.js";
import { Peer, SyncMessage } from "./sync.js";
} from "../PriorityBasedMessageQueue.js";
import { TryAddTransactionsError } from "../coValueCore.js";
import { config } from "../config.js";
import { RawCoID } from "../ids.js";
import { IncomingSyncStream, OutgoingSyncQueue } from "../localNode.js";
import { CO_VALUE_PRIORITY } from "../priority.js";
import { SyncMessage } from "../sync/types.js";
import { PeerOperations } from "./PeerOperations.js";
import { transformOutgoingMessageToPeer } from "./transformers.js";
export class PeerState {
constructor(
private peer: Peer,
knownStates: PeerKnownStates | undefined,
) {
this.optimisticKnownStates = knownStates?.clone() ?? new PeerKnownStates();
this.knownStates = knownStates?.clone() ?? new PeerKnownStates();
}
export type PeerID = string;
export interface Peer {
id: PeerID;
incoming: IncomingSyncStream;
outgoing: OutgoingSyncQueue;
role: "peer" | "server" | "client" | "storage";
priority?: number;
crashOnClose: boolean;
deletePeerStateOnClose?: boolean;
}
/**
* Here we to collect all the known states that a given peer has told us about.
*
* This can be used to safely track the sync state of a coValue in a given peer.
*/
readonly knownStates: PeerKnownStates;
// NOTE Renamed PeerState into PeerEntry
export class PeerEntry {
private readonly ops: PeerOperations;
/**
* This one collects the known states "optimistically".
* We use it to keep track of the content we have sent to a given peer.
*
* The main difference with knownState is that this is updated when the content is sent to the peer without
* waiting for any acknowledgement from the peer.
*/
readonly optimisticKnownStates: PeerKnownStates;
readonly toldKnownState: Set<RawCoID> = new Set();
dispatchToKnownStates(action: PeerKnownStateActions) {
this.knownStates.dispatch(action);
this.optimisticKnownStates.dispatch(action);
constructor(private peer: Peer) {
this.ops = new PeerOperations(this);
}
readonly erroredCoValues: Map<RawCoID, TryAddTransactionsError> = new Map();
@@ -57,6 +48,10 @@ export class PeerState {
return this.peer.crashOnClose;
}
get send() {
return this.ops;
}
shouldRetryUnavailableCoValues() {
return this.peer.role === "server";
}
@@ -96,16 +91,31 @@ export class PeerState {
this.processing = false;
}
pushOutgoingMessage(msg: SyncMessage) {
async pushOutgoingMessage(msg: SyncMessage) {
if (this.closed) {
return Promise.resolve();
}
const promise = this.queue.push(msg);
const transformedMessages = transformOutgoingMessageToPeer(msg, this.id);
if (config.TRACE_SYNC_MESSAGES) {
transformedMessages.map((msg) => {
console.log("🟢 <<<=== Sending to peer", this.id, msg);
});
}
void this.processQueue();
try {
return await Promise.all(
transformedMessages.map((msg_3) => {
const promise = this.queue.push(msg_3);
return promise;
void this.processQueue();
return promise;
}),
);
} catch (e) {
console.error("Error sending to peer", this.id, transformedMessages, e);
}
}
get incoming() {

View File

@@ -0,0 +1,159 @@
import { CoValueCore } from "../coValueCore.js";
import { RawCoID } from "../ids.js";
import {
CoValueKnownState,
DataMessage,
PushMessage,
SyncMessage,
} from "../sync/types.js";
import { PeerEntry } from "./PeerEntry.js";
export function emptyDataMessage(
id: RawCoID,
asDependencyOf?: RawCoID,
): DataMessage {
const message: DataMessage = {
id,
known: true,
header: undefined,
action: "data",
priority: 0,
new: {},
};
return asDependencyOf ? { ...message, asDependencyOf } : message;
}
export function unknownDataMessage(
id: RawCoID,
asDependencyOf?: RawCoID,
): DataMessage {
const message: DataMessage = {
id,
known: false,
header: undefined,
action: "data",
priority: 0,
new: {},
};
return asDependencyOf ? { ...message, asDependencyOf } : message;
}
/**
* The PeerOperations class centralizes the sending logic for the atomic synchronization operations:
* pull, push, ack, and data, implementing the protocol.
*/
export class PeerOperations {
constructor(private readonly peer: PeerEntry) {}
async pull({ knownState }: { knownState: CoValueKnownState }) {
if (this.peer.closed) return;
return this.peer.pushOutgoingMessage({
...knownState,
action: "pull",
});
}
async ack({ knownState }: { knownState: CoValueKnownState }) {
if (this.peer.closed) return;
return this.peer.pushOutgoingMessage({
...knownState,
action: "ack",
});
}
async push({
peerKnownState,
coValue,
}: { peerKnownState: CoValueKnownState; coValue: CoValueCore }) {
if (this.peer.closed) return;
return this.sendContent({
peerKnownState,
coValue,
action: "push",
});
}
async data({
peerKnownState,
coValue,
dependencies = [],
}: {
peerKnownState: CoValueKnownState;
coValue: CoValueCore | "empty" | "unknown";
dependencies?: CoValueCore[];
}) {
if (this.peer.closed) return;
if (coValue === "empty") {
return this.peer.pushOutgoingMessage(emptyDataMessage(peerKnownState.id));
}
if (coValue === "unknown") {
return this.peer.pushOutgoingMessage(
unknownDataMessage(peerKnownState.id),
);
}
const sendContentOrEmptyMessage = async (params: SendContentParamsType) => {
const sentContentPiecesNumber = await this.sendContent(params);
if (!sentContentPiecesNumber) {
void this.data({ peerKnownState, coValue: "empty" });
}
};
// send dependencies first
await Promise.all(
dependencies.map((depCoValue) =>
sendContentOrEmptyMessage({
peerKnownState,
coValue: depCoValue,
action: "data",
asDependencyOf: coValue.id,
}),
),
);
// Send new content pieces (possibly, in chunks) created after peerKnownState that passed in
return sendContentOrEmptyMessage({
peerKnownState,
coValue,
action: "data",
});
}
private async sendContent({
peerKnownState,
coValue,
action,
asDependencyOf,
}: SendContentParamsType): Promise<number> {
const newContentPieces = coValue.newContentSince(peerKnownState);
if (newContentPieces) {
for (const [_i, piece] of newContentPieces.entries()) {
let msg: SyncMessage;
if (action === "data") {
msg = { ...piece, action, known: true } as DataMessage;
} else {
msg = { ...piece, action } as PushMessage;
}
if (asDependencyOf) msg = { ...msg, asDependencyOf };
void this.peer.pushOutgoingMessage(msg);
}
}
return newContentPieces?.length || 0;
}
}
type SendContentParamsType = {
peerKnownState: CoValueKnownState;
coValue: CoValueCore;
action: "push" | "data";
asDependencyOf?: RawCoID;
};

View File

@@ -0,0 +1,75 @@
import { RawCoID } from "../ids.js";
import { Peer, PeerEntry, PeerID } from "./PeerEntry.js";
export class Peers {
private readonly peers: { [key: PeerID]: PeerEntry } = {};
add(peerData: Peer) {
const prevPeer = this.peers[peerData.id];
const peer = new PeerEntry(peerData);
this.peers[peerData.id] = peer;
if (prevPeer && !prevPeer.closed) {
prevPeer.gracefulShutdown();
}
return peer;
}
get(id: PeerID): PeerEntry | void {
if (this.peers[id]) {
return this.peers[id];
}
}
getMany(ids: PeerID[]): PeerEntry[] {
return this.getAll().filter((peer: PeerEntry) => ids.includes(peer.id));
}
getAll(): PeerEntry[] {
return Object.values(this.peers);
}
delete(id: PeerID) {
if (this.peers[id]) {
delete this.peers[id];
}
}
getInPriorityOrder({
excludedId,
}: { excludedId?: PeerID } = {}): PeerEntry[] {
return Object.values(this.peers)
.sort((a, b) => {
const aPriority = a.priority || 0;
const bPriority = b.priority || 0;
return bPriority - aPriority;
})
.filter((peer) => (excludedId ? peer.id !== excludedId : true));
}
getServerAndStorage({
excludedId,
includedId,
}: { excludedId?: PeerID; includedId?: PeerID } = {}): PeerEntry[] {
return this.getInPriorityOrder({ excludedId }).filter(
(peer) =>
peer.isServerOrStoragePeer() ||
(includedId ? peer.id === includedId : false),
);
}
}
export function getPeersWithoutErrors(peers: PeerEntry[], coValueId: RawCoID) {
return peers.filter((p) => {
if (p.erroredCoValues.has(coValueId)) {
console.error(
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
);
return false;
}
return true;
});
}

View File

@@ -0,0 +1,2 @@
export * from "./PeerEntry.js";
export * from "./Peers.js";

View File

@@ -0,0 +1,81 @@
import { unknownDataMessage } from "../exports.js";
import { SessionID } from "../ids.js";
import { CoValueContent, SyncMessage } from "../sync/types.js";
export const transformOutgoingMessageToPeer = (
msg: SyncMessage,
id: string,
): SyncMessage[] => {
if (!id.includes("cloud")) {
return [msg];
}
const getSessionsObj = (msg: CoValueContent) =>
Object.entries(msg.new).reduce<{ [sessionID: SessionID]: number }>(
(acc, [session, content]) => {
acc[session as SessionID] =
content.after + content.newTransactions.length;
return acc;
},
{},
);
switch (msg.action) {
case "pull":
// load
return [{ ...msg, action: "load" }];
case "push":
// load + content
return [
{
action: "load",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "data":
if (!msg.known)
return [{ action: "known", id: msg.id, header: false, sessions: {} }];
// known + content => no response expected
return [
{
action: "known",
id: msg.id,
header: true,
sessions: getSessionsObj(msg),
},
{ ...msg, action: "content" },
];
case "ack":
// known => no response expected
return [{ ...msg, action: "known" }];
default:
return [msg];
}
};
export const transformIncomingMessageFromPeer = (
msg: SyncMessage,
id: string,
): SyncMessage => {
if (!id.includes("cloud")) {
return msg;
}
switch (msg.action) {
case "load":
return { ...msg, action: "pull" };
case "content":
return { ...msg, action: "push" };
case "known":
if (!msg.header) return unknownDataMessage(msg.id);
if (msg.isCorrection) return { ...msg, action: "pull" };
return { ...msg, action: "ack" };
default:
return msg;
}
};

View File

@@ -1,7 +1,7 @@
import { MAX_RECOMMENDED_TX_SIZE } from "../coValueCore.js";
import { RawCoID, SessionID } from "../ids.js";
import { getPriorityFromHeader } from "../priority.js";
import { CoValueKnownState, NewContentMessage } from "../sync.js";
import { CoValueKnownState, NewContentMessage } from "../sync/types.js";
import { CoValueChunk } from "./index.js";
export function contentSinceChunk(

View File

@@ -2,14 +2,10 @@ import { CoID, RawCoValue } from "../coValue.js";
import { CoValueHeader, Transaction } from "../coValueCore.js";
import { Signature } from "../crypto/crypto.js";
import { RawCoID } from "../ids.js";
import { IncomingSyncStream, OutgoingSyncQueue } from "../localNode.js";
import { Peer } from "../peer/PeerEntry.js";
import { connectedPeers } from "../streamUtils.js";
import {
CoValueKnownState,
IncomingSyncStream,
NewContentMessage,
OutgoingSyncQueue,
Peer,
} from "../sync.js";
import { CoValueKnownState, NewContentMessage } from "../sync/types.js";
import {
BlockFilename,
FileSystem,
@@ -73,9 +69,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
if (msg.action === "done") {
return;
}
if (msg.action === "content") {
await this.handleNewContent(msg);

View File

@@ -1,5 +1,7 @@
import { Channel } from "queueueue";
import { Peer, PeerID, SyncMessage } from "./sync.js";
import { Peer, PeerID } from "./peer/PeerEntry.js";
import { SyncMessage } from "./sync/types.js";
export { Channel } from "queueueue";
export function connectedPeers(

View File

@@ -1,669 +1,85 @@
import { PeerState } from "./PeerState.js";
import { SyncStateSubscriptionManager } from "./SyncStateSubscriptionManager.js";
import { CoValueHeader, Transaction } from "./coValueCore.js";
import { CoValueCore } from "./coValueCore.js";
import { Signature } from "./crypto/crypto.js";
import { RawCoID, SessionID } from "./ids.js";
import { CoValueEntry } from "./coValueEntry.js";
import { RawCoID } from "./ids.js";
import { LocalNode } from "./localNode.js";
import { CoValuePriority } from "./priority.js";
export type CoValueKnownState = {
id: RawCoID;
header: boolean;
sessions: { [sessionID: SessionID]: number };
};
export function emptyKnownState(id: RawCoID): CoValueKnownState {
return {
id,
header: false,
sessions: {},
};
}
export type SyncMessage =
| LoadMessage
| KnownStateMessage
| NewContentMessage
| DoneMessage;
export type LoadMessage = {
action: "load";
} & CoValueKnownState;
export type KnownStateMessage = {
action: "known";
asDependencyOf?: RawCoID;
isCorrection?: boolean;
} & CoValueKnownState;
export type NewContentMessage = {
action: "content";
id: RawCoID;
header?: CoValueHeader;
priority: CoValuePriority;
new: {
[sessionID: SessionID]: SessionNewContent;
};
};
export type SessionNewContent = {
after: number;
newTransactions: Transaction[];
lastSignature: Signature;
};
export type DoneMessage = {
action: "done";
id: RawCoID;
};
export type PeerID = string;
import { PeerEntry, PeerID } from "./peer/index.js";
import { DependencyService } from "./sync/DependencyService.js";
import {
AckResponseHandler,
CoValueKnownState,
DataResponseHandler,
LoadService,
MessageHandlerInterface,
PullRequestHandler,
PushRequestHandler,
SyncMessage,
SyncService,
} from "./sync/index.js";
export type DisconnectedError = "Disconnected";
export type PingTimeoutError = "PingTimeout";
export type IncomingSyncStream = AsyncIterable<
SyncMessage | DisconnectedError | PingTimeoutError
>;
export type OutgoingSyncQueue = {
push: (msg: SyncMessage) => Promise<unknown>;
close: () => void;
};
export interface Peer {
id: PeerID;
incoming: IncomingSyncStream;
outgoing: OutgoingSyncQueue;
role: "peer" | "server" | "client" | "storage";
priority?: number;
crashOnClose: boolean;
deletePeerStateOnClose?: boolean;
}
export function combinedKnownStates(
stateA: CoValueKnownState,
stateB: CoValueKnownState,
): CoValueKnownState {
const sessionStates: CoValueKnownState["sessions"] = {};
const allSessions = new Set([
...Object.keys(stateA.sessions),
...Object.keys(stateB.sessions),
] as SessionID[]);
for (const sessionID of allSessions) {
const stateAValue = stateA.sessions[sessionID];
const stateBValue = stateB.sessions[sessionID];
sessionStates[sessionID] = Math.max(stateAValue || 0, stateBValue || 0);
}
return {
id: stateA.id,
header: stateA.header || stateB.header,
sessions: sessionStates,
};
}
export class SyncManager {
peers: { [key: PeerID]: PeerState } = {};
local: LocalNode;
requestedSyncs: {
[id: RawCoID]:
| { done: Promise<void>; nRequestsThisTick: number }
| undefined;
} = {};
private readonly loadService: LoadService;
private readonly syncService: SyncService;
private readonly pullRequestHandler: PullRequestHandler;
private readonly pushRequestHandler: PushRequestHandler;
private readonly ackResponseHandler: AckResponseHandler;
private readonly dataResponseHandler: DataResponseHandler;
private readonly dependencyService: DependencyService;
constructor(local: LocalNode) {
this.local = local;
this.syncStateSubscriptionManager = new SyncStateSubscriptionManager(this);
}
syncStateSubscriptionManager: SyncStateSubscriptionManager;
peersInPriorityOrder(): PeerState[] {
return Object.values(this.peers).sort((a, b) => {
const aPriority = a.priority || 0;
const bPriority = b.priority || 0;
return bPriority - aPriority;
});
}
getPeers(): PeerState[] {
return Object.values(this.peers);
}
getServerAndStoragePeers(excludePeerId?: PeerID): PeerState[] {
return this.peersInPriorityOrder().filter(
(peer) => peer.isServerOrStoragePeer() && peer.id !== excludePeerId,
);
}
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
if (peer.erroredCoValues.has(msg.id)) {
console.error(
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
);
return;
}
// TODO: validate
switch (msg.action) {
case "load":
return await this.handleLoad(msg, peer);
case "known":
if (msg.isCorrection) {
return await this.handleCorrection(msg, peer);
} else {
return await this.handleKnownState(msg, peer);
}
case "content":
// await new Promise<void>((resolve) => setTimeout(resolve, 0));
return await this.handleNewContent(msg, peer);
case "done":
return await this.handleUnsubscribe(msg);
default:
throw new Error(
`Unknown message type ${(msg as { action: "string" }).action}`,
);
}
}
async subscribeToIncludingDependencies(id: RawCoID, peer: PeerState) {
const entry = this.local.coValuesStore.get(id);
if (entry.state.type !== "available") {
entry.loadFromPeers([peer]).catch((e: unknown) => {
console.error("Error sending load", e);
});
return;
}
const coValue = entry.state.coValue;
for (const id of coValue.getDependedOnCoValues()) {
await this.subscribeToIncludingDependencies(id, peer);
}
if (!peer.toldKnownState.has(id)) {
peer.toldKnownState.add(id);
this.trySendToPeer(peer, {
action: "load",
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending load", e);
});
}
}
async tellUntoldKnownStateIncludingDependencies(
id: RawCoID,
peer: PeerState,
asDependencyOf?: RawCoID,
) {
const coValue = this.local.expectCoValueLoaded(id);
await Promise.all(
coValue
.getDependedOnCoValues()
.map((dependentCoID) =>
this.tellUntoldKnownStateIncludingDependencies(
dependentCoID,
peer,
asDependencyOf || id,
),
),
);
if (!peer.toldKnownState.has(id)) {
this.trySendToPeer(peer, {
action: "known",
asDependencyOf,
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending known state", e);
});
peer.toldKnownState.add(id);
}
}
async sendNewContentIncludingDependencies(id: RawCoID, peer: PeerState) {
const coValue = this.local.expectCoValueLoaded(id);
await Promise.all(
coValue
.getDependedOnCoValues()
.map((id) => this.sendNewContentIncludingDependencies(id, peer)),
);
const newContentPieces = coValue.newContentSince(
peer.optimisticKnownStates.get(id),
);
if (newContentPieces) {
const optimisticKnownStateBefore =
peer.optimisticKnownStates.get(id) || emptyKnownState(id);
const sendPieces = async () => {
let lastYield = performance.now();
for (const [_i, piece] of newContentPieces.entries()) {
// console.log(
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${
// newContentPieces.length
// } header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
this.trySendToPeer(peer, piece).catch((e: unknown) => {
console.error("Error sending content piece", e);
});
if (performance.now() - lastYield > 10) {
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
});
lastYield = performance.now();
}
}
};
sendPieces().catch((e) => {
console.error("Error sending new content piece, retrying", e);
peer.optimisticKnownStates.dispatch({
type: "SET",
id,
value: optimisticKnownStateBefore ?? emptyKnownState(id),
});
return this.sendNewContentIncludingDependencies(id, peer);
});
peer.optimisticKnownStates.dispatch({
type: "COMBINE_WITH",
id,
value: coValue.knownState(),
});
}
}
addPeer(peer: Peer) {
const prevPeer = this.peers[peer.id];
const peerState = new PeerState(peer, prevPeer?.knownStates);
this.peers[peer.id] = peerState;
if (prevPeer && !prevPeer.closed) {
prevPeer.gracefulShutdown();
}
const unsubscribeFromKnownStatesUpdates = peerState.knownStates.subscribe(
(id) => {
this.syncStateSubscriptionManager.triggerUpdate(peer.id, id);
this.syncService = new SyncService(
// onPushContent callback
({ entry, peerId }: { entry: CoValueEntry; peerId: PeerID }) => {
entry.uploadState.setPendingForPeer(peerId);
},
);
if (peerState.isServerOrStoragePeer()) {
const initialSync = async () => {
for (const entry of this.local.coValuesStore.getValues()) {
await this.subscribeToIncludingDependencies(entry.id, peerState);
this.loadService = new LoadService();
this.dependencyService = new DependencyService(this, this.loadService);
if (entry.state.type === "available") {
await this.sendNewContentIncludingDependencies(entry.id, peerState);
}
this.pullRequestHandler = new PullRequestHandler(this.loadService);
this.pushRequestHandler = new PushRequestHandler(
this.syncService,
// The reason for this ugly callback here is to avoid having the local node as a dependency in the handler,
// This should be removed after CoValueCore is decoupled from the local node instance
this.dependencyService,
);
if (!peerState.optimisticKnownStates.has(entry.id)) {
peerState.optimisticKnownStates.dispatch({
type: "SET_AS_EMPTY",
id: entry.id,
});
}
}
};
void initialSync();
}
this.ackResponseHandler = new AckResponseHandler(
// onPushContentAcknowledged callback
({ entry, peerId }: { entry: CoValueEntry; peerId: PeerID }) => {
entry.uploadState.setCompletedForPeer(peerId);
},
);
const processMessages = async () => {
for await (const msg of peerState.incoming) {
if (msg === "Disconnected") {
return;
}
if (msg === "PingTimeout") {
console.error("Ping timeout from peer", peer.id);
return;
}
try {
await this.handleSyncMessage(msg, peerState);
} catch (e) {
throw new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
);
}
}
};
processMessages()
.then(() => {
if (peer.crashOnClose) {
console.error("Unexepcted close from peer", peer.id);
this.local.crashed = new Error("Unexpected close from peer");
throw new Error("Unexpected close from peer");
}
})
.catch((e) => {
console.error("Error processing messages from peer", peer.id, e);
if (peer.crashOnClose) {
this.local.crashed = e;
throw new Error(e);
}
})
.finally(() => {
const state = this.peers[peer.id];
state?.gracefulShutdown();
unsubscribeFromKnownStatesUpdates();
if (peer.deletePeerStateOnClose) {
delete this.peers[peer.id];
}
});
this.dataResponseHandler = new DataResponseHandler(
this.dependencyService,
this.syncService,
);
}
trySendToPeer(peer: PeerState, msg: SyncMessage) {
return peer.pushOutgoingMessage(msg);
async initialSync(peer: PeerEntry) {
return this.syncService.initialSync(peer, this.local.coValuesStore);
}
async handleLoad(msg: LoadMessage, peer: PeerState) {
peer.dispatchToKnownStates({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
const entry = this.local.coValuesStore.get(msg.id);
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
if (eligiblePeers.length === 0) {
// If the load request contains a header or any session data
// and we don't have any eligible peers to load the coValue from
// we try to load it from the sender because it is the only place
// where we can get informations about the coValue
if (msg.header || Object.keys(msg.sessions).length > 0) {
entry.loadFromPeers([peer]).catch((e) => {
console.error("Error loading coValue in handleLoad", e);
});
}
return;
} else {
this.local.loadCoValueCore(msg.id, peer.id).catch((e) => {
console.error("Error loading coValue in handleLoad", e);
});
}
}
if (entry.state.type === "loading") {
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
// in a new task, otherwise we might block further incoming content messages that would
// resolve the CoValue as available. This can happen when we receive fresh
// content from a client, but we are a server with our own upstream server(s)
entry
.getCoValue()
.then(async (value) => {
if (value === "unavailable") {
peer.dispatchToKnownStates({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
peer.toldKnownState.add(msg.id);
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
});
return;
}
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
await this.sendNewContentIncludingDependencies(msg.id, peer);
})
.catch((e) => {
console.error("Error loading coValue in handleLoad loading state", e);
});
}
if (entry.state.type === "available") {
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
await this.sendNewContentIncludingDependencies(msg.id, peer);
}
}
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
const entry = this.local.coValuesStore.get(msg.id);
peer.dispatchToKnownStates({
type: "COMBINE_WITH",
id: msg.id,
value: knownStateIn(msg),
});
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
if (msg.asDependencyOf) {
const dependencyEntry = this.local.coValuesStore.get(
msg.asDependencyOf,
);
if (
dependencyEntry.state.type === "available" ||
dependencyEntry.state.type === "loading"
) {
this.local
.loadCoValueCore(
msg.id,
peer.role === "storage" ? undefined : peer.id,
)
.catch((e) => {
console.error(
`Error loading coValue ${msg.id} to create loading state, as dependency of ${msg.asDependencyOf}`,
e,
);
});
}
}
}
// The header is a boolean value that tells us if the other peer do have information about the header.
// If it's false in this point it means that the coValue is unavailable on the other peer.
if (entry.state.type !== "available") {
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
if (!availableOnPeer) {
entry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
}
return;
}
if (entry.state.type === "available") {
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
await this.sendNewContentIncludingDependencies(msg.id, peer);
}
}
async handleNewContent(msg: NewContentMessage, peer: PeerState) {
const entry = this.local.coValuesStore.get(msg.id);
let coValue: CoValueCore;
if (entry.state.type !== "available") {
if (!msg.header) {
console.error("Expected header to be sent in first message");
return;
}
peer.dispatchToKnownStates({
type: "UPDATE_HEADER",
id: msg.id,
header: true,
});
coValue = new CoValueCore(msg.header, this.local);
entry.dispatch({
type: "available",
coValue,
});
} else {
coValue = entry.state.coValue;
}
let invalidStateAssumed = false;
for (const [sessionID, newContentForSession] of Object.entries(msg.new) as [
SessionID,
SessionNewContent,
][]) {
const ourKnownTxIdx =
coValue.sessionLogs.get(sessionID)?.transactions.length;
const theirFirstNewTxIdx = newContentForSession.after;
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
invalidStateAssumed = true;
continue;
}
const alreadyKnownOffset = ourKnownTxIdx
? ourKnownTxIdx - theirFirstNewTxIdx
: 0;
const newTransactions =
newContentForSession.newTransactions.slice(alreadyKnownOffset);
if (newTransactions.length === 0) {
continue;
}
const before = performance.now();
// eslint-disable-next-line neverthrow/must-use-result
const result = coValue.tryAddTransactions(
sessionID,
newTransactions,
undefined,
newContentForSession.lastSignature,
);
const after = performance.now();
if (after - before > 80) {
const totalTxLength = newTransactions
.map((t) =>
t.privacy === "private"
? t.encryptedChanges.length
: t.changes.length,
)
.reduce((a, b) => a + b, 0);
console.log(
`Adding incoming transactions took ${(after - before).toFixed(
2,
)}ms for ${totalTxLength} bytes = bandwidth: ${(
(1000 * totalTxLength) / (after - before) / (1024 * 1024)
).toFixed(2)} MB/s`,
);
}
// const theirTotalnTxs = Object.values(
// peer.optimisticKnownStates[msg.id]?.sessions || {},
// ).reduce((sum, nTxs) => sum + nTxs, 0);
// const ourTotalnTxs = [...coValue.sessionLogs.values()].reduce(
// (sum, session) => sum + session.transactions.length,
// 0,
// );
if (result.isErr()) {
console.error(
"Failed to add transactions from",
peer.id,
result.error,
msg.id,
newTransactions.length + " new transactions",
"after: " + newContentForSession.after,
"our last known tx idx initially: " + ourKnownTxIdx,
"our last known tx idx now: " +
coValue.sessionLogs.get(sessionID)?.transactions.length,
);
peer.erroredCoValues.set(msg.id, result.error);
continue;
}
peer.dispatchToKnownStates({
type: "UPDATE_SESSION_COUNTER",
id: msg.id,
sessionId: sessionID,
value:
newContentForSession.after +
newContentForSession.newTransactions.length,
});
}
if (invalidStateAssumed) {
this.trySendToPeer(peer, {
action: "known",
isCorrection: true,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state correction", e);
});
} else {
/**
* We are sending a known state message to the peer to acknowledge the
* receipt of the new content.
*
* This way the sender knows that the content has been received and applied
* and can update their peer's knownState accordingly.
*/
this.trySendToPeer(peer, {
action: "known",
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending known state", e);
});
}
/**
* We do send a correction/ack message before syncing to give an immediate
* response to the peers that are waiting for confirmation that a coValue is
* fully synced
*/
await this.syncCoValue(coValue);
}
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
peer.dispatchToKnownStates({
type: "SET",
id: msg.id,
value: knownStateIn(msg),
});
return this.sendNewContentIncludingDependencies(msg.id, peer);
}
handleUnsubscribe(_msg: DoneMessage) {
throw new Error("Method not implemented.");
}
async syncCoValue(coValue: CoValueCore) {
async syncCoValue(
coValue: CoValueCore,
peersKnownState: CoValueKnownState,
peers?: PeerEntry[],
) {
if (this.requestedSyncs[coValue.id]) {
this.requestedSyncs[coValue.id]!.nRequestsThisTick++;
return this.requestedSyncs[coValue.id]!.done;
@@ -671,83 +87,66 @@ export class SyncManager {
const done = new Promise<void>((resolve) => {
queueMicrotask(async () => {
delete this.requestedSyncs[coValue.id];
// if (entry.nRequestsThisTick >= 2) {
// console.log("Syncing", coValue.id, "for", entry.nRequestsThisTick, "requests");
// }
await this.actuallySyncCoValue(coValue);
const entry = this.local.coValuesStore.get(coValue.id);
await this.syncService.syncCoValue(entry, peersKnownState, peers);
resolve();
});
});
const entry = {
this.requestedSyncs[coValue.id] = {
done,
nRequestsThisTick: 1,
};
this.requestedSyncs[coValue.id] = entry;
return done;
}
}
async actuallySyncCoValue(coValue: CoValueCore) {
// let blockingSince = performance.now();
for (const peer of this.peersInPriorityOrder()) {
if (peer.closed) continue;
if (peer.erroredCoValues.has(coValue.id)) continue;
// if (performance.now() - blockingSince > 5) {
// await new Promise<void>((resolve) => {
// setTimeout(resolve, 0);
// });
// blockingSince = performance.now();
// }
if (peer.optimisticKnownStates.has(coValue.id)) {
await this.tellUntoldKnownStateIncludingDependencies(coValue.id, peer);
await this.sendNewContentIncludingDependencies(coValue.id, peer);
} else if (peer.isServerOrStoragePeer()) {
await this.subscribeToIncludingDependencies(coValue.id, peer);
await this.sendNewContentIncludingDependencies(coValue.id, peer);
}
}
async loadCoValue(id: RawCoID): Promise<CoValueCore | "unavailable"> {
const entry = this.local.coValuesStore.get(id);
return this.loadService.loadCoValue(entry);
}
for (const peer of this.getPeers()) {
this.syncStateSubscriptionManager.triggerUpdate(peer.id, coValue.id);
handleSyncMessage(msg: SyncMessage, peer: PeerEntry) {
if (peer.erroredCoValues.has(msg.id)) {
console.error(
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
);
return;
}
const entry = this.local.coValuesStore.get(msg.id);
let handler: MessageHandlerInterface;
switch (msg.action) {
case "data":
handler = this.dataResponseHandler;
break;
case "push":
handler = this.pushRequestHandler;
break;
case "pull":
handler = this.pullRequestHandler;
break;
case "ack":
handler = this.ackResponseHandler;
break;
default:
throw new Error(
`Unknown message type ${(msg as unknown as { action: "string" }).action}`,
);
}
return handler.handle({ msg, peer, entry });
}
async waitForUploadIntoPeer(peerId: PeerID, id: RawCoID) {
const isAlreadyUploaded =
this.syncStateSubscriptionManager.getIsCoValueFullyUploadedIntoPeer(
peerId,
id,
);
const entry = this.local.coValuesStore.get(id);
if (!entry) {
throw new Error(`Unknown coValue ${id}`);
}
if (isAlreadyUploaded) {
if (entry.uploadState.isCoValueFullyUploadedIntoPeer(peerId)) {
return true;
}
return new Promise((resolve) => {
const unsubscribe =
this.syncStateSubscriptionManager.subscribeToPeerUpdates(
peerId,
(knownState, syncState) => {
if (syncState.isUploaded && knownState.id === id) {
resolve(true);
unsubscribe?.();
}
},
);
});
}
gracefulShutdown() {
for (const peer of Object.values(this.peers)) {
peer.gracefulShutdown();
}
return entry.uploadState.waitForPeer(peerId);
}
}
function knownStateIn(msg: LoadMessage | KnownStateMessage) {
return {
id: msg.id,
header: msg.header,
sessions: msg.sessions,
};
}

View File

@@ -0,0 +1,29 @@
import { ParallelQueueRunner } from "../utils/parallelQueueRunner.js";
import { MessageHandlerInput, MessageHandlerInterface } from "./types.js";
export abstract class AbstractMessageHandler
implements MessageHandlerInterface
{
private readonly queuesRunner = new ParallelQueueRunner();
handle({ msg, peer, entry }: MessageHandlerInput) {
this.queuesRunner.defferPer(msg.id, () =>
this.routeMessage({ msg, peer, entry }),
);
}
protected routeMessage({ msg, peer, entry }: MessageHandlerInput) {
switch (entry.state.type) {
case "available":
return this.handleAvailable({ msg, peer, entry });
case "loading":
return this.handleLoading({ msg, peer, entry });
case "unknown":
case "unavailable":
return this.handleUnknown({ msg, peer, entry });
}
}
abstract handleAvailable(input: MessageHandlerInput): Promise<unknown>;
abstract handleLoading(input: MessageHandlerInput): Promise<unknown>;
abstract handleUnknown(input: MessageHandlerInput): Promise<unknown>;
}

View File

@@ -0,0 +1,46 @@
import { CoValueEntry } from "../coValueEntry.js";
import { PeerEntry, PeerID } from "../peer/PeerEntry.js";
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
import { AckMessage } from "./types.js";
export type AckMessageHandlerInput = {
msg: AckMessage;
peer: PeerEntry;
entry: CoValueEntry;
};
export class AckResponseHandler extends AbstractMessageHandler {
constructor(
private onPushContentAcknowledged?: ({
entry,
peerId,
}: { entry: CoValueEntry; peerId: PeerID }) => void,
) {
super();
}
async handleAvailable(input: AckMessageHandlerInput) {
if (this.onPushContentAcknowledged) {
this.onPushContentAcknowledged({
entry: input.entry,
peerId: input.peer.id,
});
}
}
async handleLoading(input: AckMessageHandlerInput) {
console.error(
"Unexpected loading state. Ack message is a response to a push request and should not be received for loading coValue.",
input.msg.id,
input.peer.id,
);
}
async handleUnknown(input: AckMessageHandlerInput) {
console.error(
"Unexpected unavailable state. Ack message is a response to a push request and should not be received for unavailable coValue.",
input.msg.id,
input.peer.id,
);
}
}

View File

@@ -0,0 +1,129 @@
import { isTryAddTransactionsException } from "../coValueCore.js";
import { CoValueAvailableState } from "../coValueEntry.js";
import { LocalNode } from "../exports.js";
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
import { DependencyService } from "./DependencyService.js";
import { SyncService } from "./SyncService.js";
import { DataMessageHandlerInput, emptyKnownState } from "./types.js";
/**
* "Data" is a response to our "pull" message. It's always some data we asked for, initially.
* It's a terminal message which must not be responded to.
* At this stage the coValue state is considered synced between the peer and the node.
*/
export class DataResponseHandler extends AbstractMessageHandler {
constructor(
private readonly dependencyService: DependencyService,
private readonly syncService: SyncService,
) {
super();
}
async handleAvailable(input: DataMessageHandlerInput): Promise<void> {
const { msg, entry, peer } = input;
await this.dependencyService.loadUnknownDependencies(input);
if (!msg.known) {
// Send coValue to the peer if not known by the peer but available on our side
return this.syncService.syncCoValue(entry, emptyKnownState(msg.id), [
peer,
]);
}
this.addData(input);
// Push data to peers which are not aware of the coValue,
// they are preserved in entry.uploadState after being marked as 'not-found-in-peer'
const unawarePeerIds = entry.uploadState.getUnawarePeerIds();
if (unawarePeerIds.length) {
void this.syncService.syncCoValue(
entry,
emptyKnownState(msg.id),
LocalNode.peers.getMany(unawarePeerIds),
);
}
}
async handleLoading(input: DataMessageHandlerInput) {
const { peer, msg, entry } = input;
// not known by peer
if (!msg.known) {
entry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
return;
}
if (!msg.header) {
console.error(
"Unexpected empty header in message. Data message is a response to a pull request and should be received for available coValue or include the full header.",
msg.id,
peer.id,
);
return;
}
await this.dependencyService.MakeAvailableWithDependencies(input);
return this.routeMessage(input);
}
async handleUnknown(input: DataMessageHandlerInput) {
const { peer, msg, entry } = input;
if (!msg.known) {
input.entry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
});
return;
}
if (!msg.asDependencyOf) {
console.error(
"Unexpected coValue unavailable state in DataResponseHandler",
peer.id,
msg.id,
);
}
entry.moveToLoadingState([peer]);
return this.routeMessage(input);
}
addData(input: DataMessageHandlerInput) {
const { peer, msg, entry } = input;
const { coValue } = entry.state as CoValueAvailableState;
try {
const anyMissedTransaction = coValue.addNewContent(msg);
if (anyMissedTransaction) {
console.error(
"Unexpected missed transactions in data message",
peer.id,
msg,
);
return false;
}
} catch (e) {
if (isTryAddTransactionsException(e)) {
const { message, error } = e;
console.error(peer.id, message, error);
peer.erroredCoValues.set(msg.id, error);
} else {
console.error("Unknown error", peer.id, e);
}
return false;
}
return true;
}
}

View File

@@ -0,0 +1,93 @@
import {
CoValueCore,
CoValueHeader,
getDependedOnFromContent,
} from "../coValueCore.js";
import { CoValueAvailableState, CoValueEntry } from "../coValueEntry.js";
import { SyncManager } from "../sync.js";
import { LoadService } from "./LoadService.js";
import {
CoValueContent,
DataMessageHandlerInput,
PushMessageHandlerInput,
} from "./types.js";
export class DependencyService {
constructor(
private syncManager: SyncManager,
private loadService: LoadService,
) {}
private async getUnknownDependencies(
input: DataMessageHandlerInput | PushMessageHandlerInput,
) {
const { msg, entry } = input;
const isAvailable = entry.state.type === "available";
if (!msg.header && !isAvailable) {
throw new Error(`Cannot get dependencies without header ${msg.id}`);
}
const availableCoValue = isAvailable
? (entry.state as CoValueAvailableState).coValue
: null;
const header = availableCoValue ? availableCoValue.header : msg.header;
const dependencies = new Set([
...getDependedOnFromContent({
...msg,
header,
} as Required<CoValueContent>),
...(availableCoValue ? availableCoValue.getDependedOnCoValues() : []),
]);
const unknownDependencies: CoValueEntry[] = [];
for (const id of dependencies) {
const entry = this.syncManager.local.coValuesStore.get(id);
if (entry.state.type === "loading") {
await entry.getCoValue();
}
if (entry.state.type !== "available") {
unknownDependencies.push(entry);
}
}
return unknownDependencies;
}
async loadUnknownDependencies(
input: DataMessageHandlerInput | PushMessageHandlerInput,
) {
const unknownDependencies = await this.getUnknownDependencies(input);
// load dependencies one by one as they can depend on each other
for await (const dependency of unknownDependencies) {
await this.loadService.loadCoValue(dependency);
}
}
private createCoValue(header: CoValueHeader) {
return new CoValueCore(header, this.syncManager.local);
}
async MakeAvailableWithDependencies(
input: PushMessageHandlerInput | DataMessageHandlerInput,
) {
if (!input.msg.header) {
throw new Error(`Empty header for ${input.msg.id}`);
}
if (input.entry.state.type === "available") {
throw new Error(
`CoValue is already available, requested to make available for ${input.msg.id}`,
);
}
await this.loadUnknownDependencies(input);
const coValue = this.createCoValue(input.msg.header);
input.entry.dispatch({
type: "available",
coValue,
});
}
}

View File

@@ -0,0 +1,66 @@
import { CoValueCore } from "../coValueCore.js";
import { CO_VALUE_LOADING_TIMEOUT, CoValueEntry } from "../coValueEntry.js";
import { LocalNode } from "../exports.js";
import { PeerEntry, getPeersWithoutErrors } from "../peer/index.js";
import { emptyKnownState } from "./types.js";
export class LoadService {
constructor() {}
/**
* Sends "pull" request to peers to load/update the coValue state and request to subscribe to peer's updates if have not
*
* @param entry
* @param peerToLoadFrom - Required peer to send the request to
*/
async loadCoValue(
entry: CoValueEntry,
peerToLoadFrom?: PeerEntry,
): Promise<CoValueCore | "unavailable"> {
const peers = peerToLoadFrom
? [peerToLoadFrom]
: LocalNode.peers.getServerAndStorage();
try {
await entry.loadFromPeers(
getPeersWithoutErrors(peers, entry.id),
loadCoValueFromPeers,
);
} catch (e) {
console.error("Error loading from peers", entry.id, e);
}
return entry.getCoValue();
}
}
async function loadCoValueFromPeers(
coValueEntry: CoValueEntry,
peers: PeerEntry[],
) {
for await (const peer of peers) {
if (coValueEntry.state.type === "available") {
await peer.send.pull({
knownState: coValueEntry.state.coValue.knownState(),
});
} else {
await peer.send.pull({ knownState: emptyKnownState(coValueEntry.id) });
}
if (coValueEntry.state.type === "loading") {
const timeout = setTimeout(() => {
if (coValueEntry.state.type === "loading") {
console.error(
"Failed to load coValue from peer",
peer.id,
coValueEntry.id,
);
coValueEntry.markAsNotFoundInPeer(peer.id);
}
}, CO_VALUE_LOADING_TIMEOUT);
await coValueEntry.state.waitForPeer(peer.id);
clearTimeout(timeout);
}
}
}

View File

@@ -0,0 +1,57 @@
import { CoValueAvailableState, CoValueEntry } from "../coValueEntry.js";
import { PeerEntry } from "../peer/index.js";
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
import { LoadService } from "./LoadService.js";
import { PullMessage } from "./types.js";
export type PullMessageHandlerInput = {
msg: PullMessage;
peer: PeerEntry;
entry: CoValueEntry;
};
/**
* "Pull" request must be followed by "data" message response according to the protocol:
* - Sends new content if it exists.
* - Sends an empty data message otherwise.
* - Sends an empty data message with `{ known: false }` in the message if the `coValue` is unknown by local node.
*
* Handler initiates a new "pull" requests to load the coValue from peers if it is not known by the node.
*/
export class PullRequestHandler extends AbstractMessageHandler {
constructor(private readonly loadService: LoadService) {
super();
}
async handleAvailable(input: PullMessageHandlerInput): Promise<unknown> {
const { msg, peer, entry } = input;
const { coValue } = entry.state as CoValueAvailableState;
return peer.send.data({
peerKnownState: msg,
coValue,
});
}
async handleLoading(input: PullMessageHandlerInput): Promise<unknown> {
// We need to wait for the CoValue to be loaded that would resolve the CoValue as available.
await input.entry.getCoValue();
return this.routeMessage(input);
}
async handleUnknown(input: PullMessageHandlerInput): Promise<unknown> {
const { msg, peer, entry } = input;
// Initiate a new PULL flow
// If the coValue is known by peer then we try to load it from the sender as well
if (msg.header) {
void this.loadService.loadCoValue(entry, peer);
}
return peer.send.data({
peerKnownState: msg,
coValue: "unknown",
});
}
}

View File

@@ -0,0 +1,83 @@
import { CoValueCore, isTryAddTransactionsException } from "../coValueCore.js";
import { CoValueAvailableState } from "../coValueEntry.js";
import { LocalNode } from "../exports.js";
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
import { DependencyService } from "./DependencyService.js";
import { SyncService } from "./SyncService.js";
import { PushMessageHandlerInput, emptyKnownState } from "./types.js";
export class PushRequestHandler extends AbstractMessageHandler {
constructor(
protected readonly syncService: SyncService,
protected readonly dependencyService: DependencyService,
) {
super();
}
async handleAvailable(input: PushMessageHandlerInput): Promise<unknown> {
const { coValue } = input.entry.state as CoValueAvailableState;
await this.dependencyService.loadUnknownDependencies(input);
return this.addData(coValue, input);
}
async handleUnknown(input: PushMessageHandlerInput) {
const { msg, entry, peer } = input;
if (!msg.header) {
console.error(`Unexpected unavailable state for coValue ${input.msg.id}`);
}
entry.moveToLoadingState([peer]);
return this.routeMessage(input);
}
async handleLoading(input: PushMessageHandlerInput) {
if (!input.msg.header) {
console.error(`Unexpected loading state for coValue ${input.msg.id}`);
return;
}
await this.dependencyService.MakeAvailableWithDependencies(input);
return this.routeMessage(input);
}
private async addData(coValue: CoValueCore, input: PushMessageHandlerInput) {
const { msg, peer, entry } = input;
const knownState = coValue.knownState();
const isEmptyKnownState =
!knownState.header ||
!knownState.sessions ||
!Object.keys(knownState.sessions).length;
const assumedPeerKnownState = isEmptyKnownState
? emptyKnownState(knownState.id)
: { ...knownState };
try {
const anyMissedTransaction = coValue.addNewContent(msg);
anyMissedTransaction
? await peer.send.pull({ knownState: coValue.knownState() })
: await peer.send.ack({ knownState: coValue.knownState() });
} catch (e) {
if (isTryAddTransactionsException(e)) {
const { message, error } = e;
console.error(peer.id, message, error);
peer.erroredCoValues.set(msg.id, error);
} else {
console.error("Unknown error", peer.id, e);
}
return;
}
const peers = LocalNode.peers.getInPriorityOrder({
excludedId: peer.id,
});
await this.syncService.syncCoValue(entry, assumedPeerKnownState, peers);
}
}

View File

@@ -0,0 +1,68 @@
import { CoValuesStore } from "../CoValuesStore.js";
import { CoValueEntry } from "../coValueEntry.js";
import { LocalNode } from "../exports.js";
import { PeerEntry, PeerID } from "../peer/index.js";
import { CoValueKnownState, emptyKnownState } from "./types.js";
export class SyncService {
constructor(
private readonly onPushContent?: ({
entry,
peerId,
}: { entry: CoValueEntry; peerId: PeerID }) => void,
) {}
/**
* Sends "push" request to peers to broadcast all known coValues state
* and request to subscribe to those coValues updates (if have not)
*/
async initialSync(
peer: PeerEntry,
coValuesStore: CoValuesStore,
): Promise<void> {
const ids = coValuesStore.getOrderedIds();
for (const id of ids) {
const coValue = coValuesStore.expectCoValueLoaded(id);
// Previously we used to send load + content, see transformOutgoingMessageToPeer()
await peer.send.push({
peerKnownState: emptyKnownState(id),
coValue,
});
// TODO should be moved inside peer.send.push
if (this.onPushContent) {
const entry = coValuesStore.get(coValue.id);
this.onPushContent({ entry, peerId: peer.id });
}
}
}
/**
* Sends "push" request to peers to broadcast the new known coValue state and request to subscribe to updates if have not
*/
async syncCoValue(
entry: CoValueEntry,
peerKnownState: CoValueKnownState,
peers?: PeerEntry[],
) {
if (entry.state.type !== "available") {
throw new Error(`Can't sync unavailable coValue ${peerKnownState.id}`);
}
const peersToSync = peers || LocalNode.peers.getInPriorityOrder();
for (const peer of peersToSync) {
if (peer.erroredCoValues.has(entry.id)) continue;
await peer.send.push({
peerKnownState,
coValue: entry.state.coValue,
});
// TODO should be moved inside peer.send.push
if (this.onPushContent) {
this.onPushContent({ entry, peerId: peer.id });
}
}
}
}

View File

@@ -0,0 +1,7 @@
export * from "./types.js";
export * from "./DataResponseHandler.js";
export * from "./PushRequestHandler.js";
export * from "./PullRequestHandler.js";
export * from "./AckResponseHandler.js";
export * from "./SyncService.js";
export * from "./LoadService.js";

View File

@@ -0,0 +1,94 @@
import { CoValueHeader, SessionNewContent } from "../coValueCore.js";
import { CoValueEntry } from "../coValueEntry.js";
import { RawCoID, SessionID } from "../ids.js";
import { PeerEntry } from "../peer/PeerEntry.js";
import { CoValuePriority } from "../priority.js";
export type CoValueKnownState = {
id: RawCoID;
// Is coValue known by peer
header: boolean;
// Number of known sessions
sessions: { [sessionID: SessionID]: number };
};
export function emptyKnownState(id: RawCoID): CoValueKnownState {
return {
id,
header: false,
sessions: {},
};
}
export type SyncMessage =
| LoadMessage
| KnownStateMessage
| NewContentMessage
| PullMessage
| PushMessage
| AckMessage
| DataMessage;
export type LoadMessage = {
action: "load";
} & CoValueKnownState;
export type PullMessage = {
action: "pull";
} & CoValueKnownState;
export type KnownStateMessage = {
action: "known";
asDependencyOf?: RawCoID;
isCorrection?: boolean;
} & CoValueKnownState;
export type AckMessage = {
action: "ack";
} & CoValueKnownState;
export type CoValueContent = {
id: RawCoID;
header?: CoValueHeader;
priority: CoValuePriority;
new: {
[sessionID: SessionID]: SessionNewContent;
};
};
export type NewContentMessage = {
action: "content";
} & CoValueContent;
export type DataMessage = {
known: boolean;
action: "data";
asDependencyOf?: RawCoID;
} & CoValueContent;
export type PushMessage = {
action: "push";
asDependencyOf?: RawCoID;
} & CoValueContent;
export type MessageHandlerInput = {
msg: SyncMessage;
peer: PeerEntry;
entry: CoValueEntry;
};
export type PushMessageHandlerInput = {
msg: PushMessage;
peer: PeerEntry;
entry: CoValueEntry;
};
export type DataMessageHandlerInput = {
msg: DataMessage;
peer: PeerEntry;
entry: CoValueEntry;
};
export interface MessageHandlerInterface {
handle({ msg, peer, entry }: MessageHandlerInput): void;
}

View File

@@ -1,7 +1,8 @@
import { describe, expect, test, vi } from "vitest";
import { PeerKnownStates } from "../PeerKnownStates.js";
import { RawCoID, SessionID } from "../ids.js";
import { CoValueKnownState, emptyKnownState } from "../sync.js";
import { CoValueKnownState, emptyKnownState } from "../sync/types.js";
describe("PeerKnownStates", () => {
test("should set and get a known state", () => {

View File

@@ -1,8 +1,10 @@
import { describe, expect, test, vi } from "vitest";
import { PeerKnownStateActions } from "../PeerKnownStates.js";
import { PeerState } from "../PeerState.js";
import { Peer } from "../localNode.js";
import { PeerEntry } from "../peer/PeerEntry.js";
import { CO_VALUE_PRIORITY } from "../priority.js";
import { Peer, SyncMessage } from "../sync.js";
import { SyncMessage } from "../sync/types.js";
function setup() {
const mockPeer: Peer = {
@@ -16,7 +18,7 @@ function setup() {
close: vi.fn(),
},
};
const peerState = new PeerState(mockPeer, undefined);
const peerState = new PeerEntry(mockPeer, undefined);
return { mockPeer, peerState };
}
@@ -160,7 +162,7 @@ describe("PeerState", () => {
};
peerState.dispatchToKnownStates(action);
const newPeerState = new PeerState(mockPeer, peerState.knownStates);
const newPeerState = new PeerEntry(mockPeer, peerState.knownStates);
expect(newPeerState.knownStates).toEqual(peerState.knownStates);
expect(newPeerState.optimisticKnownStates).toEqual(peerState.knownStates);

View File

@@ -1,7 +1,8 @@
import { describe, expect, test } from "vitest";
import { PriorityBasedMessageQueue } from "../PriorityBasedMessageQueue.js";
import { CO_VALUE_PRIORITY } from "../priority.js";
import { SyncMessage } from "../sync.js";
import { SyncMessage } from "../sync/types.js";
function setup() {
const queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.MEDIUM);
@@ -67,9 +68,9 @@ describe("PriorityBasedMessageQueue", () => {
void queue.push(mediumPriorityMsg);
void queue.push(highPriorityMsg);
expect(queue.pull()?.msg).toEqual(highPriorityMsg);
expect(queue.pull()?.msg).toEqual(mediumPriorityMsg);
expect(queue.pull()?.msg).toEqual(lowPriorityMsg);
expect(queue.pull()?.content).toEqual(highPriorityMsg);
expect(queue.pull()?.content).toEqual(mediumPriorityMsg);
expect(queue.pull()?.content).toEqual(lowPriorityMsg);
});
test("should return undefined when pulling from empty queue", () => {

View File

@@ -4,7 +4,7 @@ import {
PeerSyncStateListenerCallback,
} from "../SyncStateSubscriptionManager.js";
import { connectedPeers } from "../streamUtils.js";
import { emptyKnownState } from "../sync.js";
import { emptyKnownState } from "../sync/types.js";
import { createTestNode, waitFor } from "./testUtils.js";
describe("SyncStateSubscriptionManager", () => {

View File

@@ -59,7 +59,7 @@ test("Can create account with one node, and then load it on another", async () =
console.log("After connected peers");
node.syncManager.addPeer(node2asPeer);
node.addPeer(node2asPeer);
const node2 = await LocalNode.withLoadedAccount({
accountID,

View File

@@ -1,15 +1,16 @@
import { describe, expect, test, vi } from "vitest";
import { PeerState } from "../PeerState";
import { CoValueCore } from "../coValueCore";
import { CO_VALUE_LOADING_MAX_RETRIES, CoValueState } from "../coValueState";
import { CO_VALUE_LOADING_MAX_RETRIES, CoValueEntry } from "../coValueEntry.js";
import { RawCoID } from "../ids";
import { Peer } from "../sync";
import { PeerEntry } from "../peer/PeerEntry.js";
import { Peer } from "../localNode.js";
describe("CoValueState", () => {
const mockCoValueId = "co_test123" as RawCoID;
test("should create unknown state", () => {
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
expect(state.id).toBe(mockCoValueId);
expect(state.state.type).toBe("unknown");
@@ -17,7 +18,7 @@ describe("CoValueState", () => {
test("should create loading state", () => {
const peerIds = ["peer1", "peer2"];
const state = CoValueState.Loading(mockCoValueId, peerIds);
const state = CoValueEntry.Loading(mockCoValueId, peerIds);
expect(state.id).toBe(mockCoValueId);
expect(state.state.type).toBe("loading");
@@ -25,7 +26,7 @@ describe("CoValueState", () => {
test("should create available state", async () => {
const mockCoValue = createMockCoValueCore(mockCoValueId);
const state = CoValueState.Available(mockCoValue);
const state = CoValueEntry.Available(mockCoValue);
expect(state.id).toBe(mockCoValueId);
expect(state.state.type).toBe("available");
@@ -35,7 +36,7 @@ describe("CoValueState", () => {
test("should handle found action", async () => {
const mockCoValue = createMockCoValueCore(mockCoValueId);
const state = CoValueState.Loading(mockCoValueId, ["peer1", "peer2"]);
const state = CoValueEntry.Loading(mockCoValueId, ["peer1", "peer2"]);
const stateValuePromise = state.getCoValue();
@@ -50,7 +51,7 @@ describe("CoValueState", () => {
});
test("should ignore actions when not in loading state", () => {
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
state.dispatch({
type: "not-found-in-peer",
@@ -87,9 +88,9 @@ describe("CoValueState", () => {
});
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
@@ -140,9 +141,9 @@ describe("CoValueState", () => {
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
@@ -189,9 +190,9 @@ describe("CoValueState", () => {
});
},
);
const mockPeers = [peer1, peer2] as unknown as PeerState[];
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
@@ -239,9 +240,9 @@ describe("CoValueState", () => {
},
);
const mockPeers = [peer1] as unknown as PeerState[];
const mockPeers = [peer1] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
@@ -273,9 +274,9 @@ describe("CoValueState", () => {
},
);
const mockPeers = [peer1] as unknown as PeerState[];
const mockPeers = [peer1] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
@@ -322,9 +323,9 @@ describe("CoValueState", () => {
},
);
const mockPeers = [peer1] as unknown as PeerState[];
const mockPeers = [peer1] as unknown as PeerEntry[];
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers(mockPeers);
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
@@ -369,7 +370,7 @@ describe("CoValueState", () => {
},
);
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1, peer2]);
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
@@ -418,7 +419,7 @@ describe("CoValueState", () => {
peer1.closed = true;
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1, peer2]);
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
@@ -446,7 +447,7 @@ describe("CoValueState", () => {
async () => {},
);
const state = CoValueState.Unknown(mockCoValueId);
const state = CoValueEntry.Unknown(mockCoValueId);
const loadPromise = state.loadFromPeers([peer1]);
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES * 2; i++) {
@@ -467,7 +468,7 @@ function createMockPeerState(
peer: Partial<Peer>,
pushFn = () => Promise.resolve(),
) {
const peerState = new PeerState(
const peerState = new PeerEntry(
{
id: "peer",
role: "server",

View File

@@ -9,7 +9,7 @@ import { stableStringify } from "../jsonStringify.js";
import { LocalNode } from "../localNode.js";
import { getPriorityFromHeader } from "../priority.js";
import { connectedPeers, newQueuePair } from "../streamUtils.js";
import { SyncMessage } from "../sync.js";
import { SyncMessage } from "../sync/types.js";
import {
createTestNode,
randomAnonymousAccountAndSessionID,
@@ -32,7 +32,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -113,7 +113,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -188,7 +188,7 @@ test("After subscribing, node sends own known state and new txs to peer", async
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -304,7 +304,7 @@ test("Client replies with known new content to tellKnownState from server", asyn
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -378,7 +378,7 @@ test("No matter the optimistic known state, node respects invalid known state me
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -478,7 +478,7 @@ test("If we add a peer, but it never subscribes to a coValue, it won't get any m
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -514,7 +514,7 @@ test.todo(
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -587,7 +587,7 @@ test.skip("If we add a server peer, newly created coValues are auto-subscribed t
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -643,7 +643,7 @@ test("When we connect a new server peer, we try to sync all existing coValues to
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -679,7 +679,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe
const [outRx, outTx] = newQueuePair();
const outRxQ = outRx[Symbol.asyncIterator]();
node.syncManager.addPeer({
node.addPeer({
id: "test",
incoming: inRx,
outgoing: outTx,
@@ -718,7 +718,7 @@ test.skip("When replaying creation and transactions of a coValue as new content,
const [outRx1, outTx1] = newQueuePair();
const outRxQ1 = outRx1[Symbol.asyncIterator]();
node1.syncManager.addPeer({
node1.addPeer({
id: "test2",
incoming: inRx1,
outgoing: outTx1,
@@ -736,7 +736,7 @@ test.skip("When replaying creation and transactions of a coValue as new content,
const [outRx2, outTx2] = newQueuePair();
const outRxQ2 = outRx2[Symbol.asyncIterator]();
node2.syncManager.addPeer({
node2.addPeer({
id: "test1",
incoming: inRx2,
outgoing: outTx2,
@@ -879,8 +879,8 @@ test("Can sync a coValue through a server to another client", async () => {
},
);
client1.syncManager.addPeer(serverAsPeerForClient1);
server.syncManager.addPeer(client1AsPeer);
client1.addPeer(serverAsPeerForClient1);
server.addPeer(client1AsPeer);
const client2 = new LocalNode(
admin,
@@ -898,8 +898,8 @@ test("Can sync a coValue through a server to another client", async () => {
},
);
client2.syncManager.addPeer(serverAsPeerForClient2);
server.syncManager.addPeer(client2AsPeer);
client2.addPeer(serverAsPeerForClient2);
server.addPeer(client2AsPeer);
const mapOnClient2 = await client2.loadCoValueCore(map.core.id);
if (mapOnClient2 === "unavailable") {
@@ -931,8 +931,8 @@ test("Can sync a coValue with private transactions through a server to another c
peer2role: "client",
});
client1.syncManager.addPeer(serverAsPeer);
server.syncManager.addPeer(client1AsPeer);
client1.addPeer(serverAsPeer);
server.addPeer(client1AsPeer);
const client2 = new LocalNode(
admin,
@@ -950,8 +950,8 @@ test("Can sync a coValue with private transactions through a server to another c
},
);
client2.syncManager.addPeer(serverAsOtherPeer);
server.syncManager.addPeer(client2AsPeer);
client2.addPeer(serverAsOtherPeer);
server.addPeer(client2AsPeer);
const mapOnClient2 = await client2.loadCoValueCore(map.core.id);
if (mapOnClient2 === "unavailable") {
@@ -1098,13 +1098,13 @@ test("If we start loading a coValue before connecting to a peer that has it, it
// trace: true,
});
node1.syncManager.addPeer(node2asPeer);
node1.addPeer(node2asPeer);
const mapOnNode2Promise = node2.loadCoValueCore(map.core.id);
expect(node2.coValuesStore.get(map.core.id).state.type).toEqual("unknown");
node2.syncManager.addPeer(node1asPeer);
node2.addPeer(node1asPeer);
const mapOnNode2 = await mapOnNode2Promise;
if (mapOnNode2 === "unavailable") {
@@ -1190,8 +1190,8 @@ describe("sync - extra tests", () => {
peer2role: "client",
});
node1.syncManager.addPeer(node2AsPeer);
node2.syncManager.addPeer(node1AsPeer);
node1.addPeer(node2AsPeer);
node2.addPeer(node1AsPeer);
// Wait for initial sync
await new Promise((resolve) => setTimeout(resolve, 100));
@@ -1486,12 +1486,12 @@ describe("sync - extra tests", () => {
},
);
node1.syncManager.addPeer(node2AsPeerFor1);
node1.syncManager.addPeer(node3AsPeerFor1);
node2.syncManager.addPeer(node1AsPeerFor2);
node2.syncManager.addPeer(node3AsPeerFor2);
node3.syncManager.addPeer(node1AsPeerFor3);
node3.syncManager.addPeer(node2AsPeerFor3);
node1.addPeer(node2AsPeerFor1);
node1.addPeer(node3AsPeerFor1);
node2.addPeer(node1AsPeerFor2);
node2.addPeer(node3AsPeerFor2);
node3.addPeer(node1AsPeerFor3);
node3.addPeer(node2AsPeerFor3);
// Wait for initial sync
await new Promise((resolve) => setTimeout(resolve, 100));
@@ -1587,10 +1587,10 @@ describe("sync - extra tests", () => {
},
);
node1.syncManager.addPeer(newNode3AsPeerFor1);
node2.syncManager.addPeer(newNode3AsPeerFor2);
node3.syncManager.addPeer(newNode1AsPeerFor3);
node3.syncManager.addPeer(newNode2AsPeerFor3);
node1.addPeer(newNode3AsPeerFor1);
node2.addPeer(newNode3AsPeerFor2);
node3.addPeer(newNode1AsPeerFor3);
node3.addPeer(newNode2AsPeerFor3);
// Wait for re-sync
await new Promise((resolve) => setTimeout(resolve, 200));

View File

@@ -1,11 +1,10 @@
import { expect } from "vitest";
import { ControlledAgent } from "../coValues/account.js";
import { WasmCrypto } from "../crypto/WasmCrypto.js";
import { CoID, RawCoValue } from "../exports.js";
import { CoID, Peer, RawCoValue } from "../exports.js";
import { SessionID } from "../ids.js";
import { LocalNode } from "../localNode.js";
import { connectedPeers } from "../streamUtils.js";
import { Peer } from "../sync.js";
import { expectGroup } from "../typeUtils/expectGroup.js";
const Crypto = await WasmCrypto.create();
@@ -98,12 +97,12 @@ export function createThreeConnectedNodes(
},
);
node1.syncManager.addPeer(node1ToNode2Peer);
node1.syncManager.addPeer(node1ToNode3Peer);
node2.syncManager.addPeer(node2ToNode1Peer);
node2.syncManager.addPeer(node2ToNode3Peer);
node3.syncManager.addPeer(node3ToNode1Peer);
node3.syncManager.addPeer(node3ToNode2Peer);
node1.addPeer(node1ToNode2Peer);
node1.addPeer(node1ToNode3Peer);
node2.addPeer(node2ToNode1Peer);
node2.addPeer(node2ToNode3Peer);
node3.addPeer(node3ToNode1Peer);
node3.addPeer(node3ToNode2Peer);
return {
node1,

View File

@@ -0,0 +1,59 @@
import { RawCoID } from "../ids.js";
type DeferredFn = () => Promise<unknown>;
export class ParallelQueueRunner {
private queueIds: Map<RawCoID, { queue: DeferredFn[]; locked: boolean }> =
new Map();
defferPer(queueId: RawCoID, fn: () => Promise<unknown>) {
const item = this.queueIds.get(queueId);
if (item) {
item.queue.push(fn);
} else {
this.queueIds.set(queueId, { queue: [fn], locked: false });
}
void this.processQueue(queueId);
}
private async processQueue(queueId: RawCoID) {
const queueEntry = this.queueIds.get(queueId)!;
if (queueEntry.locked) return;
queueEntry.locked = true;
while (queueEntry.queue.length) {
try {
await queueEntry.queue.shift()!();
} catch (e) {
console.error(`Error while processing queue for ${queueId} ${e}`);
}
}
queueEntry.locked = false;
}
}
// Test it
// function sleep(ms: number) {
// return new Promise((resolve) => setTimeout(resolve, ms));
// }
//
// function test(id: RawCoID, msg: string, ms: number) {
// queue.deffer(id, async () => {
// console.log(id, msg, "start");
// await sleep(ms);
// console.log(id, msg, "end");
// });
// }
//
// const queue = new QueueRunner();
//
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "1", 400);
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "2", 300);
// test("111", "1", 200);
// test("111", "2", 200);
// test("111", "3", 200);
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "3", 200);
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "4", 200);
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "5", 200);

View File

@@ -71,7 +71,7 @@ export async function createJazzBrowserContext<Acc extends Account>(
options.reconnectionTimeout,
(peer) => {
if (node) {
node.syncManager.addPeer(peer);
node.addPeer(peer);
}
},
);

View File

@@ -54,7 +54,7 @@ export async function startWorker<Acc extends Account>({
});
setInterval(async () => {
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
if (!worker._raw.core.node.peers.get("upstream")) {
console.log(new Date(), "Reconnecting to upstream " + peer);
const wsPeer: Peer = createWebSocketPeer({
@@ -63,7 +63,7 @@ export async function startWorker<Acc extends Account>({
role: "server",
});
worker._raw.core.node.syncManager.addPeer(wsPeer);
worker._raw.core.node.addPeer(wsPeer);
}
}, 5000);

View File

@@ -102,9 +102,7 @@ export async function createJazzRNContext<Acc extends Account>(
async function websocketReconnectLoop() {
while (shouldTryToReconnect) {
if (
Object.keys(node.syncManager.peers).some((peerId) =>
peerId.includes(options.peer),
)
Object.keys(node.peers).some((peerId) => peerId.includes(options.peer))
) {
// TODO: this might drain battery, use listeners instead
await new Promise((resolve) => setTimeout(resolve, 100));
@@ -130,7 +128,7 @@ export async function createJazzRNContext<Acc extends Account>(
);
});
node.syncManager.addPeer(
node.addPeer(
createWebSocketPeer({
websocket: new WebSocket(options.peer),
id: options.peer + "@" + new Date().toISOString(),

View File

@@ -1,9 +1,7 @@
import { CoValueCore, Profile } from "cojson";
import { emptyKnownState } from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import {
Account,
CoMap,
Peer,
WasmCrypto,
createJazzContext,
isControlledAccount,
@@ -41,8 +39,11 @@ export const createWorkerAccount = async ({
const syncManager = account._raw.core.node.syncManager;
await Promise.all([
syncManager.syncCoValue(accountCoValue),
syncManager.syncCoValue(accountProfileCoValue),
syncManager.syncCoValue(accountCoValue, emptyKnownState(accountCoValue.id)),
syncManager.syncCoValue(
accountProfileCoValue,
emptyKnownState(accountProfileCoValue.id),
),
]);
await Promise.race([

View File

@@ -4,7 +4,7 @@ import { WebSocketServer } from "ws";
import { mkdir } from "node:fs/promises";
import { dirname } from "node:path";
import { SQLiteStorage } from "cojson-storage-sqlite";
import { SQLiteNode, SQLiteStorage } from "cojson-storage-sqlite";
import { createWebSocketPeer } from "cojson-transport-ws";
export const startSyncServer = async ({
@@ -38,9 +38,10 @@ export const startSyncServer = async ({
if (!inMemory) {
await mkdir(dirname(db), { recursive: true });
SQLiteNode.USE_PROTOCOL2 = true;
const storage = await SQLiteStorage.asPeer({ filename: db });
localNode.syncManager.addPeer(storage);
localNode.addPeer(storage);
}
wss.on("connection", function connection(ws, req) {
@@ -66,7 +67,7 @@ export const startSyncServer = async ({
const clientId = clientAddress + "@" + new Date().toISOString();
localNode.syncManager.addPeer(
localNode.addPeer(
createWebSocketPeer({
id: clientId,
role: "client",

View File

@@ -204,7 +204,7 @@ export class Account extends CoValueBase implements CoValue {
{ peer1role: "server", peer2role: "client" },
);
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);
as._raw.core.node.addPeer(connectedPeers[1]);
return this.create<A>({
creationProps: options.creationProps,

View File

@@ -265,7 +265,7 @@ export async function createAnonymousJazzContext({
);
for (const peer of peersToLoadFrom) {
node.syncManager.addPeer(peer);
node.addPeer(peer);
}
return {

View File

@@ -1,4 +1,4 @@
import type { RawCoValue } from "cojson";
import { RawCoValue } from "cojson";
import type {
Account,
AnonymousJazzAgent,
@@ -86,7 +86,7 @@ export class SubscriptionScope<Root extends CoValue> {
this.subscriber._type === "Account"
? this.subscriber._raw.core.node
: this.subscriber.node;
void node.loadCoValueCore(accessedOrSetId).then((core) => {
void node.load(accessedOrSetId, true).then((core) => {
if (loadingEntry.state === "loading" && loadingEntry.immediatelyUnsub) {
return;
}

View File

@@ -92,7 +92,7 @@ describe("CoFeed resolution", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,
@@ -174,7 +174,7 @@ describe("CoFeed resolution", async () => {
peer2role: "client",
});
me._raw.core.node.syncManager.addPeer(secondAsPeer);
me._raw.core.node.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -305,7 +305,7 @@ describe("FileStream loading & Subscription", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondAsPeer);
me._raw.core.node.addPeer(secondAsPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,
@@ -333,7 +333,7 @@ describe("FileStream loading & Subscription", async () => {
peer1role: "server",
peer2role: "client",
});
me._raw.core.node.syncManager.addPeer(secondAsPeer);
me._raw.core.node.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}

View File

@@ -155,7 +155,7 @@ describe("CoList resolution", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,
@@ -226,7 +226,7 @@ describe("CoList resolution", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,

View File

@@ -382,7 +382,7 @@ describe("CoMap resolution", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,
@@ -453,7 +453,7 @@ describe("CoMap resolution", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondAsPeer);
me._raw.core.node.addPeer(secondAsPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,

View File

@@ -48,7 +48,7 @@ describe("Deep loading with depth arg", async () => {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,
@@ -260,7 +260,7 @@ test("Deep loading a record-like coMap", async () => {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,

View File

@@ -25,7 +25,7 @@ export async function setupAccount() {
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
me._raw.core.node.addPeer(secondPeer);
const { account: meOnSecondPeer } = await createJazzContext({
auth: fixedCredentialsAuth({
accountID: me.id,