Compare commits

...

5 Commits

Author SHA1 Message Date
Guido D'Orsi
1e625f3c12 chore: changeset 2025-02-18 18:14:39 +01:00
Guido D'Orsi
8b3686c7ce feat: use the uniqueSessions index to get the single coValue session 2025-02-18 18:12:47 +01:00
Guido D'Orsi
bce04ee06d chore: restore the transactions autobatching 2025-02-18 18:05:53 +01:00
Guido D'Orsi
f2e9115f4c fix: improve transactions management on IDB 2025-02-18 17:51:32 +01:00
Guido D'Orsi
ee0897d9a8 fix: improve the rollback on failure when handling new content in storage 2025-02-18 14:42:29 +01:00
14 changed files with 539 additions and 378 deletions

View File

@@ -0,0 +1,8 @@
---
"cojson-storage-indexeddb": patch
"cojson-storage-rn-sqlite": patch
"cojson-storage-sqlite": patch
"cojson-storage": patch
---
Improve rollback on error when failing to add new content

View File

@@ -0,0 +1,111 @@
export type StoreName =
| "coValues"
| "sessions"
| "transactions"
| "signatureAfter";
// A access unit for the IndexedDB Jazz database
// It's a wrapper around the IDBTransaction object that helps on batching multiple operations
// in a single transaction.
export class CoJsonIDBTransaction {
db: IDBDatabase;
tx: IDBTransaction;
pendingRequests: ((txEntry: this) => void)[] = [];
rejectHandlers: (() => void)[] = [];
id = Math.random();
running = false;
failed = false;
done = false;
constructor(db: IDBDatabase) {
this.db = db;
this.tx = this.db.transaction(
["coValues", "sessions", "transactions", "signatureAfter"],
"readwrite",
);
this.tx.oncomplete = () => {
this.done = true;
};
this.tx.onabort = () => {
this.done = true;
};
}
startedAt = performance.now();
isReusable() {
const delta = performance.now() - this.startedAt;
return !this.done && delta <= 20;
}
getObjectStore(name: StoreName) {
return this.tx.objectStore(name);
}
private pushRequest<T>(
handler: (txEntry: this, next: () => void) => Promise<T>,
) {
const next = () => {
const next = this.pendingRequests.shift();
if (next) {
next(this);
} else {
this.running = false;
this.done = true;
}
};
if (this.running) {
return new Promise<T>((resolve, reject) => {
this.rejectHandlers.push(reject);
this.pendingRequests.push(async () => {
try {
const result = await handler(this, next);
resolve(result);
} catch (error) {
reject(error);
}
});
});
}
this.running = true;
return handler(this, next);
}
handleRequest<T>(handler: (txEntry: this) => IDBRequest<T>) {
return this.pushRequest<T>((txEntry, next) => {
return new Promise<T>((resolve, reject) => {
const request = handler(txEntry);
request.onerror = () => {
this.failed = true;
this.tx.abort();
console.error(request.error);
reject(request.error);
// Don't leave any pending promise
for (const handler of this.rejectHandlers) {
handler();
}
};
request.onsuccess = () => {
resolve(request.result as T);
next();
};
});
});
}
commit() {
if (!this.done) {
this.tx.commit();
}
}
}

View File

@@ -1,4 +1,4 @@
import type { CojsonInternalTypes, RawCoID } from "cojson";
import type { CojsonInternalTypes, RawCoID, SessionID } from "cojson";
import type {
CoValueRow,
DBClientInterface,
@@ -8,119 +8,60 @@ import type {
StoredSessionRow,
TransactionRow,
} from "cojson-storage";
import { SyncPromise } from "./syncPromises.js";
import { CoJsonIDBTransaction } from "./CoJsonIDBTransaction.js";
export class IDBClient implements DBClientInterface {
private db;
currentTx:
| {
id: number;
tx: IDBTransaction;
stores: {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
};
startedAt: number;
pendingRequests: ((txEntry: {
stores: {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
};
}) => void)[];
}
| undefined;
currentTxID = 0;
activeTransaction: CoJsonIDBTransaction | undefined;
autoBatchingTransaction: CoJsonIDBTransaction | undefined;
constructor(db: IDBDatabase) {
this.db = db;
}
makeRequest<T>(
handler: (stores: {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
}) => IDBRequest,
): SyncPromise<T> {
return new SyncPromise((resolve, reject) => {
let txEntry = this.currentTx;
handler: (txEntry: CoJsonIDBTransaction) => IDBRequest<T>,
): Promise<T> {
if (this.activeTransaction) {
return this.activeTransaction.handleRequest<T>(handler);
}
const requestEntry = ({
stores,
}: {
stores: {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
};
}) => {
const request = handler(stores);
request.onerror = () => {
console.error("Error in request", request.error);
this.currentTx = undefined;
reject(request.error);
};
request.onsuccess = () => {
const value = request.result as T;
resolve(value);
if (this.autoBatchingTransaction?.isReusable()) {
return this.autoBatchingTransaction.handleRequest<T>(handler);
}
const next = txEntry?.pendingRequests.shift();
const tx = new CoJsonIDBTransaction(this.db);
if (next) {
next({ stores });
} else {
if (this.currentTx === txEntry) {
this.currentTx = undefined;
}
}
};
};
this.autoBatchingTransaction = tx;
// Transaction batching
if (!txEntry || performance.now() - txEntry.startedAt > 20) {
const tx = this.db.transaction(
["coValues", "sessions", "transactions", "signatureAfter"],
"readwrite",
);
txEntry = {
id: this.currentTxID++,
tx,
stores: {
coValues: tx.objectStore("coValues"),
sessions: tx.objectStore("sessions"),
transactions: tx.objectStore("transactions"),
signatureAfter: tx.objectStore("signatureAfter"),
},
startedAt: performance.now(),
pendingRequests: [],
};
this.currentTx = txEntry;
requestEntry(txEntry);
} else {
txEntry.pendingRequests.push(requestEntry);
}
});
return tx.handleRequest<T>(handler);
}
async getCoValue(coValueId: RawCoID): Promise<StoredCoValueRow | undefined> {
return this.makeRequest<StoredCoValueRow | undefined>(({ coValues }) =>
coValues.index("coValuesById").get(coValueId),
return this.makeRequest<StoredCoValueRow | undefined>((tx) =>
tx.getObjectStore("coValues").index("coValuesById").get(coValueId),
);
}
async getCoValueSessions(coValueRowId: number): Promise<StoredSessionRow[]> {
return this.makeRequest<StoredSessionRow[]>(({ sessions }) =>
sessions.index("sessionsByCoValue").getAll(coValueRowId),
return this.makeRequest<StoredSessionRow[]>((tx) =>
tx
.getObjectStore("sessions")
.index("sessionsByCoValue")
.getAll(coValueRowId),
);
}
async getSingleCoValueSession(
coValueRowId: number,
sessionID: SessionID,
): Promise<StoredSessionRow | undefined> {
return this.makeRequest<StoredSessionRow>((tx) =>
tx
.getObjectStore("sessions")
.index("uniqueSessions")
.get([coValueRowId, sessionID]),
);
}
@@ -128,13 +69,15 @@ export class IDBClient implements DBClientInterface {
sessionRowId: number,
firstNewTxIdx: number,
): Promise<TransactionRow[]> {
return this.makeRequest<TransactionRow[]>(({ transactions }) =>
transactions.getAll(
IDBKeyRange.bound(
[sessionRowId, firstNewTxIdx],
[sessionRowId, Number.POSITIVE_INFINITY],
return this.makeRequest<TransactionRow[]>((tx) =>
tx
.getObjectStore("transactions")
.getAll(
IDBKeyRange.bound(
[sessionRowId, firstNewTxIdx],
[sessionRowId, Number.POSITIVE_INFINITY],
),
),
),
);
}
@@ -142,9 +85,10 @@ export class IDBClient implements DBClientInterface {
sessionRowId: number,
firstNewTxIdx: number,
): Promise<SignatureAfterRow[]> {
return this.makeRequest<SignatureAfterRow[]>(
({ signatureAfter }: { signatureAfter: IDBObjectStore }) =>
signatureAfter.getAll(
return this.makeRequest<SignatureAfterRow[]>((tx) =>
tx
.getObjectStore("signatureAfter")
.getAll(
IDBKeyRange.bound(
[sessionRowId, firstNewTxIdx],
[sessionRowId, Number.POSITIVE_INFINITY],
@@ -160,8 +104,8 @@ export class IDBClient implements DBClientInterface {
throw new Error(`Header is required, coId: ${msg.id}`);
}
return (await this.makeRequest<IDBValidKey>(({ coValues }) =>
coValues.put({
return (await this.makeRequest<IDBValidKey>((tx) =>
tx.getObjectStore("coValues").put({
id: msg.id,
// biome-ignore lint/style/noNonNullAssertion: TODO(JAZZ-561): Review
header: msg.header!,
@@ -176,25 +120,26 @@ export class IDBClient implements DBClientInterface {
sessionUpdate: SessionRow;
sessionRow?: StoredSessionRow;
}): Promise<number> {
return this.makeRequest<number>(({ sessions }) =>
sessions.put(
sessionRow?.rowID
? {
rowID: sessionRow.rowID,
...sessionUpdate,
}
: sessionUpdate,
),
return this.makeRequest<number>(
(tx) =>
tx.getObjectStore("sessions").put(
sessionRow?.rowID
? {
rowID: sessionRow.rowID,
...sessionUpdate,
}
: sessionUpdate,
) as IDBRequest<number>,
);
}
addTransaction(
async addTransaction(
sessionRowID: number,
idx: number,
newTransaction: CojsonInternalTypes.Transaction,
) {
return this.makeRequest(({ transactions }) =>
transactions.add({
await this.makeRequest((tx) =>
tx.getObjectStore("transactions").add({
ses: sessionRowID,
idx,
tx: newTransaction,
@@ -211,8 +156,8 @@ export class IDBClient implements DBClientInterface {
idx: number;
signature: CojsonInternalTypes.Signature;
}) {
return this.makeRequest(({ signatureAfter }) =>
signatureAfter.put({
return this.makeRequest((tx) =>
tx.getObjectStore("signatureAfter").put({
ses: sessionRowID,
idx,
signature,
@@ -220,7 +165,24 @@ export class IDBClient implements DBClientInterface {
);
}
async unitOfWork(operationsCallback: () => unknown[]) {
return Promise.all(operationsCallback());
closeTransaction(tx: CoJsonIDBTransaction) {
tx.commit();
if (tx === this.activeTransaction) {
this.activeTransaction = undefined;
}
}
async transaction(operationsCallback: () => unknown) {
const tx = new CoJsonIDBTransaction(this.db);
this.activeTransaction = tx;
try {
await operationsCallback();
tx.commit(); // Tells the browser to not wait for another possible request and commit the transaction immediately
} finally {
this.activeTransaction = undefined;
}
}
}

View File

@@ -33,18 +33,7 @@ export class IDBNode {
}
await this.syncManager.handleSyncMessage(msg);
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? `${v.slice(0, 20)}...`
: v,
)}`,
{ cause: e },
),
);
console.error(e);
}
}
};

View File

@@ -1,224 +0,0 @@
const isFunction = (func: any) => typeof func === "function";
const isObject = (supposedObject: any) =>
typeof supposedObject === "object" &&
supposedObject !== null &&
!Array.isArray(supposedObject);
const isThenable = (obj: any) => isObject(obj) && isFunction(obj.then);
const identity = (co: any) => co;
export { identity, isFunction, isObject, isThenable };
enum States {
PENDING = "PENDING",
RESOLVED = "RESOLVED",
REJECTED = "REJECTED",
}
interface Handler<T, U> {
onSuccess: HandlerOnSuccess<T, U>;
onFail: HandlerOnFail<U>;
}
type HandlerOnSuccess<T, U = any> = (value: T) => U | Thenable<U>;
type HandlerOnFail<U = any> = (reason: any) => U | Thenable<U>;
type Finally<U> = () => U | Thenable<U>;
interface Thenable<T> {
then<U>(
onSuccess?: HandlerOnSuccess<T, U>,
onFail?: HandlerOnFail<U>,
): Thenable<U>;
then<U>(
onSuccess?: HandlerOnSuccess<T, U>,
onFail?: (reason: any) => void,
): Thenable<U>;
}
type Resolve<R> = (value?: R | Thenable<R>) => void;
type Reject = (value?: any) => void;
export class SyncPromise<T> {
private state: States = States.PENDING;
private handlers: Handler<T, any>[] = [];
private value: T | any;
public constructor(callback: (resolve: Resolve<T>, reject: Reject) => void) {
try {
callback(this.resolve as Resolve<T>, this.reject);
} catch (e) {
this.reject(e);
}
}
private resolve = (value: T) => {
return this.setResult(value, States.RESOLVED);
};
private reject = (reason: any) => {
return this.setResult(reason, States.REJECTED);
};
private setResult = (value: T | any, state: States) => {
const set = () => {
if (this.state !== States.PENDING) {
return null;
}
if (isThenable(value)) {
return (value as Thenable<T>).then(this.resolve, this.reject);
}
this.value = value;
this.state = state;
return this.executeHandlers();
};
void set();
};
private executeHandlers = () => {
if (this.state === States.PENDING) {
return null;
}
for (const handler of this.handlers) {
if (this.state === States.REJECTED) {
handler.onFail(this.value);
} else {
handler.onSuccess(this.value);
}
}
this.handlers = [];
};
private attachHandler = (handler: Handler<T, any>) => {
this.handlers = [...this.handlers, handler];
this.executeHandlers();
};
// biome-ignore lint/suspicious/noThenProperty: TODO(JAZZ-561): Review
public then<U>(onSuccess: HandlerOnSuccess<T, U>, onFail?: HandlerOnFail<U>) {
return new SyncPromise<U>((resolve, reject) => {
return this.attachHandler({
onSuccess: (result) => {
try {
return resolve(onSuccess(result));
} catch (e) {
return reject(e);
}
},
onFail: (reason) => {
if (!onFail) {
return reject(reason);
}
try {
return resolve(onFail(reason));
} catch (e) {
return reject(e);
}
},
});
});
}
public catch<U>(onFail: HandlerOnFail<U>) {
return this.then<U>(identity, onFail);
}
// methods
public toString() {
return "[object SyncPromise]";
}
public finally<U>(cb: Finally<U>) {
return new SyncPromise<U>((resolve, reject) => {
let co: U | any;
let isRejected: boolean;
return this.then(
(value) => {
isRejected = false;
co = value;
return cb();
},
(reason) => {
isRejected = true;
co = reason;
return cb();
},
).then(() => {
if (isRejected) {
return reject(co);
}
return resolve(co);
});
});
}
public spread<U>(handler: (...args: any[]) => U) {
return this.then<U>((collection) => {
if (Array.isArray(collection)) {
return handler(...collection);
}
return handler(collection);
});
}
// static
public static resolve<U = any>(value?: U | Thenable<U>) {
return new SyncPromise<U>((resolve) => {
return resolve(value);
});
}
public static reject<U>(reason?: any) {
return new SyncPromise<U>((_resolve, reject) => {
return reject(reason);
});
}
public static all<U = any>(collection: (U | Thenable<U>)[]) {
return new SyncPromise<U[]>((resolve, reject) => {
if (!Array.isArray(collection)) {
return reject(new TypeError("An array must be provided."));
}
if (collection.length === 0) {
return resolve([]);
}
let counter = collection.length;
const resolvedCollection: U[] = [];
const tryResolve = (value: U, index: number) => {
counter -= 1;
resolvedCollection[index] = value;
if (counter !== 0) {
return null;
}
return resolve(resolvedCollection);
};
return collection.forEach((item, index) => {
return SyncPromise.resolve(item)
.then((value) => {
return tryResolve(value, index);
})
.catch(reject);
});
});
}
}

View File

@@ -0,0 +1,170 @@
import { afterEach, beforeEach, describe, expect, test } from "vitest";
import { CoJsonIDBTransaction } from "../CoJsonIDBTransaction";
const TEST_DB_NAME = "test-cojson-idb-transaction";
describe("CoJsonIDBTransaction", () => {
let db: IDBDatabase;
beforeEach(async () => {
// Create test database
await new Promise<void>((resolve, reject) => {
const request = indexedDB.open(TEST_DB_NAME, 1);
request.onerror = () => reject(request.error);
request.onupgradeneeded = (event) => {
const db = request.result;
// Create test stores
db.createObjectStore("coValues", { keyPath: "id" });
const sessions = db.createObjectStore("sessions", { keyPath: "id" });
sessions.createIndex("uniqueSessions", ["coValue", "sessionID"], {
unique: true,
});
db.createObjectStore("transactions", { keyPath: "id" });
db.createObjectStore("signatureAfter", { keyPath: "id" });
};
request.onsuccess = () => {
db = request.result;
resolve();
};
});
});
afterEach(async () => {
// Close and delete test database
db.close();
await new Promise<void>((resolve, reject) => {
const request = indexedDB.deleteDatabase(TEST_DB_NAME);
request.onerror = () => reject(request.error);
request.onsuccess = () => resolve();
});
});
test("handles successful write and read operations", async () => {
const tx = new CoJsonIDBTransaction(db);
// Write test
await tx.handleRequest((tx) =>
tx.getObjectStore("coValues").put({
id: "test1",
value: "hello",
}),
);
// Read test
const readTx = new CoJsonIDBTransaction(db);
const result = await readTx.handleRequest((tx) =>
tx.getObjectStore("coValues").get("test1"),
);
expect(result).toEqual({
id: "test1",
value: "hello",
});
});
test("handles multiple operations in single transaction", async () => {
const tx = new CoJsonIDBTransaction(db);
// Multiple writes
await Promise.all([
tx.handleRequest((tx) =>
tx.getObjectStore("coValues").put({
id: "test1",
value: "hello",
}),
),
tx.handleRequest((tx) =>
tx.getObjectStore("coValues").put({
id: "test2",
value: "world",
}),
),
]);
// Read results
const readTx = new CoJsonIDBTransaction(db);
const [result1, result2] = await Promise.all([
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("test1")),
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("test2")),
]);
expect(result1).toEqual({
id: "test1",
value: "hello",
});
expect(result2).toEqual({
id: "test2",
value: "world",
});
});
test("handles transaction across multiple stores", async () => {
const tx = new CoJsonIDBTransaction(db);
await Promise.all([
tx.handleRequest((tx) =>
tx.getObjectStore("coValues").put({
id: "value1",
data: "value data",
}),
),
tx.handleRequest((tx) =>
tx.getObjectStore("sessions").put({
id: "session1",
data: "session data",
}),
),
]);
const readTx = new CoJsonIDBTransaction(db);
const [valueResult, sessionResult] = await Promise.all([
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("value1")),
readTx.handleRequest((tx) =>
tx.getObjectStore("sessions").get("session1"),
),
]);
expect(valueResult).toEqual({
id: "value1",
data: "value data",
});
expect(sessionResult).toEqual({
id: "session1",
data: "session data",
});
});
test("handles failed transactions", async () => {
const tx = new CoJsonIDBTransaction(db);
await expect(
tx.handleRequest((tx) =>
tx.getObjectStore("sessions").put({
id: 1,
coValue: "value1",
sessionID: "session1",
data: "session data",
}),
),
).resolves.toBe(1);
expect(tx.failed).toBe(false);
const badTx = new CoJsonIDBTransaction(db);
await expect(
badTx.handleRequest((tx) =>
tx.getObjectStore("sessions").put({
id: 2,
coValue: "value1",
sessionID: "session1",
data: "session data",
}),
),
).rejects.toThrow();
expect(badTx.failed).toBe(true);
});
});

View File

@@ -1,5 +1,10 @@
import { type DB as DatabaseT } from "@op-engineering/op-sqlite";
import { CojsonInternalTypes, type OutgoingSyncQueue, RawCoID } from "cojson";
import {
CojsonInternalTypes,
type OutgoingSyncQueue,
RawCoID,
SessionID,
} from "cojson";
import type {
DBClientInterface,
SessionRow,
@@ -48,6 +53,17 @@ export class SQLiteClient implements DBClientInterface {
return rows as StoredSessionRow[];
}
async getSingleCoValueSession(
coValueRowId: number,
sessionID: SessionID,
): Promise<StoredSessionRow | undefined> {
const { rows } = await this.db.execute(
"SELECT * FROM sessions WHERE coValue = ? AND sessionID = ?",
[coValueRowId, sessionID],
);
return rows[0] as StoredSessionRow | undefined;
}
async getNewTransactionInSession(
sessionRowId: number,
firstNewTxIdx: number,
@@ -142,12 +158,10 @@ export class SQLiteClient implements DBClientInterface {
);
}
async unitOfWork(
operationsCallback: () => Promise<unknown>[],
): Promise<void> {
async transaction(operationsCallback: () => unknown) {
try {
await this.db.transaction(async () => {
await Promise.all(operationsCallback());
await operationsCallback();
});
} catch (e) {
console.error("Transaction failed:", e);

View File

@@ -71,6 +71,17 @@ export class SQLiteClient implements DBClientInterface {
.all(coValueRowId) as StoredSessionRow[];
}
getSingleCoValueSession(
coValueRowId: number,
sessionID: SessionID,
): StoredSessionRow | undefined {
return this.db
.prepare<[number, string]>(
`SELECT * FROM sessions WHERE coValue = ? AND sessionID = ?`,
)
.get(coValueRowId, sessionID) as StoredSessionRow | undefined;
}
getNewTransactionInSession(
sessionRowId: number,
firstNewTxIdx: number,
@@ -159,7 +170,7 @@ export class SQLiteClient implements DBClientInterface {
.run(sessionRowID, idx, signature);
}
unitOfWork(operationsCallback: () => any[]) {
transaction(operationsCallback: () => unknown) {
this.db.transaction(operationsCallback)();
}
}

View File

@@ -200,15 +200,6 @@ export class SyncManager {
? coValueRow.rowID
: await this.dbClient.addCoValue(msg);
const allOurSessionsEntries =
await this.dbClient.getCoValueSessions(storedCoValueRowID);
const allOurSessions: {
[sessionID: SessionID]: StoredSessionRow;
} = Object.fromEntries(
allOurSessionsEntries.map((row) => [row.sessionID, row]),
);
const ourKnown: CojsonInternalTypes.CoValueKnownState = {
id: msg.id,
header: true,
@@ -217,9 +208,13 @@ export class SyncManager {
let invalidAssumptions = false;
await this.dbClient.unitOfWork(() =>
(Object.keys(msg.new) as SessionID[]).map((sessionID) => {
const sessionRow = allOurSessions[sessionID];
for (const sessionID of Object.keys(msg.new) as SessionID[]) {
await this.dbClient.transaction(async () => {
const sessionRow = await this.dbClient.getSingleCoValueSession(
storedCoValueRowID,
sessionID,
);
if (sessionRow) {
ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx;
}
@@ -229,8 +224,8 @@ export class SyncManager {
} else {
return this.putNewTxs(msg, sessionID, sessionRow, storedCoValueRowID);
}
}),
);
});
}
if (invalidAssumptions) {
this.sendStateMessage({

View File

@@ -45,9 +45,11 @@ describe("DB sync manager", () => {
const DBClient = vi.fn();
DBClient.prototype.getCoValue = vi.fn();
DBClient.prototype.getCoValueSessions = vi.fn();
DBClient.prototype.getSingleCoValueSession = vi.fn();
DBClient.prototype.getNewTransactionInSession = vi.fn();
DBClient.prototype.addSessionUpdate = vi.fn();
DBClient.prototype.addTransaction = vi.fn();
DBClient.prototype.unitOfWork = vi.fn((callback) => Promise.all(callback()));
DBClient.prototype.transaction = vi.fn((callback) => callback());
beforeEach(async () => {
const idbClient = new DBClient() as unknown as Mocked<DBClientInterface>;

View File

@@ -41,6 +41,11 @@ export interface DBClientInterface {
coValueRowId: number,
): Promise<StoredSessionRow[]> | StoredSessionRow[];
getSingleCoValueSession(
coValueRowId: number,
sessionID: SessionID,
): Promise<StoredSessionRow | undefined> | StoredSessionRow | undefined;
getNewTransactionInSession(
sessionRowId: number,
firstNewTxIdx: number,
@@ -79,5 +84,5 @@ export interface DBClientInterface {
signature: Signature;
}): Promise<number> | void | unknown;
unitOfWork(operationsCallback: () => unknown[]): Promise<unknown> | void;
transaction(callback: () => unknown): Promise<unknown> | void;
}

View File

@@ -2,6 +2,7 @@ import React from "react";
import ReactDOM from "react-dom/client";
import { Link, RouterProvider, createBrowserRouter } from "react-router-dom";
import { AuthAndJazz } from "./jazz";
import { ConcurrentChanges } from "./pages/ConcurrentChanges";
import { FileStreamTest } from "./pages/FileStream";
import { InboxPage } from "./pages/Inbox";
import { ResumeSyncState } from "./pages/ResumeSyncState";
@@ -31,6 +32,9 @@ function Index() {
<li>
<Link to="/write-only">Write Only</Link>
</li>
<li>
<Link to="/concurrent-changes">Concurrent Changes</Link>
</li>
<li>
<Link to="/inbox">Inbox</Link>
</li>
@@ -67,6 +71,10 @@ const router = createBrowserRouter([
path: "/inbox",
element: <InboxPage />,
},
{
path: "/concurrent-changes",
element: <ConcurrentChanges />,
},
{
path: "/",
element: <Index />,

View File

@@ -0,0 +1,75 @@
import { useAccount, useCoState } from "jazz-react";
import { CoFeed, Group, ID, co } from "jazz-tools";
import { useEffect, useState } from "react";
export class Counter extends CoFeed.Of(co.json<{ value: number }>()) {}
function getIdParam() {
const url = new URL(window.location.href);
return (url.searchParams.get("id") as ID<Counter>) ?? undefined;
}
export function ConcurrentChanges() {
const [id, setId] = useState(getIdParam);
const counter = useCoState(Counter, id, []);
const { me } = useAccount();
useEffect(() => {
if (id) {
const url = new URL(window.location.href);
url.searchParams.set("id", id);
history.pushState({}, "", url.toString());
}
}, [id]);
useEffect(() => {
if (counter?.byMe) {
count(counter);
}
}, [counter?.byMe?.value !== undefined]);
const createCounter = () => {
if (!me) return;
const group = Group.create();
group.addMember("everyone", "writer");
const id = Counter.create([{ value: 0 }], group).id;
setId(id);
window.open(`?id=${id}`, "_blank");
};
const done = Object.entries(counter?.perSession ?? {}).every(
([_, entry]) => entry.value.value === 300,
);
return (
<div>
<h1>Concurrent Changes</h1>
<p>
{Object.entries(counter?.perSession ?? {}).map(([sessionId, entry]) => (
<div key={sessionId}>
<p>{sessionId}</p>
<p data-testid="value">{entry.value.value}</p>
</div>
))}
</p>
<button onClick={createCounter}>Create a new value!</button>
{done && <p data-testid="done">Done!</p>}
</div>
);
}
async function count(counter: Counter) {
if (!counter.byMe) return;
let value = counter.byMe.value?.value ?? 0;
while (value < 300) {
await new Promise((resolve) => setTimeout(resolve, 10));
counter.push({ value: ++value });
}
}

View File

@@ -0,0 +1,35 @@
import { expect, test } from "@playwright/test";
test.describe("Concurrent Changes", () => {
test("should complete the task without incurring on InvalidSignature errors", async ({
page,
context,
}) => {
await page.goto("/concurrent-changes");
const newPage = await context.newPage();
await page.getByRole("button", { name: "Create a new value!" }).click();
await newPage.goto(page.url());
await page.getByTestId("done").waitFor();
await newPage.close();
const errorLogs: string[] = [];
page.on("console", (message) => {
if (message.type() === "error") {
errorLogs.push(message.text());
}
});
await page.reload();
await expect(page.getByTestId("done")).toBeVisible();
expect(
errorLogs.find((log) => log.includes("InvalidSignature")),
).toBeUndefined();
});
});