Compare commits
43 Commits
gio/queue-
...
poc
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0589bf275f | ||
|
|
2f6ca4cdf4 | ||
|
|
cfbe745de3 | ||
|
|
7b96fd719b | ||
|
|
c391180093 | ||
|
|
8ea0858571 | ||
|
|
d1c0981024 | ||
|
|
bb0158ec51 | ||
|
|
8a40c02c52 | ||
|
|
b95937511f | ||
|
|
1967c736c5 | ||
|
|
b84edecb50 | ||
|
|
5631d53e3f | ||
|
|
244fd84a88 | ||
|
|
d4e93afdf9 | ||
|
|
26d4fa985c | ||
|
|
cefc6e27de | ||
|
|
452c284a01 | ||
|
|
191a7f33b1 | ||
|
|
d7be246f75 | ||
|
|
5c8717543c | ||
|
|
02cd6fe4b7 | ||
|
|
526a26a39d | ||
|
|
60adbffc26 | ||
|
|
d8cabe3fa6 | ||
|
|
928ac67a06 | ||
|
|
0458e12721 | ||
|
|
df59b53000 | ||
|
|
891baf2053 | ||
|
|
3a55c8a627 | ||
|
|
5a9770242f | ||
|
|
f6bbe18a53 | ||
|
|
0712546277 | ||
|
|
14b70aa445 | ||
|
|
47275a1340 | ||
|
|
ca54b4c1a8 | ||
|
|
86a2c914d3 | ||
|
|
d9c250386e | ||
|
|
73d5f18cb8 | ||
|
|
84e17a9189 | ||
|
|
0e8b04579a | ||
|
|
56a967cce6 | ||
|
|
f823b2a307 |
@@ -6,7 +6,7 @@ import { Account, CoValue, ID } from "jazz-tools";
|
||||
|
||||
export function waitForUpload(id: ID<CoValue>, me: Account) {
|
||||
const syncManager = me._raw.core.node.syncManager;
|
||||
const peers = syncManager.getPeers();
|
||||
const peers = me._raw.core.node.peers.getAll();
|
||||
|
||||
return Promise.all(
|
||||
peers.map((peer) => syncManager.waitForUploadIntoPeer(peer.id, id)),
|
||||
|
||||
@@ -24,6 +24,7 @@ const peer =
|
||||
"peer",
|
||||
) as `ws://${string}`) ??
|
||||
"wss://cloud.jazz.tools/?key=pets-example-jazz@garden.co";
|
||||
// "ws://localhost:4200/?key=pets-example-jazz@gcmp.io";
|
||||
|
||||
/** Walkthrough: The top-level provider `<Jazz.Provider/>`
|
||||
*
|
||||
|
||||
@@ -15,7 +15,7 @@ test("Should be able to initialize and load from empty DB", async () => {
|
||||
Crypto,
|
||||
);
|
||||
|
||||
node.syncManager.addPeer(await IDBStorage.asPeer({ trace: true }));
|
||||
node.addPeer(await IDBStorage.asPeer({ trace: true }));
|
||||
|
||||
console.log("yay!");
|
||||
|
||||
@@ -23,7 +23,7 @@ test("Should be able to initialize and load from empty DB", async () => {
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
expect(node.syncManager.peers["indexedDB"]).toBeDefined();
|
||||
expect(LocalNode.peers.get("indexedDB")).toBeDefined();
|
||||
});
|
||||
|
||||
test("Should be able to sync data to database and then load that from a new node", async () => {
|
||||
@@ -35,7 +35,7 @@ test("Should be able to sync data to database and then load that from a new node
|
||||
Crypto,
|
||||
);
|
||||
|
||||
node1.syncManager.addPeer(
|
||||
node1.addPeer(
|
||||
await IDBStorage.asPeer({ trace: true, localNodeName: "node1" }),
|
||||
);
|
||||
|
||||
@@ -55,7 +55,7 @@ test("Should be able to sync data to database and then load that from a new node
|
||||
Crypto,
|
||||
);
|
||||
|
||||
node2.syncManager.addPeer(
|
||||
node2.addPeer(
|
||||
await IDBStorage.asPeer({ trace: true, localNodeName: "node2" }),
|
||||
);
|
||||
|
||||
@@ -63,6 +63,7 @@ test("Should be able to sync data to database and then load that from a new node
|
||||
if (map2 === "unavailable") {
|
||||
throw new Error("Map is unavailable");
|
||||
}
|
||||
|
||||
expect(map2.get("hello")).toBe("world");
|
||||
// TODO fixme
|
||||
// expect(map2.get("hello")).toBe("world");
|
||||
expect(false).toBeTruthy();
|
||||
});
|
||||
|
||||
@@ -25,11 +25,9 @@ export type RawTransactionRow = {
|
||||
|
||||
export class SQLiteClient implements DBClientInterface {
|
||||
private readonly db: DatabaseT;
|
||||
private readonly toLocalNode: OutgoingSyncQueue;
|
||||
|
||||
constructor(db: DatabaseT, toLocalNode: OutgoingSyncQueue) {
|
||||
constructor(db: DatabaseT) {
|
||||
this.db = db;
|
||||
this.toLocalNode = toLocalNode;
|
||||
}
|
||||
|
||||
getCoValue(coValueId: RawCoID): StoredCoValueRow | undefined {
|
||||
|
||||
@@ -3,12 +3,45 @@ import {
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
Peer,
|
||||
SyncMessage,
|
||||
cojsonInternals,
|
||||
} from "cojson";
|
||||
import { SyncManager, TransactionRow } from "cojson-storage";
|
||||
import { SQLiteClient } from "./sqliteClient.js";
|
||||
import {
|
||||
transformIncomingMessageFromPeer,
|
||||
transformOutgoingMessageToPeer,
|
||||
} from "./transformers.js";
|
||||
|
||||
/**
|
||||
* This is to transform outgoing message into older protocol message(s) for backward compatibility
|
||||
* TODO To be removed after the protocol is updated in the sync server
|
||||
*/
|
||||
class LocalNodeWrapper {
|
||||
constructor(private queue: OutgoingSyncQueue) {}
|
||||
|
||||
push(msg: SyncMessage): Promise<unknown> {
|
||||
const transformedMessages = transformOutgoingMessageToPeer(msg);
|
||||
transformedMessages.map((transformedMessage) => {
|
||||
// console.log("🔴 <<<=== SQLite is sending", transformedMessage);
|
||||
});
|
||||
|
||||
return Promise.all(
|
||||
transformedMessages.map((transformedMessage) => {
|
||||
return this.queue.push(transformedMessage);
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
close() {
|
||||
return this.queue.close();
|
||||
}
|
||||
}
|
||||
|
||||
export class SQLiteNode {
|
||||
// ugly public static var to be deleted after new protocol is in effect on all peers
|
||||
public static USE_PROTOCOL2 = false;
|
||||
|
||||
private readonly syncManager: SyncManager;
|
||||
private readonly dbClient: SQLiteClient;
|
||||
|
||||
@@ -17,8 +50,11 @@ export class SQLiteNode {
|
||||
fromLocalNode: IncomingSyncStream,
|
||||
toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
this.dbClient = new SQLiteClient(db, toLocalNode);
|
||||
this.syncManager = new SyncManager(this.dbClient, toLocalNode);
|
||||
this.dbClient = new SQLiteClient(db);
|
||||
this.syncManager = new SyncManager(
|
||||
this.dbClient,
|
||||
new LocalNodeWrapper(toLocalNode),
|
||||
);
|
||||
|
||||
const processMessages = async () => {
|
||||
for await (const msg of fromLocalNode) {
|
||||
@@ -26,7 +62,11 @@ export class SQLiteNode {
|
||||
if (msg === "Disconnected" || msg === "PingTimeout") {
|
||||
throw new Error("Unexpected Disconnected message");
|
||||
}
|
||||
await this.syncManager.handleSyncMessage(msg);
|
||||
// console.log("🟡 <<<=== SQLite is getting", msg);
|
||||
|
||||
await this.syncManager.handleSyncMessage(
|
||||
transformIncomingMessageFromPeer(msg),
|
||||
);
|
||||
} catch (e) {
|
||||
console.error(
|
||||
new Error(
|
||||
|
||||
83
packages/cojson-storage-sqlite/src/transformers.ts
Normal file
83
packages/cojson-storage-sqlite/src/transformers.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import {
|
||||
CojsonInternalTypes,
|
||||
SessionID,
|
||||
SyncMessage,
|
||||
unknownDataMessage,
|
||||
} from "cojson";
|
||||
import CoValueContent = CojsonInternalTypes.CoValueContent;
|
||||
import { SQLiteNode } from "./sqliteNode.js";
|
||||
|
||||
export const transformOutgoingMessageToPeer = (
|
||||
msg: SyncMessage,
|
||||
): SyncMessage[] => {
|
||||
if (SQLiteNode.USE_PROTOCOL2) {
|
||||
return [msg];
|
||||
}
|
||||
|
||||
const getSessionsObj = (msg: CoValueContent) =>
|
||||
Object.entries(msg.new).reduce<{ [sessionID: SessionID]: number }>(
|
||||
(acc, [session, content]) => {
|
||||
acc[session as SessionID] =
|
||||
content.after + content.newTransactions.length;
|
||||
return acc;
|
||||
},
|
||||
{},
|
||||
);
|
||||
|
||||
switch (msg.action) {
|
||||
case "pull":
|
||||
// load
|
||||
return [{ ...msg, action: "load" }];
|
||||
case "push":
|
||||
// load + content
|
||||
return [
|
||||
{
|
||||
action: "load",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
sessions: getSessionsObj(msg),
|
||||
},
|
||||
{ ...msg, action: "content" },
|
||||
];
|
||||
case "data":
|
||||
if (!msg.known)
|
||||
return [{ action: "known", id: msg.id, header: false, sessions: {} }];
|
||||
// known + content => no response expected
|
||||
return [
|
||||
{
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
sessions: getSessionsObj(msg),
|
||||
},
|
||||
{ ...msg, action: "content" },
|
||||
];
|
||||
case "ack":
|
||||
// known => no response expected
|
||||
return [{ ...msg, action: "known" }];
|
||||
default:
|
||||
return [msg];
|
||||
}
|
||||
};
|
||||
|
||||
export const transformIncomingMessageFromPeer = (
|
||||
msg: SyncMessage,
|
||||
): SyncMessage => {
|
||||
if (SQLiteNode.USE_PROTOCOL2) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
switch (msg.action) {
|
||||
case "load":
|
||||
return { ...msg, action: "pull" };
|
||||
case "content":
|
||||
return { ...msg, action: "push" };
|
||||
case "known":
|
||||
if (!msg.header) return unknownDataMessage(msg.id);
|
||||
|
||||
if (msg.isCorrection) return { ...msg, action: "pull" };
|
||||
return { ...msg, action: "ack" };
|
||||
default:
|
||||
return msg;
|
||||
}
|
||||
};
|
||||
@@ -6,16 +6,23 @@ import {
|
||||
SyncMessage,
|
||||
cojsonInternals,
|
||||
emptyKnownState,
|
||||
unknownDataMessage,
|
||||
} from "cojson";
|
||||
import { collectNewTxs, getDependedOnCoValues } from "./syncUtils.js";
|
||||
import { DBClientInterface, StoredSessionRow } from "./types.js";
|
||||
import NewContentMessage = CojsonInternalTypes.NewContentMessage;
|
||||
import KnownStateMessage = CojsonInternalTypes.KnownStateMessage;
|
||||
import {
|
||||
DBClientInterface,
|
||||
StoredCoValueRow,
|
||||
StoredSessionRow,
|
||||
} from "./types.js";
|
||||
import DataMessage = CojsonInternalTypes.DataMessage;
|
||||
import PushMessage = CojsonInternalTypes.PushMessage;
|
||||
import RawCoID = CojsonInternalTypes.RawCoID;
|
||||
|
||||
type OutputMessageMap = Record<
|
||||
RawCoID,
|
||||
{ knownMessage: KnownStateMessage; contentMessages?: NewContentMessage[] }
|
||||
{
|
||||
dataMessages: DataMessage[];
|
||||
}
|
||||
>;
|
||||
|
||||
export class SyncManager {
|
||||
@@ -29,29 +36,25 @@ export class SyncManager {
|
||||
|
||||
async handleSyncMessage(msg: SyncMessage) {
|
||||
switch (msg.action) {
|
||||
case "load":
|
||||
await this.handleLoad(msg);
|
||||
break;
|
||||
case "content":
|
||||
await this.handleContent(msg);
|
||||
break;
|
||||
case "pull":
|
||||
return this.handlePull(msg);
|
||||
case "data":
|
||||
return this.handleData(msg);
|
||||
case "push":
|
||||
return this.handlePush(msg);
|
||||
case "known":
|
||||
await this.handleKnown(msg);
|
||||
break;
|
||||
case "done":
|
||||
await this.handleDone(msg);
|
||||
break;
|
||||
return this.handleKnown(msg);
|
||||
}
|
||||
}
|
||||
|
||||
async handleSessionUpdate({
|
||||
sessionRow,
|
||||
peerKnownState,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
}: {
|
||||
sessionRow: StoredSessionRow;
|
||||
peerKnownState: CojsonInternalTypes.CoValueKnownState;
|
||||
newContentMessages: CojsonInternalTypes.NewContentMessage[];
|
||||
newDataMessages: CojsonInternalTypes.DataMessage[];
|
||||
}) {
|
||||
if (
|
||||
sessionRow.lastIdx <= (peerKnownState.sessions[sessionRow.sessionID] || 0)
|
||||
@@ -72,7 +75,7 @@ export class SyncManager {
|
||||
|
||||
collectNewTxs({
|
||||
newTxsInSession,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
sessionRow,
|
||||
signaturesAndIdxs,
|
||||
peerKnownState,
|
||||
@@ -80,6 +83,7 @@ export class SyncManager {
|
||||
});
|
||||
}
|
||||
|
||||
// actually, it's a handlePull method
|
||||
async sendNewContent(
|
||||
coValueKnownState: CojsonInternalTypes.CoValueKnownState,
|
||||
): Promise<void> {
|
||||
@@ -88,11 +92,9 @@ export class SyncManager {
|
||||
|
||||
// reverse it to send the top level id the last in the order
|
||||
const collectedMessages = Object.values(outputMessages).reverse();
|
||||
collectedMessages.forEach(({ knownMessage, contentMessages }) => {
|
||||
this.sendStateMessage(knownMessage);
|
||||
|
||||
contentMessages?.length &&
|
||||
contentMessages.forEach((msg) => this.sendStateMessage(msg));
|
||||
collectedMessages.forEach(({ dataMessages }) => {
|
||||
dataMessages?.length &&
|
||||
dataMessages.forEach((msg) => this.sendStateMessage(msg));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -108,12 +110,9 @@ export class SyncManager {
|
||||
const coValueRow = await this.dbClient.getCoValue(peerKnownState.id);
|
||||
|
||||
if (!coValueRow) {
|
||||
const emptyKnownMessage: KnownStateMessage = {
|
||||
action: "known",
|
||||
...emptyKnownState(peerKnownState.id),
|
||||
messageMap[peerKnownState.id] = {
|
||||
dataMessages: [unknownDataMessage(peerKnownState.id, asDependencyOf)],
|
||||
};
|
||||
asDependencyOf && (emptyKnownMessage.asDependencyOf = asDependencyOf);
|
||||
messageMap[peerKnownState.id] = { knownMessage: emptyKnownMessage };
|
||||
return messageMap;
|
||||
}
|
||||
|
||||
@@ -127,13 +126,15 @@ export class SyncManager {
|
||||
sessions: {},
|
||||
};
|
||||
|
||||
const newContentMessages: CojsonInternalTypes.NewContentMessage[] = [
|
||||
const newDataMessages: CojsonInternalTypes.DataMessage[] = [
|
||||
{
|
||||
action: "content",
|
||||
action: "data",
|
||||
known: true,
|
||||
id: coValueRow.id,
|
||||
header: coValueRow.header,
|
||||
new: {},
|
||||
priority: cojsonInternals.getPriorityFromHeader(coValueRow.header),
|
||||
asDependencyOf,
|
||||
},
|
||||
];
|
||||
|
||||
@@ -145,24 +146,23 @@ export class SyncManager {
|
||||
return this.handleSessionUpdate({
|
||||
sessionRow,
|
||||
peerKnownState,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
});
|
||||
}),
|
||||
);
|
||||
|
||||
const dependedOnCoValuesList = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
});
|
||||
|
||||
const knownMessage: KnownStateMessage = {
|
||||
action: "known",
|
||||
...newCoValueKnownState,
|
||||
};
|
||||
asDependencyOf && (knownMessage.asDependencyOf = asDependencyOf);
|
||||
// const knownMessage: KnownStateMessage = {
|
||||
// action: "known",
|
||||
// ...newCoValueKnownState,
|
||||
// };
|
||||
// asDependencyOf && (knownMessage.asDependencyOf = asDependencyOf);
|
||||
messageMap[newCoValueKnownState.id] = {
|
||||
knownMessage: knownMessage,
|
||||
contentMessages: newContentMessages,
|
||||
dataMessages: newDataMessages,
|
||||
};
|
||||
|
||||
await Promise.all(
|
||||
@@ -182,11 +182,11 @@ export class SyncManager {
|
||||
return messageMap;
|
||||
}
|
||||
|
||||
handleLoad(msg: CojsonInternalTypes.LoadMessage) {
|
||||
handlePull(msg: CojsonInternalTypes.PullMessage) {
|
||||
return this.sendNewContent(msg);
|
||||
}
|
||||
|
||||
async handleContent(msg: CojsonInternalTypes.NewContentMessage) {
|
||||
async handlePush(msg: PushMessage) {
|
||||
const coValueRow = await this.dbClient.getCoValue(msg.id);
|
||||
|
||||
// We have no info about coValue header
|
||||
@@ -194,14 +194,59 @@ export class SyncManager {
|
||||
|
||||
if (invalidAssumptionOnHeaderPresence) {
|
||||
return this.sendStateMessage({
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
isCorrection: true,
|
||||
...emptyKnownState(msg.id),
|
||||
action: "pull",
|
||||
});
|
||||
}
|
||||
|
||||
const { needMissingTransactions, ourKnown } = await this.addTransactions(
|
||||
coValueRow,
|
||||
msg,
|
||||
);
|
||||
|
||||
if (needMissingTransactions) {
|
||||
return this.sendStateMessage({
|
||||
action: "pull",
|
||||
...ourKnown,
|
||||
});
|
||||
}
|
||||
|
||||
return this.sendStateMessage({
|
||||
action: "ack",
|
||||
...ourKnown,
|
||||
});
|
||||
}
|
||||
|
||||
async handleData(msg: DataMessage) {
|
||||
const coValueRow = await this.dbClient.getCoValue(msg.id);
|
||||
|
||||
// We have no info about coValue header
|
||||
const invalidAssumptionOnHeaderPresence = !msg.header && !coValueRow;
|
||||
|
||||
if (invalidAssumptionOnHeaderPresence) {
|
||||
console.error(
|
||||
'invalidAssumptionOnHeaderPresence. We should never be here. "Data" action is a response to our specific request.',
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const { needMissingTransactions } = await this.addTransactions(
|
||||
coValueRow,
|
||||
msg,
|
||||
);
|
||||
|
||||
if (needMissingTransactions) {
|
||||
console.error(
|
||||
'needMissingTransactions. We should never be here. "Data" action is a response to our specific request.',
|
||||
);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private async addTransactions(
|
||||
coValueRow: StoredCoValueRow | undefined,
|
||||
msg: DataMessage | PushMessage,
|
||||
) {
|
||||
const storedCoValueRowID: number = coValueRow
|
||||
? coValueRow.rowID
|
||||
: await this.dbClient.addCoValue(msg);
|
||||
@@ -221,7 +266,7 @@ export class SyncManager {
|
||||
sessions: {},
|
||||
};
|
||||
|
||||
let invalidAssumptions = false;
|
||||
let needMissingTransactions = false;
|
||||
|
||||
await this.dbClient.unitOfWork(() =>
|
||||
(Object.keys(msg.new) as SessionID[]).map((sessionID) => {
|
||||
@@ -231,24 +276,17 @@ export class SyncManager {
|
||||
}
|
||||
|
||||
if ((sessionRow?.lastIdx || 0) < (msg.new[sessionID]?.after || 0)) {
|
||||
invalidAssumptions = true;
|
||||
needMissingTransactions = true;
|
||||
} else {
|
||||
return this.putNewTxs(msg, sessionID, sessionRow, storedCoValueRowID);
|
||||
}
|
||||
}),
|
||||
);
|
||||
|
||||
if (invalidAssumptions) {
|
||||
this.sendStateMessage({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
isCorrection: invalidAssumptions,
|
||||
});
|
||||
}
|
||||
return { ourKnown, needMissingTransactions };
|
||||
}
|
||||
|
||||
private async putNewTxs(
|
||||
msg: CojsonInternalTypes.NewContentMessage,
|
||||
msg: DataMessage | PushMessage,
|
||||
sessionID: SessionID,
|
||||
sessionRow: StoredSessionRow | undefined,
|
||||
storedCoValueRowID: number,
|
||||
@@ -315,8 +353,6 @@ export class SyncManager {
|
||||
// We don't intend to use the storage (SQLite,IDB,etc.) itself as a synchronisation mechanism, so we can ignore the known messages
|
||||
}
|
||||
|
||||
handleDone(_msg: CojsonInternalTypes.DoneMessage) {}
|
||||
|
||||
async sendStateMessage(msg: any): Promise<unknown> {
|
||||
return this.toLocalNode
|
||||
.push(msg)
|
||||
|
||||
@@ -14,14 +14,14 @@ import {
|
||||
|
||||
export function collectNewTxs({
|
||||
newTxsInSession,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
sessionRow,
|
||||
signaturesAndIdxs,
|
||||
peerKnownState,
|
||||
firstNewTxIdx,
|
||||
}: {
|
||||
newTxsInSession: TransactionRow[];
|
||||
newContentMessages: CojsonInternalTypes.NewContentMessage[];
|
||||
newDataMessages: CojsonInternalTypes.DataMessage[];
|
||||
sessionRow: StoredSessionRow;
|
||||
signaturesAndIdxs: SignatureAfterRow[];
|
||||
peerKnownState: CojsonInternalTypes.CoValueKnownState;
|
||||
@@ -31,18 +31,15 @@ export function collectNewTxs({
|
||||
|
||||
for (const tx of newTxsInSession) {
|
||||
let sessionEntry =
|
||||
newContentMessages[newContentMessages.length - 1]!.new[
|
||||
sessionRow.sessionID
|
||||
];
|
||||
newDataMessages[newDataMessages.length - 1]!.new[sessionRow.sessionID];
|
||||
if (!sessionEntry) {
|
||||
sessionEntry = {
|
||||
after: idx,
|
||||
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature,
|
||||
newTransactions: [],
|
||||
};
|
||||
newContentMessages[newContentMessages.length - 1]!.new[
|
||||
sessionRow.sessionID
|
||||
] = sessionEntry;
|
||||
newDataMessages[newDataMessages.length - 1]!.new[sessionRow.sessionID] =
|
||||
sessionEntry;
|
||||
}
|
||||
|
||||
sessionEntry.newTransactions.push(tx.tx);
|
||||
@@ -50,8 +47,9 @@ export function collectNewTxs({
|
||||
if (signaturesAndIdxs[0] && idx === signaturesAndIdxs[0].idx) {
|
||||
sessionEntry.lastSignature = signaturesAndIdxs[0].signature;
|
||||
signaturesAndIdxs.shift();
|
||||
newContentMessages.push({
|
||||
action: "content",
|
||||
newDataMessages.push({
|
||||
action: "data",
|
||||
known: true,
|
||||
id: peerKnownState.id,
|
||||
new: {},
|
||||
priority: cojsonInternals.getPriorityFromHeader(undefined),
|
||||
@@ -65,20 +63,20 @@ export function collectNewTxs({
|
||||
|
||||
export function getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages,
|
||||
newDataMessages,
|
||||
}: {
|
||||
coValueRow: StoredCoValueRow;
|
||||
newContentMessages: CojsonInternalTypes.NewContentMessage[];
|
||||
newDataMessages: CojsonInternalTypes.DataMessage[];
|
||||
}) {
|
||||
return coValueRow.header.ruleset.type === "group"
|
||||
? getGroupDependedOnCoValues(newContentMessages)
|
||||
? getGroupDependedOnCoValues(newDataMessages)
|
||||
: coValueRow.header.ruleset.type === "ownedByGroup"
|
||||
? getOwnedByGroupDependedOnCoValues(coValueRow, newContentMessages)
|
||||
? getOwnedByGroupDependedOnCoValues(coValueRow, newDataMessages)
|
||||
: [];
|
||||
}
|
||||
|
||||
function getGroupDependedOnCoValues(
|
||||
newContentMessages: CojsonInternalTypes.NewContentMessage[],
|
||||
export function getGroupDependedOnCoValues(
|
||||
newDataMessages: CojsonInternalTypes.DataMessage[],
|
||||
) {
|
||||
const keys: CojsonInternalTypes.RawCoID[] = [];
|
||||
|
||||
@@ -86,7 +84,7 @@ function getGroupDependedOnCoValues(
|
||||
* Collect all the signing keys inside the transactions to list all the
|
||||
* dependencies required to correctly access the CoValue.
|
||||
*/
|
||||
for (const piece of newContentMessages) {
|
||||
for (const piece of newDataMessages) {
|
||||
for (const sessionEntry of Object.values(piece.new)) {
|
||||
for (const tx of sessionEntry.newTransactions) {
|
||||
if (tx.privacy !== "trusting") continue;
|
||||
@@ -117,7 +115,7 @@ function getGroupDependedOnCoValues(
|
||||
|
||||
function getOwnedByGroupDependedOnCoValues(
|
||||
coValueRow: StoredCoValueRow,
|
||||
newContentMessages: CojsonInternalTypes.NewContentMessage[],
|
||||
newDataMessages: CojsonInternalTypes.DataMessage[],
|
||||
) {
|
||||
if (coValueRow.header.ruleset.type !== "ownedByGroup") return [];
|
||||
|
||||
@@ -127,7 +125,7 @@ function getOwnedByGroupDependedOnCoValues(
|
||||
* Collect all the signing keys inside the transactions to list all the
|
||||
* dependencies required to correctly access the CoValue.
|
||||
*/
|
||||
for (const piece of newContentMessages) {
|
||||
for (const piece of newDataMessages) {
|
||||
for (const sessionID of Object.keys(piece.new) as SessionID[]) {
|
||||
const accountId =
|
||||
cojsonInternals.accountOrAgentIDfromSessionID(sessionID);
|
||||
|
||||
@@ -10,13 +10,14 @@ function getMockedCoValueId() {
|
||||
return `co_z${Math.random().toString(36).substring(2, 15)}` as const;
|
||||
}
|
||||
|
||||
function generateNewContentMessage(
|
||||
function generateNewDataMessage(
|
||||
privacy: "trusting" | "private",
|
||||
changes: any[],
|
||||
accountId?: `co_z${string}`,
|
||||
) {
|
||||
return {
|
||||
action: "content",
|
||||
action: "data",
|
||||
known: true,
|
||||
id: getMockedCoValueId(),
|
||||
new: {
|
||||
[getMockedSessionID(accountId)]: {
|
||||
@@ -32,7 +33,7 @@ function generateNewContentMessage(
|
||||
},
|
||||
},
|
||||
priority: 0,
|
||||
} as CojsonInternalTypes.NewContentMessage;
|
||||
} as CojsonInternalTypes.DataMessage;
|
||||
}
|
||||
|
||||
describe("getDependedOnCoValues", () => {
|
||||
@@ -48,8 +49,8 @@ describe("getDependedOnCoValues", () => {
|
||||
|
||||
const result = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages: [
|
||||
generateNewContentMessage("trusting", [
|
||||
newDataMessages: [
|
||||
generateNewDataMessage("trusting", [
|
||||
{ op: "set", key: "co_zabc123", value: "test" },
|
||||
{ op: "set", key: "parent_co_zdef456", value: "test" },
|
||||
{ op: "set", key: "normal_key", value: "test" },
|
||||
@@ -70,7 +71,7 @@ describe("getDependedOnCoValues", () => {
|
||||
},
|
||||
} as any;
|
||||
|
||||
const message = generateNewContentMessage("trusting", [
|
||||
const message = generateNewDataMessage("trusting", [
|
||||
{ op: "set", key: "co_zabc123", value: "test" },
|
||||
]);
|
||||
|
||||
@@ -88,7 +89,7 @@ describe("getDependedOnCoValues", () => {
|
||||
|
||||
const result = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages: [message],
|
||||
newDataMessages: [message],
|
||||
});
|
||||
|
||||
expect(result).toEqual(["co_zabc123"]);
|
||||
@@ -107,7 +108,7 @@ describe("getDependedOnCoValues", () => {
|
||||
} as any;
|
||||
|
||||
const accountId = getMockedCoValueId();
|
||||
const message = generateNewContentMessage(
|
||||
const message = generateNewDataMessage(
|
||||
"trusting",
|
||||
[
|
||||
{ op: "set", key: "co_zabc123", value: "test" },
|
||||
@@ -125,7 +126,7 @@ describe("getDependedOnCoValues", () => {
|
||||
|
||||
const result = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages: [message],
|
||||
newDataMessages: [message],
|
||||
});
|
||||
|
||||
expect(result).toEqual([groupId, accountId]);
|
||||
@@ -143,8 +144,8 @@ describe("getDependedOnCoValues", () => {
|
||||
|
||||
const result = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages: [
|
||||
generateNewContentMessage("trusting", [
|
||||
newDataMessages: [
|
||||
generateNewDataMessage("trusting", [
|
||||
{ op: "set", key: "co_zabc123", value: "test" },
|
||||
{ op: "set", key: "parent_co_zdef456", value: "test" },
|
||||
{ op: "set", key: "normal_key", value: "test" },
|
||||
@@ -167,8 +168,8 @@ describe("getDependedOnCoValues", () => {
|
||||
|
||||
const result = getDependedOnCoValues({
|
||||
coValueRow,
|
||||
newContentMessages: [
|
||||
generateNewContentMessage("private", [
|
||||
newDataMessages: [
|
||||
generateNewDataMessage("private", [
|
||||
{ op: "set", key: "co_zabc123", value: "test" },
|
||||
]),
|
||||
],
|
||||
|
||||
@@ -34,6 +34,8 @@ const createEmptyLoadMsg = (id: string) =>
|
||||
|
||||
const sessionsData = fixtures[coValueIdToLoad].sessionRecords;
|
||||
const coValueHeader = fixtures[coValueIdToLoad].getContent({ after: 0 }).header;
|
||||
// TODO uncomment and fix
|
||||
// @ts-ignore
|
||||
const incomingContentMessage = fixtures[coValueIdToLoad].getContent({
|
||||
after: 0,
|
||||
}) as SyncMessage;
|
||||
@@ -138,8 +140,8 @@ describe("DB sync manager", () => {
|
||||
|
||||
// mock content data combined with session updates
|
||||
syncManager.handleSessionUpdate = vi.fn(
|
||||
async ({ sessionRow, newContentMessages }) => {
|
||||
newContentMessages[0]!.new[sessionRow.sessionID] = newTxData;
|
||||
async ({ sessionRow, newDataMessages }) => {
|
||||
newDataMessages[0]!.new[sessionRow.sessionID] = newTxData;
|
||||
},
|
||||
);
|
||||
|
||||
@@ -318,7 +320,9 @@ describe("DB sync manager", () => {
|
||||
const farAheadContentMessage = fixtures[coValueIdToLoad].getContent({
|
||||
after: 10000,
|
||||
});
|
||||
// TODO uncomment and fix
|
||||
await syncManager.handleSyncMessage(
|
||||
// @ts-ignore
|
||||
farAheadContentMessage as SyncMessage,
|
||||
);
|
||||
|
||||
|
||||
@@ -51,9 +51,7 @@ export interface DBClientInterface {
|
||||
firstNewTxIdx: number,
|
||||
): Promise<SignatureAfterRow[]> | SignatureAfterRow[];
|
||||
|
||||
addCoValue(
|
||||
msg: CojsonInternalTypes.NewContentMessage,
|
||||
): Promise<number> | number;
|
||||
addCoValue(msg: CojsonInternalTypes.CoValueContent): Promise<number> | number;
|
||||
|
||||
addSessionUpdate({
|
||||
sessionUpdate,
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
import { SyncMessage } from "cojson";
|
||||
import { CoValueKnownState } from "cojson/src/sync.js";
|
||||
import { CoValueKnownState } from "cojson/src/sync/types.js";
|
||||
import { afterEach, beforeEach, describe, expect, test, vi } from "vitest";
|
||||
import {
|
||||
BatchedOutgoingMessages,
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
import { CoValueCore } from "./coValueCore.js";
|
||||
import { CoValueState } from "./coValueState.js";
|
||||
import { CoValueEntry } from "./coValueEntry.js";
|
||||
import { RawCoID } from "./ids.js";
|
||||
|
||||
export class CoValuesStore {
|
||||
coValues = new Map<RawCoID, CoValueState>();
|
||||
coValues = new Map<RawCoID, CoValueEntry>();
|
||||
|
||||
get(id: RawCoID) {
|
||||
let entry = this.coValues.get(id);
|
||||
|
||||
if (!entry) {
|
||||
entry = CoValueState.Unknown(id);
|
||||
entry = CoValueEntry.Unknown(id);
|
||||
this.coValues.set(id, entry);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@ export class CoValuesStore {
|
||||
type: "available",
|
||||
coValue,
|
||||
});
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
getEntries() {
|
||||
@@ -32,7 +34,44 @@ export class CoValuesStore {
|
||||
return this.coValues.values();
|
||||
}
|
||||
|
||||
getOrderedIds() {
|
||||
const coValues = new Set<RawCoID>();
|
||||
|
||||
// TODO test it thoroughly
|
||||
for (const entry of this.getValues()) {
|
||||
this.getOrderedDependencies(entry.id, coValues);
|
||||
}
|
||||
|
||||
return Array.from(coValues);
|
||||
}
|
||||
|
||||
getKeys() {
|
||||
return this.coValues.keys();
|
||||
}
|
||||
|
||||
private getOrderedDependencies(id: RawCoID, coValues: Set<RawCoID>) {
|
||||
const entry = this.get(id);
|
||||
const coValue = this.expectCoValueLoaded(entry.id);
|
||||
|
||||
if (coValues.has(coValue.id)) {
|
||||
return coValues;
|
||||
}
|
||||
for (const id of coValue.getDependedOnCoValues()) {
|
||||
this.getOrderedDependencies(id, coValues);
|
||||
}
|
||||
coValues.add(coValue.id);
|
||||
|
||||
return coValues;
|
||||
}
|
||||
|
||||
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
|
||||
const entry = this.get(id);
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
throw new Error(
|
||||
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
|
||||
);
|
||||
}
|
||||
return entry.state.coValue;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,116 +0,0 @@
|
||||
import { RawCoID, SessionID } from "./ids.js";
|
||||
import {
|
||||
CoValueKnownState,
|
||||
combinedKnownStates,
|
||||
emptyKnownState,
|
||||
} from "./sync.js";
|
||||
|
||||
export type PeerKnownStateActions =
|
||||
| {
|
||||
type: "SET_AS_EMPTY";
|
||||
id: RawCoID;
|
||||
}
|
||||
| {
|
||||
type: "UPDATE_HEADER";
|
||||
id: RawCoID;
|
||||
header: boolean;
|
||||
}
|
||||
| {
|
||||
type: "UPDATE_SESSION_COUNTER";
|
||||
id: RawCoID;
|
||||
sessionId: SessionID;
|
||||
value: number;
|
||||
}
|
||||
| {
|
||||
type: "SET";
|
||||
id: RawCoID;
|
||||
value: CoValueKnownState;
|
||||
}
|
||||
| {
|
||||
type: "COMBINE_WITH";
|
||||
id: RawCoID;
|
||||
value: CoValueKnownState;
|
||||
};
|
||||
|
||||
export class PeerKnownStates {
|
||||
private coValues = new Map<RawCoID, CoValueKnownState>();
|
||||
|
||||
private updateHeader(id: RawCoID, header: boolean) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
knownState.header = header;
|
||||
this.coValues.set(id, knownState);
|
||||
}
|
||||
|
||||
private combineWith(id: RawCoID, value: CoValueKnownState) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
this.coValues.set(id, combinedKnownStates(knownState, value));
|
||||
}
|
||||
|
||||
private updateSessionCounter(
|
||||
id: RawCoID,
|
||||
sessionId: SessionID,
|
||||
value: number,
|
||||
) {
|
||||
const knownState = this.coValues.get(id) ?? emptyKnownState(id);
|
||||
const currentValue = knownState.sessions[sessionId] || 0;
|
||||
knownState.sessions[sessionId] = Math.max(currentValue, value);
|
||||
|
||||
this.coValues.set(id, knownState);
|
||||
}
|
||||
|
||||
get(id: RawCoID) {
|
||||
return this.coValues.get(id);
|
||||
}
|
||||
|
||||
has(id: RawCoID) {
|
||||
return this.coValues.has(id);
|
||||
}
|
||||
|
||||
clone() {
|
||||
const clone = new PeerKnownStates();
|
||||
clone.coValues = new Map(this.coValues);
|
||||
return clone;
|
||||
}
|
||||
|
||||
dispatch(action: PeerKnownStateActions) {
|
||||
switch (action.type) {
|
||||
case "UPDATE_HEADER":
|
||||
this.updateHeader(action.id, action.header);
|
||||
break;
|
||||
case "UPDATE_SESSION_COUNTER":
|
||||
this.updateSessionCounter(action.id, action.sessionId, action.value);
|
||||
break;
|
||||
case "SET":
|
||||
this.coValues.set(action.id, action.value);
|
||||
break;
|
||||
case "COMBINE_WITH":
|
||||
this.combineWith(action.id, action.value);
|
||||
break;
|
||||
case "SET_AS_EMPTY":
|
||||
this.coValues.set(action.id, emptyKnownState(action.id));
|
||||
break;
|
||||
}
|
||||
|
||||
this.triggerUpdate(action.id);
|
||||
}
|
||||
|
||||
listeners = new Set<(id: RawCoID, knownState: CoValueKnownState) => void>();
|
||||
|
||||
triggerUpdate(id: RawCoID) {
|
||||
this.trigger(id, this.coValues.get(id) ?? emptyKnownState(id));
|
||||
}
|
||||
|
||||
private trigger(id: RawCoID, knownState: CoValueKnownState) {
|
||||
for (const listener of this.listeners) {
|
||||
listener(id, knownState);
|
||||
}
|
||||
}
|
||||
|
||||
subscribe(listener: (id: RawCoID, knownState: CoValueKnownState) => void) {
|
||||
this.listeners.add(listener);
|
||||
|
||||
return () => {
|
||||
this.listeners.delete(listener);
|
||||
};
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,6 @@
|
||||
import { CoValuePriority } from "./priority.js";
|
||||
import { SyncMessage } from "./sync.js";
|
||||
|
||||
import { SyncMessage } from "./sync/types.js";
|
||||
|
||||
function promiseWithResolvers<R>() {
|
||||
let resolve = (_: R) => {};
|
||||
|
||||
@@ -1,141 +0,0 @@
|
||||
import { RawCoID } from "./ids.js";
|
||||
import {
|
||||
CoValueKnownState,
|
||||
PeerID,
|
||||
SyncManager,
|
||||
emptyKnownState,
|
||||
} from "./sync.js";
|
||||
|
||||
export type SyncStateGetter = {
|
||||
isUploaded: boolean;
|
||||
};
|
||||
|
||||
export type GlobalSyncStateListenerCallback = (
|
||||
peerId: PeerID,
|
||||
knownState: CoValueKnownState,
|
||||
sync: SyncStateGetter,
|
||||
) => void;
|
||||
|
||||
export type PeerSyncStateListenerCallback = (
|
||||
knownState: CoValueKnownState,
|
||||
sync: SyncStateGetter,
|
||||
) => void;
|
||||
|
||||
export class SyncStateSubscriptionManager {
|
||||
constructor(private syncManager: SyncManager) {}
|
||||
|
||||
private listeners = new Set<GlobalSyncStateListenerCallback>();
|
||||
private listenersByPeers = new Map<
|
||||
PeerID,
|
||||
Set<PeerSyncStateListenerCallback>
|
||||
>();
|
||||
|
||||
subscribeToUpdates(listener: GlobalSyncStateListenerCallback) {
|
||||
this.listeners.add(listener);
|
||||
|
||||
return () => {
|
||||
this.listeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
subscribeToPeerUpdates(
|
||||
peerId: PeerID,
|
||||
listener: PeerSyncStateListenerCallback,
|
||||
) {
|
||||
const listeners = this.listenersByPeers.get(peerId) ?? new Set();
|
||||
|
||||
if (listeners.size === 0) {
|
||||
this.listenersByPeers.set(peerId, listeners);
|
||||
}
|
||||
|
||||
listeners.add(listener);
|
||||
|
||||
return () => {
|
||||
listeners.delete(listener);
|
||||
};
|
||||
}
|
||||
|
||||
triggerUpdate(peerId: PeerID, id: RawCoID) {
|
||||
const peer = this.syncManager.peers[peerId];
|
||||
|
||||
if (!peer) {
|
||||
return;
|
||||
}
|
||||
|
||||
const peerListeners = this.listenersByPeers.get(peer.id);
|
||||
|
||||
// If we don't have any active listeners do nothing
|
||||
if (!peerListeners?.size && !this.listeners.size) {
|
||||
return;
|
||||
}
|
||||
|
||||
const knownState = peer.knownStates.get(id) ?? emptyKnownState(id);
|
||||
|
||||
// Build a lazy sync state object to process the isUploaded info
|
||||
// only when requested
|
||||
const syncState = {} as SyncStateGetter;
|
||||
|
||||
const getIsUploaded = simpleMemoize(() =>
|
||||
this.getIsCoValueFullyUploadedIntoPeer(peerId, id),
|
||||
);
|
||||
Object.defineProperties(syncState, {
|
||||
isUploaded: {
|
||||
enumerable: true,
|
||||
get: getIsUploaded,
|
||||
},
|
||||
});
|
||||
|
||||
for (const listener of this.listeners) {
|
||||
listener(peerId, knownState, syncState);
|
||||
}
|
||||
|
||||
if (!peerListeners) return;
|
||||
|
||||
for (const listener of peerListeners) {
|
||||
listener(knownState, syncState);
|
||||
}
|
||||
}
|
||||
|
||||
getIsCoValueFullyUploadedIntoPeer(peerId: PeerID, id: RawCoID) {
|
||||
const peer = this.syncManager.peers[peerId];
|
||||
const entry = this.syncManager.local.coValuesStore.get(id);
|
||||
|
||||
if (!peer) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
return false;
|
||||
}
|
||||
|
||||
const coValue = entry.state.coValue;
|
||||
const knownState = peer.knownStates.get(id);
|
||||
|
||||
if (!knownState) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return getIsUploadCompleted(
|
||||
coValue.knownState().sessions,
|
||||
knownState.sessions,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
function getIsUploadCompleted(
|
||||
from: Record<string, number>,
|
||||
to: Record<string, number>,
|
||||
) {
|
||||
for (const sessionId of Object.keys(from)) {
|
||||
if (from[sessionId] !== to[sessionId]) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
function simpleMemoize<T>(fn: () => T): () => T {
|
||||
let value: T | undefined;
|
||||
return () => value ?? (value = fn());
|
||||
}
|
||||
@@ -13,6 +13,7 @@ import {
|
||||
SignerID,
|
||||
StreamingHash,
|
||||
} from "./crypto/crypto.js";
|
||||
import { CojsonInternalTypes, cojsonInternals } from "./exports.js";
|
||||
import {
|
||||
RawCoID,
|
||||
SessionID,
|
||||
@@ -30,10 +31,11 @@ import {
|
||||
isKeyForKeyField,
|
||||
} from "./permissions.js";
|
||||
import { getPriorityFromHeader } from "./priority.js";
|
||||
import { CoValueKnownState, NewContentMessage } from "./sync.js";
|
||||
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
|
||||
import { expectGroup } from "./typeUtils/expectGroup.js";
|
||||
import { isAccountID } from "./typeUtils/isAccountID.js";
|
||||
import CoValueContent = CojsonInternalTypes.CoValueContent;
|
||||
import { CoValueKnownState } from "./sync/types.js";
|
||||
|
||||
/**
|
||||
In order to not block other concurrently syncing CoValues we introduce a maximum size of transactions,
|
||||
@@ -50,6 +52,12 @@ export type CoValueHeader = {
|
||||
meta: JsonObject | null;
|
||||
} & CoValueUniqueness;
|
||||
|
||||
export type SessionNewContent = {
|
||||
after: number;
|
||||
newTransactions: Transaction[];
|
||||
lastSignature: Signature;
|
||||
};
|
||||
|
||||
export type CoValueUniqueness = {
|
||||
uniqueness: JsonValue;
|
||||
createdAt?: `2${string}` | null;
|
||||
@@ -107,7 +115,7 @@ export class CoValueCore {
|
||||
} = {};
|
||||
_cachedKnownState?: CoValueKnownState;
|
||||
_cachedDependentOn?: RawCoID[];
|
||||
_cachedNewContentSinceEmpty?: NewContentMessage[] | undefined;
|
||||
_cachedNewContentSinceEmpty?: CoValueContent[] | undefined;
|
||||
_currentAsyncAddTransaction?: Promise<void>;
|
||||
|
||||
constructor(
|
||||
@@ -197,7 +205,7 @@ export class CoValueCore {
|
||||
};
|
||||
}
|
||||
|
||||
tryAddTransactions(
|
||||
private tryAddTransactions(
|
||||
sessionID: SessionID,
|
||||
newTransactions: Transaction[],
|
||||
givenExpectedNewHash: Hash | undefined,
|
||||
@@ -586,6 +594,8 @@ export class CoValueCore {
|
||||
expectedNewHash,
|
||||
);
|
||||
|
||||
const peersKnownState = { ...this.knownState() };
|
||||
|
||||
const success = this.tryAddTransactions(
|
||||
sessionID,
|
||||
[transaction],
|
||||
@@ -594,7 +604,7 @@ export class CoValueCore {
|
||||
)._unsafeUnwrap({ withStackTrace: true });
|
||||
|
||||
if (success) {
|
||||
void this.node.syncManager.syncCoValue(this);
|
||||
void this.node.syncManager.syncCoValue(this, peersKnownState);
|
||||
}
|
||||
|
||||
return success;
|
||||
@@ -889,19 +899,19 @@ export class CoValueCore {
|
||||
return this.sessionLogs.get(txID.sessionID)?.transactions[txID.txIndex];
|
||||
}
|
||||
|
||||
newContentSince(
|
||||
knownState: CoValueKnownState | undefined,
|
||||
): NewContentMessage[] | undefined {
|
||||
const isKnownStateEmpty = !knownState?.header && !knownState?.sessions;
|
||||
newContentSince(knownState: CoValueKnownState): CoValueContent[] | undefined {
|
||||
const shouldSendEverything =
|
||||
!knownState.header ||
|
||||
!knownState.sessions ||
|
||||
!Object.keys(knownState.sessions).length;
|
||||
|
||||
if (isKnownStateEmpty && this._cachedNewContentSinceEmpty) {
|
||||
if (shouldSendEverything && this._cachedNewContentSinceEmpty) {
|
||||
return this._cachedNewContentSinceEmpty;
|
||||
}
|
||||
|
||||
let currentPiece: NewContentMessage = {
|
||||
action: "content",
|
||||
let currentPiece: CoValueContent = {
|
||||
id: this.id,
|
||||
header: knownState?.header ? undefined : this.header,
|
||||
header: shouldSendEverything ? this.header : undefined,
|
||||
priority: getPriorityFromHeader(this.header),
|
||||
new: {},
|
||||
};
|
||||
@@ -963,7 +973,6 @@ export class CoValueCore {
|
||||
|
||||
if (pieceSize >= MAX_RECOMMENDED_TX_SIZE) {
|
||||
currentPiece = {
|
||||
action: "content",
|
||||
id: this.id,
|
||||
header: undefined,
|
||||
new: {},
|
||||
@@ -1006,7 +1015,7 @@ export class CoValueCore {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (isKnownStateEmpty) {
|
||||
if (shouldSendEverything) {
|
||||
this._cachedNewContentSinceEmpty = piecesWithContent;
|
||||
}
|
||||
|
||||
@@ -1043,6 +1052,55 @@ export class CoValueCore {
|
||||
]
|
||||
: [];
|
||||
}
|
||||
|
||||
addNewContent(content: CoValueContent) {
|
||||
let anyMissingTransaction = false;
|
||||
|
||||
for (const [sessionID, newContentForSession] of Object.entries(
|
||||
content.new,
|
||||
) as [SessionID, SessionNewContent][]) {
|
||||
const ourKnownTxIdx =
|
||||
this.sessionLogs.get(sessionID)?.transactions.length;
|
||||
const theirFirstNewTxIdx = newContentForSession.after;
|
||||
|
||||
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
|
||||
anyMissingTransaction = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const alreadyKnownOffset = ourKnownTxIdx
|
||||
? ourKnownTxIdx - theirFirstNewTxIdx
|
||||
: 0;
|
||||
|
||||
const newTransactions =
|
||||
newContentForSession.newTransactions.slice(alreadyKnownOffset);
|
||||
|
||||
if (newTransactions.length === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const result = this.tryAddTransactions(
|
||||
sessionID,
|
||||
newTransactions,
|
||||
undefined,
|
||||
newContentForSession.lastSignature,
|
||||
);
|
||||
|
||||
if (result.isErr()) {
|
||||
const message = `Failed to add transactions for ${content.id}: ${newTransactions.length} new transactions after:
|
||||
${newContentForSession.after} our last known tx idx initially: ${ourKnownTxIdx} our last known tx idx now:
|
||||
${this.sessionLogs.get(sessionID)?.transactions.length}`;
|
||||
|
||||
throw {
|
||||
type: "TryAddTransactionsError",
|
||||
error: result.error,
|
||||
message,
|
||||
} as TryAddTransactionsException;
|
||||
}
|
||||
}
|
||||
|
||||
return anyMissingTransaction;
|
||||
}
|
||||
}
|
||||
|
||||
function getNextKnownSignatureIdx(
|
||||
@@ -1077,3 +1135,85 @@ export type TryAddTransactionsError =
|
||||
| ResolveAccountAgentError
|
||||
| InvalidHashError
|
||||
| InvalidSignatureError;
|
||||
|
||||
export type TryAddTransactionsException = {
|
||||
type: "TryAddTransactionsError";
|
||||
error: TryAddTransactionsError;
|
||||
message: string;
|
||||
};
|
||||
export function isTryAddTransactionsException(
|
||||
e: any,
|
||||
): e is TryAddTransactionsException {
|
||||
return (
|
||||
e &&
|
||||
e.type === "TryAddTransactionsError" &&
|
||||
typeof e.message === "string" &&
|
||||
e.error !== undefined
|
||||
);
|
||||
}
|
||||
|
||||
export function getDependedOnFromContent(msg: Required<CoValueContent>) {
|
||||
if (!msg.header) {
|
||||
throw new Error(`Header is required for getting dependencies ${msg.id}`);
|
||||
}
|
||||
|
||||
return msg.header.ruleset.type === "group"
|
||||
? getGroupDependedOnCoValues(msg)
|
||||
: msg.header.ruleset.type === "ownedByGroup"
|
||||
? [
|
||||
msg.header.ruleset.group,
|
||||
...new Set(
|
||||
[...Object.keys(msg.new)]
|
||||
.map((sessionID) =>
|
||||
accountOrAgentIDfromSessionID(sessionID as SessionID),
|
||||
)
|
||||
.filter(
|
||||
(session): session is RawAccountID =>
|
||||
isAccountID(session) && session !== msg.id,
|
||||
),
|
||||
),
|
||||
]
|
||||
: [];
|
||||
}
|
||||
|
||||
function getGroupDependedOnCoValues(content: CoValueContent) {
|
||||
const keys: CojsonInternalTypes.RawCoID[] = [];
|
||||
|
||||
/**
|
||||
* Collect all the signing keys inside the transactions to list all the
|
||||
* dependencies required to correctly access the CoValue.
|
||||
*/
|
||||
for (const sessionEntry of Object.values(content.new)) {
|
||||
for (const tx of sessionEntry.newTransactions) {
|
||||
if (tx.privacy !== "trusting") continue;
|
||||
|
||||
const changes = safeParseChanges(tx.changes);
|
||||
for (const change of changes) {
|
||||
if (
|
||||
change &&
|
||||
typeof change === "object" &&
|
||||
"op" in change &&
|
||||
change.op === "set" &&
|
||||
"key" in change &&
|
||||
change.key
|
||||
) {
|
||||
const key = cojsonInternals.getGroupDependentKey(change.key);
|
||||
|
||||
if (key) {
|
||||
keys.push(key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return keys;
|
||||
}
|
||||
|
||||
function safeParseChanges(changes: Stringified<JsonValue[]>) {
|
||||
try {
|
||||
return cojsonInternals.parseJSON(changes);
|
||||
} catch (e) {
|
||||
return [];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { PeerState } from "./PeerState.js";
|
||||
import { CoValueCore } from "./coValueCore.js";
|
||||
import { config } from "./config.js";
|
||||
import { RawCoID } from "./ids.js";
|
||||
import { PeerID } from "./sync.js";
|
||||
import { PeerEntry, PeerID } from "./peer/index.js";
|
||||
|
||||
export const CO_VALUE_LOADING_MAX_RETRIES = 5;
|
||||
export const CO_VALUE_LOADING_TIMEOUT = 30_000;
|
||||
@@ -100,29 +100,115 @@ type CoValueStateType =
|
||||
| CoValueAvailableState
|
||||
| CoValueUnavailableState;
|
||||
|
||||
export class CoValueState {
|
||||
class UploadState {
|
||||
protected peers = new Map<
|
||||
PeerID,
|
||||
ReturnType<typeof createResolvablePromise<void>> & { completed?: boolean }
|
||||
>();
|
||||
|
||||
unawarePeers = new Set<PeerID>();
|
||||
|
||||
constructor(private readonly coValueEntry: CoValueEntry) {
|
||||
this.peers = new Map();
|
||||
}
|
||||
|
||||
getUnawarePeerIds(): PeerID[] {
|
||||
return Array.from(this.unawarePeers);
|
||||
}
|
||||
setPendingForPeer(peerId: PeerID) {
|
||||
if (!(this.coValueEntry.state.type === "available")) {
|
||||
console.error(
|
||||
"Trying to set pending upload for a coValue that is not available",
|
||||
this.coValueEntry.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
const peerUploadState = this.peers.get(peerId);
|
||||
if (!peerUploadState) {
|
||||
this.peers.set(peerId, createResolvablePromise<void>());
|
||||
}
|
||||
}
|
||||
|
||||
setCompletedForPeer(peerId: PeerID) {
|
||||
const peerUploadState = this.peers.get(peerId);
|
||||
if (!peerUploadState) {
|
||||
if (!config.HYBRID_MESSAGING_MODE) {
|
||||
// When two messaging protocol are stacked, this could produce some excessive "ack" messages
|
||||
// out of peer's "known" messages which would lead to this error. Should be ignored in HYBRID_MESSAGING_MODE
|
||||
console.error(
|
||||
"Trying to set complete for a coValue that is not uploaded to",
|
||||
peerId,
|
||||
this.coValueEntry.id,
|
||||
);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
peerUploadState.resolve();
|
||||
peerUploadState.completed = true;
|
||||
this.unawarePeers.delete(peerId);
|
||||
}
|
||||
|
||||
waitForPeer(peerId: PeerID) {
|
||||
const peerUploadState = this.peers.get(peerId);
|
||||
if (!peerUploadState) {
|
||||
console.error(
|
||||
"Trying to wait for no pending upload into peer",
|
||||
peerId,
|
||||
this.coValueEntry.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
return peerUploadState?.promise;
|
||||
}
|
||||
|
||||
isCoValueFullyUploadedIntoPeer(peerId: PeerID) {
|
||||
const peerUploadState = this.peers.get(peerId);
|
||||
|
||||
return !!peerUploadState?.completed;
|
||||
}
|
||||
|
||||
copyFrom(otherUploadState: UploadState) {
|
||||
for (let [peerId] of otherUploadState.peers) {
|
||||
this.setPendingForPeer(peerId);
|
||||
if (otherUploadState.isCoValueFullyUploadedIntoPeer(peerId)) {
|
||||
this.setCompletedForPeer(peerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NOTE Renamed CoValueState into CoValueEntry
|
||||
export class CoValueEntry {
|
||||
promise?: Promise<CoValueCore | "unavailable">;
|
||||
private resolve?: (value: CoValueCore | "unavailable") => void;
|
||||
public uploadState: UploadState;
|
||||
|
||||
constructor(
|
||||
public id: RawCoID,
|
||||
public state: CoValueStateType,
|
||||
) {}
|
||||
) {
|
||||
this.uploadState = new UploadState(this);
|
||||
}
|
||||
|
||||
static Unknown(id: RawCoID) {
|
||||
return new CoValueState(id, new CoValueUnknownState());
|
||||
return new CoValueEntry(id, new CoValueUnknownState());
|
||||
}
|
||||
|
||||
static Loading(id: RawCoID, peersIds: Iterable<PeerID>) {
|
||||
return new CoValueState(id, new CoValueLoadingState(peersIds));
|
||||
return new CoValueEntry(id, new CoValueLoadingState(peersIds));
|
||||
}
|
||||
|
||||
static Available(coValue: CoValueCore) {
|
||||
return new CoValueState(coValue.id, new CoValueAvailableState(coValue));
|
||||
return new CoValueEntry(coValue.id, new CoValueAvailableState(coValue));
|
||||
}
|
||||
|
||||
static Unavailable(id: RawCoID) {
|
||||
return new CoValueState(id, new CoValueUnavailableState());
|
||||
return new CoValueEntry(id, new CoValueUnavailableState());
|
||||
}
|
||||
|
||||
async getCoValue() {
|
||||
@@ -170,7 +256,13 @@ export class CoValueState {
|
||||
this.resolve = undefined;
|
||||
}
|
||||
|
||||
async loadFromPeers(peers: PeerState[]) {
|
||||
async loadFromPeers(
|
||||
peers: PeerEntry[],
|
||||
loadCoValueCallback: (
|
||||
coValueEntry: CoValueEntry,
|
||||
peers: PeerEntry[],
|
||||
) => Promise<void>,
|
||||
) {
|
||||
const state = this.state;
|
||||
|
||||
if (state.type !== "unknown" && state.type !== "unavailable") {
|
||||
@@ -181,22 +273,17 @@ export class CoValueState {
|
||||
return;
|
||||
}
|
||||
|
||||
const doLoad = async (peersToLoadFrom: PeerState[]) => {
|
||||
const peersWithoutErrors = getPeersWithoutErrors(
|
||||
peersToLoadFrom,
|
||||
this.id,
|
||||
);
|
||||
|
||||
const doLoad = async (peersToLoadFrom: PeerEntry[]) => {
|
||||
// If we are in the loading state we move to a new loading state
|
||||
// to reset all the loading promises
|
||||
if (this.state.type === "loading" || this.state.type === "unknown") {
|
||||
this.moveToState(
|
||||
new CoValueLoadingState(peersWithoutErrors.map((p) => p.id)),
|
||||
new CoValueLoadingState(peersToLoadFrom.map((p) => p.id)),
|
||||
);
|
||||
}
|
||||
|
||||
// Assign the current state to a variable to not depend on the state changes
|
||||
// that may happen while we wait for loadCoValueFromPeers to complete
|
||||
// that may happen while we wait for loadCoValueCallback to complete
|
||||
const currentState = this.state;
|
||||
|
||||
// If we entered successfully the loading state, we load the coValue from the peers
|
||||
@@ -204,7 +291,7 @@ export class CoValueState {
|
||||
// We may not enter the loading state if the coValue has become available in between
|
||||
// of the retries
|
||||
if (currentState.type === "loading") {
|
||||
await loadCoValueFromPeers(this, peersWithoutErrors);
|
||||
await loadCoValueCallback(this, peersToLoadFrom);
|
||||
|
||||
const result = await currentState.result;
|
||||
return result !== "unavailable";
|
||||
@@ -237,6 +324,18 @@ export class CoValueState {
|
||||
}
|
||||
}
|
||||
|
||||
moveToLoadingState(peers: PeerEntry[]) {
|
||||
if (this.state.type !== "unknown") {
|
||||
console.error(
|
||||
"Cannot move to loading state from",
|
||||
this.state.type,
|
||||
this.id,
|
||||
);
|
||||
} else {
|
||||
this.moveToState(new CoValueLoadingState(peers.map((p) => p.id)));
|
||||
}
|
||||
}
|
||||
|
||||
dispatch(action: CoValueStateAction) {
|
||||
const currentState = this.state;
|
||||
|
||||
@@ -255,64 +354,16 @@ export class CoValueState {
|
||||
currentState.markAsUnavailable(action.peerId);
|
||||
}
|
||||
|
||||
this.uploadState.unawarePeers.add(action.peerId);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function loadCoValueFromPeers(
|
||||
coValueEntry: CoValueState,
|
||||
peers: PeerState[],
|
||||
) {
|
||||
for (const peer of peers) {
|
||||
if (peer.closed) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (coValueEntry.state.type === "available") {
|
||||
/**
|
||||
* We don't need to wait for the message to be delivered here.
|
||||
*
|
||||
* This way when the coValue becomes available because it's cached we don't wait for the server
|
||||
* peer to consume the messages queue before moving forward.
|
||||
*/
|
||||
peer
|
||||
.pushOutgoingMessage({
|
||||
action: "load",
|
||||
...coValueEntry.state.coValue.knownState(),
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error(`Failed to push load message to peer ${peer.id}`, err);
|
||||
});
|
||||
} else {
|
||||
/**
|
||||
* We only wait for the load state to be resolved.
|
||||
*/
|
||||
peer
|
||||
.pushOutgoingMessage({
|
||||
action: "load",
|
||||
id: coValueEntry.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
})
|
||||
.catch((err) => {
|
||||
console.error(`Failed to push load message to peer ${peer.id}`, err);
|
||||
});
|
||||
}
|
||||
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
const timeout = setTimeout(() => {
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
console.error("Failed to load coValue from peer", peer.id);
|
||||
coValueEntry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
}
|
||||
}, CO_VALUE_LOADING_TIMEOUT);
|
||||
await coValueEntry.state.waitForPeer(peer.id);
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
markAsNotFoundInPeer(peerId: PeerID) {
|
||||
this.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -352,16 +403,3 @@ function createResolvablePromise<T>() {
|
||||
function sleep(ms: number) {
|
||||
return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
}
|
||||
|
||||
function getPeersWithoutErrors(peers: PeerState[], coValueId: RawCoID) {
|
||||
return peers.filter((p) => {
|
||||
if (p.erroredCoValues.has(coValueId)) {
|
||||
console.error(
|
||||
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
@@ -136,7 +136,6 @@ export class RawGroup<
|
||||
loadAllChildGroups() {
|
||||
const requests: Promise<unknown>[] = [];
|
||||
const store = this.core.node.coValuesStore;
|
||||
const peers = this.core.node.syncManager.getServerAndStoragePeers();
|
||||
|
||||
for (const key of this.keys()) {
|
||||
if (!isChildGroupReference(key)) {
|
||||
@@ -146,14 +145,10 @@ export class RawGroup<
|
||||
const id = getChildGroupId(key);
|
||||
const child = store.get(id);
|
||||
|
||||
if (
|
||||
child.state.type === "unknown" ||
|
||||
child.state.type === "unavailable"
|
||||
) {
|
||||
child.loadFromPeers(peers).catch(() => {
|
||||
console.error(`Failed to load child group ${id}`);
|
||||
});
|
||||
}
|
||||
// NOTE the same code invoked form another end (vs entry.load...)
|
||||
this.core.node.load(id).catch(() => {
|
||||
console.error(`Failed to load child group ${id}`);
|
||||
});
|
||||
|
||||
requests.push(
|
||||
child.getCoValue().then((coValue) => {
|
||||
|
||||
5
packages/cojson/src/config.ts
Normal file
5
packages/cojson/src/config.ts
Normal file
@@ -0,0 +1,5 @@
|
||||
export const config = {
|
||||
// Peers can use older messaging system
|
||||
HYBRID_MESSAGING_MODE: true,
|
||||
TRACE_SYNC_MESSAGES: true,
|
||||
};
|
||||
@@ -1,4 +1,5 @@
|
||||
import { base64URLtoBytes, bytesToBase64url } from "./base64url.js";
|
||||
import type { AnyRawCoValue, CoID } from "./coValue.js";
|
||||
import { type RawCoValue } from "./coValue.js";
|
||||
import {
|
||||
CoValueCore,
|
||||
@@ -6,23 +7,35 @@ import {
|
||||
MAX_RECOMMENDED_TX_SIZE,
|
||||
idforHeader,
|
||||
} from "./coValueCore.js";
|
||||
import { ControlledAgent, RawControlledAccount } from "./coValues/account.js";
|
||||
import type {
|
||||
AccountMeta,
|
||||
RawAccountID,
|
||||
RawAccountMigration,
|
||||
} from "./coValues/account.js";
|
||||
import {
|
||||
ControlledAgent,
|
||||
RawAccount,
|
||||
RawControlledAccount,
|
||||
RawProfile,
|
||||
accountHeaderForInitialAgentSecret,
|
||||
} from "./coValues/account.js";
|
||||
import { RawCoList } from "./coValues/coList.js";
|
||||
import { RawCoMap } from "./coValues/coMap.js";
|
||||
import type {
|
||||
BinaryCoStreamMeta,
|
||||
BinaryStreamInfo,
|
||||
} from "./coValues/coStream.js";
|
||||
import { RawBinaryCoStream, RawCoStream } from "./coValues/coStream.js";
|
||||
import type { Everyone, InviteSecret } from "./coValues/group.js";
|
||||
import { EVERYONE, RawGroup } from "./coValues/group.js";
|
||||
import type { Everyone } from "./coValues/group.js";
|
||||
import type { AgentSecret } from "./crypto/crypto.js";
|
||||
import {
|
||||
CryptoProvider,
|
||||
StreamingHash,
|
||||
secretSeedLength,
|
||||
shortHashLength,
|
||||
} from "./crypto/crypto.js";
|
||||
import type { AgentID, SessionID } from "./ids.js";
|
||||
import {
|
||||
getGroupDependentKey,
|
||||
getGroupDependentKeyList,
|
||||
@@ -31,46 +44,29 @@ import {
|
||||
rawCoIDtoBytes,
|
||||
} from "./ids.js";
|
||||
import { Stringified, parseJSON } from "./jsonStringify.js";
|
||||
import { LocalNode } from "./localNode.js";
|
||||
import type { JsonValue } from "./jsonValue.js";
|
||||
import {
|
||||
IncomingSyncStream,
|
||||
LocalNode,
|
||||
OutgoingSyncQueue,
|
||||
} from "./localNode.js";
|
||||
import type * as Media from "./media.js";
|
||||
import { emptyDataMessage, unknownDataMessage } from "./peer/PeerOperations.js";
|
||||
import type { Peer } from "./peer/index.js";
|
||||
import type { Role } from "./permissions.js";
|
||||
import { getPriorityFromHeader } from "./priority.js";
|
||||
import { FileSystem } from "./storage/FileSystem.js";
|
||||
import { BlockFilename, LSMStorage, WalFilename } from "./storage/index.js";
|
||||
import { Channel, connectedPeers } from "./streamUtils.js";
|
||||
import { DisconnectedError, PingTimeoutError } from "./sync.js";
|
||||
import type { SyncMessage } from "./sync/index.js";
|
||||
import { emptyKnownState } from "./sync/index.js";
|
||||
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
|
||||
import { expectGroup } from "./typeUtils/expectGroup.js";
|
||||
import { isAccountID } from "./typeUtils/isAccountID.js";
|
||||
|
||||
import type { AnyRawCoValue, CoID } from "./coValue.js";
|
||||
import type {
|
||||
AccountMeta,
|
||||
RawAccountID,
|
||||
RawAccountMigration,
|
||||
} from "./coValues/account.js";
|
||||
import type {
|
||||
BinaryCoStreamMeta,
|
||||
BinaryStreamInfo,
|
||||
} from "./coValues/coStream.js";
|
||||
import type { InviteSecret } from "./coValues/group.js";
|
||||
import type { AgentSecret } from "./crypto/crypto.js";
|
||||
import type { AgentID, SessionID } from "./ids.js";
|
||||
import type { JsonValue } from "./jsonValue.js";
|
||||
import type * as Media from "./media.js";
|
||||
import type {
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
Peer,
|
||||
SyncMessage,
|
||||
} from "./sync.js";
|
||||
import {
|
||||
DisconnectedError,
|
||||
PingTimeoutError,
|
||||
emptyKnownState,
|
||||
} from "./sync.js";
|
||||
|
||||
type Value = JsonValue | AnyRawCoValue;
|
||||
|
||||
import { getPriorityFromHeader } from "./priority.js";
|
||||
import { FileSystem } from "./storage/FileSystem.js";
|
||||
import { BlockFilename, LSMStorage, WalFilename } from "./storage/index.js";
|
||||
|
||||
/** @hidden */
|
||||
export const cojsonInternals = {
|
||||
connectedPeers,
|
||||
@@ -129,6 +125,8 @@ export {
|
||||
isRawCoID,
|
||||
LSMStorage,
|
||||
emptyKnownState,
|
||||
emptyDataMessage,
|
||||
unknownDataMessage,
|
||||
};
|
||||
|
||||
export type {
|
||||
@@ -146,12 +144,15 @@ export type {
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-namespace
|
||||
export namespace CojsonInternalTypes {
|
||||
export type CoValueKnownState = import("./sync.js").CoValueKnownState;
|
||||
export type DoneMessage = import("./sync.js").DoneMessage;
|
||||
export type KnownStateMessage = import("./sync.js").KnownStateMessage;
|
||||
export type LoadMessage = import("./sync.js").LoadMessage;
|
||||
export type NewContentMessage = import("./sync.js").NewContentMessage;
|
||||
export type SessionNewContent = import("./sync.js").SessionNewContent;
|
||||
export type KnownStateMessage = import("./sync/index.js").KnownStateMessage;
|
||||
export type CoValueKnownState = import("./sync/index.js").CoValueKnownState;
|
||||
export type CoValueContent = import("./sync/index.js").CoValueContent;
|
||||
export type NewContentMessage = import("./sync/index.js").NewContentMessage;
|
||||
export type PullMessage = import("./sync/index.js").PullMessage;
|
||||
export type PushMessage = import("./sync/index.js").PushMessage;
|
||||
export type DataMessage = import("./sync/index.js").DataMessage;
|
||||
export type AckMessage = import("./sync/index.js").AckMessage;
|
||||
export type SessionNewContent = import("./coValueCore.js").SessionNewContent;
|
||||
export type CoValueHeader = import("./coValueCore.js").CoValueHeader;
|
||||
export type Transaction = import("./coValueCore.js").Transaction;
|
||||
export type TransactionID = import("./ids.js").TransactionID;
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
import { Result, ResultAsync, err, ok, okAsync } from "neverthrow";
|
||||
import { Result, err, ok } from "neverthrow";
|
||||
import { CoValuesStore } from "./CoValuesStore.js";
|
||||
import { CoID } from "./coValue.js";
|
||||
import { RawCoValue } from "./coValue.js";
|
||||
import { CoID, RawCoValue } from "./coValue.js";
|
||||
import {
|
||||
CoValueCore,
|
||||
CoValueHeader,
|
||||
@@ -25,11 +24,23 @@ import {
|
||||
RawGroup,
|
||||
secretSeedFromInviteSecret,
|
||||
} from "./coValues/group.js";
|
||||
import { config } from "./config.js";
|
||||
import { AgentSecret, CryptoProvider } from "./crypto/crypto.js";
|
||||
import { AgentID, RawCoID, SessionID, isAgentID } from "./ids.js";
|
||||
import { Peer, PeerID, SyncManager } from "./sync.js";
|
||||
import { Peer, PeerEntry, Peers } from "./peer/index.js";
|
||||
import { transformIncomingMessageFromPeer } from "./peer/transformers.js";
|
||||
import { DisconnectedError, PingTimeoutError, SyncManager } from "./sync.js";
|
||||
import { SyncMessage, emptyKnownState } from "./sync/types.js";
|
||||
import { expectGroup } from "./typeUtils/expectGroup.js";
|
||||
|
||||
export type IncomingSyncStream = AsyncIterable<
|
||||
SyncMessage | DisconnectedError | PingTimeoutError
|
||||
>;
|
||||
export type OutgoingSyncQueue = {
|
||||
push: (msg: SyncMessage) => Promise<unknown>;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
/** A `LocalNode` represents a local view of a set of loaded `CoValue`s, from the perspective of a particular account (or primitive cryptographic agent).
|
||||
|
||||
A `LocalNode` can have peers that it syncs to, for example some form of local persistence, or a sync server, such as `cloud.jazz.tools` (Jazz Cloud).
|
||||
@@ -42,6 +53,8 @@ const { localNode } = useJazz();
|
||||
```
|
||||
*/
|
||||
export class LocalNode {
|
||||
static peers = new Peers();
|
||||
|
||||
/** @internal */
|
||||
crypto: CryptoProvider;
|
||||
/** @internal */
|
||||
@@ -52,7 +65,6 @@ export class LocalNode {
|
||||
currentSessionID: SessionID;
|
||||
/** @category 3. Low-level */
|
||||
syncManager = new SyncManager(this);
|
||||
|
||||
crashed: Error | undefined = undefined;
|
||||
|
||||
/** @category 3. Low-level */
|
||||
@@ -66,6 +78,70 @@ export class LocalNode {
|
||||
this.crypto = crypto;
|
||||
}
|
||||
|
||||
async processMessages(peer: PeerEntry) {
|
||||
for await (const msg of peer.incoming) {
|
||||
if (msg === "Disconnected") {
|
||||
return;
|
||||
}
|
||||
if (msg === "PingTimeout") {
|
||||
console.error("Ping timeout from peer", peer.id);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
if (config.TRACE_SYNC_MESSAGES) {
|
||||
console.log("🔵 ===>>> Received from", peer.id, msg);
|
||||
}
|
||||
this.syncManager.handleSyncMessage(
|
||||
transformIncomingMessageFromPeer(msg, peer.id),
|
||||
peer,
|
||||
);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Error reading from peer ${peer.id}, handling msg\n\n${JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async addPeer(peerData: Peer) {
|
||||
const peer: PeerEntry = LocalNode.peers.add(peerData);
|
||||
|
||||
if (peer.isServerOrStoragePeer()) {
|
||||
await this.syncManager.initialSync(peer);
|
||||
}
|
||||
|
||||
this.processMessages(peer)
|
||||
.then(() => {
|
||||
if (peerData.crashOnClose) {
|
||||
console.error("Unexepcted close from peer", peerData.id);
|
||||
this.crashed = new Error("Unexpected close from peer");
|
||||
throw new Error("Unexpected close from peer");
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error processing messages from peer", peerData.id, e);
|
||||
if (peerData.crashOnClose) {
|
||||
this.crashed = e;
|
||||
throw new Error(e);
|
||||
}
|
||||
})
|
||||
.finally(() => {
|
||||
const state = LocalNode.peers.get(peerData.id);
|
||||
state?.gracefulShutdown();
|
||||
|
||||
if (peerData.deletePeerStateOnClose) {
|
||||
LocalNode.peers.delete(peer.id);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/** @category 2. Node Creation */
|
||||
static async withNewlyCreatedAccount<Meta extends AccountMeta = AccountMeta>({
|
||||
creationProps,
|
||||
@@ -104,7 +180,7 @@ export class LocalNode {
|
||||
|
||||
if (peersToLoadFrom) {
|
||||
for (const peer of peersToLoadFrom) {
|
||||
nodeWithAccount.syncManager.addPeer(peer);
|
||||
await nodeWithAccount.addPeer(peer);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -141,6 +217,7 @@ export class LocalNode {
|
||||
if (coValueEntry.state.type === "available") {
|
||||
void nodeWithAccount.syncManager.syncCoValue(
|
||||
coValueEntry.state.coValue,
|
||||
emptyKnownState(coValueEntry.id),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -182,12 +259,10 @@ export class LocalNode {
|
||||
);
|
||||
|
||||
for (const peer of peersToLoadFrom) {
|
||||
loadingNode.syncManager.addPeer(peer);
|
||||
await loadingNode.addPeer(peer);
|
||||
}
|
||||
|
||||
const accountPromise = loadingNode.load(accountID);
|
||||
|
||||
const account = await accountPromise;
|
||||
const account = await loadingNode.load<RawAccount>(accountID);
|
||||
|
||||
if (account === "unavailable") {
|
||||
throw new Error("Account unavailable from all peers");
|
||||
@@ -246,36 +321,11 @@ export class LocalNode {
|
||||
const coValue = new CoValueCore(header, this);
|
||||
this.coValuesStore.setAsAvailable(coValue.id, coValue);
|
||||
|
||||
void this.syncManager.syncCoValue(coValue);
|
||||
void this.syncManager.syncCoValue(coValue, emptyKnownState(coValue.id));
|
||||
|
||||
return coValue;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
async loadCoValueCore(
|
||||
id: RawCoID,
|
||||
skipLoadingFromPeer?: PeerID,
|
||||
): Promise<CoValueCore | "unavailable"> {
|
||||
if (this.crashed) {
|
||||
throw new Error("Trying to load CoValue after node has crashed", {
|
||||
cause: this.crashed,
|
||||
});
|
||||
}
|
||||
|
||||
const entry = this.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
|
||||
const peers =
|
||||
this.syncManager.getServerAndStoragePeers(skipLoadingFromPeer);
|
||||
|
||||
await entry.loadFromPeers(peers).catch((e) => {
|
||||
console.error("Error loading from peers", id, e);
|
||||
});
|
||||
}
|
||||
|
||||
return entry.getCoValue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Loads a CoValue's content, syncing from peers as necessary and resolving the returned
|
||||
* promise once a first version has been loaded. See `coValue.subscribe()` and `node.useTelepathicData()`
|
||||
@@ -283,13 +333,34 @@ export class LocalNode {
|
||||
*
|
||||
* @category 3. Low-level
|
||||
*/
|
||||
async load<T extends RawCoValue>(id: CoID<T>): Promise<T | "unavailable"> {
|
||||
const core = await this.loadCoValueCore(id);
|
||||
async load<T extends RawCoValue>(
|
||||
id: RawCoID,
|
||||
returnCore?: false,
|
||||
): Promise<"unavailable" | T>;
|
||||
async load<T extends RawCoValue>(
|
||||
id: RawCoID,
|
||||
returnCore: true,
|
||||
): Promise<"unavailable" | CoValueCore>;
|
||||
async load<T extends RawCoValue>(
|
||||
id: RawCoID,
|
||||
returnCore: boolean = false,
|
||||
): Promise<"unavailable" | CoValueCore | T> {
|
||||
if (this.crashed) {
|
||||
throw new Error("Trying to load CoValue after node has crashed", {
|
||||
cause: this.crashed,
|
||||
});
|
||||
}
|
||||
|
||||
const core = await this.syncManager.loadCoValue(id);
|
||||
|
||||
if (core === "unavailable") {
|
||||
return "unavailable";
|
||||
}
|
||||
|
||||
if (returnCore) {
|
||||
return core;
|
||||
}
|
||||
|
||||
return core.getCurrentContent() as T;
|
||||
}
|
||||
|
||||
@@ -313,7 +384,7 @@ export class LocalNode {
|
||||
|
||||
// console.log("Subscribing to " + id);
|
||||
|
||||
this.load(id)
|
||||
this.load<T>(id)
|
||||
.then((coValue) => {
|
||||
if (stopped) {
|
||||
return;
|
||||
@@ -423,14 +494,7 @@ export class LocalNode {
|
||||
|
||||
/** @internal */
|
||||
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValueCore {
|
||||
const entry = this.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
throw new Error(
|
||||
`${expectation ? expectation + ": " : ""}CoValue ${id} not yet loaded. Current state: ${entry.state.type}`,
|
||||
);
|
||||
}
|
||||
return entry.state.coValue;
|
||||
return this.coValuesStore.expectCoValueLoaded(id, expectation);
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
@@ -530,50 +594,6 @@ export class LocalNode {
|
||||
return (coValue.getCurrentContent() as RawAccount).currentAgentID();
|
||||
}
|
||||
|
||||
resolveAccountAgentAsync(
|
||||
id: RawAccountID | AgentID,
|
||||
expectation?: string,
|
||||
): ResultAsync<AgentID, ResolveAccountAgentError> {
|
||||
if (isAgentID(id)) {
|
||||
return okAsync(id);
|
||||
}
|
||||
|
||||
return ResultAsync.fromPromise(
|
||||
this.loadCoValueCore(id),
|
||||
(e) =>
|
||||
({
|
||||
type: "ErrorLoadingCoValueCore",
|
||||
expectation,
|
||||
id,
|
||||
error: e,
|
||||
}) satisfies LoadCoValueCoreError,
|
||||
).andThen((coValue) => {
|
||||
if (coValue === "unavailable") {
|
||||
return err({
|
||||
type: "AccountUnavailableFromAllPeers" as const,
|
||||
expectation,
|
||||
id,
|
||||
} satisfies AccountUnavailableFromAllPeersError);
|
||||
}
|
||||
|
||||
if (
|
||||
coValue.header.type !== "comap" ||
|
||||
coValue.header.ruleset.type !== "group" ||
|
||||
!coValue.header.meta ||
|
||||
!("type" in coValue.header.meta) ||
|
||||
coValue.header.meta.type !== "account"
|
||||
) {
|
||||
return err({
|
||||
type: "UnexpectedlyNotAccount" as const,
|
||||
expectation,
|
||||
id,
|
||||
} satisfies UnexpectedlyNotAccountError);
|
||||
}
|
||||
|
||||
return (coValue.getCurrentContent() as RawAccount).currentAgentID();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated use Account.createGroup() instead
|
||||
*/
|
||||
@@ -648,7 +668,11 @@ export class LocalNode {
|
||||
new Map(entry.state.coValue.sessionLogs),
|
||||
);
|
||||
|
||||
newNode.coValuesStore.setAsAvailable(coValueID, newCoValue);
|
||||
const newEntry = newNode.coValuesStore.setAsAvailable(
|
||||
coValueID,
|
||||
newCoValue,
|
||||
);
|
||||
newEntry.uploadState.copyFrom(entry.uploadState);
|
||||
|
||||
coValuesToCopy.pop();
|
||||
}
|
||||
@@ -670,7 +694,9 @@ export class LocalNode {
|
||||
}
|
||||
|
||||
gracefulShutdown() {
|
||||
this.syncManager.gracefulShutdown();
|
||||
for (const peer of LocalNode.peers.getAll()) {
|
||||
peer.gracefulShutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,42 +1,33 @@
|
||||
import { PeerKnownStateActions, PeerKnownStates } from "./PeerKnownStates.js";
|
||||
import {
|
||||
PriorityBasedMessageQueue,
|
||||
QueueEntry,
|
||||
} from "./PriorityBasedMessageQueue.js";
|
||||
import { TryAddTransactionsError } from "./coValueCore.js";
|
||||
import { RawCoID } from "./ids.js";
|
||||
import { CO_VALUE_PRIORITY } from "./priority.js";
|
||||
import { Peer, SyncMessage } from "./sync.js";
|
||||
} from "../PriorityBasedMessageQueue.js";
|
||||
import { TryAddTransactionsError } from "../coValueCore.js";
|
||||
import { config } from "../config.js";
|
||||
import { RawCoID } from "../ids.js";
|
||||
import { IncomingSyncStream, OutgoingSyncQueue } from "../localNode.js";
|
||||
import { CO_VALUE_PRIORITY } from "../priority.js";
|
||||
import { SyncMessage } from "../sync/types.js";
|
||||
import { PeerOperations } from "./PeerOperations.js";
|
||||
import { transformOutgoingMessageToPeer } from "./transformers.js";
|
||||
|
||||
export class PeerState {
|
||||
constructor(
|
||||
private peer: Peer,
|
||||
knownStates: PeerKnownStates | undefined,
|
||||
) {
|
||||
this.optimisticKnownStates = knownStates?.clone() ?? new PeerKnownStates();
|
||||
this.knownStates = knownStates?.clone() ?? new PeerKnownStates();
|
||||
}
|
||||
export type PeerID = string;
|
||||
export interface Peer {
|
||||
id: PeerID;
|
||||
incoming: IncomingSyncStream;
|
||||
outgoing: OutgoingSyncQueue;
|
||||
role: "peer" | "server" | "client" | "storage";
|
||||
priority?: number;
|
||||
crashOnClose: boolean;
|
||||
deletePeerStateOnClose?: boolean;
|
||||
}
|
||||
|
||||
/**
|
||||
* Here we to collect all the known states that a given peer has told us about.
|
||||
*
|
||||
* This can be used to safely track the sync state of a coValue in a given peer.
|
||||
*/
|
||||
readonly knownStates: PeerKnownStates;
|
||||
// NOTE Renamed PeerState into PeerEntry
|
||||
export class PeerEntry {
|
||||
private readonly ops: PeerOperations;
|
||||
|
||||
/**
|
||||
* This one collects the known states "optimistically".
|
||||
* We use it to keep track of the content we have sent to a given peer.
|
||||
*
|
||||
* The main difference with knownState is that this is updated when the content is sent to the peer without
|
||||
* waiting for any acknowledgement from the peer.
|
||||
*/
|
||||
readonly optimisticKnownStates: PeerKnownStates;
|
||||
readonly toldKnownState: Set<RawCoID> = new Set();
|
||||
|
||||
dispatchToKnownStates(action: PeerKnownStateActions) {
|
||||
this.knownStates.dispatch(action);
|
||||
this.optimisticKnownStates.dispatch(action);
|
||||
constructor(private peer: Peer) {
|
||||
this.ops = new PeerOperations(this);
|
||||
}
|
||||
|
||||
readonly erroredCoValues: Map<RawCoID, TryAddTransactionsError> = new Map();
|
||||
@@ -57,6 +48,10 @@ export class PeerState {
|
||||
return this.peer.crashOnClose;
|
||||
}
|
||||
|
||||
get send() {
|
||||
return this.ops;
|
||||
}
|
||||
|
||||
shouldRetryUnavailableCoValues() {
|
||||
return this.peer.role === "server";
|
||||
}
|
||||
@@ -96,16 +91,31 @@ export class PeerState {
|
||||
this.processing = false;
|
||||
}
|
||||
|
||||
pushOutgoingMessage(msg: SyncMessage) {
|
||||
async pushOutgoingMessage(msg: SyncMessage) {
|
||||
if (this.closed) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
const promise = this.queue.push(msg);
|
||||
const transformedMessages = transformOutgoingMessageToPeer(msg, this.id);
|
||||
if (config.TRACE_SYNC_MESSAGES) {
|
||||
transformedMessages.map((msg) => {
|
||||
console.log("🟢 <<<=== Sending to peer", this.id, msg);
|
||||
});
|
||||
}
|
||||
|
||||
void this.processQueue();
|
||||
try {
|
||||
return await Promise.all(
|
||||
transformedMessages.map((msg_3) => {
|
||||
const promise = this.queue.push(msg_3);
|
||||
|
||||
return promise;
|
||||
void this.processQueue();
|
||||
|
||||
return promise;
|
||||
}),
|
||||
);
|
||||
} catch (e) {
|
||||
console.error("Error sending to peer", this.id, transformedMessages, e);
|
||||
}
|
||||
}
|
||||
|
||||
get incoming() {
|
||||
159
packages/cojson/src/peer/PeerOperations.ts
Normal file
159
packages/cojson/src/peer/PeerOperations.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { CoValueCore } from "../coValueCore.js";
|
||||
import { RawCoID } from "../ids.js";
|
||||
import {
|
||||
CoValueKnownState,
|
||||
DataMessage,
|
||||
PushMessage,
|
||||
SyncMessage,
|
||||
} from "../sync/types.js";
|
||||
import { PeerEntry } from "./PeerEntry.js";
|
||||
|
||||
export function emptyDataMessage(
|
||||
id: RawCoID,
|
||||
asDependencyOf?: RawCoID,
|
||||
): DataMessage {
|
||||
const message: DataMessage = {
|
||||
id,
|
||||
known: true,
|
||||
header: undefined,
|
||||
action: "data",
|
||||
priority: 0,
|
||||
new: {},
|
||||
};
|
||||
return asDependencyOf ? { ...message, asDependencyOf } : message;
|
||||
}
|
||||
|
||||
export function unknownDataMessage(
|
||||
id: RawCoID,
|
||||
asDependencyOf?: RawCoID,
|
||||
): DataMessage {
|
||||
const message: DataMessage = {
|
||||
id,
|
||||
known: false,
|
||||
header: undefined,
|
||||
action: "data",
|
||||
priority: 0,
|
||||
new: {},
|
||||
};
|
||||
|
||||
return asDependencyOf ? { ...message, asDependencyOf } : message;
|
||||
}
|
||||
|
||||
/**
|
||||
* The PeerOperations class centralizes the sending logic for the atomic synchronization operations:
|
||||
* pull, push, ack, and data, implementing the protocol.
|
||||
*/
|
||||
export class PeerOperations {
|
||||
constructor(private readonly peer: PeerEntry) {}
|
||||
|
||||
async pull({ knownState }: { knownState: CoValueKnownState }) {
|
||||
if (this.peer.closed) return;
|
||||
|
||||
return this.peer.pushOutgoingMessage({
|
||||
...knownState,
|
||||
action: "pull",
|
||||
});
|
||||
}
|
||||
|
||||
async ack({ knownState }: { knownState: CoValueKnownState }) {
|
||||
if (this.peer.closed) return;
|
||||
|
||||
return this.peer.pushOutgoingMessage({
|
||||
...knownState,
|
||||
action: "ack",
|
||||
});
|
||||
}
|
||||
|
||||
async push({
|
||||
peerKnownState,
|
||||
coValue,
|
||||
}: { peerKnownState: CoValueKnownState; coValue: CoValueCore }) {
|
||||
if (this.peer.closed) return;
|
||||
|
||||
return this.sendContent({
|
||||
peerKnownState,
|
||||
coValue,
|
||||
action: "push",
|
||||
});
|
||||
}
|
||||
|
||||
async data({
|
||||
peerKnownState,
|
||||
coValue,
|
||||
dependencies = [],
|
||||
}: {
|
||||
peerKnownState: CoValueKnownState;
|
||||
coValue: CoValueCore | "empty" | "unknown";
|
||||
dependencies?: CoValueCore[];
|
||||
}) {
|
||||
if (this.peer.closed) return;
|
||||
|
||||
if (coValue === "empty") {
|
||||
return this.peer.pushOutgoingMessage(emptyDataMessage(peerKnownState.id));
|
||||
}
|
||||
if (coValue === "unknown") {
|
||||
return this.peer.pushOutgoingMessage(
|
||||
unknownDataMessage(peerKnownState.id),
|
||||
);
|
||||
}
|
||||
|
||||
const sendContentOrEmptyMessage = async (params: SendContentParamsType) => {
|
||||
const sentContentPiecesNumber = await this.sendContent(params);
|
||||
if (!sentContentPiecesNumber) {
|
||||
void this.data({ peerKnownState, coValue: "empty" });
|
||||
}
|
||||
};
|
||||
|
||||
// send dependencies first
|
||||
await Promise.all(
|
||||
dependencies.map((depCoValue) =>
|
||||
sendContentOrEmptyMessage({
|
||||
peerKnownState,
|
||||
coValue: depCoValue,
|
||||
action: "data",
|
||||
asDependencyOf: coValue.id,
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
// Send new content pieces (possibly, in chunks) created after peerKnownState that passed in
|
||||
return sendContentOrEmptyMessage({
|
||||
peerKnownState,
|
||||
coValue,
|
||||
action: "data",
|
||||
});
|
||||
}
|
||||
|
||||
private async sendContent({
|
||||
peerKnownState,
|
||||
coValue,
|
||||
action,
|
||||
asDependencyOf,
|
||||
}: SendContentParamsType): Promise<number> {
|
||||
const newContentPieces = coValue.newContentSince(peerKnownState);
|
||||
|
||||
if (newContentPieces) {
|
||||
for (const [_i, piece] of newContentPieces.entries()) {
|
||||
let msg: SyncMessage;
|
||||
if (action === "data") {
|
||||
msg = { ...piece, action, known: true } as DataMessage;
|
||||
} else {
|
||||
msg = { ...piece, action } as PushMessage;
|
||||
}
|
||||
|
||||
if (asDependencyOf) msg = { ...msg, asDependencyOf };
|
||||
|
||||
void this.peer.pushOutgoingMessage(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return newContentPieces?.length || 0;
|
||||
}
|
||||
}
|
||||
|
||||
type SendContentParamsType = {
|
||||
peerKnownState: CoValueKnownState;
|
||||
coValue: CoValueCore;
|
||||
action: "push" | "data";
|
||||
asDependencyOf?: RawCoID;
|
||||
};
|
||||
75
packages/cojson/src/peer/Peers.ts
Normal file
75
packages/cojson/src/peer/Peers.ts
Normal file
@@ -0,0 +1,75 @@
|
||||
import { RawCoID } from "../ids.js";
|
||||
import { Peer, PeerEntry, PeerID } from "./PeerEntry.js";
|
||||
|
||||
export class Peers {
|
||||
private readonly peers: { [key: PeerID]: PeerEntry } = {};
|
||||
|
||||
add(peerData: Peer) {
|
||||
const prevPeer = this.peers[peerData.id];
|
||||
const peer = new PeerEntry(peerData);
|
||||
this.peers[peerData.id] = peer;
|
||||
|
||||
if (prevPeer && !prevPeer.closed) {
|
||||
prevPeer.gracefulShutdown();
|
||||
}
|
||||
|
||||
return peer;
|
||||
}
|
||||
|
||||
get(id: PeerID): PeerEntry | void {
|
||||
if (this.peers[id]) {
|
||||
return this.peers[id];
|
||||
}
|
||||
}
|
||||
|
||||
getMany(ids: PeerID[]): PeerEntry[] {
|
||||
return this.getAll().filter((peer: PeerEntry) => ids.includes(peer.id));
|
||||
}
|
||||
|
||||
getAll(): PeerEntry[] {
|
||||
return Object.values(this.peers);
|
||||
}
|
||||
|
||||
delete(id: PeerID) {
|
||||
if (this.peers[id]) {
|
||||
delete this.peers[id];
|
||||
}
|
||||
}
|
||||
|
||||
getInPriorityOrder({
|
||||
excludedId,
|
||||
}: { excludedId?: PeerID } = {}): PeerEntry[] {
|
||||
return Object.values(this.peers)
|
||||
.sort((a, b) => {
|
||||
const aPriority = a.priority || 0;
|
||||
const bPriority = b.priority || 0;
|
||||
|
||||
return bPriority - aPriority;
|
||||
})
|
||||
.filter((peer) => (excludedId ? peer.id !== excludedId : true));
|
||||
}
|
||||
|
||||
getServerAndStorage({
|
||||
excludedId,
|
||||
includedId,
|
||||
}: { excludedId?: PeerID; includedId?: PeerID } = {}): PeerEntry[] {
|
||||
return this.getInPriorityOrder({ excludedId }).filter(
|
||||
(peer) =>
|
||||
peer.isServerOrStoragePeer() ||
|
||||
(includedId ? peer.id === includedId : false),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
export function getPeersWithoutErrors(peers: PeerEntry[], coValueId: RawCoID) {
|
||||
return peers.filter((p) => {
|
||||
if (p.erroredCoValues.has(coValueId)) {
|
||||
console.error(
|
||||
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
|
||||
);
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
});
|
||||
}
|
||||
2
packages/cojson/src/peer/index.ts
Normal file
2
packages/cojson/src/peer/index.ts
Normal file
@@ -0,0 +1,2 @@
|
||||
export * from "./PeerEntry.js";
|
||||
export * from "./Peers.js";
|
||||
81
packages/cojson/src/peer/transformers.ts
Normal file
81
packages/cojson/src/peer/transformers.ts
Normal file
@@ -0,0 +1,81 @@
|
||||
import { unknownDataMessage } from "../exports.js";
|
||||
import { SessionID } from "../ids.js";
|
||||
|
||||
import { CoValueContent, SyncMessage } from "../sync/types.js";
|
||||
|
||||
export const transformOutgoingMessageToPeer = (
|
||||
msg: SyncMessage,
|
||||
id: string,
|
||||
): SyncMessage[] => {
|
||||
if (!id.includes("cloud")) {
|
||||
return [msg];
|
||||
}
|
||||
|
||||
const getSessionsObj = (msg: CoValueContent) =>
|
||||
Object.entries(msg.new).reduce<{ [sessionID: SessionID]: number }>(
|
||||
(acc, [session, content]) => {
|
||||
acc[session as SessionID] =
|
||||
content.after + content.newTransactions.length;
|
||||
return acc;
|
||||
},
|
||||
{},
|
||||
);
|
||||
|
||||
switch (msg.action) {
|
||||
case "pull":
|
||||
// load
|
||||
return [{ ...msg, action: "load" }];
|
||||
case "push":
|
||||
// load + content
|
||||
return [
|
||||
{
|
||||
action: "load",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
sessions: getSessionsObj(msg),
|
||||
},
|
||||
{ ...msg, action: "content" },
|
||||
];
|
||||
case "data":
|
||||
if (!msg.known)
|
||||
return [{ action: "known", id: msg.id, header: false, sessions: {} }];
|
||||
// known + content => no response expected
|
||||
return [
|
||||
{
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
sessions: getSessionsObj(msg),
|
||||
},
|
||||
{ ...msg, action: "content" },
|
||||
];
|
||||
case "ack":
|
||||
// known => no response expected
|
||||
return [{ ...msg, action: "known" }];
|
||||
default:
|
||||
return [msg];
|
||||
}
|
||||
};
|
||||
|
||||
export const transformIncomingMessageFromPeer = (
|
||||
msg: SyncMessage,
|
||||
id: string,
|
||||
): SyncMessage => {
|
||||
if (!id.includes("cloud")) {
|
||||
return msg;
|
||||
}
|
||||
|
||||
switch (msg.action) {
|
||||
case "load":
|
||||
return { ...msg, action: "pull" };
|
||||
case "content":
|
||||
return { ...msg, action: "push" };
|
||||
case "known":
|
||||
if (!msg.header) return unknownDataMessage(msg.id);
|
||||
|
||||
if (msg.isCorrection) return { ...msg, action: "pull" };
|
||||
return { ...msg, action: "ack" };
|
||||
default:
|
||||
return msg;
|
||||
}
|
||||
};
|
||||
@@ -1,7 +1,7 @@
|
||||
import { MAX_RECOMMENDED_TX_SIZE } from "../coValueCore.js";
|
||||
import { RawCoID, SessionID } from "../ids.js";
|
||||
import { getPriorityFromHeader } from "../priority.js";
|
||||
import { CoValueKnownState, NewContentMessage } from "../sync.js";
|
||||
import { CoValueKnownState, NewContentMessage } from "../sync/types.js";
|
||||
import { CoValueChunk } from "./index.js";
|
||||
|
||||
export function contentSinceChunk(
|
||||
|
||||
@@ -2,14 +2,10 @@ import { CoID, RawCoValue } from "../coValue.js";
|
||||
import { CoValueHeader, Transaction } from "../coValueCore.js";
|
||||
import { Signature } from "../crypto/crypto.js";
|
||||
import { RawCoID } from "../ids.js";
|
||||
import { IncomingSyncStream, OutgoingSyncQueue } from "../localNode.js";
|
||||
import { Peer } from "../peer/PeerEntry.js";
|
||||
import { connectedPeers } from "../streamUtils.js";
|
||||
import {
|
||||
CoValueKnownState,
|
||||
IncomingSyncStream,
|
||||
NewContentMessage,
|
||||
OutgoingSyncQueue,
|
||||
Peer,
|
||||
} from "../sync.js";
|
||||
import { CoValueKnownState, NewContentMessage } from "../sync/types.js";
|
||||
import {
|
||||
BlockFilename,
|
||||
FileSystem,
|
||||
@@ -73,9 +69,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
if (msg === "Disconnected" || msg === "PingTimeout") {
|
||||
throw new Error("Unexpected Disconnected message");
|
||||
}
|
||||
if (msg.action === "done") {
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg.action === "content") {
|
||||
await this.handleNewContent(msg);
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import { Channel } from "queueueue";
|
||||
import { Peer, PeerID, SyncMessage } from "./sync.js";
|
||||
import { Peer, PeerID } from "./peer/PeerEntry.js";
|
||||
|
||||
import { SyncMessage } from "./sync/types.js";
|
||||
export { Channel } from "queueueue";
|
||||
|
||||
export function connectedPeers(
|
||||
|
||||
@@ -1,669 +1,85 @@
|
||||
import { PeerState } from "./PeerState.js";
|
||||
import { SyncStateSubscriptionManager } from "./SyncStateSubscriptionManager.js";
|
||||
import { CoValueHeader, Transaction } from "./coValueCore.js";
|
||||
import { CoValueCore } from "./coValueCore.js";
|
||||
import { Signature } from "./crypto/crypto.js";
|
||||
import { RawCoID, SessionID } from "./ids.js";
|
||||
import { CoValueEntry } from "./coValueEntry.js";
|
||||
import { RawCoID } from "./ids.js";
|
||||
import { LocalNode } from "./localNode.js";
|
||||
import { CoValuePriority } from "./priority.js";
|
||||
|
||||
export type CoValueKnownState = {
|
||||
id: RawCoID;
|
||||
header: boolean;
|
||||
sessions: { [sessionID: SessionID]: number };
|
||||
};
|
||||
|
||||
export function emptyKnownState(id: RawCoID): CoValueKnownState {
|
||||
return {
|
||||
id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
};
|
||||
}
|
||||
|
||||
export type SyncMessage =
|
||||
| LoadMessage
|
||||
| KnownStateMessage
|
||||
| NewContentMessage
|
||||
| DoneMessage;
|
||||
|
||||
export type LoadMessage = {
|
||||
action: "load";
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type KnownStateMessage = {
|
||||
action: "known";
|
||||
asDependencyOf?: RawCoID;
|
||||
isCorrection?: boolean;
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type NewContentMessage = {
|
||||
action: "content";
|
||||
id: RawCoID;
|
||||
header?: CoValueHeader;
|
||||
priority: CoValuePriority;
|
||||
new: {
|
||||
[sessionID: SessionID]: SessionNewContent;
|
||||
};
|
||||
};
|
||||
|
||||
export type SessionNewContent = {
|
||||
after: number;
|
||||
newTransactions: Transaction[];
|
||||
lastSignature: Signature;
|
||||
};
|
||||
export type DoneMessage = {
|
||||
action: "done";
|
||||
id: RawCoID;
|
||||
};
|
||||
|
||||
export type PeerID = string;
|
||||
import { PeerEntry, PeerID } from "./peer/index.js";
|
||||
import { DependencyService } from "./sync/DependencyService.js";
|
||||
import {
|
||||
AckResponseHandler,
|
||||
CoValueKnownState,
|
||||
DataResponseHandler,
|
||||
LoadService,
|
||||
MessageHandlerInterface,
|
||||
PullRequestHandler,
|
||||
PushRequestHandler,
|
||||
SyncMessage,
|
||||
SyncService,
|
||||
} from "./sync/index.js";
|
||||
|
||||
export type DisconnectedError = "Disconnected";
|
||||
|
||||
export type PingTimeoutError = "PingTimeout";
|
||||
|
||||
export type IncomingSyncStream = AsyncIterable<
|
||||
SyncMessage | DisconnectedError | PingTimeoutError
|
||||
>;
|
||||
export type OutgoingSyncQueue = {
|
||||
push: (msg: SyncMessage) => Promise<unknown>;
|
||||
close: () => void;
|
||||
};
|
||||
|
||||
export interface Peer {
|
||||
id: PeerID;
|
||||
incoming: IncomingSyncStream;
|
||||
outgoing: OutgoingSyncQueue;
|
||||
role: "peer" | "server" | "client" | "storage";
|
||||
priority?: number;
|
||||
crashOnClose: boolean;
|
||||
deletePeerStateOnClose?: boolean;
|
||||
}
|
||||
|
||||
export function combinedKnownStates(
|
||||
stateA: CoValueKnownState,
|
||||
stateB: CoValueKnownState,
|
||||
): CoValueKnownState {
|
||||
const sessionStates: CoValueKnownState["sessions"] = {};
|
||||
|
||||
const allSessions = new Set([
|
||||
...Object.keys(stateA.sessions),
|
||||
...Object.keys(stateB.sessions),
|
||||
] as SessionID[]);
|
||||
|
||||
for (const sessionID of allSessions) {
|
||||
const stateAValue = stateA.sessions[sessionID];
|
||||
const stateBValue = stateB.sessions[sessionID];
|
||||
|
||||
sessionStates[sessionID] = Math.max(stateAValue || 0, stateBValue || 0);
|
||||
}
|
||||
|
||||
return {
|
||||
id: stateA.id,
|
||||
header: stateA.header || stateB.header,
|
||||
sessions: sessionStates,
|
||||
};
|
||||
}
|
||||
|
||||
export class SyncManager {
|
||||
peers: { [key: PeerID]: PeerState } = {};
|
||||
local: LocalNode;
|
||||
|
||||
requestedSyncs: {
|
||||
[id: RawCoID]:
|
||||
| { done: Promise<void>; nRequestsThisTick: number }
|
||||
| undefined;
|
||||
} = {};
|
||||
|
||||
private readonly loadService: LoadService;
|
||||
private readonly syncService: SyncService;
|
||||
private readonly pullRequestHandler: PullRequestHandler;
|
||||
private readonly pushRequestHandler: PushRequestHandler;
|
||||
private readonly ackResponseHandler: AckResponseHandler;
|
||||
private readonly dataResponseHandler: DataResponseHandler;
|
||||
private readonly dependencyService: DependencyService;
|
||||
|
||||
constructor(local: LocalNode) {
|
||||
this.local = local;
|
||||
this.syncStateSubscriptionManager = new SyncStateSubscriptionManager(this);
|
||||
}
|
||||
|
||||
syncStateSubscriptionManager: SyncStateSubscriptionManager;
|
||||
|
||||
peersInPriorityOrder(): PeerState[] {
|
||||
return Object.values(this.peers).sort((a, b) => {
|
||||
const aPriority = a.priority || 0;
|
||||
const bPriority = b.priority || 0;
|
||||
|
||||
return bPriority - aPriority;
|
||||
});
|
||||
}
|
||||
|
||||
getPeers(): PeerState[] {
|
||||
return Object.values(this.peers);
|
||||
}
|
||||
|
||||
getServerAndStoragePeers(excludePeerId?: PeerID): PeerState[] {
|
||||
return this.peersInPriorityOrder().filter(
|
||||
(peer) => peer.isServerOrStoragePeer() && peer.id !== excludePeerId,
|
||||
);
|
||||
}
|
||||
|
||||
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
|
||||
if (peer.erroredCoValues.has(msg.id)) {
|
||||
console.error(
|
||||
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
// TODO: validate
|
||||
switch (msg.action) {
|
||||
case "load":
|
||||
return await this.handleLoad(msg, peer);
|
||||
case "known":
|
||||
if (msg.isCorrection) {
|
||||
return await this.handleCorrection(msg, peer);
|
||||
} else {
|
||||
return await this.handleKnownState(msg, peer);
|
||||
}
|
||||
case "content":
|
||||
// await new Promise<void>((resolve) => setTimeout(resolve, 0));
|
||||
return await this.handleNewContent(msg, peer);
|
||||
case "done":
|
||||
return await this.handleUnsubscribe(msg);
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown message type ${(msg as { action: "string" }).action}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async subscribeToIncludingDependencies(id: RawCoID, peer: PeerState) {
|
||||
const entry = this.local.coValuesStore.get(id);
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
entry.loadFromPeers([peer]).catch((e: unknown) => {
|
||||
console.error("Error sending load", e);
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
const coValue = entry.state.coValue;
|
||||
|
||||
for (const id of coValue.getDependedOnCoValues()) {
|
||||
await this.subscribeToIncludingDependencies(id, peer);
|
||||
}
|
||||
|
||||
if (!peer.toldKnownState.has(id)) {
|
||||
peer.toldKnownState.add(id);
|
||||
this.trySendToPeer(peer, {
|
||||
action: "load",
|
||||
...coValue.knownState(),
|
||||
}).catch((e: unknown) => {
|
||||
console.error("Error sending load", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async tellUntoldKnownStateIncludingDependencies(
|
||||
id: RawCoID,
|
||||
peer: PeerState,
|
||||
asDependencyOf?: RawCoID,
|
||||
) {
|
||||
const coValue = this.local.expectCoValueLoaded(id);
|
||||
|
||||
await Promise.all(
|
||||
coValue
|
||||
.getDependedOnCoValues()
|
||||
.map((dependentCoID) =>
|
||||
this.tellUntoldKnownStateIncludingDependencies(
|
||||
dependentCoID,
|
||||
peer,
|
||||
asDependencyOf || id,
|
||||
),
|
||||
),
|
||||
);
|
||||
|
||||
if (!peer.toldKnownState.has(id)) {
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
asDependencyOf,
|
||||
...coValue.knownState(),
|
||||
}).catch((e: unknown) => {
|
||||
console.error("Error sending known state", e);
|
||||
});
|
||||
|
||||
peer.toldKnownState.add(id);
|
||||
}
|
||||
}
|
||||
|
||||
async sendNewContentIncludingDependencies(id: RawCoID, peer: PeerState) {
|
||||
const coValue = this.local.expectCoValueLoaded(id);
|
||||
|
||||
await Promise.all(
|
||||
coValue
|
||||
.getDependedOnCoValues()
|
||||
.map((id) => this.sendNewContentIncludingDependencies(id, peer)),
|
||||
);
|
||||
|
||||
const newContentPieces = coValue.newContentSince(
|
||||
peer.optimisticKnownStates.get(id),
|
||||
);
|
||||
|
||||
if (newContentPieces) {
|
||||
const optimisticKnownStateBefore =
|
||||
peer.optimisticKnownStates.get(id) || emptyKnownState(id);
|
||||
|
||||
const sendPieces = async () => {
|
||||
let lastYield = performance.now();
|
||||
for (const [_i, piece] of newContentPieces.entries()) {
|
||||
// console.log(
|
||||
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${
|
||||
// newContentPieces.length
|
||||
// } header: ${!!piece.header}`,
|
||||
// // Object.values(piece.new).map((s) => s.newTransactions)
|
||||
// );
|
||||
|
||||
this.trySendToPeer(peer, piece).catch((e: unknown) => {
|
||||
console.error("Error sending content piece", e);
|
||||
});
|
||||
|
||||
if (performance.now() - lastYield > 10) {
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, 0);
|
||||
});
|
||||
lastYield = performance.now();
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
sendPieces().catch((e) => {
|
||||
console.error("Error sending new content piece, retrying", e);
|
||||
peer.optimisticKnownStates.dispatch({
|
||||
type: "SET",
|
||||
id,
|
||||
value: optimisticKnownStateBefore ?? emptyKnownState(id),
|
||||
});
|
||||
return this.sendNewContentIncludingDependencies(id, peer);
|
||||
});
|
||||
|
||||
peer.optimisticKnownStates.dispatch({
|
||||
type: "COMBINE_WITH",
|
||||
id,
|
||||
value: coValue.knownState(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
addPeer(peer: Peer) {
|
||||
const prevPeer = this.peers[peer.id];
|
||||
const peerState = new PeerState(peer, prevPeer?.knownStates);
|
||||
this.peers[peer.id] = peerState;
|
||||
|
||||
if (prevPeer && !prevPeer.closed) {
|
||||
prevPeer.gracefulShutdown();
|
||||
}
|
||||
|
||||
const unsubscribeFromKnownStatesUpdates = peerState.knownStates.subscribe(
|
||||
(id) => {
|
||||
this.syncStateSubscriptionManager.triggerUpdate(peer.id, id);
|
||||
this.syncService = new SyncService(
|
||||
// onPushContent callback
|
||||
({ entry, peerId }: { entry: CoValueEntry; peerId: PeerID }) => {
|
||||
entry.uploadState.setPendingForPeer(peerId);
|
||||
},
|
||||
);
|
||||
|
||||
if (peerState.isServerOrStoragePeer()) {
|
||||
const initialSync = async () => {
|
||||
for (const entry of this.local.coValuesStore.getValues()) {
|
||||
await this.subscribeToIncludingDependencies(entry.id, peerState);
|
||||
this.loadService = new LoadService();
|
||||
this.dependencyService = new DependencyService(this, this.loadService);
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
await this.sendNewContentIncludingDependencies(entry.id, peerState);
|
||||
}
|
||||
this.pullRequestHandler = new PullRequestHandler(this.loadService);
|
||||
this.pushRequestHandler = new PushRequestHandler(
|
||||
this.syncService,
|
||||
// The reason for this ugly callback here is to avoid having the local node as a dependency in the handler,
|
||||
// This should be removed after CoValueCore is decoupled from the local node instance
|
||||
this.dependencyService,
|
||||
);
|
||||
|
||||
if (!peerState.optimisticKnownStates.has(entry.id)) {
|
||||
peerState.optimisticKnownStates.dispatch({
|
||||
type: "SET_AS_EMPTY",
|
||||
id: entry.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
};
|
||||
void initialSync();
|
||||
}
|
||||
this.ackResponseHandler = new AckResponseHandler(
|
||||
// onPushContentAcknowledged callback
|
||||
({ entry, peerId }: { entry: CoValueEntry; peerId: PeerID }) => {
|
||||
entry.uploadState.setCompletedForPeer(peerId);
|
||||
},
|
||||
);
|
||||
|
||||
const processMessages = async () => {
|
||||
for await (const msg of peerState.incoming) {
|
||||
if (msg === "Disconnected") {
|
||||
return;
|
||||
}
|
||||
if (msg === "PingTimeout") {
|
||||
console.error("Ping timeout from peer", peer.id);
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await this.handleSyncMessage(msg, peerState);
|
||||
} catch (e) {
|
||||
throw new Error(
|
||||
`Error reading from peer ${
|
||||
peer.id
|
||||
}, handling msg\n\n${JSON.stringify(msg, (k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
processMessages()
|
||||
.then(() => {
|
||||
if (peer.crashOnClose) {
|
||||
console.error("Unexepcted close from peer", peer.id);
|
||||
this.local.crashed = new Error("Unexpected close from peer");
|
||||
throw new Error("Unexpected close from peer");
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error processing messages from peer", peer.id, e);
|
||||
if (peer.crashOnClose) {
|
||||
this.local.crashed = e;
|
||||
throw new Error(e);
|
||||
}
|
||||
})
|
||||
.finally(() => {
|
||||
const state = this.peers[peer.id];
|
||||
state?.gracefulShutdown();
|
||||
unsubscribeFromKnownStatesUpdates();
|
||||
|
||||
if (peer.deletePeerStateOnClose) {
|
||||
delete this.peers[peer.id];
|
||||
}
|
||||
});
|
||||
this.dataResponseHandler = new DataResponseHandler(
|
||||
this.dependencyService,
|
||||
this.syncService,
|
||||
);
|
||||
}
|
||||
|
||||
trySendToPeer(peer: PeerState, msg: SyncMessage) {
|
||||
return peer.pushOutgoingMessage(msg);
|
||||
async initialSync(peer: PeerEntry) {
|
||||
return this.syncService.initialSync(peer, this.local.coValuesStore);
|
||||
}
|
||||
|
||||
async handleLoad(msg: LoadMessage, peer: PeerState) {
|
||||
peer.dispatchToKnownStates({
|
||||
type: "SET",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
|
||||
const eligiblePeers = this.getServerAndStoragePeers(peer.id);
|
||||
|
||||
if (eligiblePeers.length === 0) {
|
||||
// If the load request contains a header or any session data
|
||||
// and we don't have any eligible peers to load the coValue from
|
||||
// we try to load it from the sender because it is the only place
|
||||
// where we can get informations about the coValue
|
||||
if (msg.header || Object.keys(msg.sessions).length > 0) {
|
||||
entry.loadFromPeers([peer]).catch((e) => {
|
||||
console.error("Error loading coValue in handleLoad", e);
|
||||
});
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
this.local.loadCoValueCore(msg.id, peer.id).catch((e) => {
|
||||
console.error("Error loading coValue in handleLoad", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.state.type === "loading") {
|
||||
// We need to return from handleLoad immediately and wait for the CoValue to be loaded
|
||||
// in a new task, otherwise we might block further incoming content messages that would
|
||||
// resolve the CoValue as available. This can happen when we receive fresh
|
||||
// content from a client, but we are a server with our own upstream server(s)
|
||||
entry
|
||||
.getCoValue()
|
||||
.then(async (value) => {
|
||||
if (value === "unavailable") {
|
||||
peer.dispatchToKnownStates({
|
||||
type: "SET",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
peer.toldKnownState.add(msg.id);
|
||||
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
}).catch((e) => {
|
||||
console.error("Error sending known state back", e);
|
||||
});
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
|
||||
await this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error loading coValue in handleLoad loading state", e);
|
||||
});
|
||||
}
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
|
||||
await this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
}
|
||||
|
||||
async handleKnownState(msg: KnownStateMessage, peer: PeerState) {
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "COMBINE_WITH",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
|
||||
if (entry.state.type === "unknown" || entry.state.type === "unavailable") {
|
||||
if (msg.asDependencyOf) {
|
||||
const dependencyEntry = this.local.coValuesStore.get(
|
||||
msg.asDependencyOf,
|
||||
);
|
||||
|
||||
if (
|
||||
dependencyEntry.state.type === "available" ||
|
||||
dependencyEntry.state.type === "loading"
|
||||
) {
|
||||
this.local
|
||||
.loadCoValueCore(
|
||||
msg.id,
|
||||
peer.role === "storage" ? undefined : peer.id,
|
||||
)
|
||||
.catch((e) => {
|
||||
console.error(
|
||||
`Error loading coValue ${msg.id} to create loading state, as dependency of ${msg.asDependencyOf}`,
|
||||
e,
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// The header is a boolean value that tells us if the other peer do have information about the header.
|
||||
// If it's false in this point it means that the coValue is unavailable on the other peer.
|
||||
if (entry.state.type !== "available") {
|
||||
const availableOnPeer = peer.optimisticKnownStates.get(msg.id)?.header;
|
||||
|
||||
if (!availableOnPeer) {
|
||||
entry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (entry.state.type === "available") {
|
||||
await this.tellUntoldKnownStateIncludingDependencies(msg.id, peer);
|
||||
await this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
}
|
||||
|
||||
async handleNewContent(msg: NewContentMessage, peer: PeerState) {
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
let coValue: CoValueCore;
|
||||
|
||||
if (entry.state.type !== "available") {
|
||||
if (!msg.header) {
|
||||
console.error("Expected header to be sent in first message");
|
||||
return;
|
||||
}
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "UPDATE_HEADER",
|
||||
id: msg.id,
|
||||
header: true,
|
||||
});
|
||||
|
||||
coValue = new CoValueCore(msg.header, this.local);
|
||||
|
||||
entry.dispatch({
|
||||
type: "available",
|
||||
coValue,
|
||||
});
|
||||
} else {
|
||||
coValue = entry.state.coValue;
|
||||
}
|
||||
|
||||
let invalidStateAssumed = false;
|
||||
|
||||
for (const [sessionID, newContentForSession] of Object.entries(msg.new) as [
|
||||
SessionID,
|
||||
SessionNewContent,
|
||||
][]) {
|
||||
const ourKnownTxIdx =
|
||||
coValue.sessionLogs.get(sessionID)?.transactions.length;
|
||||
const theirFirstNewTxIdx = newContentForSession.after;
|
||||
|
||||
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
|
||||
invalidStateAssumed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const alreadyKnownOffset = ourKnownTxIdx
|
||||
? ourKnownTxIdx - theirFirstNewTxIdx
|
||||
: 0;
|
||||
|
||||
const newTransactions =
|
||||
newContentForSession.newTransactions.slice(alreadyKnownOffset);
|
||||
|
||||
if (newTransactions.length === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const before = performance.now();
|
||||
// eslint-disable-next-line neverthrow/must-use-result
|
||||
const result = coValue.tryAddTransactions(
|
||||
sessionID,
|
||||
newTransactions,
|
||||
undefined,
|
||||
newContentForSession.lastSignature,
|
||||
);
|
||||
const after = performance.now();
|
||||
if (after - before > 80) {
|
||||
const totalTxLength = newTransactions
|
||||
.map((t) =>
|
||||
t.privacy === "private"
|
||||
? t.encryptedChanges.length
|
||||
: t.changes.length,
|
||||
)
|
||||
.reduce((a, b) => a + b, 0);
|
||||
console.log(
|
||||
`Adding incoming transactions took ${(after - before).toFixed(
|
||||
2,
|
||||
)}ms for ${totalTxLength} bytes = bandwidth: ${(
|
||||
(1000 * totalTxLength) / (after - before) / (1024 * 1024)
|
||||
).toFixed(2)} MB/s`,
|
||||
);
|
||||
}
|
||||
|
||||
// const theirTotalnTxs = Object.values(
|
||||
// peer.optimisticKnownStates[msg.id]?.sessions || {},
|
||||
// ).reduce((sum, nTxs) => sum + nTxs, 0);
|
||||
// const ourTotalnTxs = [...coValue.sessionLogs.values()].reduce(
|
||||
// (sum, session) => sum + session.transactions.length,
|
||||
// 0,
|
||||
// );
|
||||
|
||||
if (result.isErr()) {
|
||||
console.error(
|
||||
"Failed to add transactions from",
|
||||
peer.id,
|
||||
result.error,
|
||||
msg.id,
|
||||
newTransactions.length + " new transactions",
|
||||
"after: " + newContentForSession.after,
|
||||
"our last known tx idx initially: " + ourKnownTxIdx,
|
||||
"our last known tx idx now: " +
|
||||
coValue.sessionLogs.get(sessionID)?.transactions.length,
|
||||
);
|
||||
peer.erroredCoValues.set(msg.id, result.error);
|
||||
continue;
|
||||
}
|
||||
|
||||
peer.dispatchToKnownStates({
|
||||
type: "UPDATE_SESSION_COUNTER",
|
||||
id: msg.id,
|
||||
sessionId: sessionID,
|
||||
value:
|
||||
newContentForSession.after +
|
||||
newContentForSession.newTransactions.length,
|
||||
});
|
||||
}
|
||||
|
||||
if (invalidStateAssumed) {
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
isCorrection: true,
|
||||
...coValue.knownState(),
|
||||
}).catch((e) => {
|
||||
console.error("Error sending known state correction", e);
|
||||
});
|
||||
} else {
|
||||
/**
|
||||
* We are sending a known state message to the peer to acknowledge the
|
||||
* receipt of the new content.
|
||||
*
|
||||
* This way the sender knows that the content has been received and applied
|
||||
* and can update their peer's knownState accordingly.
|
||||
*/
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
...coValue.knownState(),
|
||||
}).catch((e: unknown) => {
|
||||
console.error("Error sending known state", e);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* We do send a correction/ack message before syncing to give an immediate
|
||||
* response to the peers that are waiting for confirmation that a coValue is
|
||||
* fully synced
|
||||
*/
|
||||
await this.syncCoValue(coValue);
|
||||
}
|
||||
|
||||
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
|
||||
peer.dispatchToKnownStates({
|
||||
type: "SET",
|
||||
id: msg.id,
|
||||
value: knownStateIn(msg),
|
||||
});
|
||||
|
||||
return this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
|
||||
handleUnsubscribe(_msg: DoneMessage) {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
async syncCoValue(coValue: CoValueCore) {
|
||||
async syncCoValue(
|
||||
coValue: CoValueCore,
|
||||
peersKnownState: CoValueKnownState,
|
||||
peers?: PeerEntry[],
|
||||
) {
|
||||
if (this.requestedSyncs[coValue.id]) {
|
||||
this.requestedSyncs[coValue.id]!.nRequestsThisTick++;
|
||||
return this.requestedSyncs[coValue.id]!.done;
|
||||
@@ -671,83 +87,66 @@ export class SyncManager {
|
||||
const done = new Promise<void>((resolve) => {
|
||||
queueMicrotask(async () => {
|
||||
delete this.requestedSyncs[coValue.id];
|
||||
// if (entry.nRequestsThisTick >= 2) {
|
||||
// console.log("Syncing", coValue.id, "for", entry.nRequestsThisTick, "requests");
|
||||
// }
|
||||
await this.actuallySyncCoValue(coValue);
|
||||
const entry = this.local.coValuesStore.get(coValue.id);
|
||||
await this.syncService.syncCoValue(entry, peersKnownState, peers);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
const entry = {
|
||||
|
||||
this.requestedSyncs[coValue.id] = {
|
||||
done,
|
||||
nRequestsThisTick: 1,
|
||||
};
|
||||
this.requestedSyncs[coValue.id] = entry;
|
||||
return done;
|
||||
}
|
||||
}
|
||||
|
||||
async actuallySyncCoValue(coValue: CoValueCore) {
|
||||
// let blockingSince = performance.now();
|
||||
for (const peer of this.peersInPriorityOrder()) {
|
||||
if (peer.closed) continue;
|
||||
if (peer.erroredCoValues.has(coValue.id)) continue;
|
||||
// if (performance.now() - blockingSince > 5) {
|
||||
// await new Promise<void>((resolve) => {
|
||||
// setTimeout(resolve, 0);
|
||||
// });
|
||||
// blockingSince = performance.now();
|
||||
// }
|
||||
if (peer.optimisticKnownStates.has(coValue.id)) {
|
||||
await this.tellUntoldKnownStateIncludingDependencies(coValue.id, peer);
|
||||
await this.sendNewContentIncludingDependencies(coValue.id, peer);
|
||||
} else if (peer.isServerOrStoragePeer()) {
|
||||
await this.subscribeToIncludingDependencies(coValue.id, peer);
|
||||
await this.sendNewContentIncludingDependencies(coValue.id, peer);
|
||||
}
|
||||
}
|
||||
async loadCoValue(id: RawCoID): Promise<CoValueCore | "unavailable"> {
|
||||
const entry = this.local.coValuesStore.get(id);
|
||||
return this.loadService.loadCoValue(entry);
|
||||
}
|
||||
|
||||
for (const peer of this.getPeers()) {
|
||||
this.syncStateSubscriptionManager.triggerUpdate(peer.id, coValue.id);
|
||||
handleSyncMessage(msg: SyncMessage, peer: PeerEntry) {
|
||||
if (peer.erroredCoValues.has(msg.id)) {
|
||||
console.error(
|
||||
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
|
||||
);
|
||||
return;
|
||||
}
|
||||
const entry = this.local.coValuesStore.get(msg.id);
|
||||
|
||||
let handler: MessageHandlerInterface;
|
||||
switch (msg.action) {
|
||||
case "data":
|
||||
handler = this.dataResponseHandler;
|
||||
break;
|
||||
case "push":
|
||||
handler = this.pushRequestHandler;
|
||||
break;
|
||||
case "pull":
|
||||
handler = this.pullRequestHandler;
|
||||
break;
|
||||
case "ack":
|
||||
handler = this.ackResponseHandler;
|
||||
break;
|
||||
default:
|
||||
throw new Error(
|
||||
`Unknown message type ${(msg as unknown as { action: "string" }).action}`,
|
||||
);
|
||||
}
|
||||
return handler.handle({ msg, peer, entry });
|
||||
}
|
||||
|
||||
async waitForUploadIntoPeer(peerId: PeerID, id: RawCoID) {
|
||||
const isAlreadyUploaded =
|
||||
this.syncStateSubscriptionManager.getIsCoValueFullyUploadedIntoPeer(
|
||||
peerId,
|
||||
id,
|
||||
);
|
||||
const entry = this.local.coValuesStore.get(id);
|
||||
if (!entry) {
|
||||
throw new Error(`Unknown coValue ${id}`);
|
||||
}
|
||||
|
||||
if (isAlreadyUploaded) {
|
||||
if (entry.uploadState.isCoValueFullyUploadedIntoPeer(peerId)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return new Promise((resolve) => {
|
||||
const unsubscribe =
|
||||
this.syncStateSubscriptionManager.subscribeToPeerUpdates(
|
||||
peerId,
|
||||
(knownState, syncState) => {
|
||||
if (syncState.isUploaded && knownState.id === id) {
|
||||
resolve(true);
|
||||
unsubscribe?.();
|
||||
}
|
||||
},
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
gracefulShutdown() {
|
||||
for (const peer of Object.values(this.peers)) {
|
||||
peer.gracefulShutdown();
|
||||
}
|
||||
return entry.uploadState.waitForPeer(peerId);
|
||||
}
|
||||
}
|
||||
|
||||
function knownStateIn(msg: LoadMessage | KnownStateMessage) {
|
||||
return {
|
||||
id: msg.id,
|
||||
header: msg.header,
|
||||
sessions: msg.sessions,
|
||||
};
|
||||
}
|
||||
|
||||
29
packages/cojson/src/sync/AbstractMessageHandler.ts
Normal file
29
packages/cojson/src/sync/AbstractMessageHandler.ts
Normal file
@@ -0,0 +1,29 @@
|
||||
import { ParallelQueueRunner } from "../utils/parallelQueueRunner.js";
|
||||
import { MessageHandlerInput, MessageHandlerInterface } from "./types.js";
|
||||
|
||||
export abstract class AbstractMessageHandler
|
||||
implements MessageHandlerInterface
|
||||
{
|
||||
private readonly queuesRunner = new ParallelQueueRunner();
|
||||
|
||||
handle({ msg, peer, entry }: MessageHandlerInput) {
|
||||
this.queuesRunner.defferPer(msg.id, () =>
|
||||
this.routeMessage({ msg, peer, entry }),
|
||||
);
|
||||
}
|
||||
|
||||
protected routeMessage({ msg, peer, entry }: MessageHandlerInput) {
|
||||
switch (entry.state.type) {
|
||||
case "available":
|
||||
return this.handleAvailable({ msg, peer, entry });
|
||||
case "loading":
|
||||
return this.handleLoading({ msg, peer, entry });
|
||||
case "unknown":
|
||||
case "unavailable":
|
||||
return this.handleUnknown({ msg, peer, entry });
|
||||
}
|
||||
}
|
||||
abstract handleAvailable(input: MessageHandlerInput): Promise<unknown>;
|
||||
abstract handleLoading(input: MessageHandlerInput): Promise<unknown>;
|
||||
abstract handleUnknown(input: MessageHandlerInput): Promise<unknown>;
|
||||
}
|
||||
46
packages/cojson/src/sync/AckResponseHandler.ts
Normal file
46
packages/cojson/src/sync/AckResponseHandler.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import { CoValueEntry } from "../coValueEntry.js";
|
||||
import { PeerEntry, PeerID } from "../peer/PeerEntry.js";
|
||||
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
|
||||
import { AckMessage } from "./types.js";
|
||||
|
||||
export type AckMessageHandlerInput = {
|
||||
msg: AckMessage;
|
||||
peer: PeerEntry;
|
||||
entry: CoValueEntry;
|
||||
};
|
||||
|
||||
export class AckResponseHandler extends AbstractMessageHandler {
|
||||
constructor(
|
||||
private onPushContentAcknowledged?: ({
|
||||
entry,
|
||||
peerId,
|
||||
}: { entry: CoValueEntry; peerId: PeerID }) => void,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async handleAvailable(input: AckMessageHandlerInput) {
|
||||
if (this.onPushContentAcknowledged) {
|
||||
this.onPushContentAcknowledged({
|
||||
entry: input.entry,
|
||||
peerId: input.peer.id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async handleLoading(input: AckMessageHandlerInput) {
|
||||
console.error(
|
||||
"Unexpected loading state. Ack message is a response to a push request and should not be received for loading coValue.",
|
||||
input.msg.id,
|
||||
input.peer.id,
|
||||
);
|
||||
}
|
||||
|
||||
async handleUnknown(input: AckMessageHandlerInput) {
|
||||
console.error(
|
||||
"Unexpected unavailable state. Ack message is a response to a push request and should not be received for unavailable coValue.",
|
||||
input.msg.id,
|
||||
input.peer.id,
|
||||
);
|
||||
}
|
||||
}
|
||||
129
packages/cojson/src/sync/DataResponseHandler.ts
Normal file
129
packages/cojson/src/sync/DataResponseHandler.ts
Normal file
@@ -0,0 +1,129 @@
|
||||
import { isTryAddTransactionsException } from "../coValueCore.js";
|
||||
import { CoValueAvailableState } from "../coValueEntry.js";
|
||||
import { LocalNode } from "../exports.js";
|
||||
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
|
||||
import { DependencyService } from "./DependencyService.js";
|
||||
import { SyncService } from "./SyncService.js";
|
||||
import { DataMessageHandlerInput, emptyKnownState } from "./types.js";
|
||||
|
||||
/**
|
||||
* "Data" is a response to our "pull" message. It's always some data we asked for, initially.
|
||||
* It's a terminal message which must not be responded to.
|
||||
* At this stage the coValue state is considered synced between the peer and the node.
|
||||
*/
|
||||
export class DataResponseHandler extends AbstractMessageHandler {
|
||||
constructor(
|
||||
private readonly dependencyService: DependencyService,
|
||||
private readonly syncService: SyncService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async handleAvailable(input: DataMessageHandlerInput): Promise<void> {
|
||||
const { msg, entry, peer } = input;
|
||||
await this.dependencyService.loadUnknownDependencies(input);
|
||||
|
||||
if (!msg.known) {
|
||||
// Send coValue to the peer if not known by the peer but available on our side
|
||||
return this.syncService.syncCoValue(entry, emptyKnownState(msg.id), [
|
||||
peer,
|
||||
]);
|
||||
}
|
||||
|
||||
this.addData(input);
|
||||
|
||||
// Push data to peers which are not aware of the coValue,
|
||||
// they are preserved in entry.uploadState after being marked as 'not-found-in-peer'
|
||||
const unawarePeerIds = entry.uploadState.getUnawarePeerIds();
|
||||
|
||||
if (unawarePeerIds.length) {
|
||||
void this.syncService.syncCoValue(
|
||||
entry,
|
||||
emptyKnownState(msg.id),
|
||||
LocalNode.peers.getMany(unawarePeerIds),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async handleLoading(input: DataMessageHandlerInput) {
|
||||
const { peer, msg, entry } = input;
|
||||
|
||||
// not known by peer
|
||||
if (!msg.known) {
|
||||
entry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg.header) {
|
||||
console.error(
|
||||
"Unexpected empty header in message. Data message is a response to a pull request and should be received for available coValue or include the full header.",
|
||||
msg.id,
|
||||
peer.id,
|
||||
);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
await this.dependencyService.MakeAvailableWithDependencies(input);
|
||||
|
||||
return this.routeMessage(input);
|
||||
}
|
||||
|
||||
async handleUnknown(input: DataMessageHandlerInput) {
|
||||
const { peer, msg, entry } = input;
|
||||
|
||||
if (!msg.known) {
|
||||
input.entry.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
peerId: peer.id,
|
||||
});
|
||||
return;
|
||||
}
|
||||
|
||||
if (!msg.asDependencyOf) {
|
||||
console.error(
|
||||
"Unexpected coValue unavailable state in DataResponseHandler",
|
||||
peer.id,
|
||||
msg.id,
|
||||
);
|
||||
}
|
||||
|
||||
entry.moveToLoadingState([peer]);
|
||||
|
||||
return this.routeMessage(input);
|
||||
}
|
||||
|
||||
addData(input: DataMessageHandlerInput) {
|
||||
const { peer, msg, entry } = input;
|
||||
const { coValue } = entry.state as CoValueAvailableState;
|
||||
|
||||
try {
|
||||
const anyMissedTransaction = coValue.addNewContent(msg);
|
||||
|
||||
if (anyMissedTransaction) {
|
||||
console.error(
|
||||
"Unexpected missed transactions in data message",
|
||||
peer.id,
|
||||
msg,
|
||||
);
|
||||
|
||||
return false;
|
||||
}
|
||||
} catch (e) {
|
||||
if (isTryAddTransactionsException(e)) {
|
||||
const { message, error } = e;
|
||||
console.error(peer.id, message, error);
|
||||
|
||||
peer.erroredCoValues.set(msg.id, error);
|
||||
} else {
|
||||
console.error("Unknown error", peer.id, e);
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
93
packages/cojson/src/sync/DependencyService.ts
Normal file
93
packages/cojson/src/sync/DependencyService.ts
Normal file
@@ -0,0 +1,93 @@
|
||||
import {
|
||||
CoValueCore,
|
||||
CoValueHeader,
|
||||
getDependedOnFromContent,
|
||||
} from "../coValueCore.js";
|
||||
import { CoValueAvailableState, CoValueEntry } from "../coValueEntry.js";
|
||||
import { SyncManager } from "../sync.js";
|
||||
import { LoadService } from "./LoadService.js";
|
||||
import {
|
||||
CoValueContent,
|
||||
DataMessageHandlerInput,
|
||||
PushMessageHandlerInput,
|
||||
} from "./types.js";
|
||||
|
||||
export class DependencyService {
|
||||
constructor(
|
||||
private syncManager: SyncManager,
|
||||
private loadService: LoadService,
|
||||
) {}
|
||||
|
||||
private async getUnknownDependencies(
|
||||
input: DataMessageHandlerInput | PushMessageHandlerInput,
|
||||
) {
|
||||
const { msg, entry } = input;
|
||||
const isAvailable = entry.state.type === "available";
|
||||
if (!msg.header && !isAvailable) {
|
||||
throw new Error(`Cannot get dependencies without header ${msg.id}`);
|
||||
}
|
||||
|
||||
const availableCoValue = isAvailable
|
||||
? (entry.state as CoValueAvailableState).coValue
|
||||
: null;
|
||||
|
||||
const header = availableCoValue ? availableCoValue.header : msg.header;
|
||||
const dependencies = new Set([
|
||||
...getDependedOnFromContent({
|
||||
...msg,
|
||||
header,
|
||||
} as Required<CoValueContent>),
|
||||
...(availableCoValue ? availableCoValue.getDependedOnCoValues() : []),
|
||||
]);
|
||||
|
||||
const unknownDependencies: CoValueEntry[] = [];
|
||||
for (const id of dependencies) {
|
||||
const entry = this.syncManager.local.coValuesStore.get(id);
|
||||
if (entry.state.type === "loading") {
|
||||
await entry.getCoValue();
|
||||
}
|
||||
if (entry.state.type !== "available") {
|
||||
unknownDependencies.push(entry);
|
||||
}
|
||||
}
|
||||
|
||||
return unknownDependencies;
|
||||
}
|
||||
|
||||
async loadUnknownDependencies(
|
||||
input: DataMessageHandlerInput | PushMessageHandlerInput,
|
||||
) {
|
||||
const unknownDependencies = await this.getUnknownDependencies(input);
|
||||
|
||||
// load dependencies one by one as they can depend on each other
|
||||
for await (const dependency of unknownDependencies) {
|
||||
await this.loadService.loadCoValue(dependency);
|
||||
}
|
||||
}
|
||||
|
||||
private createCoValue(header: CoValueHeader) {
|
||||
return new CoValueCore(header, this.syncManager.local);
|
||||
}
|
||||
|
||||
async MakeAvailableWithDependencies(
|
||||
input: PushMessageHandlerInput | DataMessageHandlerInput,
|
||||
) {
|
||||
if (!input.msg.header) {
|
||||
throw new Error(`Empty header for ${input.msg.id}`);
|
||||
}
|
||||
|
||||
if (input.entry.state.type === "available") {
|
||||
throw new Error(
|
||||
`CoValue is already available, requested to make available for ${input.msg.id}`,
|
||||
);
|
||||
}
|
||||
|
||||
await this.loadUnknownDependencies(input);
|
||||
|
||||
const coValue = this.createCoValue(input.msg.header);
|
||||
input.entry.dispatch({
|
||||
type: "available",
|
||||
coValue,
|
||||
});
|
||||
}
|
||||
}
|
||||
66
packages/cojson/src/sync/LoadService.ts
Normal file
66
packages/cojson/src/sync/LoadService.ts
Normal file
@@ -0,0 +1,66 @@
|
||||
import { CoValueCore } from "../coValueCore.js";
|
||||
import { CO_VALUE_LOADING_TIMEOUT, CoValueEntry } from "../coValueEntry.js";
|
||||
import { LocalNode } from "../exports.js";
|
||||
import { PeerEntry, getPeersWithoutErrors } from "../peer/index.js";
|
||||
import { emptyKnownState } from "./types.js";
|
||||
|
||||
export class LoadService {
|
||||
constructor() {}
|
||||
|
||||
/**
|
||||
* Sends "pull" request to peers to load/update the coValue state and request to subscribe to peer's updates if have not
|
||||
*
|
||||
* @param entry
|
||||
* @param peerToLoadFrom - Required peer to send the request to
|
||||
*/
|
||||
async loadCoValue(
|
||||
entry: CoValueEntry,
|
||||
peerToLoadFrom?: PeerEntry,
|
||||
): Promise<CoValueCore | "unavailable"> {
|
||||
const peers = peerToLoadFrom
|
||||
? [peerToLoadFrom]
|
||||
: LocalNode.peers.getServerAndStorage();
|
||||
|
||||
try {
|
||||
await entry.loadFromPeers(
|
||||
getPeersWithoutErrors(peers, entry.id),
|
||||
loadCoValueFromPeers,
|
||||
);
|
||||
} catch (e) {
|
||||
console.error("Error loading from peers", entry.id, e);
|
||||
}
|
||||
|
||||
return entry.getCoValue();
|
||||
}
|
||||
}
|
||||
|
||||
async function loadCoValueFromPeers(
|
||||
coValueEntry: CoValueEntry,
|
||||
peers: PeerEntry[],
|
||||
) {
|
||||
for await (const peer of peers) {
|
||||
if (coValueEntry.state.type === "available") {
|
||||
await peer.send.pull({
|
||||
knownState: coValueEntry.state.coValue.knownState(),
|
||||
});
|
||||
} else {
|
||||
await peer.send.pull({ knownState: emptyKnownState(coValueEntry.id) });
|
||||
}
|
||||
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
const timeout = setTimeout(() => {
|
||||
if (coValueEntry.state.type === "loading") {
|
||||
console.error(
|
||||
"Failed to load coValue from peer",
|
||||
peer.id,
|
||||
coValueEntry.id,
|
||||
);
|
||||
coValueEntry.markAsNotFoundInPeer(peer.id);
|
||||
}
|
||||
}, CO_VALUE_LOADING_TIMEOUT);
|
||||
|
||||
await coValueEntry.state.waitForPeer(peer.id);
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
}
|
||||
57
packages/cojson/src/sync/PullRequestHandler.ts
Normal file
57
packages/cojson/src/sync/PullRequestHandler.ts
Normal file
@@ -0,0 +1,57 @@
|
||||
import { CoValueAvailableState, CoValueEntry } from "../coValueEntry.js";
|
||||
import { PeerEntry } from "../peer/index.js";
|
||||
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
|
||||
import { LoadService } from "./LoadService.js";
|
||||
import { PullMessage } from "./types.js";
|
||||
|
||||
export type PullMessageHandlerInput = {
|
||||
msg: PullMessage;
|
||||
peer: PeerEntry;
|
||||
entry: CoValueEntry;
|
||||
};
|
||||
|
||||
/**
|
||||
* "Pull" request must be followed by "data" message response according to the protocol:
|
||||
* - Sends new content if it exists.
|
||||
* - Sends an empty data message otherwise.
|
||||
* - Sends an empty data message with `{ known: false }` in the message if the `coValue` is unknown by local node.
|
||||
*
|
||||
* Handler initiates a new "pull" requests to load the coValue from peers if it is not known by the node.
|
||||
*/
|
||||
export class PullRequestHandler extends AbstractMessageHandler {
|
||||
constructor(private readonly loadService: LoadService) {
|
||||
super();
|
||||
}
|
||||
|
||||
async handleAvailable(input: PullMessageHandlerInput): Promise<unknown> {
|
||||
const { msg, peer, entry } = input;
|
||||
const { coValue } = entry.state as CoValueAvailableState;
|
||||
|
||||
return peer.send.data({
|
||||
peerKnownState: msg,
|
||||
coValue,
|
||||
});
|
||||
}
|
||||
|
||||
async handleLoading(input: PullMessageHandlerInput): Promise<unknown> {
|
||||
// We need to wait for the CoValue to be loaded that would resolve the CoValue as available.
|
||||
await input.entry.getCoValue();
|
||||
|
||||
return this.routeMessage(input);
|
||||
}
|
||||
|
||||
async handleUnknown(input: PullMessageHandlerInput): Promise<unknown> {
|
||||
const { msg, peer, entry } = input;
|
||||
|
||||
// Initiate a new PULL flow
|
||||
// If the coValue is known by peer then we try to load it from the sender as well
|
||||
if (msg.header) {
|
||||
void this.loadService.loadCoValue(entry, peer);
|
||||
}
|
||||
|
||||
return peer.send.data({
|
||||
peerKnownState: msg,
|
||||
coValue: "unknown",
|
||||
});
|
||||
}
|
||||
}
|
||||
83
packages/cojson/src/sync/PushRequestHandler.ts
Normal file
83
packages/cojson/src/sync/PushRequestHandler.ts
Normal file
@@ -0,0 +1,83 @@
|
||||
import { CoValueCore, isTryAddTransactionsException } from "../coValueCore.js";
|
||||
import { CoValueAvailableState } from "../coValueEntry.js";
|
||||
import { LocalNode } from "../exports.js";
|
||||
import { AbstractMessageHandler } from "./AbstractMessageHandler.js";
|
||||
import { DependencyService } from "./DependencyService.js";
|
||||
import { SyncService } from "./SyncService.js";
|
||||
import { PushMessageHandlerInput, emptyKnownState } from "./types.js";
|
||||
|
||||
export class PushRequestHandler extends AbstractMessageHandler {
|
||||
constructor(
|
||||
protected readonly syncService: SyncService,
|
||||
protected readonly dependencyService: DependencyService,
|
||||
) {
|
||||
super();
|
||||
}
|
||||
|
||||
async handleAvailable(input: PushMessageHandlerInput): Promise<unknown> {
|
||||
const { coValue } = input.entry.state as CoValueAvailableState;
|
||||
await this.dependencyService.loadUnknownDependencies(input);
|
||||
|
||||
return this.addData(coValue, input);
|
||||
}
|
||||
|
||||
async handleUnknown(input: PushMessageHandlerInput) {
|
||||
const { msg, entry, peer } = input;
|
||||
if (!msg.header) {
|
||||
console.error(`Unexpected unavailable state for coValue ${input.msg.id}`);
|
||||
}
|
||||
entry.moveToLoadingState([peer]);
|
||||
|
||||
return this.routeMessage(input);
|
||||
}
|
||||
|
||||
async handleLoading(input: PushMessageHandlerInput) {
|
||||
if (!input.msg.header) {
|
||||
console.error(`Unexpected loading state for coValue ${input.msg.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
await this.dependencyService.MakeAvailableWithDependencies(input);
|
||||
|
||||
return this.routeMessage(input);
|
||||
}
|
||||
|
||||
private async addData(coValue: CoValueCore, input: PushMessageHandlerInput) {
|
||||
const { msg, peer, entry } = input;
|
||||
|
||||
const knownState = coValue.knownState();
|
||||
const isEmptyKnownState =
|
||||
!knownState.header ||
|
||||
!knownState.sessions ||
|
||||
!Object.keys(knownState.sessions).length;
|
||||
|
||||
const assumedPeerKnownState = isEmptyKnownState
|
||||
? emptyKnownState(knownState.id)
|
||||
: { ...knownState };
|
||||
|
||||
try {
|
||||
const anyMissedTransaction = coValue.addNewContent(msg);
|
||||
|
||||
anyMissedTransaction
|
||||
? await peer.send.pull({ knownState: coValue.knownState() })
|
||||
: await peer.send.ack({ knownState: coValue.knownState() });
|
||||
} catch (e) {
|
||||
if (isTryAddTransactionsException(e)) {
|
||||
const { message, error } = e;
|
||||
console.error(peer.id, message, error);
|
||||
|
||||
peer.erroredCoValues.set(msg.id, error);
|
||||
} else {
|
||||
console.error("Unknown error", peer.id, e);
|
||||
}
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
const peers = LocalNode.peers.getInPriorityOrder({
|
||||
excludedId: peer.id,
|
||||
});
|
||||
|
||||
await this.syncService.syncCoValue(entry, assumedPeerKnownState, peers);
|
||||
}
|
||||
}
|
||||
68
packages/cojson/src/sync/SyncService.ts
Normal file
68
packages/cojson/src/sync/SyncService.ts
Normal file
@@ -0,0 +1,68 @@
|
||||
import { CoValuesStore } from "../CoValuesStore.js";
|
||||
import { CoValueEntry } from "../coValueEntry.js";
|
||||
import { LocalNode } from "../exports.js";
|
||||
import { PeerEntry, PeerID } from "../peer/index.js";
|
||||
import { CoValueKnownState, emptyKnownState } from "./types.js";
|
||||
|
||||
export class SyncService {
|
||||
constructor(
|
||||
private readonly onPushContent?: ({
|
||||
entry,
|
||||
peerId,
|
||||
}: { entry: CoValueEntry; peerId: PeerID }) => void,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Sends "push" request to peers to broadcast all known coValues state
|
||||
* and request to subscribe to those coValues updates (if have not)
|
||||
*/
|
||||
async initialSync(
|
||||
peer: PeerEntry,
|
||||
coValuesStore: CoValuesStore,
|
||||
): Promise<void> {
|
||||
const ids = coValuesStore.getOrderedIds();
|
||||
|
||||
for (const id of ids) {
|
||||
const coValue = coValuesStore.expectCoValueLoaded(id);
|
||||
// Previously we used to send load + content, see transformOutgoingMessageToPeer()
|
||||
await peer.send.push({
|
||||
peerKnownState: emptyKnownState(id),
|
||||
coValue,
|
||||
});
|
||||
|
||||
// TODO should be moved inside peer.send.push
|
||||
if (this.onPushContent) {
|
||||
const entry = coValuesStore.get(coValue.id);
|
||||
this.onPushContent({ entry, peerId: peer.id });
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends "push" request to peers to broadcast the new known coValue state and request to subscribe to updates if have not
|
||||
*/
|
||||
async syncCoValue(
|
||||
entry: CoValueEntry,
|
||||
peerKnownState: CoValueKnownState,
|
||||
peers?: PeerEntry[],
|
||||
) {
|
||||
if (entry.state.type !== "available") {
|
||||
throw new Error(`Can't sync unavailable coValue ${peerKnownState.id}`);
|
||||
}
|
||||
|
||||
const peersToSync = peers || LocalNode.peers.getInPriorityOrder();
|
||||
|
||||
for (const peer of peersToSync) {
|
||||
if (peer.erroredCoValues.has(entry.id)) continue;
|
||||
|
||||
await peer.send.push({
|
||||
peerKnownState,
|
||||
coValue: entry.state.coValue,
|
||||
});
|
||||
// TODO should be moved inside peer.send.push
|
||||
if (this.onPushContent) {
|
||||
this.onPushContent({ entry, peerId: peer.id });
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
7
packages/cojson/src/sync/index.ts
Normal file
7
packages/cojson/src/sync/index.ts
Normal file
@@ -0,0 +1,7 @@
|
||||
export * from "./types.js";
|
||||
export * from "./DataResponseHandler.js";
|
||||
export * from "./PushRequestHandler.js";
|
||||
export * from "./PullRequestHandler.js";
|
||||
export * from "./AckResponseHandler.js";
|
||||
export * from "./SyncService.js";
|
||||
export * from "./LoadService.js";
|
||||
94
packages/cojson/src/sync/types.ts
Normal file
94
packages/cojson/src/sync/types.ts
Normal file
@@ -0,0 +1,94 @@
|
||||
import { CoValueHeader, SessionNewContent } from "../coValueCore.js";
|
||||
import { CoValueEntry } from "../coValueEntry.js";
|
||||
import { RawCoID, SessionID } from "../ids.js";
|
||||
import { PeerEntry } from "../peer/PeerEntry.js";
|
||||
import { CoValuePriority } from "../priority.js";
|
||||
|
||||
export type CoValueKnownState = {
|
||||
id: RawCoID;
|
||||
// Is coValue known by peer
|
||||
header: boolean;
|
||||
// Number of known sessions
|
||||
sessions: { [sessionID: SessionID]: number };
|
||||
};
|
||||
|
||||
export function emptyKnownState(id: RawCoID): CoValueKnownState {
|
||||
return {
|
||||
id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
};
|
||||
}
|
||||
|
||||
export type SyncMessage =
|
||||
| LoadMessage
|
||||
| KnownStateMessage
|
||||
| NewContentMessage
|
||||
| PullMessage
|
||||
| PushMessage
|
||||
| AckMessage
|
||||
| DataMessage;
|
||||
|
||||
export type LoadMessage = {
|
||||
action: "load";
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type PullMessage = {
|
||||
action: "pull";
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type KnownStateMessage = {
|
||||
action: "known";
|
||||
asDependencyOf?: RawCoID;
|
||||
isCorrection?: boolean;
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type AckMessage = {
|
||||
action: "ack";
|
||||
} & CoValueKnownState;
|
||||
|
||||
export type CoValueContent = {
|
||||
id: RawCoID;
|
||||
header?: CoValueHeader;
|
||||
priority: CoValuePriority;
|
||||
new: {
|
||||
[sessionID: SessionID]: SessionNewContent;
|
||||
};
|
||||
};
|
||||
|
||||
export type NewContentMessage = {
|
||||
action: "content";
|
||||
} & CoValueContent;
|
||||
|
||||
export type DataMessage = {
|
||||
known: boolean;
|
||||
action: "data";
|
||||
asDependencyOf?: RawCoID;
|
||||
} & CoValueContent;
|
||||
|
||||
export type PushMessage = {
|
||||
action: "push";
|
||||
asDependencyOf?: RawCoID;
|
||||
} & CoValueContent;
|
||||
|
||||
export type MessageHandlerInput = {
|
||||
msg: SyncMessage;
|
||||
peer: PeerEntry;
|
||||
entry: CoValueEntry;
|
||||
};
|
||||
|
||||
export type PushMessageHandlerInput = {
|
||||
msg: PushMessage;
|
||||
peer: PeerEntry;
|
||||
entry: CoValueEntry;
|
||||
};
|
||||
|
||||
export type DataMessageHandlerInput = {
|
||||
msg: DataMessage;
|
||||
peer: PeerEntry;
|
||||
entry: CoValueEntry;
|
||||
};
|
||||
|
||||
export interface MessageHandlerInterface {
|
||||
handle({ msg, peer, entry }: MessageHandlerInput): void;
|
||||
}
|
||||
@@ -1,7 +1,8 @@
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { PeerKnownStates } from "../PeerKnownStates.js";
|
||||
import { RawCoID, SessionID } from "../ids.js";
|
||||
import { CoValueKnownState, emptyKnownState } from "../sync.js";
|
||||
|
||||
import { CoValueKnownState, emptyKnownState } from "../sync/types.js";
|
||||
|
||||
describe("PeerKnownStates", () => {
|
||||
test("should set and get a known state", () => {
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { PeerKnownStateActions } from "../PeerKnownStates.js";
|
||||
import { PeerState } from "../PeerState.js";
|
||||
import { Peer } from "../localNode.js";
|
||||
import { PeerEntry } from "../peer/PeerEntry.js";
|
||||
import { CO_VALUE_PRIORITY } from "../priority.js";
|
||||
import { Peer, SyncMessage } from "../sync.js";
|
||||
|
||||
import { SyncMessage } from "../sync/types.js";
|
||||
|
||||
function setup() {
|
||||
const mockPeer: Peer = {
|
||||
@@ -16,7 +18,7 @@ function setup() {
|
||||
close: vi.fn(),
|
||||
},
|
||||
};
|
||||
const peerState = new PeerState(mockPeer, undefined);
|
||||
const peerState = new PeerEntry(mockPeer, undefined);
|
||||
return { mockPeer, peerState };
|
||||
}
|
||||
|
||||
@@ -160,7 +162,7 @@ describe("PeerState", () => {
|
||||
};
|
||||
peerState.dispatchToKnownStates(action);
|
||||
|
||||
const newPeerState = new PeerState(mockPeer, peerState.knownStates);
|
||||
const newPeerState = new PeerEntry(mockPeer, peerState.knownStates);
|
||||
|
||||
expect(newPeerState.knownStates).toEqual(peerState.knownStates);
|
||||
expect(newPeerState.optimisticKnownStates).toEqual(peerState.knownStates);
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { describe, expect, test } from "vitest";
|
||||
import { PriorityBasedMessageQueue } from "../PriorityBasedMessageQueue.js";
|
||||
import { CO_VALUE_PRIORITY } from "../priority.js";
|
||||
import { SyncMessage } from "../sync.js";
|
||||
|
||||
import { SyncMessage } from "../sync/types.js";
|
||||
|
||||
function setup() {
|
||||
const queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.MEDIUM);
|
||||
@@ -67,9 +68,9 @@ describe("PriorityBasedMessageQueue", () => {
|
||||
void queue.push(mediumPriorityMsg);
|
||||
void queue.push(highPriorityMsg);
|
||||
|
||||
expect(queue.pull()?.msg).toEqual(highPriorityMsg);
|
||||
expect(queue.pull()?.msg).toEqual(mediumPriorityMsg);
|
||||
expect(queue.pull()?.msg).toEqual(lowPriorityMsg);
|
||||
expect(queue.pull()?.content).toEqual(highPriorityMsg);
|
||||
expect(queue.pull()?.content).toEqual(mediumPriorityMsg);
|
||||
expect(queue.pull()?.content).toEqual(lowPriorityMsg);
|
||||
});
|
||||
|
||||
test("should return undefined when pulling from empty queue", () => {
|
||||
|
||||
@@ -4,7 +4,7 @@ import {
|
||||
PeerSyncStateListenerCallback,
|
||||
} from "../SyncStateSubscriptionManager.js";
|
||||
import { connectedPeers } from "../streamUtils.js";
|
||||
import { emptyKnownState } from "../sync.js";
|
||||
import { emptyKnownState } from "../sync/types.js";
|
||||
import { createTestNode, waitFor } from "./testUtils.js";
|
||||
|
||||
describe("SyncStateSubscriptionManager", () => {
|
||||
|
||||
@@ -59,7 +59,7 @@ test("Can create account with one node, and then load it on another", async () =
|
||||
|
||||
console.log("After connected peers");
|
||||
|
||||
node.syncManager.addPeer(node2asPeer);
|
||||
node.addPeer(node2asPeer);
|
||||
|
||||
const node2 = await LocalNode.withLoadedAccount({
|
||||
accountID,
|
||||
|
||||
@@ -1,15 +1,16 @@
|
||||
import { describe, expect, test, vi } from "vitest";
|
||||
import { PeerState } from "../PeerState";
|
||||
import { CoValueCore } from "../coValueCore";
|
||||
import { CO_VALUE_LOADING_MAX_RETRIES, CoValueState } from "../coValueState";
|
||||
import { CO_VALUE_LOADING_MAX_RETRIES, CoValueEntry } from "../coValueEntry.js";
|
||||
import { RawCoID } from "../ids";
|
||||
import { Peer } from "../sync";
|
||||
import { PeerEntry } from "../peer/PeerEntry.js";
|
||||
|
||||
import { Peer } from "../localNode.js";
|
||||
|
||||
describe("CoValueState", () => {
|
||||
const mockCoValueId = "co_test123" as RawCoID;
|
||||
|
||||
test("should create unknown state", () => {
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
expect(state.state.type).toBe("unknown");
|
||||
@@ -17,7 +18,7 @@ describe("CoValueState", () => {
|
||||
|
||||
test("should create loading state", () => {
|
||||
const peerIds = ["peer1", "peer2"];
|
||||
const state = CoValueState.Loading(mockCoValueId, peerIds);
|
||||
const state = CoValueEntry.Loading(mockCoValueId, peerIds);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
expect(state.state.type).toBe("loading");
|
||||
@@ -25,7 +26,7 @@ describe("CoValueState", () => {
|
||||
|
||||
test("should create available state", async () => {
|
||||
const mockCoValue = createMockCoValueCore(mockCoValueId);
|
||||
const state = CoValueState.Available(mockCoValue);
|
||||
const state = CoValueEntry.Available(mockCoValue);
|
||||
|
||||
expect(state.id).toBe(mockCoValueId);
|
||||
expect(state.state.type).toBe("available");
|
||||
@@ -35,7 +36,7 @@ describe("CoValueState", () => {
|
||||
|
||||
test("should handle found action", async () => {
|
||||
const mockCoValue = createMockCoValueCore(mockCoValueId);
|
||||
const state = CoValueState.Loading(mockCoValueId, ["peer1", "peer2"]);
|
||||
const state = CoValueEntry.Loading(mockCoValueId, ["peer1", "peer2"]);
|
||||
|
||||
const stateValuePromise = state.getCoValue();
|
||||
|
||||
@@ -50,7 +51,7 @@ describe("CoValueState", () => {
|
||||
});
|
||||
|
||||
test("should ignore actions when not in loading state", () => {
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
|
||||
state.dispatch({
|
||||
type: "not-found-in-peer",
|
||||
@@ -87,9 +88,9 @@ describe("CoValueState", () => {
|
||||
});
|
||||
},
|
||||
);
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
|
||||
@@ -140,9 +141,9 @@ describe("CoValueState", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
|
||||
@@ -189,9 +190,9 @@ describe("CoValueState", () => {
|
||||
});
|
||||
},
|
||||
);
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerState[];
|
||||
const mockPeers = [peer1, peer2] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
|
||||
@@ -239,9 +240,9 @@ describe("CoValueState", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
const mockPeers = [peer1] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
|
||||
@@ -273,9 +274,9 @@ describe("CoValueState", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
const mockPeers = [peer1] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
// Should attempt CO_VALUE_LOADING_MAX_RETRIES retries
|
||||
@@ -322,9 +323,9 @@ describe("CoValueState", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const mockPeers = [peer1] as unknown as PeerState[];
|
||||
const mockPeers = [peer1] as unknown as PeerEntry[];
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers(mockPeers);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
|
||||
@@ -369,7 +370,7 @@ describe("CoValueState", () => {
|
||||
},
|
||||
);
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1, peer2]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
|
||||
@@ -418,7 +419,7 @@ describe("CoValueState", () => {
|
||||
|
||||
peer1.closed = true;
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1, peer2]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES; i++) {
|
||||
@@ -446,7 +447,7 @@ describe("CoValueState", () => {
|
||||
async () => {},
|
||||
);
|
||||
|
||||
const state = CoValueState.Unknown(mockCoValueId);
|
||||
const state = CoValueEntry.Unknown(mockCoValueId);
|
||||
const loadPromise = state.loadFromPeers([peer1]);
|
||||
|
||||
for (let i = 0; i < CO_VALUE_LOADING_MAX_RETRIES * 2; i++) {
|
||||
@@ -467,7 +468,7 @@ function createMockPeerState(
|
||||
peer: Partial<Peer>,
|
||||
pushFn = () => Promise.resolve(),
|
||||
) {
|
||||
const peerState = new PeerState(
|
||||
const peerState = new PeerEntry(
|
||||
{
|
||||
id: "peer",
|
||||
role: "server",
|
||||
|
||||
@@ -9,7 +9,7 @@ import { stableStringify } from "../jsonStringify.js";
|
||||
import { LocalNode } from "../localNode.js";
|
||||
import { getPriorityFromHeader } from "../priority.js";
|
||||
import { connectedPeers, newQueuePair } from "../streamUtils.js";
|
||||
import { SyncMessage } from "../sync.js";
|
||||
import { SyncMessage } from "../sync/types.js";
|
||||
import {
|
||||
createTestNode,
|
||||
randomAnonymousAccountAndSessionID,
|
||||
@@ -32,7 +32,7 @@ test("Node replies with initial tx and header to empty subscribe", async () => {
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -113,7 +113,7 @@ test("Node replies with only new tx to subscribe with some known state", async (
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -188,7 +188,7 @@ test("After subscribing, node sends own known state and new txs to peer", async
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -304,7 +304,7 @@ test("Client replies with known new content to tellKnownState from server", asyn
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -378,7 +378,7 @@ test("No matter the optimistic known state, node respects invalid known state me
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -478,7 +478,7 @@ test("If we add a peer, but it never subscribes to a coValue, it won't get any m
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -514,7 +514,7 @@ test.todo(
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -587,7 +587,7 @@ test.skip("If we add a server peer, newly created coValues are auto-subscribed t
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -643,7 +643,7 @@ test("When we connect a new server peer, we try to sync all existing coValues to
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -679,7 +679,7 @@ test("When receiving a subscribe with a known state that is ahead of our own, pe
|
||||
const [outRx, outTx] = newQueuePair();
|
||||
const outRxQ = outRx[Symbol.asyncIterator]();
|
||||
|
||||
node.syncManager.addPeer({
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
@@ -718,7 +718,7 @@ test.skip("When replaying creation and transactions of a coValue as new content,
|
||||
const [outRx1, outTx1] = newQueuePair();
|
||||
const outRxQ1 = outRx1[Symbol.asyncIterator]();
|
||||
|
||||
node1.syncManager.addPeer({
|
||||
node1.addPeer({
|
||||
id: "test2",
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
@@ -736,7 +736,7 @@ test.skip("When replaying creation and transactions of a coValue as new content,
|
||||
const [outRx2, outTx2] = newQueuePair();
|
||||
const outRxQ2 = outRx2[Symbol.asyncIterator]();
|
||||
|
||||
node2.syncManager.addPeer({
|
||||
node2.addPeer({
|
||||
id: "test1",
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
@@ -879,8 +879,8 @@ test("Can sync a coValue through a server to another client", async () => {
|
||||
},
|
||||
);
|
||||
|
||||
client1.syncManager.addPeer(serverAsPeerForClient1);
|
||||
server.syncManager.addPeer(client1AsPeer);
|
||||
client1.addPeer(serverAsPeerForClient1);
|
||||
server.addPeer(client1AsPeer);
|
||||
|
||||
const client2 = new LocalNode(
|
||||
admin,
|
||||
@@ -898,8 +898,8 @@ test("Can sync a coValue through a server to another client", async () => {
|
||||
},
|
||||
);
|
||||
|
||||
client2.syncManager.addPeer(serverAsPeerForClient2);
|
||||
server.syncManager.addPeer(client2AsPeer);
|
||||
client2.addPeer(serverAsPeerForClient2);
|
||||
server.addPeer(client2AsPeer);
|
||||
|
||||
const mapOnClient2 = await client2.loadCoValueCore(map.core.id);
|
||||
if (mapOnClient2 === "unavailable") {
|
||||
@@ -931,8 +931,8 @@ test("Can sync a coValue with private transactions through a server to another c
|
||||
peer2role: "client",
|
||||
});
|
||||
|
||||
client1.syncManager.addPeer(serverAsPeer);
|
||||
server.syncManager.addPeer(client1AsPeer);
|
||||
client1.addPeer(serverAsPeer);
|
||||
server.addPeer(client1AsPeer);
|
||||
|
||||
const client2 = new LocalNode(
|
||||
admin,
|
||||
@@ -950,8 +950,8 @@ test("Can sync a coValue with private transactions through a server to another c
|
||||
},
|
||||
);
|
||||
|
||||
client2.syncManager.addPeer(serverAsOtherPeer);
|
||||
server.syncManager.addPeer(client2AsPeer);
|
||||
client2.addPeer(serverAsOtherPeer);
|
||||
server.addPeer(client2AsPeer);
|
||||
|
||||
const mapOnClient2 = await client2.loadCoValueCore(map.core.id);
|
||||
if (mapOnClient2 === "unavailable") {
|
||||
@@ -1098,13 +1098,13 @@ test("If we start loading a coValue before connecting to a peer that has it, it
|
||||
// trace: true,
|
||||
});
|
||||
|
||||
node1.syncManager.addPeer(node2asPeer);
|
||||
node1.addPeer(node2asPeer);
|
||||
|
||||
const mapOnNode2Promise = node2.loadCoValueCore(map.core.id);
|
||||
|
||||
expect(node2.coValuesStore.get(map.core.id).state.type).toEqual("unknown");
|
||||
|
||||
node2.syncManager.addPeer(node1asPeer);
|
||||
node2.addPeer(node1asPeer);
|
||||
|
||||
const mapOnNode2 = await mapOnNode2Promise;
|
||||
if (mapOnNode2 === "unavailable") {
|
||||
@@ -1190,8 +1190,8 @@ describe("sync - extra tests", () => {
|
||||
peer2role: "client",
|
||||
});
|
||||
|
||||
node1.syncManager.addPeer(node2AsPeer);
|
||||
node2.syncManager.addPeer(node1AsPeer);
|
||||
node1.addPeer(node2AsPeer);
|
||||
node2.addPeer(node1AsPeer);
|
||||
|
||||
// Wait for initial sync
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
@@ -1486,12 +1486,12 @@ describe("sync - extra tests", () => {
|
||||
},
|
||||
);
|
||||
|
||||
node1.syncManager.addPeer(node2AsPeerFor1);
|
||||
node1.syncManager.addPeer(node3AsPeerFor1);
|
||||
node2.syncManager.addPeer(node1AsPeerFor2);
|
||||
node2.syncManager.addPeer(node3AsPeerFor2);
|
||||
node3.syncManager.addPeer(node1AsPeerFor3);
|
||||
node3.syncManager.addPeer(node2AsPeerFor3);
|
||||
node1.addPeer(node2AsPeerFor1);
|
||||
node1.addPeer(node3AsPeerFor1);
|
||||
node2.addPeer(node1AsPeerFor2);
|
||||
node2.addPeer(node3AsPeerFor2);
|
||||
node3.addPeer(node1AsPeerFor3);
|
||||
node3.addPeer(node2AsPeerFor3);
|
||||
|
||||
// Wait for initial sync
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
@@ -1587,10 +1587,10 @@ describe("sync - extra tests", () => {
|
||||
},
|
||||
);
|
||||
|
||||
node1.syncManager.addPeer(newNode3AsPeerFor1);
|
||||
node2.syncManager.addPeer(newNode3AsPeerFor2);
|
||||
node3.syncManager.addPeer(newNode1AsPeerFor3);
|
||||
node3.syncManager.addPeer(newNode2AsPeerFor3);
|
||||
node1.addPeer(newNode3AsPeerFor1);
|
||||
node2.addPeer(newNode3AsPeerFor2);
|
||||
node3.addPeer(newNode1AsPeerFor3);
|
||||
node3.addPeer(newNode2AsPeerFor3);
|
||||
|
||||
// Wait for re-sync
|
||||
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
import { expect } from "vitest";
|
||||
import { ControlledAgent } from "../coValues/account.js";
|
||||
import { WasmCrypto } from "../crypto/WasmCrypto.js";
|
||||
import { CoID, RawCoValue } from "../exports.js";
|
||||
import { CoID, Peer, RawCoValue } from "../exports.js";
|
||||
import { SessionID } from "../ids.js";
|
||||
import { LocalNode } from "../localNode.js";
|
||||
import { connectedPeers } from "../streamUtils.js";
|
||||
import { Peer } from "../sync.js";
|
||||
import { expectGroup } from "../typeUtils/expectGroup.js";
|
||||
|
||||
const Crypto = await WasmCrypto.create();
|
||||
@@ -98,12 +97,12 @@ export function createThreeConnectedNodes(
|
||||
},
|
||||
);
|
||||
|
||||
node1.syncManager.addPeer(node1ToNode2Peer);
|
||||
node1.syncManager.addPeer(node1ToNode3Peer);
|
||||
node2.syncManager.addPeer(node2ToNode1Peer);
|
||||
node2.syncManager.addPeer(node2ToNode3Peer);
|
||||
node3.syncManager.addPeer(node3ToNode1Peer);
|
||||
node3.syncManager.addPeer(node3ToNode2Peer);
|
||||
node1.addPeer(node1ToNode2Peer);
|
||||
node1.addPeer(node1ToNode3Peer);
|
||||
node2.addPeer(node2ToNode1Peer);
|
||||
node2.addPeer(node2ToNode3Peer);
|
||||
node3.addPeer(node3ToNode1Peer);
|
||||
node3.addPeer(node3ToNode2Peer);
|
||||
|
||||
return {
|
||||
node1,
|
||||
|
||||
59
packages/cojson/src/utils/parallelQueueRunner.ts
Normal file
59
packages/cojson/src/utils/parallelQueueRunner.ts
Normal file
@@ -0,0 +1,59 @@
|
||||
import { RawCoID } from "../ids.js";
|
||||
type DeferredFn = () => Promise<unknown>;
|
||||
|
||||
export class ParallelQueueRunner {
|
||||
private queueIds: Map<RawCoID, { queue: DeferredFn[]; locked: boolean }> =
|
||||
new Map();
|
||||
|
||||
defferPer(queueId: RawCoID, fn: () => Promise<unknown>) {
|
||||
const item = this.queueIds.get(queueId);
|
||||
if (item) {
|
||||
item.queue.push(fn);
|
||||
} else {
|
||||
this.queueIds.set(queueId, { queue: [fn], locked: false });
|
||||
}
|
||||
|
||||
void this.processQueue(queueId);
|
||||
}
|
||||
|
||||
private async processQueue(queueId: RawCoID) {
|
||||
const queueEntry = this.queueIds.get(queueId)!;
|
||||
|
||||
if (queueEntry.locked) return;
|
||||
queueEntry.locked = true;
|
||||
|
||||
while (queueEntry.queue.length) {
|
||||
try {
|
||||
await queueEntry.queue.shift()!();
|
||||
} catch (e) {
|
||||
console.error(`Error while processing queue for ${queueId} ${e}`);
|
||||
}
|
||||
}
|
||||
|
||||
queueEntry.locked = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Test it
|
||||
// function sleep(ms: number) {
|
||||
// return new Promise((resolve) => setTimeout(resolve, ms));
|
||||
// }
|
||||
//
|
||||
// function test(id: RawCoID, msg: string, ms: number) {
|
||||
// queue.deffer(id, async () => {
|
||||
// console.log(id, msg, "start");
|
||||
// await sleep(ms);
|
||||
// console.log(id, msg, "end");
|
||||
// });
|
||||
// }
|
||||
//
|
||||
// const queue = new QueueRunner();
|
||||
//
|
||||
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "1", 400);
|
||||
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "2", 300);
|
||||
// test("111", "1", 200);
|
||||
// test("111", "2", 200);
|
||||
// test("111", "3", 200);
|
||||
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "3", 200);
|
||||
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "4", 200);
|
||||
// test("co_zXkkbcca9nkdfJHBo4RHhX22Tf", "5", 200);
|
||||
@@ -71,7 +71,7 @@ export async function createJazzBrowserContext<Acc extends Account>(
|
||||
options.reconnectionTimeout,
|
||||
(peer) => {
|
||||
if (node) {
|
||||
node.syncManager.addPeer(peer);
|
||||
node.addPeer(peer);
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
@@ -54,7 +54,7 @@ export async function startWorker<Acc extends Account>({
|
||||
});
|
||||
|
||||
setInterval(async () => {
|
||||
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
|
||||
if (!worker._raw.core.node.peers.get("upstream")) {
|
||||
console.log(new Date(), "Reconnecting to upstream " + peer);
|
||||
|
||||
const wsPeer: Peer = createWebSocketPeer({
|
||||
@@ -63,7 +63,7 @@ export async function startWorker<Acc extends Account>({
|
||||
role: "server",
|
||||
});
|
||||
|
||||
worker._raw.core.node.syncManager.addPeer(wsPeer);
|
||||
worker._raw.core.node.addPeer(wsPeer);
|
||||
}
|
||||
}, 5000);
|
||||
|
||||
|
||||
@@ -102,9 +102,7 @@ export async function createJazzRNContext<Acc extends Account>(
|
||||
async function websocketReconnectLoop() {
|
||||
while (shouldTryToReconnect) {
|
||||
if (
|
||||
Object.keys(node.syncManager.peers).some((peerId) =>
|
||||
peerId.includes(options.peer),
|
||||
)
|
||||
Object.keys(node.peers).some((peerId) => peerId.includes(options.peer))
|
||||
) {
|
||||
// TODO: this might drain battery, use listeners instead
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
@@ -130,7 +128,7 @@ export async function createJazzRNContext<Acc extends Account>(
|
||||
);
|
||||
});
|
||||
|
||||
node.syncManager.addPeer(
|
||||
node.addPeer(
|
||||
createWebSocketPeer({
|
||||
websocket: new WebSocket(options.peer),
|
||||
id: options.peer + "@" + new Date().toISOString(),
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
import { CoValueCore, Profile } from "cojson";
|
||||
import { emptyKnownState } from "cojson";
|
||||
import { createWebSocketPeer } from "cojson-transport-ws";
|
||||
import {
|
||||
Account,
|
||||
CoMap,
|
||||
Peer,
|
||||
WasmCrypto,
|
||||
createJazzContext,
|
||||
isControlledAccount,
|
||||
@@ -41,8 +39,11 @@ export const createWorkerAccount = async ({
|
||||
const syncManager = account._raw.core.node.syncManager;
|
||||
|
||||
await Promise.all([
|
||||
syncManager.syncCoValue(accountCoValue),
|
||||
syncManager.syncCoValue(accountProfileCoValue),
|
||||
syncManager.syncCoValue(accountCoValue, emptyKnownState(accountCoValue.id)),
|
||||
syncManager.syncCoValue(
|
||||
accountProfileCoValue,
|
||||
emptyKnownState(accountProfileCoValue.id),
|
||||
),
|
||||
]);
|
||||
|
||||
await Promise.race([
|
||||
|
||||
@@ -4,7 +4,7 @@ import { WebSocketServer } from "ws";
|
||||
|
||||
import { mkdir } from "node:fs/promises";
|
||||
import { dirname } from "node:path";
|
||||
import { SQLiteStorage } from "cojson-storage-sqlite";
|
||||
import { SQLiteNode, SQLiteStorage } from "cojson-storage-sqlite";
|
||||
import { createWebSocketPeer } from "cojson-transport-ws";
|
||||
|
||||
export const startSyncServer = async ({
|
||||
@@ -38,9 +38,10 @@ export const startSyncServer = async ({
|
||||
if (!inMemory) {
|
||||
await mkdir(dirname(db), { recursive: true });
|
||||
|
||||
SQLiteNode.USE_PROTOCOL2 = true;
|
||||
const storage = await SQLiteStorage.asPeer({ filename: db });
|
||||
|
||||
localNode.syncManager.addPeer(storage);
|
||||
localNode.addPeer(storage);
|
||||
}
|
||||
|
||||
wss.on("connection", function connection(ws, req) {
|
||||
@@ -66,7 +67,7 @@ export const startSyncServer = async ({
|
||||
|
||||
const clientId = clientAddress + "@" + new Date().toISOString();
|
||||
|
||||
localNode.syncManager.addPeer(
|
||||
localNode.addPeer(
|
||||
createWebSocketPeer({
|
||||
id: clientId,
|
||||
role: "client",
|
||||
|
||||
@@ -204,7 +204,7 @@ export class Account extends CoValueBase implements CoValue {
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
|
||||
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);
|
||||
as._raw.core.node.addPeer(connectedPeers[1]);
|
||||
|
||||
return this.create<A>({
|
||||
creationProps: options.creationProps,
|
||||
|
||||
@@ -265,7 +265,7 @@ export async function createAnonymousJazzContext({
|
||||
);
|
||||
|
||||
for (const peer of peersToLoadFrom) {
|
||||
node.syncManager.addPeer(peer);
|
||||
node.addPeer(peer);
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { RawCoValue } from "cojson";
|
||||
import { RawCoValue } from "cojson";
|
||||
import type {
|
||||
Account,
|
||||
AnonymousJazzAgent,
|
||||
@@ -86,7 +86,7 @@ export class SubscriptionScope<Root extends CoValue> {
|
||||
this.subscriber._type === "Account"
|
||||
? this.subscriber._raw.core.node
|
||||
: this.subscriber.node;
|
||||
void node.loadCoValueCore(accessedOrSetId).then((core) => {
|
||||
void node.load(accessedOrSetId, true).then((core) => {
|
||||
if (loadingEntry.state === "loading" && loadingEntry.immediatelyUnsub) {
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -92,7 +92,7 @@ describe("CoFeed resolution", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
@@ -174,7 +174,7 @@ describe("CoFeed resolution", async () => {
|
||||
peer2role: "client",
|
||||
});
|
||||
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
me._raw.core.node.addPeer(secondAsPeer);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
@@ -305,7 +305,7 @@ describe("FileStream loading & Subscription", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
me._raw.core.node.addPeer(secondAsPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
@@ -333,7 +333,7 @@ describe("FileStream loading & Subscription", async () => {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
});
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
me._raw.core.node.addPeer(secondAsPeer);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
|
||||
@@ -155,7 +155,7 @@ describe("CoList resolution", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
@@ -226,7 +226,7 @@ describe("CoList resolution", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
|
||||
@@ -382,7 +382,7 @@ describe("CoMap resolution", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
@@ -453,7 +453,7 @@ describe("CoMap resolution", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
me._raw.core.node.addPeer(secondAsPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
|
||||
@@ -48,7 +48,7 @@ describe("Deep loading with depth arg", async () => {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
@@ -260,7 +260,7 @@ test("Deep loading a record-like coMap", async () => {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
|
||||
@@ -25,7 +25,7 @@ export async function setupAccount() {
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
me._raw.core.node.syncManager.addPeer(secondPeer);
|
||||
me._raw.core.node.addPeer(secondPeer);
|
||||
const { account: meOnSecondPeer } = await createJazzContext({
|
||||
auth: fixedCredentialsAuth({
|
||||
accountID: me.id,
|
||||
|
||||
Reference in New Issue
Block a user