Improve tests and ensure non-private sync through server works

This commit is contained in:
Anselm
2023-08-07 16:59:49 +01:00
parent 7c8b732111
commit f07ec976c9
16 changed files with 3687 additions and 602 deletions

18
.eslintrc.cjs Normal file
View File

@@ -0,0 +1,18 @@
module.exports = {
extends: [
'eslint:recommended',
'plugin:@typescript-eslint/recommended',
],
parser: '@typescript-eslint/parser',
plugins: ['@typescript-eslint'],
parserOptions: {
project: './tsconfig.json',
},
root: true,
rules: {
"no-unused-vars": "off",
"@typescript-eslint/no-unused-vars": ["error", { "argsIgnorePattern": "^_", "varsIgnorePattern": "^_" }],
"@typescript-eslint/no-floating-promises": "error",
},
};

BIN
bun.lockb

Binary file not shown.

View File

@@ -1,19 +1,30 @@
{
"name": "cojson",
"module": "src/index.ts",
"type": "module",
"license": "MIT",
"devDependencies": {
"bun-types": "latest"
},
"peerDependencies": {
"typescript": "^5.0.0"
},
"dependencies": {
"@noble/ciphers": "^0.1.3",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"@scure/base": "^1.1.1",
"fast-json-stable-stringify": "^2.1.0"
}
}
"name": "cojson",
"module": "src/index.ts",
"type": "module",
"license": "MIT",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",
"@typescript-eslint/parser": "^6.2.1",
"eslint": "^8.46.0",
"jest": "^29.6.2",
"ts-jest": "^29.1.1",
"typescript": "5.0.2"
},
"dependencies": {
"@noble/ciphers": "^0.1.3",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"@scure/base": "^1.1.1",
"fast-json-stable-stringify": "^2.1.0",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
},
"scripts": {
"test": "jest"
},
"jest": {
"preset": "ts-jest",
"testEnvironment": "node"
}
}

View File

@@ -1,4 +1,3 @@
import { expect, test } from "bun:test";
import {
CoValue,
Transaction,

View File

@@ -187,8 +187,6 @@ export class CoValue {
this.content = undefined;
this.node.syncCoValue(this);
const _ = this.getCurrentContent();
return true;
@@ -252,12 +250,18 @@ export class CoValue {
expectedNewHash
);
return this.tryAddTransactions(
const success = this.tryAddTransactions(
sessionID,
[transaction],
expectedNewHash,
signature
);
if (success) {
void this.node.sync.syncCoValue(this);
}
return success;
}
getCurrentContent(): ContentType {
@@ -376,7 +380,8 @@ export class CoValue {
// Try to find indirect revelation through previousKeys
for (const entry of readKeyHistory) {
if (entry.value?.previousKeys?.[keyID]) {
const encryptedPreviousKey = entry.value?.previousKeys?.[keyID];
if (entry.value && encryptedPreviousKey) {
const sealingKeyID = entry.value.keyID;
const sealingKeySecret = this.getReadKey(sealingKeyID);
@@ -388,7 +393,7 @@ export class CoValue {
{
sealed: keyID,
sealing: sealingKeyID,
encrypted: entry.value.previousKeys[keyID],
encrypted: encryptedPreviousKey,
},
sealingKeySecret
);
@@ -424,7 +429,9 @@ export class CoValue {
return this.sessions[txID.sessionID]?.transactions[txID.txIndex];
}
newContentSince(knownState: CoValueKnownState | undefined): NewContentMessage | undefined {
newContentSince(
knownState: CoValueKnownState | undefined
): NewContentMessage | undefined {
const newContent: NewContentMessage = {
action: "newContent",
coValueID: this.id,
@@ -459,14 +466,28 @@ export class CoValue {
})
.filter((x): x is Exclude<typeof x, undefined> => !!x)
),
}
};
if (!newContent.header && Object.keys(newContent.newContent).length === 0) {
if (
!newContent.header &&
Object.keys(newContent.newContent).length === 0
) {
return undefined;
}
return newContent;
}
getDependedOnCoValues(): RawCoValueID[] {
return this.header.ruleset.type === "team"
? expectTeamContent(this.getCurrentContent())
.keys()
.filter((k): k is AgentID => k.startsWith("agent_"))
.map((agent) => agentIDAsCoValueID(agent))
: this.header.ruleset.type === "ownedByTeam"
? [this.header.ruleset.team]
: [];
}
}
export type AgentID = `agent_${string}`;

View File

@@ -1,4 +1,3 @@
import { test, expect } from "bun:test";
import {
agentIDfromSessionID,
getAgent,
@@ -87,13 +86,13 @@ test("Can get map entry values at different points in time", () => {
content.edit((editable) => {
const beforeA = Date.now();
Bun.sleepSync(1);
while(Date.now() < beforeA + 10){}
editable.set("hello", "A", "trusting");
const beforeB = Date.now();
Bun.sleepSync(1);
while(Date.now() < beforeB + 10){}
editable.set("hello", "B", "trusting");
const beforeC = Date.now();
Bun.sleepSync(1);
while(Date.now() < beforeC + 10){}
editable.set("hello", "C", "trusting");
expect(editable.get("hello")).toEqual("C");
expect(editable.getAtTime("hello", Date.now())).toEqual("C");

View File

@@ -82,7 +82,7 @@ export class CoMap<
return undefined;
}
let lastEntry = ops[ops.length - 1];
const lastEntry = ops[ops.length - 1]!;
if (lastEntry.op === "delete") {
return undefined;
@@ -116,7 +116,7 @@ export class CoMap<
return undefined;
}
const lastEntry = ops[ops.length - 1];
const lastEntry = ops[ops.length - 1]!;
return lastEntry.txID;
}

View File

@@ -1,4 +1,3 @@
import { expect, test } from "bun:test";
import {
getRecipientID,
getSignatoryID,
@@ -18,7 +17,7 @@ import {
} from "./crypto";
import { base58, base64url } from "@scure/base";
import { x25519 } from "@noble/curves/ed25519";
import { xsalsa20_poly1305 } from "@noble/ciphers/_slow";
import { xsalsa20_poly1305 } from "@noble/ciphers/salsa";
import { blake3 } from "@noble/hashes/blake3";
import stableStringify from "fast-json-stable-stringify";
@@ -84,7 +83,7 @@ test("Sealing round-trips, but invalid receiver can't unseal", () => {
getRecipientID(sender).substring("recipient_z".length)
);
const sealedBytes = base64url.decode(
sealed[getRecipientID(recipient1)].substring("sealed_U".length)
sealed[getRecipientID(recipient1)]!.substring("sealed_U".length)
);
const sharedSecret = x25519.getSharedSecret(recipient3priv, senderPub);

View File

@@ -2,7 +2,7 @@ import { ed25519, x25519 } from "@noble/curves/ed25519";
import { xsalsa20_poly1305, xsalsa20 } from "@noble/ciphers/salsa";
import { JsonValue } from "./jsonValue";
import { base58, base64url } from "@scure/base";
import stableStringify from "fast-json-stable-stringify";
import { default as stableStringify } from "fast-json-stable-stringify";
import { blake3 } from "@noble/hashes/blake3";
import { randomBytes } from "@noble/ciphers/webcrypto/utils";
import { RawCoValueID, SessionID, TransactionID } from "./coValue";
@@ -91,10 +91,10 @@ export function seal<T extends JsonValue>(
const sealedSet: SealedSet<T> = {};
for (let i = 0; i < recipientsSorted.length; i++) {
const recipient = recipientsSorted[i];
const recipient = recipientsSorted[i]!;
const sharedSecret = x25519.getSharedSecret(
senderPriv,
recipientPubs[i]
recipientPubs[i]!
);
const sealedBytes = xsalsa20_poly1305(sharedSecret, nOnce).encrypt(

View File

@@ -1,4 +1,3 @@
import { CoMap } from "./contentType";
import { newRandomKeySecret, seal } from "./crypto";
import {
RawCoValueID,
@@ -12,31 +11,17 @@ import {
getAgentCoValueHeader,
CoValueHeader,
agentIDfromSessionID,
agentIDAsCoValueID,
} from "./coValue";
import { Team, expectTeamContent } from "./permissions";
import {
NewContentMessage,
Peer,
PeerID,
PeerState,
SessionNewContent,
SubscribeMessage,
SubscribeResponseMessage,
SyncMessage,
UnsubscribeMessage,
WrongAssumedKnownStateMessage,
combinedKnownStates,
weAreStrictlyAhead,
} from "./sync";
import { SyncManager } from "./sync";
export class LocalNode {
coValues: { [key: RawCoValueID]: CoValueState } = {};
peers: { [key: PeerID]: PeerState } = {};
agentCredential: AgentCredential;
agentID: AgentID;
ownSessionID: SessionID;
knownAgents: { [key: AgentID]: Agent } = {};
sync = new SyncManager(this);
constructor(agentCredential: AgentCredential, ownSessionID: SessionID) {
this.agentCredential = agentCredential;
@@ -57,7 +42,7 @@ export class LocalNode {
const coValue = new CoValue(header, this);
this.coValues[coValue.id] = { state: "loaded", coValue: coValue };
this.syncCoValue(coValue);
void this.sync.syncCoValue(coValue);
return coValue;
}
@@ -69,20 +54,7 @@ export class LocalNode {
this.coValues[id] = entry;
for (const peer of Object.values(this.peers)) {
peer.outgoing
.write({
action: "subscribe",
knownState: {
coValueID: id,
header: false,
sessions: {},
},
})
.catch((e) => {
console.error("Error writing to peer", e);
});
}
this.sync.loadFromPeers(id);
}
if (entry.state === "loaded") {
return Promise.resolve(entry.coValue);
@@ -145,299 +117,6 @@ export class LocalNode {
return new Team(teamContent, this);
}
addPeer(peer: Peer) {
const peerState: PeerState = {
id: peer.id,
optimisticKnownStates: {},
incoming: peer.incoming,
outgoing: peer.outgoing.getWriter(),
role: peer.role,
};
this.peers[peer.id] = peerState;
if (peer.role === "server") {
for (const entry of Object.values(this.coValues)) {
if (entry.state === "loading") {
continue;
}
peerState.outgoing
.write({
action: "subscribe",
knownState: entry.coValue.knownState(),
})
.catch((e) => {
// TODO: handle error
console.error("Error writing to peer", e);
});
peerState.optimisticKnownStates[entry.coValue.id] = {
coValueID: entry.coValue.id,
header: false,
sessions: {},
};
}
}
const readIncoming = async () => {
for await (const msg of peerState.incoming) {
for (const responseMsg of this.handleSyncMessage(
msg,
peerState
)) {
await peerState.outgoing.write(responseMsg);
}
}
};
readIncoming().catch((e) => {
// TODO: handle error
console.error("Error reading from peer", e);
});
}
handleSyncMessage(msg: SyncMessage, peer: PeerState): SyncMessage[] {
// TODO: validate
switch (msg.action) {
case "subscribe":
return this.handleSubscribe(msg, peer);
case "subscribeResponse":
return this.handleSubscribeResponse(msg, peer);
case "newContent":
return this.handleNewContent(msg);
case "wrongAssumedKnownState":
return this.handleWrongAssumedKnownState(msg, peer);
case "unsubscribe":
return this.handleUnsubscribe(msg);
default:
throw new Error(`Unknown message type ${(msg as any).action}`);
}
}
handleSubscribe(
msg: SubscribeMessage,
peer: PeerState,
asDependencyOf?: RawCoValueID
): SyncMessage[] {
const entry = this.coValues[msg.knownState.coValueID];
if (!entry || entry.state === "loading") {
if (!entry) {
this.coValues[msg.knownState.coValueID] = newLoadingState();
}
return [
{
action: "subscribeResponse",
knownState: {
coValueID: msg.knownState.coValueID,
header: false,
sessions: {},
},
},
];
}
peer.optimisticKnownStates[entry.coValue.id] =
entry.coValue.knownState();
const newContent = entry.coValue.newContentSince(msg.knownState);
const dependedOnCoValues =
entry.coValue.header.ruleset.type === "team"
? expectTeamContent(entry.coValue.getCurrentContent())
.keys()
.filter((k): k is AgentID => k.startsWith("agent_"))
.map((agent) => agentIDAsCoValueID(agent))
: entry.coValue.header.ruleset.type === "ownedByTeam"
? [entry.coValue.header.ruleset.team]
: [];
return [
...dependedOnCoValues.flatMap((coValueID) =>
this.handleSubscribe(
{
action: "subscribe",
knownState: {
coValueID,
header: false,
sessions: {},
},
},
peer,
asDependencyOf || msg.knownState.coValueID
)
),
{
action: "subscribeResponse",
knownState: entry.coValue.knownState(),
asDependencyOf,
},
...(newContent ? [newContent] : []),
];
}
handleSubscribeResponse(
msg: SubscribeResponseMessage,
peer: PeerState
): SyncMessage[] {
let entry = this.coValues[msg.knownState.coValueID];
if (!entry) {
if (msg.asDependencyOf) {
if (this.coValues[msg.asDependencyOf]) {
entry = newLoadingState();
this.coValues[msg.knownState.coValueID] = entry;
}
} else {
throw new Error(
"Expected coValue entry to be created, missing subscribe?"
);
}
}
if (entry.state === "loading") {
peer.optimisticKnownStates[msg.knownState.coValueID] =
msg.knownState;
return [];
}
const newContent = entry.coValue.newContentSince(msg.knownState);
peer.optimisticKnownStates[msg.knownState.coValueID] =
combinedKnownStates(msg.knownState, entry.coValue.knownState());
return newContent ? [newContent] : [];
}
handleNewContent(msg: NewContentMessage): SyncMessage[] {
let entry = this.coValues[msg.coValueID];
if (!entry) {
throw new Error(
"Expected coValue entry to be created, missing subscribe?"
);
}
let resolveAfterDone: ((coValue: CoValue) => void) | undefined;
if (entry.state === "loading") {
if (!msg.header) {
throw new Error("Expected header to be sent in first message");
}
const coValue = new CoValue(msg.header, this);
resolveAfterDone = entry.resolve;
entry = {
state: "loaded",
coValue: coValue,
};
this.coValues[msg.coValueID] = entry;
}
const coValue = entry.coValue;
let invalidStateAssumed = false;
for (const sessionID of Object.keys(msg.newContent) as SessionID[]) {
const ourKnownTxIdx =
coValue.sessions[sessionID]?.transactions.length;
const theirFirstNewTxIdx = msg.newContent[sessionID].after;
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
invalidStateAssumed = true;
continue;
}
const alreadyKnownOffset = ourKnownTxIdx
? ourKnownTxIdx - theirFirstNewTxIdx
: 0;
const newTransactions =
msg.newContent[sessionID].newTransactions.slice(
alreadyKnownOffset
);
const success = coValue.tryAddTransactions(
sessionID,
newTransactions,
msg.newContent[sessionID].lastHash,
msg.newContent[sessionID].lastSignature
);
if (!success) {
console.error("Failed to add transactions", newTransactions);
continue;
}
}
if (resolveAfterDone) {
resolveAfterDone(coValue);
}
return invalidStateAssumed
? [
{
action: "wrongAssumedKnownState",
knownState: coValue.knownState(),
},
]
: [];
}
handleWrongAssumedKnownState(
msg: WrongAssumedKnownStateMessage,
peer: PeerState
): SyncMessage[] {
const coValue = this.expectCoValueLoaded(msg.knownState.coValueID);
peer.optimisticKnownStates[msg.knownState.coValueID] =
combinedKnownStates(msg.knownState, coValue.knownState());
const newContent = coValue.newContentSince(msg.knownState);
return newContent ? [newContent] : [];
}
handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage[] {
throw new Error("Method not implemented.");
}
async syncCoValue(coValue: CoValue) {
for (const peer of Object.values(this.peers)) {
const optimisticKnownState =
peer.optimisticKnownStates[coValue.id];
if (optimisticKnownState || peer.role === "server") {
const newContent =
coValue.newContentSince(optimisticKnownState);
peer.optimisticKnownStates[coValue.id] = peer
.optimisticKnownStates[coValue.id]
? combinedKnownStates(
peer.optimisticKnownStates[coValue.id],
coValue.knownState()
)
: coValue.knownState();
if (!optimisticKnownState && peer.role === "server") {
// auto-subscribe
await peer.outgoing.write({
action: "subscribe",
knownState: coValue.knownState(),
});
}
if (newContent) {
await peer.outgoing.write(newContent);
}
}
}
}
testWithDifferentCredentials(
agentCredential: AgentCredential,
ownSessionID: SessionID
@@ -480,7 +159,7 @@ type CoValueState =
}
| { state: "loaded"; coValue: CoValue };
function newLoadingState(): CoValueState {
export function newLoadingState(): CoValueState {
let resolve: (coValue: CoValue) => void;
const promise = new Promise<CoValue>((r) => {

View File

@@ -1,4 +1,3 @@
import { test, expect } from "bun:test";
import {
getAgent,
getAgentID,

View File

@@ -290,7 +290,13 @@ export class Team {
this.teamMap.coValue.node.agentCredential.recipientSecret,
new Set(
currentlyPermittedReaders.map(
(reader) => this.node.knownAgents[reader].recipientID
(reader) => {
const readerAgent = this.node.knownAgents[reader];
if (!readerAgent) {
throw new Error("Unknown agent " + reader);
}
return readerAgent.recipientID
}
)
),
{

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,9 @@
import { Hash, Signature } from "./crypto";
import { CoValueHeader, RawCoValueID, SessionID, Transaction } from "./coValue";
import { CoValue } from "./coValue";
import { LocalNode } from "./node";
import { newLoadingState } from "./node";
import { ReadableStream, WritableStream, WritableStreamDefaultWriter } from "isomorphic-streams";
export type CoValueKnownState = {
coValueID: RawCoValueID;
@@ -7,23 +11,29 @@ export type CoValueKnownState = {
sessions: { [sessionID: SessionID]: number };
};
export function emptyKnownState(coValueID: RawCoValueID): CoValueKnownState {
return {
coValueID,
header: false,
sessions: {},
};
}
export type SyncMessage =
| SubscribeMessage
| SubscribeResponseMessage
| TellKnownStateMessage
| NewContentMessage
| WrongAssumedKnownStateMessage
| UnsubscribeMessage;
export type SubscribeMessage = {
action: "subscribe";
knownState: CoValueKnownState;
};
} & CoValueKnownState;
export type SubscribeResponseMessage = {
action: "subscribeResponse";
knownState: CoValueKnownState;
export type TellKnownStateMessage = {
action: "tellKnownState";
asDependencyOf?: RawCoValueID;
};
} & CoValueKnownState;
export type NewContentMessage = {
action: "newContent";
@@ -44,8 +54,7 @@ export type SessionNewContent = {
export type WrongAssumedKnownStateMessage = {
action: "wrongAssumedKnownState";
knownState: CoValueKnownState;
};
} & CoValueKnownState;
export type UnsubscribeMessage = {
action: "unsubscribe";
@@ -64,6 +73,7 @@ export interface Peer {
export interface PeerState {
id: PeerID;
optimisticKnownStates: { [coValueID: RawCoValueID]: CoValueKnownState };
toldKnownState: Set<RawCoValueID>;
incoming: ReadableStream<SyncMessage>;
outgoing: WritableStreamDefaultWriter<SyncMessage>;
role: "peer" | "server" | "client";
@@ -94,10 +104,16 @@ export function weAreStrictlyAhead(
return true;
}
export function combinedKnownStates(stateA: CoValueKnownState, stateB: CoValueKnownState): CoValueKnownState {
export function combinedKnownStates(
stateA: CoValueKnownState,
stateB: CoValueKnownState
): CoValueKnownState {
const sessionStates: CoValueKnownState["sessions"] = {};
const allSessions = new Set([...Object.keys(stateA.sessions), ...Object.keys(stateB.sessions)] as SessionID[]);
const allSessions = new Set([
...Object.keys(stateA.sessions),
...Object.keys(stateB.sessions),
] as SessionID[]);
for (const sessionID of allSessions) {
const stateAValue = stateA.sessions[sessionID];
@@ -111,4 +127,386 @@ export function combinedKnownStates(stateA: CoValueKnownState, stateB: CoValueKn
header: stateA.header || stateB.header,
sessions: sessionStates,
};
}
}
export class SyncManager {
peers: { [key: PeerID]: PeerState } = {};
local: LocalNode;
constructor(local: LocalNode) {
this.local = local;
}
loadFromPeers(id: RawCoValueID) {
for (const peer of Object.values(this.peers)) {
peer.outgoing
.write({
action: "subscribe",
coValueID: id,
header: false,
sessions: {},
})
.catch((e) => {
console.error("Error writing to peer", e);
});
}
}
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
// TODO: validate
switch (msg.action) {
case "subscribe":
return await this.handleSubscribe(msg, peer);
case "tellKnownState":
return await this.handleTellKnownState(msg, peer);
case "newContent":
return await this.handleNewContent(msg, peer);
case "wrongAssumedKnownState":
return await this.handleWrongAssumedKnownState(msg, peer);
case "unsubscribe":
return await this.handleUnsubscribe(msg);
default:
throw new Error(
`Unknown message type ${
(msg as { action: "string" }).action
}`
);
}
}
async subscribeToIncludingDependencies(
coValueID: RawCoValueID,
peer: PeerState
) {
const coValue = this.local.expectCoValueLoaded(coValueID);
for (const coValueID of coValue.getDependedOnCoValues()) {
await this.subscribeToIncludingDependencies(coValueID, peer);
}
if (!peer.toldKnownState.has(coValueID)) {
peer.toldKnownState.add(coValueID);
await peer.outgoing.write({
action: "subscribe",
...coValue.knownState(),
});
}
}
async tellUntoldKnownStateIncludingDependencies(
coValueID: RawCoValueID,
peer: PeerState,
asDependencyOf?: RawCoValueID
) {
const coValue = this.local.expectCoValueLoaded(coValueID);
for (const dependentCoValueID of coValue.getDependedOnCoValues()) {
await this.tellUntoldKnownStateIncludingDependencies(
dependentCoValueID,
peer,
asDependencyOf || coValueID
);
}
if (!peer.toldKnownState.has(coValueID)) {
await peer.outgoing.write({
action: "tellKnownState",
asDependencyOf,
...coValue.knownState(),
});
peer.toldKnownState.add(coValueID);
}
}
async sendNewContentIncludingDependencies(
coValueID: RawCoValueID,
peer: PeerState
) {
const coValue = this.local.expectCoValueLoaded(coValueID);
for (const coValueID of coValue.getDependedOnCoValues()) {
await this.sendNewContentIncludingDependencies(coValueID, peer);
}
const newContent = coValue.newContentSince(
peer.optimisticKnownStates[coValueID]
);
if (newContent) {
await peer.outgoing.write(newContent);
peer.optimisticKnownStates[coValueID] = combinedKnownStates(
peer.optimisticKnownStates[coValueID] ||
emptyKnownState(coValueID),
coValue.knownState()
);
}
}
addPeer(peer: Peer) {
const peerState: PeerState = {
id: peer.id,
optimisticKnownStates: {},
incoming: peer.incoming,
outgoing: peer.outgoing.getWriter(),
toldKnownState: new Set(),
role: peer.role,
};
this.peers[peer.id] = peerState;
if (peer.role === "server") {
const initialSync = async () => {
for (const entry of Object.values(this.local.coValues)) {
if (entry.state === "loading") {
continue;
}
await this.subscribeToIncludingDependencies(
entry.coValue.id,
peerState
);
peerState.optimisticKnownStates[entry.coValue.id] = {
coValueID: entry.coValue.id,
header: false,
sessions: {},
};
}
};
void initialSync();
}
const readIncoming = async () => {
for await (const msg of peerState.incoming) {
try {
await this.handleSyncMessage(msg, peerState);
} catch (e) {
console.error(
`Error reading from peer ${peer.id}`,
JSON.stringify(msg),
e
);
}
}
};
void readIncoming();
}
async handleSubscribe(msg: SubscribeMessage, peer: PeerState) {
const entry = this.local.coValues[msg.coValueID];
if (!entry || entry.state === "loading") {
if (!entry) {
this.local.coValues[msg.coValueID] = newLoadingState();
}
peer.optimisticKnownStates[msg.coValueID] = knownStateIn(msg);
peer.toldKnownState.add(msg.coValueID);
await peer.outgoing.write({
action: "tellKnownState",
coValueID: msg.coValueID,
header: false,
sessions: {},
});
return;
}
peer.optimisticKnownStates[msg.coValueID] = knownStateIn(msg);
await this.tellUntoldKnownStateIncludingDependencies(
msg.coValueID,
peer
);
await this.sendNewContentIncludingDependencies(msg.coValueID, peer);
}
async handleTellKnownState(msg: TellKnownStateMessage, peer: PeerState) {
let entry = this.local.coValues[msg.coValueID];
peer.optimisticKnownStates[msg.coValueID] = combinedKnownStates(
peer.optimisticKnownStates[msg.coValueID] || emptyKnownState(msg.coValueID),
knownStateIn(msg)
);
if (!entry) {
if (msg.asDependencyOf) {
if (this.local.coValues[msg.asDependencyOf]) {
entry = newLoadingState();
this.local.coValues[msg.coValueID] = entry;
} else {
throw new Error(
"Expected coValue dependency entry to be created, missing subscribe?"
);
}
} else {
throw new Error(
"Expected coValue entry to be created, missing subscribe?"
);
}
}
if (entry.state === "loading") {
return [];
}
await this.tellUntoldKnownStateIncludingDependencies(
msg.coValueID,
peer
);
await this.sendNewContentIncludingDependencies(msg.coValueID, peer);
}
async handleNewContent(msg: NewContentMessage, peer: PeerState) {
let entry = this.local.coValues[msg.coValueID];
if (!entry) {
throw new Error(
"Expected coValue entry to be created, missing subscribe?"
);
}
let resolveAfterDone: ((coValue: CoValue) => void) | undefined;
const peerOptimisticKnownState =
peer.optimisticKnownStates[msg.coValueID];
if (!peerOptimisticKnownState) {
throw new Error(
"Expected optimisticKnownState to be set for coValue we receive new content for"
);
}
if (entry.state === "loading") {
if (!msg.header) {
throw new Error("Expected header to be sent in first message");
}
peerOptimisticKnownState.header = true;
const coValue = new CoValue(msg.header, this.local);
resolveAfterDone = entry.resolve;
entry = {
state: "loaded",
coValue: coValue,
};
this.local.coValues[msg.coValueID] = entry;
}
const coValue = entry.coValue;
let invalidStateAssumed = false;
for (const [sessionID, newContentForSession] of Object.entries(
msg.newContent
) as [SessionID, SessionNewContent][]) {
const ourKnownTxIdx =
coValue.sessions[sessionID]?.transactions.length;
const theirFirstNewTxIdx = newContentForSession.after;
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
invalidStateAssumed = true;
continue;
}
const alreadyKnownOffset = ourKnownTxIdx
? ourKnownTxIdx - theirFirstNewTxIdx
: 0;
const newTransactions =
newContentForSession.newTransactions.slice(alreadyKnownOffset);
const success = coValue.tryAddTransactions(
sessionID,
newTransactions,
newContentForSession.lastHash,
newContentForSession.lastSignature
);
if (!success) {
console.error("Failed to add transactions", newTransactions);
continue;
}
peerOptimisticKnownState.sessions[sessionID] =
newContentForSession.after +
newContentForSession.newTransactions.length;
}
if (resolveAfterDone) {
resolveAfterDone(coValue);
}
await this.syncCoValue(coValue);
if (invalidStateAssumed) {
await peer.outgoing.write({
action: "wrongAssumedKnownState",
...coValue.knownState(),
});
}
}
async handleWrongAssumedKnownState(
msg: WrongAssumedKnownStateMessage,
peer: PeerState
) {
const coValue = this.local.expectCoValueLoaded(msg.coValueID);
peer.optimisticKnownStates[msg.coValueID] = combinedKnownStates(
msg,
coValue.knownState()
);
const newContent = coValue.newContentSince(msg);
if (newContent) {
await peer.outgoing.write(newContent);
}
}
handleUnsubscribe(_msg: UnsubscribeMessage) {
throw new Error("Method not implemented.");
}
async syncCoValue(coValue: CoValue) {
for (const peer of Object.values(this.peers)) {
const optimisticKnownState = peer.optimisticKnownStates[coValue.id];
const shouldSync =
optimisticKnownState ||
peer.role === "server";
if (shouldSync) {
await this.tellUntoldKnownStateIncludingDependencies(
coValue.id,
peer
);
await this.sendNewContentIncludingDependencies(
coValue.id,
peer
);
}
}
}
}
function knownStateIn(
msg:
| SubscribeMessage
| TellKnownStateMessage
| WrongAssumedKnownStateMessage
) {
return {
coValueID: msg.coValueID,
header: msg.header,
sessions: msg.sessions,
};
}

View File

@@ -14,10 +14,8 @@
"forceConsistentCasingInFileNames": true,
"allowJs": true,
"noEmit": true,
"types": [
"bun-types" // add Bun global
],
// "noUncheckedIndexedAccess": true
}
"noUncheckedIndexedAccess": true,
"esModuleInterop": true,
},
"include": ["./src/**/*"],
}

2781
yarn.lock Normal file

File diff suppressed because it is too large Load Diff