Compare commits

..

7 Commits

Author SHA1 Message Date
Guido D'Orsi
9181e74fc8 chore: add ts-ignore on an error flagged by some Typescript versions 2025-01-18 00:14:54 +01:00
Guido D'Orsi
5e83864f41 fix: improve error management on initial auth, fixed an infinite loop when migration fails 2025-01-18 00:12:46 +01:00
Guido D'Orsi
de5f2d6d5b Merge pull request #1213 from garden-co/fix/handle-unknown-covaule-types
perf: skip coValue content creation on access
2025-01-17 14:44:03 +01:00
Guido D'Orsi
4aa377dea7 chore: changeset 2025-01-17 14:42:05 +01:00
Guido D'Orsi
31ae73fe0e perf: skip coValue content creation on access 2025-01-17 12:49:24 +01:00
Guido D'Orsi
7a5adfc4dc Merge pull request #1212 from garden-co/fix/handle-unknown-covaule-types
feat: handle unknown coValue content instead of triggering an error
2025-01-17 12:23:50 +01:00
Guido D'Orsi
850e264912 feat: handle unknown coValue content instead of triggering an error 2025-01-17 12:02:32 +01:00
15 changed files with 247 additions and 363 deletions

View File

@@ -0,0 +1,5 @@
---
"cojson": patch
---
Handle unkown coValue content type and optimize content access

View File

@@ -1,5 +0,0 @@
---
"cojson": patch
---
Optimize queue management

View File

@@ -0,0 +1,7 @@
---
"jazz-react-native": patch
"jazz-react": patch
"jazz-tools": patch
---
Improve error management on initial auth, fixed an infinite loop when migration fails

View File

@@ -92,7 +92,7 @@ export class PeerState {
this.processing = true;
let entry: QueueEntry<SyncMessage> | undefined;
let entry: QueueEntry | undefined;
while ((entry = this.queue.pull())) {
// Awaiting the push to send one message at a time
// This way when the peer is "under pressure" we can enqueue all
@@ -129,7 +129,7 @@ export class PeerState {
}
private closeQueue() {
let entry: QueueEntry<SyncMessage> | undefined;
let entry: QueueEntry | undefined;
while ((entry = this.queue.pull())) {
// Using resolve here to avoid unnecessary noise in the logs
entry.resolve();

View File

@@ -18,12 +18,11 @@ function promiseWithResolvers<R>() {
};
}
export type QueueEntry<V> = {
msg: V;
export type QueueEntry = {
msg: SyncMessage;
promise: Promise<void>;
resolve: () => void;
reject: (_: unknown) => void;
next: QueueEntry<V> | undefined;
};
/**
@@ -34,68 +33,10 @@ type Tuple<T, N extends number, A extends unknown[] = []> = A extends {
}
? A
: Tuple<T, N, [...A, T]>;
type QueueTuple = Tuple<Queue<SyncMessage>, 8>;
class Queue<V> {
head: QueueEntry<V> | undefined = undefined;
tail: QueueEntry<V> | undefined = undefined;
push(msg: V) {
const { promise, resolve, reject } = promiseWithResolvers<void>();
const entry: QueueEntry<V> = {
msg,
promise,
resolve,
reject,
next: undefined,
};
if (this.head === undefined) {
this.head = entry;
} else {
if (this.tail === undefined) {
throw new Error("Tail is null but head is not");
}
this.tail.next = entry;
}
this.tail = entry;
return entry;
}
pull() {
const entry = this.head;
if (entry) {
this.head = entry.next;
}
if (this.head === undefined) {
this.tail = undefined;
}
return entry;
}
isNonEmpty() {
return this.head !== undefined;
}
}
type QueueTuple = Tuple<QueueEntry[], 8>;
export class PriorityBasedMessageQueue {
private queues: QueueTuple = [
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
];
private queues: QueueTuple = [[], [], [], [], [], [], [], []];
queueSizeCounter = metrics
.getMeter("cojson")
.createUpDownCounter("jazz.messagequeue.size", {
@@ -111,19 +52,22 @@ export class PriorityBasedMessageQueue {
constructor(private defaultPriority: CoValuePriority) {}
public push(msg: SyncMessage) {
const { promise, resolve, reject } = promiseWithResolvers<void>();
const entry: QueueEntry = { msg, promise, resolve, reject };
const priority = "priority" in msg ? msg.priority : this.defaultPriority;
const entry = this.getQueue(priority).push(msg);
this.getQueue(priority).push(entry);
this.queueSizeCounter.add(1, {
priority,
});
return entry.promise;
return promise;
}
public pull() {
const priority = this.queues.findIndex((queue) => queue.isNonEmpty());
const priority = this.queues.findIndex((queue) => queue.length > 0);
if (priority === -1) {
return;
@@ -133,6 +77,6 @@ export class PriorityBasedMessageQueue {
priority,
});
return this.queues[priority]?.pull();
return this.queues[priority]?.shift();
}
}

View File

@@ -35,6 +35,43 @@ export interface RawCoValue {
subscribe(listener: (coValue: this) => void): () => void;
}
export class RawUnknownCoValue implements RawCoValue {
id: CoID<this>;
core: CoValueCore;
constructor(core: CoValueCore) {
this.id = core.id as CoID<this>;
this.core = core;
}
get type() {
return this.core.header.type;
}
get headerMeta() {
return this.core.header.meta as JsonObject;
}
/** @category 6. Meta */
get group(): RawGroup {
return this.core.getGroup();
}
toJSON() {
return {};
}
atTime() {
return this;
}
subscribe(listener: (value: this) => void): () => void {
return this.core.subscribe((content) => {
listener(content as this);
});
}
}
export type AnyRawCoValue =
| RawCoMap
| RawGroup

View File

@@ -126,10 +126,7 @@ export class CoValueCore {
.expectCoValueLoaded(header.ruleset.group)
.subscribe((_groupUpdate) => {
this._cachedContent = undefined;
const newContent = this.getCurrentContent();
for (const listener of this.listeners) {
listener(newContent);
}
this.notifyUpdate("immediate");
});
}
}
@@ -244,11 +241,6 @@ export class CoValueCore {
signerID,
} satisfies InvalidSignatureError);
}
// const afterVerify = performance.now();
// console.log(
// "Verify took",
// afterVerify - beforeVerify
// );
this.doAddTransactions(
sessionID,
@@ -263,138 +255,6 @@ export class CoValueCore {
});
}
/*tryAddTransactionsAsync(
sessionID: SessionID,
newTransactions: Transaction[],
givenExpectedNewHash: Hash | undefined,
newSignature: Signature,
): ResultAsync<true, TryAddTransactionsError> {
const currentAsyncAddTransaction = this._currentAsyncAddTransaction;
let maybeAwaitPrevious:
| ResultAsync<void, TryAddTransactionsError>
| undefined;
let thisDone = () => {};
if (currentAsyncAddTransaction) {
// eslint-disable-next-line neverthrow/must-use-result
maybeAwaitPrevious = ResultAsync.fromSafePromise(
currentAsyncAddTransaction,
);
} else {
// eslint-disable-next-line neverthrow/must-use-result
maybeAwaitPrevious = ResultAsync.fromSafePromise(Promise.resolve());
this._currentAsyncAddTransaction = new Promise((resolve) => {
thisDone = resolve;
});
}
return maybeAwaitPrevious
.andThen((_previousDone) =>
this.node
.resolveAccountAgentAsync(
accountOrAgentIDfromSessionID(sessionID),
"Expected to know signer of transaction",
)
.andThen((agent) => {
const signerID = this.crypto.getAgentSignerID(agent);
const nTxBefore =
this.sessionLogs.get(sessionID)?.transactions
.length ?? 0;
// const beforeHash = performance.now();
return ResultAsync.fromSafePromise(
this.expectedNewHashAfterAsync(
sessionID,
newTransactions,
),
).andThen(({ expectedNewHash, newStreamingHash }) => {
// const afterHash = performance.now();
// console.log(
// "Hashing took",
// afterHash - beforeHash
// );
const nTxAfter =
this.sessionLogs.get(sessionID)?.transactions
.length ?? 0;
if (nTxAfter !== nTxBefore) {
const newTransactionLengthBefore =
newTransactions.length;
newTransactions = newTransactions.slice(
nTxAfter - nTxBefore,
);
console.warn(
"Transactions changed while async hashing",
{
nTxBefore,
nTxAfter,
newTransactionLengthBefore,
remainingNewTransactions:
newTransactions.length,
},
);
}
if (
givenExpectedNewHash &&
givenExpectedNewHash !== expectedNewHash
) {
return err({
type: "InvalidHash",
id: this.id,
expectedNewHash,
givenExpectedNewHash,
} satisfies InvalidHashError);
}
performance.mark("verifyStart" + this.id);
if (
!this.crypto.verify(
newSignature,
expectedNewHash,
signerID,
)
) {
return err({
type: "InvalidSignature",
id: this.id,
newSignature,
sessionID,
signerID,
} satisfies InvalidSignatureError);
}
performance.mark("verifyEnd" + this.id);
performance.measure(
"verify" + this.id,
"verifyStart" + this.id,
"verifyEnd" + this.id,
);
this.doAddTransactions(
sessionID,
newTransactions,
newSignature,
expectedNewHash,
newStreamingHash,
"deferred",
);
return ok(true as const);
});
}),
)
.map((trueResult) => {
thisDone();
return trueResult;
})
.mapErr((err) => {
thisDone();
return err;
});
}*/
private doAddTransactions(
sessionID: SessionID,
newTransactions: Transaction[],
@@ -432,12 +292,6 @@ export class CoValueCore {
);
if (sizeOfTxsSinceLastInbetweenSignature > MAX_RECOMMENDED_TX_SIZE) {
// console.log(
// "Saving inbetween signature for tx ",
// sessionID,
// transactions.length - 1,
// sizeOfTxsSinceLastInbetweenSignature
// );
signatureAfter[transactions.length - 1] = newSignature;
}
@@ -463,34 +317,40 @@ export class CoValueCore {
this._cachedDependentOn = undefined;
this._cachedNewContentSinceEmpty = undefined;
if (this.listeners.size > 0) {
if (notifyMode === "immediate") {
const content = this.getCurrentContent();
for (const listener of this.listeners) {
listener(content);
}
} else {
if (!this.nextDeferredNotify) {
this.nextDeferredNotify = new Promise((resolve) => {
setTimeout(() => {
this.nextDeferredNotify = undefined;
this.deferredUpdates = 0;
const content = this.getCurrentContent();
for (const listener of this.listeners) {
listener(content);
}
resolve();
}, 0);
});
}
this.deferredUpdates++;
}
}
this.notifyUpdate(notifyMode);
}
deferredUpdates = 0;
nextDeferredNotify: Promise<void> | undefined;
notifyUpdate(notifyMode: "immediate" | "deferred") {
if (this.listeners.size === 0) {
return;
}
if (notifyMode === "immediate") {
const content = this.getCurrentContent();
for (const listener of this.listeners) {
listener(content);
}
} else {
if (!this.nextDeferredNotify) {
this.nextDeferredNotify = new Promise((resolve) => {
setTimeout(() => {
this.nextDeferredNotify = undefined;
this.deferredUpdates = 0;
const content = this.getCurrentContent();
for (const listener of this.listeners) {
listener(content);
}
resolve();
}, 0);
});
}
this.deferredUpdates++;
}
}
subscribe(listener: (content?: RawCoValue) => void): () => void {
this.listeners.add(listener);
listener(this.getCurrentContent());

View File

@@ -1,3 +1,4 @@
import { RawUnknownCoValue } from "./coValue.js";
import type { CoValueCore } from "./coValueCore.js";
import { RawAccount, RawControlledAccount } from "./coValues/account.js";
import { RawCoList } from "./coValues/coList.js";
@@ -38,6 +39,6 @@ export function coreToCoValue(
return new RawCoStream(core);
}
} else {
throw new Error(`Unknown coValue type ${core.header.type}`);
return new RawUnknownCoValue(core);
}
}

View File

@@ -22,7 +22,7 @@ describe("PriorityBasedMessageQueue", () => {
const { queue } = setup();
expect(queue["defaultPriority"]).toBe(CO_VALUE_PRIORITY.MEDIUM);
expect(queue["queues"].length).toBe(8);
expect(queue["queues"].every((q) => !q.isNonEmpty())).toBe(true);
expect(queue["queues"].every((q) => q.length === 0)).toBe(true);
});
test("should push message with default priority", async () => {

View File

@@ -1,11 +1,14 @@
import { expect, test, vi } from "vitest";
import { Transaction } from "../coValueCore.js";
import { CoValueCore, Transaction } from "../coValueCore.js";
import { MapOpPayload } from "../coValues/coMap.js";
import { WasmCrypto } from "../crypto/WasmCrypto.js";
import { stableStringify } from "../jsonStringify.js";
import { LocalNode } from "../localNode.js";
import { Role } from "../permissions.js";
import { randomAnonymousAccountAndSessionID } from "./testUtils.js";
import {
createTestNode,
randomAnonymousAccountAndSessionID,
} from "./testUtils.js";
const Crypto = await WasmCrypto.create();
@@ -191,3 +194,30 @@ test("New transactions in a group correctly update owned values, including subsc
expect(map.core.getValidSortedTransactions().length).toBe(0);
});
test("creating a coValue with a group should't trigger automatically a content creation (performance)", () => {
const node = createTestNode();
const group = node.createGroup();
const getCurrentContentSpy = vi.spyOn(
CoValueCore.prototype,
"getCurrentContent",
);
const groupSpy = vi.spyOn(group.core, "getCurrentContent");
getCurrentContentSpy.mockClear();
node.createCoValue({
type: "comap",
ruleset: { type: "ownedByGroup", group: group.id },
meta: null,
...Crypto.createdNowUnique(),
});
// It's called once for the group and never for the coValue
expect(getCurrentContentSpy).toHaveBeenCalledTimes(1);
expect(groupSpy).toHaveBeenCalledTimes(1);
getCurrentContentSpy.mockRestore();
});

View File

@@ -1985,6 +1985,25 @@ describe("waitForSyncWithPeer", () => {
});
});
test("Should not crash when syncing an unknown coValue type", async () => {
const { client, jazzCloud } = createTwoConnectedNodes();
const coValue = client.createCoValue({
type: "ooops" as any,
ruleset: { type: "unsafeAllowAll" },
meta: null,
...Crypto.createdNowUnique(),
});
await coValue.waitForSync();
const coValueOnTheOtherNode = await loadCoValueOrFail(
jazzCloud,
coValue.getCurrentContent().id,
);
expect(coValueOnTheOtherNode.id).toBe(coValue.id);
});
describe("metrics", () => {
afterEach(() => {
tearDownTestMetricReader();

View File

@@ -96,6 +96,10 @@ export function JazzProvider<Acc extends Account = RegisteredAccount>({
const promise = createContext();
promise.catch((e) => {
console.error("Error creating Jazz context", e);
});
// In development mode we don't return a cleanup function because otherwise
// the double effect execution would mark the context as done immediately.
if (process.env.NODE_ENV === "development") {

View File

@@ -94,6 +94,10 @@ export function JazzProvider<Acc extends Account = RegisteredAccount>({
const promise = createContext();
promise.catch((e) => {
console.error("Error creating Jazz context", e);
});
// In development mode we don't return a cleanup function because otherwise
// the double effect execution would mark the context as done immediately.
if (process.env.NODE_ENV === "development") {

View File

@@ -372,6 +372,7 @@ export class CoRichText extends CoMap {
end + 1,
mark.endBefore,
RangeClass,
// @ts-ignore Some Typescript versions flag this as an error
{},
{
markOwner: mark.sourceMark._owner || this._owner,

View File

@@ -128,129 +128,106 @@ export async function createJazzContext<Acc extends Account>(
): Promise<JazzContext<Acc>>;
export async function createJazzContext<Acc extends Account>(
options: ContextParamsWithAuth<Acc> | BaseContextParams,
): Promise<JazzContext<Acc>> {
// eslint-disable-next-line no-constant-condition
while (true) {
if (!("auth" in options)) {
return createAnonymousJazzContext({
peersToLoadFrom: options.peersToLoadFrom,
crypto: options.crypto,
): Promise<JazzContext<Acc> | JazzContextWithAgent> {
if (!("auth" in options)) {
return createAnonymousJazzContext({
peersToLoadFrom: options.peersToLoadFrom,
crypto: options.crypto,
});
}
const { auth, sessionProvider, peersToLoadFrom, crypto } = options;
const AccountSchema =
options.AccountSchema ??
(RegisteredSchemas["Account"] as unknown as AccountClass<Acc>);
const authResult = await auth.start(crypto);
if (authResult.type === "existing") {
const { sessionID, sessionDone } = await sessionProvider(
authResult.credentials.accountID,
crypto,
);
const node = await LocalNode.withLoadedAccount({
accountID: authResult.credentials
.accountID as unknown as CoID<RawAccount>,
accountSecret: authResult.credentials.secret,
sessionID: sessionID,
peersToLoadFrom: peersToLoadFrom,
crypto: crypto,
migration: async (rawAccount, _node, creationProps) => {
const account = new AccountSchema({
fromRaw: rawAccount,
}) as Acc;
activeAccountContext.set(account);
await account.applyMigration(creationProps);
},
});
const account = AccountSchema.fromNode(node);
activeAccountContext.set(account);
if (authResult.saveCredentials) {
await authResult.saveCredentials({
accountID: node.account.id as unknown as ID<Account>,
secret: node.account.agentSecret,
});
}
const { auth, sessionProvider, peersToLoadFrom, crypto } = options;
const AccountSchema =
options.AccountSchema ??
(RegisteredSchemas["Account"] as unknown as AccountClass<Acc>);
let authResult: AuthResult;
try {
authResult = await auth.start(crypto);
} catch (e) {
console.error("error", e);
throw e;
}
authResult.onSuccess();
if (authResult.type === "existing") {
try {
const { sessionID, sessionDone } = await sessionProvider(
authResult.credentials.accountID,
crypto,
);
try {
const node = await LocalNode.withLoadedAccount({
accountID: authResult.credentials
.accountID as unknown as CoID<RawAccount>,
accountSecret: authResult.credentials.secret,
sessionID: sessionID,
peersToLoadFrom: peersToLoadFrom,
crypto: crypto,
migration: async (rawAccount, _node, creationProps) => {
const account = new AccountSchema({
fromRaw: rawAccount,
}) as Acc;
activeAccountContext.set(account);
await account.applyMigration(creationProps);
},
});
const account = AccountSchema.fromNode(node);
activeAccountContext.set(account);
if (authResult.saveCredentials) {
await authResult.saveCredentials({
accountID: node.account.id as unknown as ID<Account>,
secret: node.account.agentSecret,
});
}
authResult.onSuccess();
return {
account,
done: () => {
node.gracefulShutdown();
sessionDone();
},
logOut: () => {
node.gracefulShutdown();
sessionDone();
authResult.logOut();
},
};
} catch (e) {
authResult.onError(new Error("Error loading account", { cause: e }));
sessionDone();
}
} catch (e) {
authResult.onError(
new Error("Error acquiring sessionID", { cause: e }),
);
}
} else if (authResult.type === "new") {
try {
// TODO: figure out a way to not "waste" the first SessionID
const { node } = await LocalNode.withNewlyCreatedAccount({
creationProps: authResult.creationProps,
peersToLoadFrom: peersToLoadFrom,
crypto: crypto,
initialAgentSecret: authResult.initialSecret,
migration: async (rawAccount, _node, creationProps) => {
const account = new AccountSchema({
fromRaw: rawAccount,
}) as Acc;
activeAccountContext.set(account);
await account.applyMigration(creationProps);
},
});
const account = AccountSchema.fromNode(node);
return {
account,
done: () => {
node.gracefulShutdown();
sessionDone();
},
logOut: () => {
node.gracefulShutdown();
sessionDone();
authResult.logOut();
},
};
} else if (authResult.type === "new") {
const { node } = await LocalNode.withNewlyCreatedAccount({
creationProps: authResult.creationProps,
peersToLoadFrom: peersToLoadFrom,
crypto: crypto,
initialAgentSecret: authResult.initialSecret,
migration: async (rawAccount, _node, creationProps) => {
const account = new AccountSchema({
fromRaw: rawAccount,
}) as Acc;
activeAccountContext.set(account);
await authResult.saveCredentials({
accountID: node.account.id as unknown as ID<Account>,
secret: node.account.agentSecret,
});
await account.applyMigration(creationProps);
},
});
authResult.onSuccess();
return {
account,
done: () => {
node.gracefulShutdown();
},
logOut: () => {
node.gracefulShutdown();
authResult.logOut();
},
};
} catch (e) {
authResult.onError(new Error("Error creating account", { cause: e }));
}
}
const account = AccountSchema.fromNode(node);
activeAccountContext.set(account);
await authResult.saveCredentials({
accountID: node.account.id as unknown as ID<Account>,
secret: node.account.agentSecret,
});
authResult.onSuccess();
return {
account,
done: () => {
node.gracefulShutdown();
},
logOut: () => {
node.gracefulShutdown();
authResult.logOut();
},
};
}
throw new Error("Invalid auth result");
}
export async function createAnonymousJazzContext({