Merge pull request #3 from gardencmp/anselm/gar-66-simple-sync
Simple sync
This commit is contained in:
@@ -198,6 +198,10 @@ export class CoList<T extends JsonValue, Meta extends JsonValue> {
|
||||
constructor(multilog: MultiLog) {
|
||||
this.id = multilog.id as CoValueID<CoList<T, Meta>>;
|
||||
}
|
||||
|
||||
toJSON(): JsonObject {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
}
|
||||
|
||||
export class MultiStream<T extends JsonValue, Meta extends JsonValue> {
|
||||
@@ -207,6 +211,10 @@ export class MultiStream<T extends JsonValue, Meta extends JsonValue> {
|
||||
constructor(multilog: MultiLog) {
|
||||
this.id = multilog.id as CoValueID<MultiStream<T, Meta>>;
|
||||
}
|
||||
|
||||
toJSON(): JsonObject {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
}
|
||||
|
||||
export class Static<T extends JsonValue> {
|
||||
@@ -216,6 +224,10 @@ export class Static<T extends JsonValue> {
|
||||
constructor(multilog: MultiLog) {
|
||||
this.id = multilog.id as CoValueID<Static<T>>;
|
||||
}
|
||||
|
||||
toJSON(): JsonObject {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
}
|
||||
|
||||
export function expectMap(content: CoValue): CoMap<{ [key: string]: string }, {}> {
|
||||
|
||||
138
src/multilog.ts
138
src/multilog.ts
@@ -30,6 +30,8 @@ import {
|
||||
determineValidTransactions,
|
||||
expectTeamContent,
|
||||
} from "./permissions";
|
||||
import { LocalNode } from "./node";
|
||||
import { MultiLogKnownState, NewContentMessage } from "./sync";
|
||||
|
||||
export type MultiLogID = `coval_${string}`;
|
||||
|
||||
@@ -58,14 +60,17 @@ type SessionLog = {
|
||||
transactions: Transaction[];
|
||||
lastHash?: Hash;
|
||||
streamingHash: StreamingHash;
|
||||
lastSignature: string;
|
||||
lastSignature: Signature;
|
||||
};
|
||||
|
||||
export type PrivateTransaction = {
|
||||
privacy: "private";
|
||||
madeAt: number;
|
||||
keyUsed: KeyID;
|
||||
encryptedChanges: Encrypted<JsonValue[], {in: MultiLogID, tx: TransactionID}>;
|
||||
encryptedChanges: Encrypted<
|
||||
JsonValue[],
|
||||
{ in: MultiLogID; tx: TransactionID }
|
||||
>;
|
||||
};
|
||||
|
||||
export type TrustingTransaction = {
|
||||
@@ -86,61 +91,33 @@ export type TransactionID = { sessionID: SessionID; txIndex: number };
|
||||
|
||||
export class MultiLog {
|
||||
id: MultiLogID;
|
||||
node: LocalNode;
|
||||
header: MultiLogHeader;
|
||||
sessions: { [key: SessionID]: SessionLog };
|
||||
agentCredential: AgentCredential;
|
||||
ownSessionID: SessionID;
|
||||
knownAgents: { [key: AgentID]: Agent };
|
||||
requiredMultiLogs: { [key: MultiLogID]: MultiLog };
|
||||
content?: CoValue;
|
||||
|
||||
constructor(
|
||||
header: MultiLogHeader,
|
||||
agentCredential: AgentCredential,
|
||||
ownSessionID: SessionID,
|
||||
knownAgents: { [key: AgentID]: Agent },
|
||||
requiredMultiLogs: { [key: MultiLogID]: MultiLog }
|
||||
) {
|
||||
constructor(header: MultiLogHeader, node: LocalNode) {
|
||||
this.id = multilogIDforHeader(header);
|
||||
this.header = header;
|
||||
this.sessions = {};
|
||||
this.agentCredential = agentCredential;
|
||||
this.ownSessionID = ownSessionID;
|
||||
this.knownAgents = knownAgents;
|
||||
this.requiredMultiLogs = requiredMultiLogs;
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
testWithDifferentCredentials(
|
||||
agentCredential: AgentCredential,
|
||||
ownSessionID: SessionID
|
||||
): MultiLog {
|
||||
const knownAgents = {
|
||||
...this.knownAgents,
|
||||
[agentIDfromSessionID(ownSessionID)]: getAgent(agentCredential),
|
||||
};
|
||||
const cloned = new MultiLog(
|
||||
this.header,
|
||||
const newNode = this.node.testWithDifferentCredentials(
|
||||
agentCredential,
|
||||
ownSessionID,
|
||||
knownAgents,
|
||||
Object.fromEntries(
|
||||
Object.entries(this.requiredMultiLogs).map(([id, multilog]) => [
|
||||
id,
|
||||
multilog.testWithDifferentCredentials(
|
||||
agentCredential,
|
||||
ownSessionID
|
||||
),
|
||||
])
|
||||
)
|
||||
ownSessionID
|
||||
);
|
||||
|
||||
cloned.sessions = JSON.parse(JSON.stringify(this.sessions));
|
||||
|
||||
return cloned;
|
||||
return newNode.expectMultiLogLoaded(this.id);
|
||||
}
|
||||
|
||||
knownState(): MultilogKnownState {
|
||||
knownState(): MultiLogKnownState {
|
||||
return {
|
||||
multilogID: this.id,
|
||||
header: true,
|
||||
sessions: Object.fromEntries(
|
||||
Object.entries(this.sessions).map(([k, v]) => [
|
||||
@@ -156,7 +133,7 @@ export class MultiLog {
|
||||
}
|
||||
|
||||
nextTransactionID(): TransactionID {
|
||||
const sessionID = this.ownSessionID;
|
||||
const sessionID = this.node.ownSessionID;
|
||||
return {
|
||||
sessionID,
|
||||
txIndex: this.sessions[sessionID]?.transactions.length || 0,
|
||||
@@ -170,7 +147,7 @@ export class MultiLog {
|
||||
newSignature: Signature
|
||||
): boolean {
|
||||
const signatoryID =
|
||||
this.knownAgents[agentIDfromSessionID(sessionID)]?.signatoryID;
|
||||
this.node.knownAgents[agentIDfromSessionID(sessionID)]?.signatoryID;
|
||||
|
||||
if (!signatoryID) {
|
||||
console.warn("Unknown agent", agentIDfromSessionID(sessionID));
|
||||
@@ -210,6 +187,8 @@ export class MultiLog {
|
||||
|
||||
this.content = undefined;
|
||||
|
||||
this.node.syncMultiLog(this);
|
||||
|
||||
const _ = this.getCurrentContent();
|
||||
|
||||
return true;
|
||||
@@ -262,14 +241,14 @@ export class MultiLog {
|
||||
};
|
||||
}
|
||||
|
||||
const sessionID = this.ownSessionID;
|
||||
const sessionID = this.node.ownSessionID;
|
||||
|
||||
const { expectedNewHash } = this.expectedNewHashAfter(sessionID, [
|
||||
transaction,
|
||||
]);
|
||||
|
||||
const signature = sign(
|
||||
this.agentCredential.signatorySecret,
|
||||
this.node.agentCredential.signatorySecret,
|
||||
expectedNewHash
|
||||
);
|
||||
|
||||
@@ -353,9 +332,9 @@ export class MultiLog {
|
||||
id: currentKeyId,
|
||||
};
|
||||
} else if (this.header.ruleset.type === "ownedByTeam") {
|
||||
return this.requiredMultiLogs[
|
||||
this.header.ruleset.team
|
||||
].getCurrentReadKey();
|
||||
return this.node
|
||||
.expectMultiLogLoaded(this.header.ruleset.team)
|
||||
.getCurrentReadKey();
|
||||
} else {
|
||||
throw new Error(
|
||||
"Only teams or values owned by teams have read secrets"
|
||||
@@ -374,7 +353,7 @@ export class MultiLog {
|
||||
for (const entry of readKeyHistory) {
|
||||
if (entry.value?.keyID === keyID) {
|
||||
const revealer = agentIDfromSessionID(entry.txID.sessionID);
|
||||
const revealerAgent = this.knownAgents[revealer];
|
||||
const revealerAgent = this.node.knownAgents[revealer];
|
||||
|
||||
if (!revealerAgent) {
|
||||
throw new Error("Unknown revealer");
|
||||
@@ -382,7 +361,7 @@ export class MultiLog {
|
||||
|
||||
const secret = openAs(
|
||||
entry.value.revelation,
|
||||
this.agentCredential.recipientSecret,
|
||||
this.node.agentCredential.recipientSecret,
|
||||
revealerAgent.recipientID,
|
||||
{
|
||||
in: this.id,
|
||||
@@ -417,7 +396,9 @@ export class MultiLog {
|
||||
if (secret) {
|
||||
return secret;
|
||||
} else {
|
||||
console.error(`Sealing ${sealingKeyID} key didn't unseal ${keyID}`);
|
||||
console.error(
|
||||
`Sealing ${sealingKeyID} key didn't unseal ${keyID}`
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -426,12 +407,12 @@ export class MultiLog {
|
||||
"readKey " +
|
||||
keyID +
|
||||
" not revealed for " +
|
||||
getAgentID(getAgent(this.agentCredential))
|
||||
getAgentID(getAgent(this.node.agentCredential))
|
||||
);
|
||||
} else if (this.header.ruleset.type === "ownedByTeam") {
|
||||
return this.requiredMultiLogs[this.header.ruleset.team].getReadKey(
|
||||
keyID
|
||||
);
|
||||
return this.node
|
||||
.expectMultiLogLoaded(this.header.ruleset.team)
|
||||
.getReadKey(keyID);
|
||||
} else {
|
||||
throw new Error(
|
||||
"Only teams or values owned by teams have read secrets"
|
||||
@@ -442,12 +423,51 @@ export class MultiLog {
|
||||
getTx(txID: TransactionID): Transaction | undefined {
|
||||
return this.sessions[txID.sessionID]?.transactions[txID.txIndex];
|
||||
}
|
||||
}
|
||||
|
||||
type MultilogKnownState = {
|
||||
header: boolean;
|
||||
sessions: { [key: SessionID]: number };
|
||||
};
|
||||
newContentSince(knownState: MultiLogKnownState | undefined): NewContentMessage | undefined {
|
||||
const newContent: NewContentMessage = {
|
||||
action: "newContent",
|
||||
multilogID: this.id,
|
||||
header: knownState?.header ? undefined : this.header,
|
||||
newContent: Object.fromEntries(
|
||||
Object.entries(this.sessions)
|
||||
.map(([sessionID, log]) => {
|
||||
const newTransactions = log.transactions.slice(
|
||||
knownState?.sessions[sessionID as SessionID] || 0
|
||||
);
|
||||
|
||||
if (
|
||||
newTransactions.length === 0 ||
|
||||
!log.lastHash ||
|
||||
!log.lastSignature
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return [
|
||||
sessionID,
|
||||
{
|
||||
after:
|
||||
knownState?.sessions[
|
||||
sessionID as SessionID
|
||||
] || 0,
|
||||
newTransactions,
|
||||
lastHash: log.lastHash,
|
||||
lastSignature: log.lastSignature,
|
||||
},
|
||||
];
|
||||
})
|
||||
.filter((x): x is Exclude<typeof x, undefined> => !!x)
|
||||
),
|
||||
}
|
||||
|
||||
if (!newContent.header && Object.keys(newContent.newContent).length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return newContent;
|
||||
}
|
||||
}
|
||||
|
||||
export type AgentID = `agent_${string}`;
|
||||
|
||||
@@ -481,6 +501,10 @@ export function getAgentID(agent: Agent): AgentID {
|
||||
)}`;
|
||||
}
|
||||
|
||||
export function agentIDasMultiLogID(agentID: AgentID): MultiLogID {
|
||||
return `coval_${agentID.substring("agent_".length)}`;
|
||||
}
|
||||
|
||||
export type AgentCredential = {
|
||||
signatorySecret: SignatorySecret;
|
||||
recipientSecret: RecipientSecret;
|
||||
|
||||
454
src/node.ts
454
src/node.ts
@@ -11,12 +11,28 @@ import {
|
||||
getAgentID,
|
||||
getAgentMultilogHeader,
|
||||
MultiLogHeader,
|
||||
agentIDfromSessionID,
|
||||
agentIDasMultiLogID,
|
||||
} from "./multilog";
|
||||
import { Team, expectTeamContent } from "./permissions";
|
||||
import {
|
||||
NewContentMessage,
|
||||
Peer,
|
||||
PeerID,
|
||||
PeerState,
|
||||
SessionNewContent,
|
||||
SubscribeMessage,
|
||||
SubscribeResponseMessage,
|
||||
SyncMessage,
|
||||
UnsubscribeMessage,
|
||||
WrongAssumedKnownStateMessage,
|
||||
combinedKnownStates,
|
||||
weAreStrictlyAhead,
|
||||
} from "./sync";
|
||||
|
||||
export class LocalNode {
|
||||
multilogs: { [key: MultiLogID]: Promise<MultiLog> | MultiLog } = {};
|
||||
// peers: {[key: Hostname]: Peer} = {};
|
||||
multilogs: { [key: MultiLogID]: MultilogState } = {};
|
||||
peers: { [key: PeerID]: PeerState } = {};
|
||||
agentCredential: AgentCredential;
|
||||
agentID: AgentID;
|
||||
ownSessionID: SessionID;
|
||||
@@ -30,46 +46,65 @@ export class LocalNode {
|
||||
this.knownAgents[agentID] = agent;
|
||||
this.ownSessionID = ownSessionID;
|
||||
|
||||
const agentMultilog = new MultiLog(
|
||||
getAgentMultilogHeader(agent),
|
||||
agentCredential,
|
||||
ownSessionID,
|
||||
this.knownAgents,
|
||||
{}
|
||||
);
|
||||
this.multilogs[agentMultilog.id] = Promise.resolve(agentMultilog);
|
||||
const agentMultilog = new MultiLog(getAgentMultilogHeader(agent), this);
|
||||
this.multilogs[agentMultilog.id] = {
|
||||
state: "loaded",
|
||||
multilog: agentMultilog,
|
||||
};
|
||||
}
|
||||
|
||||
createMultiLog(header: MultiLogHeader): MultiLog {
|
||||
const requiredMultiLogs =
|
||||
header.ruleset.type === "ownedByTeam"
|
||||
? {
|
||||
[header.ruleset.team]: this.expectMultiLogLoaded(
|
||||
header.ruleset.team
|
||||
),
|
||||
}
|
||||
: {};
|
||||
const multilog = new MultiLog(header, this);
|
||||
this.multilogs[multilog.id] = { state: "loaded", multilog };
|
||||
|
||||
this.syncMultiLog(multilog);
|
||||
|
||||
const multilog = new MultiLog(
|
||||
header,
|
||||
this.agentCredential,
|
||||
this.ownSessionID,
|
||||
this.knownAgents,
|
||||
requiredMultiLogs
|
||||
);
|
||||
this.multilogs[multilog.id] = multilog;
|
||||
return multilog;
|
||||
}
|
||||
|
||||
expectMultiLogLoaded(id: MultiLogID): MultiLog {
|
||||
const multilog = this.multilogs[id];
|
||||
if (!multilog) {
|
||||
throw new Error(`Unknown multilog ${id}`);
|
||||
loadMultiLog(id: MultiLogID): Promise<MultiLog> {
|
||||
let entry = this.multilogs[id];
|
||||
if (!entry) {
|
||||
entry = newLoadingState();
|
||||
|
||||
this.multilogs[id] = entry;
|
||||
|
||||
for (const peer of Object.values(this.peers)) {
|
||||
peer.outgoing
|
||||
.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error writing to peer", e);
|
||||
});
|
||||
}
|
||||
}
|
||||
if (multilog instanceof Promise) {
|
||||
throw new Error(`Multilog ${id} not yet loaded`);
|
||||
if (entry.state === "loaded") {
|
||||
return Promise.resolve(entry.multilog);
|
||||
}
|
||||
return multilog;
|
||||
return entry.done;
|
||||
}
|
||||
|
||||
expectMultiLogLoaded(id: MultiLogID, expectation?: string): MultiLog {
|
||||
const entry = this.multilogs[id];
|
||||
if (!entry) {
|
||||
throw new Error(
|
||||
`${expectation ? expectation + ": " : ""}Unknown multilog ${id}`
|
||||
);
|
||||
}
|
||||
if (entry.state === "loading") {
|
||||
throw new Error(
|
||||
`${
|
||||
expectation ? expectation + ": " : ""
|
||||
}Multilog ${id} not yet loaded`
|
||||
);
|
||||
}
|
||||
return entry.multilog;
|
||||
}
|
||||
|
||||
addKnownAgent(agent: Agent) {
|
||||
@@ -109,13 +144,352 @@ export class LocalNode {
|
||||
|
||||
return new Team(teamContent, this);
|
||||
}
|
||||
|
||||
addPeer(peer: Peer) {
|
||||
const peerState: PeerState = {
|
||||
id: peer.id,
|
||||
optimisticKnownStates: {},
|
||||
incoming: peer.incoming,
|
||||
outgoing: peer.outgoing.getWriter(),
|
||||
role: peer.role,
|
||||
};
|
||||
this.peers[peer.id] = peerState;
|
||||
|
||||
if (peer.role === "server") {
|
||||
for (const entry of Object.values(this.multilogs)) {
|
||||
if (entry.state === "loading") {
|
||||
continue;
|
||||
}
|
||||
|
||||
peerState.outgoing
|
||||
.write({
|
||||
action: "subscribe",
|
||||
knownState: entry.multilog.knownState(),
|
||||
})
|
||||
.catch((e) => {
|
||||
// TODO: handle error
|
||||
console.error("Error writing to peer", e);
|
||||
});
|
||||
|
||||
peerState.optimisticKnownStates[entry.multilog.id] = {
|
||||
multilogID: entry.multilog.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
const readIncoming = async () => {
|
||||
for await (const msg of peerState.incoming) {
|
||||
for (const responseMsg of this.handleSyncMessage(
|
||||
msg,
|
||||
peerState
|
||||
)) {
|
||||
await peerState.outgoing.write(responseMsg);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
readIncoming().catch((e) => {
|
||||
// TODO: handle error
|
||||
console.error("Error reading from peer", e);
|
||||
});
|
||||
}
|
||||
|
||||
handleSyncMessage(msg: SyncMessage, peer: PeerState): SyncMessage[] {
|
||||
// TODO: validate
|
||||
switch (msg.action) {
|
||||
case "subscribe":
|
||||
return this.handleSubscribe(msg, peer);
|
||||
case "subscribeResponse":
|
||||
return this.handleSubscribeResponse(msg, peer);
|
||||
case "newContent":
|
||||
return this.handleNewContent(msg);
|
||||
case "wrongAssumedKnownState":
|
||||
return this.handleWrongAssumedKnownState(msg, peer);
|
||||
case "unsubscribe":
|
||||
return this.handleUnsubscribe(msg);
|
||||
default:
|
||||
throw new Error(`Unknown message type ${(msg as any).action}`);
|
||||
}
|
||||
}
|
||||
|
||||
handleSubscribe(
|
||||
msg: SubscribeMessage,
|
||||
peer: PeerState,
|
||||
asDependencyOf?: MultiLogID
|
||||
): SyncMessage[] {
|
||||
const entry = this.multilogs[msg.knownState.multilogID];
|
||||
|
||||
if (!entry || entry.state === "loading") {
|
||||
if (!entry) {
|
||||
this.multilogs[msg.knownState.multilogID] = newLoadingState();
|
||||
}
|
||||
|
||||
return [
|
||||
{
|
||||
action: "subscribeResponse",
|
||||
knownState: {
|
||||
multilogID: msg.knownState.multilogID,
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
peer.optimisticKnownStates[entry.multilog.id] =
|
||||
entry.multilog.knownState();
|
||||
|
||||
const newContent = entry.multilog.newContentSince(msg.knownState);
|
||||
|
||||
const dependedOnMultilogs =
|
||||
entry.multilog.header.ruleset.type === "team"
|
||||
? expectTeamContent(entry.multilog.getCurrentContent())
|
||||
.keys()
|
||||
.filter((k): k is AgentID => k.startsWith("agent_"))
|
||||
.map((agent) => agentIDasMultiLogID(agent))
|
||||
: entry.multilog.header.ruleset.type === "ownedByTeam"
|
||||
? [entry.multilog.header.ruleset.team]
|
||||
: [];
|
||||
|
||||
return [
|
||||
...dependedOnMultilogs.flatMap((multilogID) =>
|
||||
this.handleSubscribe(
|
||||
{
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID,
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
},
|
||||
peer,
|
||||
asDependencyOf || msg.knownState.multilogID
|
||||
)
|
||||
),
|
||||
{
|
||||
action: "subscribeResponse",
|
||||
knownState: entry.multilog.knownState(),
|
||||
asDependencyOf,
|
||||
},
|
||||
...(newContent ? [newContent] : []),
|
||||
];
|
||||
}
|
||||
|
||||
handleSubscribeResponse(
|
||||
msg: SubscribeResponseMessage,
|
||||
peer: PeerState
|
||||
): SyncMessage[] {
|
||||
let entry = this.multilogs[msg.knownState.multilogID];
|
||||
|
||||
if (!entry) {
|
||||
if (msg.asDependencyOf) {
|
||||
if (this.multilogs[msg.asDependencyOf]) {
|
||||
entry = newLoadingState();
|
||||
|
||||
this.multilogs[msg.knownState.multilogID] = entry;
|
||||
}
|
||||
} else {
|
||||
throw new Error(
|
||||
"Expected multilog entry to be created, missing subscribe?"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
if (entry.state === "loading") {
|
||||
peer.optimisticKnownStates[msg.knownState.multilogID] =
|
||||
msg.knownState;
|
||||
return [];
|
||||
}
|
||||
|
||||
const newContent = entry.multilog.newContentSince(msg.knownState);
|
||||
peer.optimisticKnownStates[msg.knownState.multilogID] =
|
||||
combinedKnownStates(msg.knownState, entry.multilog.knownState());
|
||||
|
||||
return newContent ? [newContent] : [];
|
||||
}
|
||||
|
||||
handleNewContent(msg: NewContentMessage): SyncMessage[] {
|
||||
let entry = this.multilogs[msg.multilogID];
|
||||
|
||||
if (!entry) {
|
||||
throw new Error(
|
||||
"Expected multilog entry to be created, missing subscribe?"
|
||||
);
|
||||
}
|
||||
|
||||
let resolveAfterDone: ((multilog: MultiLog) => void) | undefined;
|
||||
|
||||
if (entry.state === "loading") {
|
||||
if (!msg.header) {
|
||||
throw new Error("Expected header to be sent in first message");
|
||||
}
|
||||
|
||||
const multilog = new MultiLog(msg.header, this);
|
||||
|
||||
resolveAfterDone = entry.resolve;
|
||||
|
||||
entry = {
|
||||
state: "loaded",
|
||||
multilog,
|
||||
};
|
||||
|
||||
this.multilogs[msg.multilogID] = entry;
|
||||
}
|
||||
|
||||
const multilog = entry.multilog;
|
||||
|
||||
let invalidStateAssumed = false;
|
||||
|
||||
for (const sessionID of Object.keys(msg.newContent) as SessionID[]) {
|
||||
const ourKnownTxIdx =
|
||||
multilog.sessions[sessionID]?.transactions.length;
|
||||
const theirFirstNewTxIdx = msg.newContent[sessionID].after;
|
||||
|
||||
if ((ourKnownTxIdx || 0) < theirFirstNewTxIdx) {
|
||||
invalidStateAssumed = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
const alreadyKnownOffset = ourKnownTxIdx
|
||||
? ourKnownTxIdx - theirFirstNewTxIdx
|
||||
: 0;
|
||||
|
||||
const newTransactions =
|
||||
msg.newContent[sessionID].newTransactions.slice(
|
||||
alreadyKnownOffset
|
||||
);
|
||||
|
||||
const success = multilog.tryAddTransactions(
|
||||
sessionID,
|
||||
newTransactions,
|
||||
msg.newContent[sessionID].lastHash,
|
||||
msg.newContent[sessionID].lastSignature
|
||||
);
|
||||
|
||||
if (!success) {
|
||||
console.error("Failed to add transactions", newTransactions);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (resolveAfterDone) {
|
||||
resolveAfterDone(multilog);
|
||||
}
|
||||
|
||||
return invalidStateAssumed
|
||||
? [
|
||||
{
|
||||
action: "wrongAssumedKnownState",
|
||||
knownState: multilog.knownState(),
|
||||
},
|
||||
]
|
||||
: [];
|
||||
}
|
||||
|
||||
handleWrongAssumedKnownState(
|
||||
msg: WrongAssumedKnownStateMessage,
|
||||
peer: PeerState
|
||||
): SyncMessage[] {
|
||||
const multilog = this.expectMultiLogLoaded(msg.knownState.multilogID);
|
||||
|
||||
peer.optimisticKnownStates[msg.knownState.multilogID] =
|
||||
combinedKnownStates(msg.knownState, multilog.knownState());
|
||||
|
||||
const newContent = multilog.newContentSince(msg.knownState);
|
||||
|
||||
return newContent ? [newContent] : [];
|
||||
}
|
||||
|
||||
handleUnsubscribe(msg: UnsubscribeMessage): SyncMessage[] {
|
||||
throw new Error("Method not implemented.");
|
||||
}
|
||||
|
||||
async syncMultiLog(multilog: MultiLog) {
|
||||
for (const peer of Object.values(this.peers)) {
|
||||
const optimisticKnownState =
|
||||
peer.optimisticKnownStates[multilog.id];
|
||||
|
||||
if (optimisticKnownState || peer.role === "server") {
|
||||
const newContent =
|
||||
multilog.newContentSince(optimisticKnownState);
|
||||
|
||||
peer.optimisticKnownStates[multilog.id] = peer
|
||||
.optimisticKnownStates[multilog.id]
|
||||
? combinedKnownStates(
|
||||
peer.optimisticKnownStates[multilog.id],
|
||||
multilog.knownState()
|
||||
)
|
||||
: multilog.knownState();
|
||||
|
||||
if (!optimisticKnownState && peer.role === "server") {
|
||||
// auto-subscribe
|
||||
await peer.outgoing.write({
|
||||
action: "subscribe",
|
||||
knownState: multilog.knownState(),
|
||||
});
|
||||
}
|
||||
|
||||
if (newContent) {
|
||||
await peer.outgoing.write(newContent);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
testWithDifferentCredentials(
|
||||
agentCredential: AgentCredential,
|
||||
ownSessionID: SessionID
|
||||
): LocalNode {
|
||||
const newNode = new LocalNode(agentCredential, ownSessionID);
|
||||
|
||||
newNode.multilogs = Object.fromEntries(
|
||||
Object.entries(this.multilogs)
|
||||
.map(([id, entry]) => {
|
||||
if (entry.state === "loading") {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const newMultilog = new MultiLog(
|
||||
entry.multilog.header,
|
||||
newNode
|
||||
);
|
||||
|
||||
newMultilog.sessions = entry.multilog.sessions;
|
||||
|
||||
return [id, { state: "loaded", multilog: newMultilog }];
|
||||
})
|
||||
.filter((x): x is Exclude<typeof x, undefined> => !!x)
|
||||
);
|
||||
|
||||
newNode.knownAgents = {
|
||||
...this.knownAgents,
|
||||
[agentIDfromSessionID(ownSessionID)]: getAgent(agentCredential),
|
||||
};
|
||||
|
||||
return newNode;
|
||||
}
|
||||
}
|
||||
|
||||
// type Hostname = string;
|
||||
type MultilogState =
|
||||
| {
|
||||
state: "loading";
|
||||
done: Promise<MultiLog>;
|
||||
resolve: (multilog: MultiLog) => void;
|
||||
}
|
||||
| { state: "loaded"; multilog: MultiLog };
|
||||
|
||||
// interface Peer {
|
||||
// hostname: Hostname;
|
||||
// incoming: ReadableStream<SyncMessage>;
|
||||
// outgoing: WritableStream<SyncMessage>;
|
||||
// optimisticKnownStates: {[multilogID: MultiLogID]: MultilogKnownState};
|
||||
// }
|
||||
function newLoadingState(): MultilogState {
|
||||
let resolve: (multilog: MultiLog) => void;
|
||||
|
||||
const promise = new Promise<MultiLog>((r) => {
|
||||
resolve = r;
|
||||
});
|
||||
|
||||
return {
|
||||
state: "loading",
|
||||
done: promise,
|
||||
resolve: resolve!,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -155,9 +155,10 @@ export function determineValidTransactions(
|
||||
return validTransactions;
|
||||
} else if (multilog.header.ruleset.type === "ownedByTeam") {
|
||||
const teamContent =
|
||||
multilog.requiredMultiLogs[
|
||||
multilog.header.ruleset.team
|
||||
].getCurrentContent();
|
||||
multilog.node.expectMultiLogLoaded(
|
||||
multilog.header.ruleset.team,
|
||||
"Determining valid transaction in owned object but its team wasn't loaded"
|
||||
).getCurrentContent();
|
||||
|
||||
if (teamContent.type !== "comap") {
|
||||
throw new Error("Team must be a map");
|
||||
@@ -193,8 +194,11 @@ export function determineValidTransactions(
|
||||
}));
|
||||
}
|
||||
);
|
||||
} else if (multilog.header.ruleset.type === "agent") {
|
||||
// TODO
|
||||
return [];
|
||||
} else {
|
||||
throw new Error("Unknown ruleset type " + multilog.header.ruleset.type);
|
||||
throw new Error("Unknown ruleset type " + (multilog.header.ruleset as any).type);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -228,6 +232,10 @@ export class Team {
|
||||
this.node = node;
|
||||
}
|
||||
|
||||
get id(): MultiLogID {
|
||||
return this.teamMap.id;
|
||||
}
|
||||
|
||||
addMember(agentID: AgentID, role: Role) {
|
||||
this.teamMap = this.teamMap.edit((map) => {
|
||||
const agent = this.node.knownAgents[agentID];
|
||||
@@ -245,7 +253,7 @@ export class Team {
|
||||
|
||||
const revelation = seal(
|
||||
currentReadKey.secret,
|
||||
this.teamMap.multiLog.agentCredential.recipientSecret,
|
||||
this.teamMap.multiLog.node.agentCredential.recipientSecret,
|
||||
new Set([agent.recipientID]),
|
||||
{
|
||||
in: this.teamMap.multiLog.id,
|
||||
@@ -279,7 +287,7 @@ export class Team {
|
||||
|
||||
const newReadKeyRevelation = seal(
|
||||
newReadKey.secret,
|
||||
this.teamMap.multiLog.agentCredential.recipientSecret,
|
||||
this.teamMap.multiLog.node.agentCredential.recipientSecret,
|
||||
new Set(
|
||||
currentlyPermittedReaders.map(
|
||||
(reader) => this.node.knownAgents[reader].recipientID
|
||||
|
||||
905
src/sync.test.ts
Normal file
905
src/sync.test.ts
Normal file
@@ -0,0 +1,905 @@
|
||||
import { test, expect } from "bun:test";
|
||||
import {
|
||||
getAgent,
|
||||
getAgentID,
|
||||
newRandomAgentCredential,
|
||||
newRandomSessionID,
|
||||
} from "./multilog";
|
||||
import { LocalNode } from "./node";
|
||||
import { Peer, SyncMessage } from "./sync";
|
||||
import { MapOpPayload, expectMap } from "./coValue";
|
||||
|
||||
test(
|
||||
"Node replies with initial tx and header to empty subscribe",
|
||||
async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeResponseMsg = await reader.read();
|
||||
const _adminNewContentMsg = await reader.read();
|
||||
const _teamSubscribeResponseMsg = await reader.read();
|
||||
const _teamNewContentMsg = await reader.read();
|
||||
|
||||
const subscribeResponseMsg = await reader.read();
|
||||
|
||||
expect(subscribeResponseMsg.value).toEqual({
|
||||
action: "subscribeResponse",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const newContentMsg = await reader.read();
|
||||
|
||||
expect(newContentMsg.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: {
|
||||
type: "comap",
|
||||
ruleset: { type: "ownedByTeam", team: team.id },
|
||||
meta: null,
|
||||
},
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 0,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[0].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "hello",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash:
|
||||
map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
},
|
||||
{ timeout: 100 }
|
||||
);
|
||||
|
||||
test("Node replies with only new tx to subscribe with some known state", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
editable.set("goodbye", "world", "trusting");
|
||||
});
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: true,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeResponseMsg = await reader.read();
|
||||
const _adminNewContentMsg = await reader.read();
|
||||
const _teamSubscribeResponseMsg = await reader.read();
|
||||
const _teamNewContentMsg = await reader.read();
|
||||
|
||||
const mapSubscribeResponseMsg = await reader.read();
|
||||
|
||||
expect(mapSubscribeResponseMsg.value).toEqual({
|
||||
action: "subscribeResponse",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const mapNewContentMsg = await reader.read();
|
||||
|
||||
expect(mapNewContentMsg.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: undefined,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 1,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[1].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "goodbye",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test.skip("TODO: node only replies with new tx to subscribe with some known state, even in the depended on multilogs", () => {});
|
||||
|
||||
test("After subscribing, node sends own known state and new txs to peer", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: false,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeResponseMsg = await reader.read();
|
||||
const _adminNewContentMsg = await reader.read();
|
||||
const _teamSubscribeResponseMsg = await reader.read();
|
||||
const _teamNewContentMsg = await reader.read();
|
||||
|
||||
const mapSubscribeResponseMsg = await reader.read();
|
||||
|
||||
expect(mapSubscribeResponseMsg.value).toEqual({
|
||||
action: "subscribeResponse",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const mapNewContentHeaderOnlyMsg = await reader.read();
|
||||
|
||||
expect(mapNewContentHeaderOnlyMsg.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: map.multiLog.header,
|
||||
newContent: {},
|
||||
} satisfies SyncMessage);
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const mapEditMsg1 = await reader.read();
|
||||
|
||||
expect(mapEditMsg1.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 0,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[0].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "hello",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("goodbye", "world", "trusting");
|
||||
});
|
||||
|
||||
const mapEditMsg2 = await reader.read();
|
||||
|
||||
expect(mapEditMsg2.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 1,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[1].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "goodbye",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("Client replies with known new content to subscribeResponse from server", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribeResponse",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: false,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const msg1 = await reader.read();
|
||||
|
||||
expect(msg1.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: map.multiLog.header,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 0,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[0].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "hello",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("No matter the optimistic known state, node respects invalid known state messages and resyncs", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: false,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 0,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeResponseMsg = await reader.read();
|
||||
const _adminNewContentMsg = await reader.read();
|
||||
const _teamSubscribeResponseMsg = await reader.read();
|
||||
const _teamNewContentMsg = await reader.read();
|
||||
const _mapSubscribeResponseMsg = await reader.read();
|
||||
const _mapNewContentHeaderOnlyMsg = await reader.read();
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("goodbye", "world", "trusting");
|
||||
});
|
||||
|
||||
const _mapEditMsg1 = await reader.read();
|
||||
const _mapEditMsg2 = await reader.read();
|
||||
|
||||
await writer.write({
|
||||
action: "wrongAssumedKnownState",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: true,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 1,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const newContentAfterWrongAssumedState = await reader.read();
|
||||
|
||||
expect(newContentAfterWrongAssumedState.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: undefined,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 1,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[1].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "goodbye",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("If we add a peer, but it never subscribes to a multilog, it won't get any messages", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
await shouldNotResolve(reader.read(), { timeout: 50 });
|
||||
});
|
||||
|
||||
test("If we add a server peer, all updates to all multilogs are sent to it, even if it doesn't subscribe", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "server",
|
||||
});
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
const _adminSubscribeMsg = await reader.read();
|
||||
const _teamSubscribeMsg = await reader.read();
|
||||
|
||||
const subscribeMsg = await reader.read();
|
||||
|
||||
expect(subscribeMsg.value).toEqual({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: true,
|
||||
sessions: {},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const newContentMsg = await reader.read();
|
||||
|
||||
expect(newContentMsg.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: map.multiLog.header,
|
||||
newContent: {
|
||||
[node.ownSessionID]: {
|
||||
after: 0,
|
||||
newTransactions: [
|
||||
{
|
||||
privacy: "trusting",
|
||||
madeAt: map.multiLog.sessions[node.ownSessionID]
|
||||
.transactions[0].madeAt,
|
||||
changes: [
|
||||
{
|
||||
op: "insert",
|
||||
key: "hello",
|
||||
value: "world",
|
||||
} satisfies MapOpPayload<string, string>,
|
||||
],
|
||||
},
|
||||
],
|
||||
lastHash: map.multiLog.sessions[node.ownSessionID].lastHash!,
|
||||
lastSignature:
|
||||
map.multiLog.sessions[node.ownSessionID].lastSignature!,
|
||||
},
|
||||
},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("If we add a server peer, newly created multilogs are auto-subscribed to", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "server",
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
const _initialMsg1 = await reader.read();
|
||||
const _initialMsg2 = await reader.read();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const msg1 = await reader.read();
|
||||
|
||||
expect(msg1.value).toEqual({
|
||||
action: "subscribe",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const msg2 = await reader.read();
|
||||
|
||||
expect(msg2.value).toEqual({
|
||||
action: "newContent",
|
||||
multilogID: map.multiLog.id,
|
||||
header: map.multiLog.header,
|
||||
newContent: {},
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test.skip("TODO: when receiving a subscribe response that is behind our optimistic state (due to already sent content), we ignore it", () => {});
|
||||
|
||||
test("When we connect a new server peer, we try to sync all existing multilogs to it", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "server",
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeMessage = await reader.read();
|
||||
const teamSubscribeMessage = await reader.read();
|
||||
|
||||
expect(teamSubscribeMessage.value).toEqual({
|
||||
action: "subscribe",
|
||||
knownState: team.teamMap.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
|
||||
const secondMessage = await reader.read();
|
||||
|
||||
expect(secondMessage.value).toEqual({
|
||||
action: "subscribe",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("When receiving a subscribe with a known state that is ahead of our own, peers should respond with a corresponding subscribe response message", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const [inRx, inTx] = newStreamPair<SyncMessage>();
|
||||
const [outRx, outTx] = newStreamPair<SyncMessage>();
|
||||
|
||||
node.addPeer({
|
||||
id: "test",
|
||||
incoming: inRx,
|
||||
outgoing: outTx,
|
||||
role: "peer",
|
||||
});
|
||||
|
||||
const writer = inTx.getWriter();
|
||||
|
||||
await writer.write({
|
||||
action: "subscribe",
|
||||
knownState: {
|
||||
multilogID: map.multiLog.id,
|
||||
header: true,
|
||||
sessions: {
|
||||
[node.ownSessionID]: 1,
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
const reader = outRx.getReader();
|
||||
|
||||
const _adminSubscribeResponseMsg = await reader.read();
|
||||
const _adminNewContentMsg = await reader.read();
|
||||
const _teamSubscribeResponseMsg = await reader.read();
|
||||
const _teamNewContentMsg = await reader.read();
|
||||
const mapSubscribeResponse = await reader.read();
|
||||
|
||||
expect(mapSubscribeResponse.value).toEqual({
|
||||
action: "subscribeResponse",
|
||||
knownState: map.multiLog.knownState(),
|
||||
} satisfies SyncMessage);
|
||||
});
|
||||
|
||||
test("When replaying creation and transactions of a multilog as new content, the receiving peer integrates this information", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node1 = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node1.createTeam();
|
||||
|
||||
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
|
||||
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
|
||||
|
||||
node1.addPeer({
|
||||
id: "test2",
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
role: "server",
|
||||
});
|
||||
|
||||
const reader1 = outRx1.getReader();
|
||||
|
||||
const _adminSubscriptionMsg = await reader1.read();
|
||||
const teamSubscribeMsg = await reader1.read();
|
||||
|
||||
const map = team.createMap();
|
||||
|
||||
const mapSubscriptionMsg = await reader1.read();
|
||||
const mapNewContentMsg = await reader1.read();
|
||||
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const mapEditMsg = await reader1.read();
|
||||
|
||||
const node2 = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
|
||||
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
|
||||
|
||||
node2.addPeer({
|
||||
id: "test1",
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
role: "client",
|
||||
});
|
||||
|
||||
const writer2 = inTx2.getWriter();
|
||||
const reader2 = outRx2.getReader();
|
||||
|
||||
await writer2.write(teamSubscribeMsg.value);
|
||||
const teamSubscribeResponseMsg = await reader2.read();
|
||||
|
||||
expect(node2.multilogs[team.teamMap.multiLog.id]?.state).toEqual("loading");
|
||||
|
||||
const writer1 = inTx1.getWriter();
|
||||
|
||||
await writer1.write(teamSubscribeResponseMsg.value);
|
||||
const teamContentMsg = await reader1.read();
|
||||
|
||||
await writer2.write(teamContentMsg.value);
|
||||
|
||||
await writer2.write(mapSubscriptionMsg.value);
|
||||
const _mapSubscribeResponseMsg = await reader2.read();
|
||||
await writer2.write(mapNewContentMsg.value);
|
||||
|
||||
expect(node2.multilogs[map.multiLog.id]?.state).toEqual("loading");
|
||||
|
||||
await writer2.write(mapEditMsg.value);
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||
|
||||
expect(
|
||||
expectMap(
|
||||
node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent()
|
||||
).get("hello")
|
||||
).toEqual("world");
|
||||
});
|
||||
|
||||
test("When loading a multilog on one node, the server node it is requested from replies with all the necessary depended on multilogs to make it work", async () => {
|
||||
const admin = newRandomAgentCredential();
|
||||
const adminID = getAgentID(getAgent(admin));
|
||||
|
||||
const node1 = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const team = node1.createTeam();
|
||||
|
||||
const map = team.createMap();
|
||||
map.edit((editable) => {
|
||||
editable.set("hello", "world", "trusting");
|
||||
});
|
||||
|
||||
const node2 = new LocalNode(admin, newRandomSessionID(adminID));
|
||||
|
||||
const [node2asPeer, node1asPeer] = connectedPeers();
|
||||
|
||||
node1.addPeer(node2asPeer);
|
||||
node2.addPeer(node1asPeer);
|
||||
|
||||
await node2.loadMultiLog(map.multiLog.id);
|
||||
|
||||
expect(
|
||||
expectMap(
|
||||
node2.expectMultiLogLoaded(map.multiLog.id).getCurrentContent()
|
||||
).get("hello")
|
||||
).toEqual("world");
|
||||
});
|
||||
|
||||
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
|
||||
const queue: T[] = [];
|
||||
let resolveNextItemReady: () => void = () => {};
|
||||
let nextItemReady: Promise<void> = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
|
||||
const readable = new ReadableStream<T>({
|
||||
async pull(controller) {
|
||||
while (true) {
|
||||
if (queue.length > 0) {
|
||||
controller.enqueue(queue.shift());
|
||||
if (queue.length === 0) {
|
||||
nextItemReady = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
await nextItemReady;
|
||||
}
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
const writable = new WritableStream<T>({
|
||||
write(chunk) {
|
||||
queue.push(chunk);
|
||||
if (queue.length === 1) {
|
||||
resolveNextItemReady();
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
return [readable, writable];
|
||||
}
|
||||
|
||||
function shouldNotResolve(promise: Promise<any>, ops: { timeout: number }) {
|
||||
return new Promise((resolve, reject) => {
|
||||
promise.then((v) =>
|
||||
reject(
|
||||
new Error(
|
||||
"Should not have resolved, but resolved to " +
|
||||
JSON.stringify(v)
|
||||
)
|
||||
)
|
||||
);
|
||||
setTimeout(resolve, ops.timeout);
|
||||
});
|
||||
}
|
||||
|
||||
function connectedPeers(trace?: boolean): [Peer, Peer] {
|
||||
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
|
||||
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
|
||||
|
||||
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
|
||||
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
|
||||
|
||||
outRx2
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(chunk, controller) {
|
||||
trace && console.log("peer 2 -> peer 1", chunk);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx1);
|
||||
|
||||
outRx1
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(chunk, controller) {
|
||||
trace && console.log("peer 1 -> peer 2", chunk);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx2);
|
||||
|
||||
const peer2AsPeer: Peer = {
|
||||
id: "test2",
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
role: "peer",
|
||||
};
|
||||
|
||||
const peer1AsPeer: Peer = {
|
||||
id: "test1",
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
role: "peer",
|
||||
};
|
||||
|
||||
return [peer2AsPeer, peer1AsPeer];
|
||||
}
|
||||
114
src/sync.ts
Normal file
114
src/sync.ts
Normal file
@@ -0,0 +1,114 @@
|
||||
import { Hash, Signature } from "./crypto";
|
||||
import { MultiLogHeader, MultiLogID, SessionID, Transaction } from "./multilog";
|
||||
|
||||
export type MultiLogKnownState = {
|
||||
multilogID: MultiLogID;
|
||||
header: boolean;
|
||||
sessions: { [sessionID: SessionID]: number };
|
||||
};
|
||||
|
||||
export type SyncMessage =
|
||||
| SubscribeMessage
|
||||
| SubscribeResponseMessage
|
||||
| NewContentMessage
|
||||
| WrongAssumedKnownStateMessage
|
||||
| UnsubscribeMessage;
|
||||
|
||||
export type SubscribeMessage = {
|
||||
action: "subscribe";
|
||||
knownState: MultiLogKnownState;
|
||||
};
|
||||
|
||||
export type SubscribeResponseMessage = {
|
||||
action: "subscribeResponse";
|
||||
knownState: MultiLogKnownState;
|
||||
asDependencyOf?: MultiLogID;
|
||||
};
|
||||
|
||||
export type NewContentMessage = {
|
||||
action: "newContent";
|
||||
multilogID: MultiLogID;
|
||||
header?: MultiLogHeader;
|
||||
newContent: {
|
||||
[sessionID: SessionID]: SessionNewContent;
|
||||
};
|
||||
};
|
||||
|
||||
export type SessionNewContent = {
|
||||
after: number;
|
||||
newTransactions: Transaction[];
|
||||
// TODO: is lastHash needed here?
|
||||
lastHash: Hash;
|
||||
lastSignature: Signature;
|
||||
};
|
||||
|
||||
export type WrongAssumedKnownStateMessage = {
|
||||
action: "wrongAssumedKnownState";
|
||||
knownState: MultiLogKnownState;
|
||||
};
|
||||
|
||||
export type UnsubscribeMessage = {
|
||||
action: "unsubscribe";
|
||||
multilogID: MultiLogID;
|
||||
};
|
||||
|
||||
export type PeerID = string;
|
||||
|
||||
export interface Peer {
|
||||
id: PeerID;
|
||||
incoming: ReadableStream<SyncMessage>;
|
||||
outgoing: WritableStream<SyncMessage>;
|
||||
role: "peer" | "server" | "client";
|
||||
}
|
||||
|
||||
export interface PeerState {
|
||||
id: PeerID;
|
||||
optimisticKnownStates: { [multilogID: MultiLogID]: MultiLogKnownState };
|
||||
incoming: ReadableStream<SyncMessage>;
|
||||
outgoing: WritableStreamDefaultWriter<SyncMessage>;
|
||||
role: "peer" | "server" | "client";
|
||||
}
|
||||
|
||||
export function weAreStrictlyAhead(
|
||||
ourKnownState: MultiLogKnownState,
|
||||
theirKnownState: MultiLogKnownState
|
||||
): boolean {
|
||||
if (theirKnownState.header && !ourKnownState.header) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const allSessions = new Set([
|
||||
...(Object.keys(ourKnownState.sessions) as SessionID[]),
|
||||
...(Object.keys(theirKnownState.sessions) as SessionID[]),
|
||||
]);
|
||||
|
||||
for (const sessionID of allSessions) {
|
||||
const ourSession = ourKnownState.sessions[sessionID];
|
||||
const theirSession = theirKnownState.sessions[sessionID];
|
||||
|
||||
if ((ourSession || 0) < (theirSession || 0)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
export function combinedKnownStates(stateA: MultiLogKnownState, stateB: MultiLogKnownState): MultiLogKnownState {
|
||||
const sessionStates: MultiLogKnownState["sessions"] = {};
|
||||
|
||||
const allSessions = new Set([...Object.keys(stateA.sessions), ...Object.keys(stateB.sessions)] as SessionID[]);
|
||||
|
||||
for (const sessionID of allSessions) {
|
||||
const stateAValue = stateA.sessions[sessionID];
|
||||
const stateBValue = stateB.sessions[sessionID];
|
||||
|
||||
sessionStates[sessionID] = Math.max(stateAValue || 0, stateBValue || 0);
|
||||
}
|
||||
|
||||
return {
|
||||
multilogID: stateA.multilogID,
|
||||
header: stateA.header || stateB.header,
|
||||
sessions: sessionStates,
|
||||
};
|
||||
}
|
||||
@@ -16,6 +16,8 @@
|
||||
"noEmit": true,
|
||||
"types": [
|
||||
"bun-types" // add Bun global
|
||||
]
|
||||
],
|
||||
|
||||
// "noUncheckedIndexedAccess": true
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user