Compare commits
5 Commits
llms-copy-
...
fix/idb-tr
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1e625f3c12 | ||
|
|
8b3686c7ce | ||
|
|
bce04ee06d | ||
|
|
f2e9115f4c | ||
|
|
ee0897d9a8 |
8
.changeset/selfish-llamas-pump.md
Normal file
8
.changeset/selfish-llamas-pump.md
Normal file
@@ -0,0 +1,8 @@
|
||||
---
|
||||
"cojson-storage-indexeddb": patch
|
||||
"cojson-storage-rn-sqlite": patch
|
||||
"cojson-storage-sqlite": patch
|
||||
"cojson-storage": patch
|
||||
---
|
||||
|
||||
Improve rollback on error when failing to add new content
|
||||
111
packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts
Normal file
111
packages/cojson-storage-indexeddb/src/CoJsonIDBTransaction.ts
Normal file
@@ -0,0 +1,111 @@
|
||||
export type StoreName =
|
||||
| "coValues"
|
||||
| "sessions"
|
||||
| "transactions"
|
||||
| "signatureAfter";
|
||||
|
||||
// A access unit for the IndexedDB Jazz database
|
||||
// It's a wrapper around the IDBTransaction object that helps on batching multiple operations
|
||||
// in a single transaction.
|
||||
export class CoJsonIDBTransaction {
|
||||
db: IDBDatabase;
|
||||
tx: IDBTransaction;
|
||||
|
||||
pendingRequests: ((txEntry: this) => void)[] = [];
|
||||
rejectHandlers: (() => void)[] = [];
|
||||
|
||||
id = Math.random();
|
||||
|
||||
running = false;
|
||||
failed = false;
|
||||
done = false;
|
||||
|
||||
constructor(db: IDBDatabase) {
|
||||
this.db = db;
|
||||
|
||||
this.tx = this.db.transaction(
|
||||
["coValues", "sessions", "transactions", "signatureAfter"],
|
||||
"readwrite",
|
||||
);
|
||||
|
||||
this.tx.oncomplete = () => {
|
||||
this.done = true;
|
||||
};
|
||||
this.tx.onabort = () => {
|
||||
this.done = true;
|
||||
};
|
||||
}
|
||||
|
||||
startedAt = performance.now();
|
||||
isReusable() {
|
||||
const delta = performance.now() - this.startedAt;
|
||||
return !this.done && delta <= 20;
|
||||
}
|
||||
|
||||
getObjectStore(name: StoreName) {
|
||||
return this.tx.objectStore(name);
|
||||
}
|
||||
|
||||
private pushRequest<T>(
|
||||
handler: (txEntry: this, next: () => void) => Promise<T>,
|
||||
) {
|
||||
const next = () => {
|
||||
const next = this.pendingRequests.shift();
|
||||
|
||||
if (next) {
|
||||
next(this);
|
||||
} else {
|
||||
this.running = false;
|
||||
this.done = true;
|
||||
}
|
||||
};
|
||||
|
||||
if (this.running) {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
this.rejectHandlers.push(reject);
|
||||
this.pendingRequests.push(async () => {
|
||||
try {
|
||||
const result = await handler(this, next);
|
||||
resolve(result);
|
||||
} catch (error) {
|
||||
reject(error);
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
this.running = true;
|
||||
return handler(this, next);
|
||||
}
|
||||
|
||||
handleRequest<T>(handler: (txEntry: this) => IDBRequest<T>) {
|
||||
return this.pushRequest<T>((txEntry, next) => {
|
||||
return new Promise<T>((resolve, reject) => {
|
||||
const request = handler(txEntry);
|
||||
|
||||
request.onerror = () => {
|
||||
this.failed = true;
|
||||
this.tx.abort();
|
||||
console.error(request.error);
|
||||
reject(request.error);
|
||||
|
||||
// Don't leave any pending promise
|
||||
for (const handler of this.rejectHandlers) {
|
||||
handler();
|
||||
}
|
||||
};
|
||||
|
||||
request.onsuccess = () => {
|
||||
resolve(request.result as T);
|
||||
next();
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
commit() {
|
||||
if (!this.done) {
|
||||
this.tx.commit();
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
import type { CojsonInternalTypes, RawCoID } from "cojson";
|
||||
import type { CojsonInternalTypes, RawCoID, SessionID } from "cojson";
|
||||
import type {
|
||||
CoValueRow,
|
||||
DBClientInterface,
|
||||
@@ -8,119 +8,60 @@ import type {
|
||||
StoredSessionRow,
|
||||
TransactionRow,
|
||||
} from "cojson-storage";
|
||||
import { SyncPromise } from "./syncPromises.js";
|
||||
import { CoJsonIDBTransaction } from "./CoJsonIDBTransaction.js";
|
||||
|
||||
export class IDBClient implements DBClientInterface {
|
||||
private db;
|
||||
|
||||
currentTx:
|
||||
| {
|
||||
id: number;
|
||||
tx: IDBTransaction;
|
||||
stores: {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
};
|
||||
startedAt: number;
|
||||
pendingRequests: ((txEntry: {
|
||||
stores: {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
};
|
||||
}) => void)[];
|
||||
}
|
||||
| undefined;
|
||||
|
||||
currentTxID = 0;
|
||||
activeTransaction: CoJsonIDBTransaction | undefined;
|
||||
autoBatchingTransaction: CoJsonIDBTransaction | undefined;
|
||||
|
||||
constructor(db: IDBDatabase) {
|
||||
this.db = db;
|
||||
}
|
||||
|
||||
makeRequest<T>(
|
||||
handler: (stores: {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
}) => IDBRequest,
|
||||
): SyncPromise<T> {
|
||||
return new SyncPromise((resolve, reject) => {
|
||||
let txEntry = this.currentTx;
|
||||
handler: (txEntry: CoJsonIDBTransaction) => IDBRequest<T>,
|
||||
): Promise<T> {
|
||||
if (this.activeTransaction) {
|
||||
return this.activeTransaction.handleRequest<T>(handler);
|
||||
}
|
||||
|
||||
const requestEntry = ({
|
||||
stores,
|
||||
}: {
|
||||
stores: {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
};
|
||||
}) => {
|
||||
const request = handler(stores);
|
||||
request.onerror = () => {
|
||||
console.error("Error in request", request.error);
|
||||
this.currentTx = undefined;
|
||||
reject(request.error);
|
||||
};
|
||||
request.onsuccess = () => {
|
||||
const value = request.result as T;
|
||||
resolve(value);
|
||||
if (this.autoBatchingTransaction?.isReusable()) {
|
||||
return this.autoBatchingTransaction.handleRequest<T>(handler);
|
||||
}
|
||||
|
||||
const next = txEntry?.pendingRequests.shift();
|
||||
const tx = new CoJsonIDBTransaction(this.db);
|
||||
|
||||
if (next) {
|
||||
next({ stores });
|
||||
} else {
|
||||
if (this.currentTx === txEntry) {
|
||||
this.currentTx = undefined;
|
||||
}
|
||||
}
|
||||
};
|
||||
};
|
||||
this.autoBatchingTransaction = tx;
|
||||
|
||||
// Transaction batching
|
||||
if (!txEntry || performance.now() - txEntry.startedAt > 20) {
|
||||
const tx = this.db.transaction(
|
||||
["coValues", "sessions", "transactions", "signatureAfter"],
|
||||
"readwrite",
|
||||
);
|
||||
txEntry = {
|
||||
id: this.currentTxID++,
|
||||
tx,
|
||||
stores: {
|
||||
coValues: tx.objectStore("coValues"),
|
||||
sessions: tx.objectStore("sessions"),
|
||||
transactions: tx.objectStore("transactions"),
|
||||
signatureAfter: tx.objectStore("signatureAfter"),
|
||||
},
|
||||
startedAt: performance.now(),
|
||||
pendingRequests: [],
|
||||
};
|
||||
|
||||
this.currentTx = txEntry;
|
||||
|
||||
requestEntry(txEntry);
|
||||
} else {
|
||||
txEntry.pendingRequests.push(requestEntry);
|
||||
}
|
||||
});
|
||||
return tx.handleRequest<T>(handler);
|
||||
}
|
||||
|
||||
async getCoValue(coValueId: RawCoID): Promise<StoredCoValueRow | undefined> {
|
||||
return this.makeRequest<StoredCoValueRow | undefined>(({ coValues }) =>
|
||||
coValues.index("coValuesById").get(coValueId),
|
||||
return this.makeRequest<StoredCoValueRow | undefined>((tx) =>
|
||||
tx.getObjectStore("coValues").index("coValuesById").get(coValueId),
|
||||
);
|
||||
}
|
||||
|
||||
async getCoValueSessions(coValueRowId: number): Promise<StoredSessionRow[]> {
|
||||
return this.makeRequest<StoredSessionRow[]>(({ sessions }) =>
|
||||
sessions.index("sessionsByCoValue").getAll(coValueRowId),
|
||||
return this.makeRequest<StoredSessionRow[]>((tx) =>
|
||||
tx
|
||||
.getObjectStore("sessions")
|
||||
.index("sessionsByCoValue")
|
||||
.getAll(coValueRowId),
|
||||
);
|
||||
}
|
||||
|
||||
async getSingleCoValueSession(
|
||||
coValueRowId: number,
|
||||
sessionID: SessionID,
|
||||
): Promise<StoredSessionRow | undefined> {
|
||||
return this.makeRequest<StoredSessionRow>((tx) =>
|
||||
tx
|
||||
.getObjectStore("sessions")
|
||||
.index("uniqueSessions")
|
||||
.get([coValueRowId, sessionID]),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -128,13 +69,15 @@ export class IDBClient implements DBClientInterface {
|
||||
sessionRowId: number,
|
||||
firstNewTxIdx: number,
|
||||
): Promise<TransactionRow[]> {
|
||||
return this.makeRequest<TransactionRow[]>(({ transactions }) =>
|
||||
transactions.getAll(
|
||||
IDBKeyRange.bound(
|
||||
[sessionRowId, firstNewTxIdx],
|
||||
[sessionRowId, Number.POSITIVE_INFINITY],
|
||||
return this.makeRequest<TransactionRow[]>((tx) =>
|
||||
tx
|
||||
.getObjectStore("transactions")
|
||||
.getAll(
|
||||
IDBKeyRange.bound(
|
||||
[sessionRowId, firstNewTxIdx],
|
||||
[sessionRowId, Number.POSITIVE_INFINITY],
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
@@ -142,9 +85,10 @@ export class IDBClient implements DBClientInterface {
|
||||
sessionRowId: number,
|
||||
firstNewTxIdx: number,
|
||||
): Promise<SignatureAfterRow[]> {
|
||||
return this.makeRequest<SignatureAfterRow[]>(
|
||||
({ signatureAfter }: { signatureAfter: IDBObjectStore }) =>
|
||||
signatureAfter.getAll(
|
||||
return this.makeRequest<SignatureAfterRow[]>((tx) =>
|
||||
tx
|
||||
.getObjectStore("signatureAfter")
|
||||
.getAll(
|
||||
IDBKeyRange.bound(
|
||||
[sessionRowId, firstNewTxIdx],
|
||||
[sessionRowId, Number.POSITIVE_INFINITY],
|
||||
@@ -160,8 +104,8 @@ export class IDBClient implements DBClientInterface {
|
||||
throw new Error(`Header is required, coId: ${msg.id}`);
|
||||
}
|
||||
|
||||
return (await this.makeRequest<IDBValidKey>(({ coValues }) =>
|
||||
coValues.put({
|
||||
return (await this.makeRequest<IDBValidKey>((tx) =>
|
||||
tx.getObjectStore("coValues").put({
|
||||
id: msg.id,
|
||||
// biome-ignore lint/style/noNonNullAssertion: TODO(JAZZ-561): Review
|
||||
header: msg.header!,
|
||||
@@ -176,25 +120,26 @@ export class IDBClient implements DBClientInterface {
|
||||
sessionUpdate: SessionRow;
|
||||
sessionRow?: StoredSessionRow;
|
||||
}): Promise<number> {
|
||||
return this.makeRequest<number>(({ sessions }) =>
|
||||
sessions.put(
|
||||
sessionRow?.rowID
|
||||
? {
|
||||
rowID: sessionRow.rowID,
|
||||
...sessionUpdate,
|
||||
}
|
||||
: sessionUpdate,
|
||||
),
|
||||
return this.makeRequest<number>(
|
||||
(tx) =>
|
||||
tx.getObjectStore("sessions").put(
|
||||
sessionRow?.rowID
|
||||
? {
|
||||
rowID: sessionRow.rowID,
|
||||
...sessionUpdate,
|
||||
}
|
||||
: sessionUpdate,
|
||||
) as IDBRequest<number>,
|
||||
);
|
||||
}
|
||||
|
||||
addTransaction(
|
||||
async addTransaction(
|
||||
sessionRowID: number,
|
||||
idx: number,
|
||||
newTransaction: CojsonInternalTypes.Transaction,
|
||||
) {
|
||||
return this.makeRequest(({ transactions }) =>
|
||||
transactions.add({
|
||||
await this.makeRequest((tx) =>
|
||||
tx.getObjectStore("transactions").add({
|
||||
ses: sessionRowID,
|
||||
idx,
|
||||
tx: newTransaction,
|
||||
@@ -211,8 +156,8 @@ export class IDBClient implements DBClientInterface {
|
||||
idx: number;
|
||||
signature: CojsonInternalTypes.Signature;
|
||||
}) {
|
||||
return this.makeRequest(({ signatureAfter }) =>
|
||||
signatureAfter.put({
|
||||
return this.makeRequest((tx) =>
|
||||
tx.getObjectStore("signatureAfter").put({
|
||||
ses: sessionRowID,
|
||||
idx,
|
||||
signature,
|
||||
@@ -220,7 +165,24 @@ export class IDBClient implements DBClientInterface {
|
||||
);
|
||||
}
|
||||
|
||||
async unitOfWork(operationsCallback: () => unknown[]) {
|
||||
return Promise.all(operationsCallback());
|
||||
closeTransaction(tx: CoJsonIDBTransaction) {
|
||||
tx.commit();
|
||||
|
||||
if (tx === this.activeTransaction) {
|
||||
this.activeTransaction = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
async transaction(operationsCallback: () => unknown) {
|
||||
const tx = new CoJsonIDBTransaction(this.db);
|
||||
|
||||
this.activeTransaction = tx;
|
||||
|
||||
try {
|
||||
await operationsCallback();
|
||||
tx.commit(); // Tells the browser to not wait for another possible request and commit the transaction immediately
|
||||
} finally {
|
||||
this.activeTransaction = undefined;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,18 +33,7 @@ export class IDBNode {
|
||||
}
|
||||
await this.syncManager.handleSyncMessage(msg);
|
||||
} catch (e) {
|
||||
console.error(
|
||||
new Error(
|
||||
`Error reading from localNode, handling msg\n\n${JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? `${v.slice(0, 20)}...`
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
),
|
||||
);
|
||||
console.error(e);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@@ -1,224 +0,0 @@
|
||||
const isFunction = (func: any) => typeof func === "function";
|
||||
|
||||
const isObject = (supposedObject: any) =>
|
||||
typeof supposedObject === "object" &&
|
||||
supposedObject !== null &&
|
||||
!Array.isArray(supposedObject);
|
||||
|
||||
const isThenable = (obj: any) => isObject(obj) && isFunction(obj.then);
|
||||
|
||||
const identity = (co: any) => co;
|
||||
|
||||
export { identity, isFunction, isObject, isThenable };
|
||||
|
||||
enum States {
|
||||
PENDING = "PENDING",
|
||||
RESOLVED = "RESOLVED",
|
||||
REJECTED = "REJECTED",
|
||||
}
|
||||
|
||||
interface Handler<T, U> {
|
||||
onSuccess: HandlerOnSuccess<T, U>;
|
||||
onFail: HandlerOnFail<U>;
|
||||
}
|
||||
|
||||
type HandlerOnSuccess<T, U = any> = (value: T) => U | Thenable<U>;
|
||||
type HandlerOnFail<U = any> = (reason: any) => U | Thenable<U>;
|
||||
type Finally<U> = () => U | Thenable<U>;
|
||||
|
||||
interface Thenable<T> {
|
||||
then<U>(
|
||||
onSuccess?: HandlerOnSuccess<T, U>,
|
||||
onFail?: HandlerOnFail<U>,
|
||||
): Thenable<U>;
|
||||
then<U>(
|
||||
onSuccess?: HandlerOnSuccess<T, U>,
|
||||
onFail?: (reason: any) => void,
|
||||
): Thenable<U>;
|
||||
}
|
||||
|
||||
type Resolve<R> = (value?: R | Thenable<R>) => void;
|
||||
type Reject = (value?: any) => void;
|
||||
|
||||
export class SyncPromise<T> {
|
||||
private state: States = States.PENDING;
|
||||
private handlers: Handler<T, any>[] = [];
|
||||
private value: T | any;
|
||||
|
||||
public constructor(callback: (resolve: Resolve<T>, reject: Reject) => void) {
|
||||
try {
|
||||
callback(this.resolve as Resolve<T>, this.reject);
|
||||
} catch (e) {
|
||||
this.reject(e);
|
||||
}
|
||||
}
|
||||
|
||||
private resolve = (value: T) => {
|
||||
return this.setResult(value, States.RESOLVED);
|
||||
};
|
||||
|
||||
private reject = (reason: any) => {
|
||||
return this.setResult(reason, States.REJECTED);
|
||||
};
|
||||
|
||||
private setResult = (value: T | any, state: States) => {
|
||||
const set = () => {
|
||||
if (this.state !== States.PENDING) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (isThenable(value)) {
|
||||
return (value as Thenable<T>).then(this.resolve, this.reject);
|
||||
}
|
||||
|
||||
this.value = value;
|
||||
this.state = state;
|
||||
|
||||
return this.executeHandlers();
|
||||
};
|
||||
|
||||
void set();
|
||||
};
|
||||
|
||||
private executeHandlers = () => {
|
||||
if (this.state === States.PENDING) {
|
||||
return null;
|
||||
}
|
||||
|
||||
for (const handler of this.handlers) {
|
||||
if (this.state === States.REJECTED) {
|
||||
handler.onFail(this.value);
|
||||
} else {
|
||||
handler.onSuccess(this.value);
|
||||
}
|
||||
}
|
||||
|
||||
this.handlers = [];
|
||||
};
|
||||
|
||||
private attachHandler = (handler: Handler<T, any>) => {
|
||||
this.handlers = [...this.handlers, handler];
|
||||
|
||||
this.executeHandlers();
|
||||
};
|
||||
|
||||
// biome-ignore lint/suspicious/noThenProperty: TODO(JAZZ-561): Review
|
||||
public then<U>(onSuccess: HandlerOnSuccess<T, U>, onFail?: HandlerOnFail<U>) {
|
||||
return new SyncPromise<U>((resolve, reject) => {
|
||||
return this.attachHandler({
|
||||
onSuccess: (result) => {
|
||||
try {
|
||||
return resolve(onSuccess(result));
|
||||
} catch (e) {
|
||||
return reject(e);
|
||||
}
|
||||
},
|
||||
onFail: (reason) => {
|
||||
if (!onFail) {
|
||||
return reject(reason);
|
||||
}
|
||||
|
||||
try {
|
||||
return resolve(onFail(reason));
|
||||
} catch (e) {
|
||||
return reject(e);
|
||||
}
|
||||
},
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public catch<U>(onFail: HandlerOnFail<U>) {
|
||||
return this.then<U>(identity, onFail);
|
||||
}
|
||||
|
||||
// methods
|
||||
|
||||
public toString() {
|
||||
return "[object SyncPromise]";
|
||||
}
|
||||
|
||||
public finally<U>(cb: Finally<U>) {
|
||||
return new SyncPromise<U>((resolve, reject) => {
|
||||
let co: U | any;
|
||||
let isRejected: boolean;
|
||||
|
||||
return this.then(
|
||||
(value) => {
|
||||
isRejected = false;
|
||||
co = value;
|
||||
return cb();
|
||||
},
|
||||
(reason) => {
|
||||
isRejected = true;
|
||||
co = reason;
|
||||
return cb();
|
||||
},
|
||||
).then(() => {
|
||||
if (isRejected) {
|
||||
return reject(co);
|
||||
}
|
||||
|
||||
return resolve(co);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
public spread<U>(handler: (...args: any[]) => U) {
|
||||
return this.then<U>((collection) => {
|
||||
if (Array.isArray(collection)) {
|
||||
return handler(...collection);
|
||||
}
|
||||
|
||||
return handler(collection);
|
||||
});
|
||||
}
|
||||
|
||||
// static
|
||||
|
||||
public static resolve<U = any>(value?: U | Thenable<U>) {
|
||||
return new SyncPromise<U>((resolve) => {
|
||||
return resolve(value);
|
||||
});
|
||||
}
|
||||
|
||||
public static reject<U>(reason?: any) {
|
||||
return new SyncPromise<U>((_resolve, reject) => {
|
||||
return reject(reason);
|
||||
});
|
||||
}
|
||||
|
||||
public static all<U = any>(collection: (U | Thenable<U>)[]) {
|
||||
return new SyncPromise<U[]>((resolve, reject) => {
|
||||
if (!Array.isArray(collection)) {
|
||||
return reject(new TypeError("An array must be provided."));
|
||||
}
|
||||
|
||||
if (collection.length === 0) {
|
||||
return resolve([]);
|
||||
}
|
||||
|
||||
let counter = collection.length;
|
||||
const resolvedCollection: U[] = [];
|
||||
|
||||
const tryResolve = (value: U, index: number) => {
|
||||
counter -= 1;
|
||||
resolvedCollection[index] = value;
|
||||
|
||||
if (counter !== 0) {
|
||||
return null;
|
||||
}
|
||||
|
||||
return resolve(resolvedCollection);
|
||||
};
|
||||
|
||||
return collection.forEach((item, index) => {
|
||||
return SyncPromise.resolve(item)
|
||||
.then((value) => {
|
||||
return tryResolve(value, index);
|
||||
})
|
||||
.catch(reject);
|
||||
});
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,170 @@
|
||||
import { afterEach, beforeEach, describe, expect, test } from "vitest";
|
||||
import { CoJsonIDBTransaction } from "../CoJsonIDBTransaction";
|
||||
|
||||
const TEST_DB_NAME = "test-cojson-idb-transaction";
|
||||
|
||||
describe("CoJsonIDBTransaction", () => {
|
||||
let db: IDBDatabase;
|
||||
|
||||
beforeEach(async () => {
|
||||
// Create test database
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const request = indexedDB.open(TEST_DB_NAME, 1);
|
||||
|
||||
request.onerror = () => reject(request.error);
|
||||
|
||||
request.onupgradeneeded = (event) => {
|
||||
const db = request.result;
|
||||
// Create test stores
|
||||
db.createObjectStore("coValues", { keyPath: "id" });
|
||||
const sessions = db.createObjectStore("sessions", { keyPath: "id" });
|
||||
sessions.createIndex("uniqueSessions", ["coValue", "sessionID"], {
|
||||
unique: true,
|
||||
});
|
||||
db.createObjectStore("transactions", { keyPath: "id" });
|
||||
db.createObjectStore("signatureAfter", { keyPath: "id" });
|
||||
};
|
||||
|
||||
request.onsuccess = () => {
|
||||
db = request.result;
|
||||
resolve();
|
||||
};
|
||||
});
|
||||
});
|
||||
|
||||
afterEach(async () => {
|
||||
// Close and delete test database
|
||||
db.close();
|
||||
await new Promise<void>((resolve, reject) => {
|
||||
const request = indexedDB.deleteDatabase(TEST_DB_NAME);
|
||||
request.onerror = () => reject(request.error);
|
||||
request.onsuccess = () => resolve();
|
||||
});
|
||||
});
|
||||
|
||||
test("handles successful write and read operations", async () => {
|
||||
const tx = new CoJsonIDBTransaction(db);
|
||||
|
||||
// Write test
|
||||
await tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("coValues").put({
|
||||
id: "test1",
|
||||
value: "hello",
|
||||
}),
|
||||
);
|
||||
|
||||
// Read test
|
||||
const readTx = new CoJsonIDBTransaction(db);
|
||||
const result = await readTx.handleRequest((tx) =>
|
||||
tx.getObjectStore("coValues").get("test1"),
|
||||
);
|
||||
|
||||
expect(result).toEqual({
|
||||
id: "test1",
|
||||
value: "hello",
|
||||
});
|
||||
});
|
||||
|
||||
test("handles multiple operations in single transaction", async () => {
|
||||
const tx = new CoJsonIDBTransaction(db);
|
||||
|
||||
// Multiple writes
|
||||
await Promise.all([
|
||||
tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("coValues").put({
|
||||
id: "test1",
|
||||
value: "hello",
|
||||
}),
|
||||
),
|
||||
tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("coValues").put({
|
||||
id: "test2",
|
||||
value: "world",
|
||||
}),
|
||||
),
|
||||
]);
|
||||
|
||||
// Read results
|
||||
const readTx = new CoJsonIDBTransaction(db);
|
||||
const [result1, result2] = await Promise.all([
|
||||
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("test1")),
|
||||
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("test2")),
|
||||
]);
|
||||
|
||||
expect(result1).toEqual({
|
||||
id: "test1",
|
||||
value: "hello",
|
||||
});
|
||||
expect(result2).toEqual({
|
||||
id: "test2",
|
||||
value: "world",
|
||||
});
|
||||
});
|
||||
|
||||
test("handles transaction across multiple stores", async () => {
|
||||
const tx = new CoJsonIDBTransaction(db);
|
||||
|
||||
await Promise.all([
|
||||
tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("coValues").put({
|
||||
id: "value1",
|
||||
data: "value data",
|
||||
}),
|
||||
),
|
||||
tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("sessions").put({
|
||||
id: "session1",
|
||||
data: "session data",
|
||||
}),
|
||||
),
|
||||
]);
|
||||
|
||||
const readTx = new CoJsonIDBTransaction(db);
|
||||
const [valueResult, sessionResult] = await Promise.all([
|
||||
readTx.handleRequest((tx) => tx.getObjectStore("coValues").get("value1")),
|
||||
readTx.handleRequest((tx) =>
|
||||
tx.getObjectStore("sessions").get("session1"),
|
||||
),
|
||||
]);
|
||||
|
||||
expect(valueResult).toEqual({
|
||||
id: "value1",
|
||||
data: "value data",
|
||||
});
|
||||
expect(sessionResult).toEqual({
|
||||
id: "session1",
|
||||
data: "session data",
|
||||
});
|
||||
});
|
||||
|
||||
test("handles failed transactions", async () => {
|
||||
const tx = new CoJsonIDBTransaction(db);
|
||||
|
||||
await expect(
|
||||
tx.handleRequest((tx) =>
|
||||
tx.getObjectStore("sessions").put({
|
||||
id: 1,
|
||||
coValue: "value1",
|
||||
sessionID: "session1",
|
||||
data: "session data",
|
||||
}),
|
||||
),
|
||||
).resolves.toBe(1);
|
||||
|
||||
expect(tx.failed).toBe(false);
|
||||
|
||||
const badTx = new CoJsonIDBTransaction(db);
|
||||
await expect(
|
||||
badTx.handleRequest((tx) =>
|
||||
tx.getObjectStore("sessions").put({
|
||||
id: 2,
|
||||
coValue: "value1",
|
||||
sessionID: "session1",
|
||||
data: "session data",
|
||||
}),
|
||||
),
|
||||
).rejects.toThrow();
|
||||
|
||||
expect(badTx.failed).toBe(true);
|
||||
});
|
||||
});
|
||||
@@ -1,5 +1,10 @@
|
||||
import { type DB as DatabaseT } from "@op-engineering/op-sqlite";
|
||||
import { CojsonInternalTypes, type OutgoingSyncQueue, RawCoID } from "cojson";
|
||||
import {
|
||||
CojsonInternalTypes,
|
||||
type OutgoingSyncQueue,
|
||||
RawCoID,
|
||||
SessionID,
|
||||
} from "cojson";
|
||||
import type {
|
||||
DBClientInterface,
|
||||
SessionRow,
|
||||
@@ -48,6 +53,17 @@ export class SQLiteClient implements DBClientInterface {
|
||||
return rows as StoredSessionRow[];
|
||||
}
|
||||
|
||||
async getSingleCoValueSession(
|
||||
coValueRowId: number,
|
||||
sessionID: SessionID,
|
||||
): Promise<StoredSessionRow | undefined> {
|
||||
const { rows } = await this.db.execute(
|
||||
"SELECT * FROM sessions WHERE coValue = ? AND sessionID = ?",
|
||||
[coValueRowId, sessionID],
|
||||
);
|
||||
return rows[0] as StoredSessionRow | undefined;
|
||||
}
|
||||
|
||||
async getNewTransactionInSession(
|
||||
sessionRowId: number,
|
||||
firstNewTxIdx: number,
|
||||
@@ -142,12 +158,10 @@ export class SQLiteClient implements DBClientInterface {
|
||||
);
|
||||
}
|
||||
|
||||
async unitOfWork(
|
||||
operationsCallback: () => Promise<unknown>[],
|
||||
): Promise<void> {
|
||||
async transaction(operationsCallback: () => unknown) {
|
||||
try {
|
||||
await this.db.transaction(async () => {
|
||||
await Promise.all(operationsCallback());
|
||||
await operationsCallback();
|
||||
});
|
||||
} catch (e) {
|
||||
console.error("Transaction failed:", e);
|
||||
|
||||
@@ -71,6 +71,17 @@ export class SQLiteClient implements DBClientInterface {
|
||||
.all(coValueRowId) as StoredSessionRow[];
|
||||
}
|
||||
|
||||
getSingleCoValueSession(
|
||||
coValueRowId: number,
|
||||
sessionID: SessionID,
|
||||
): StoredSessionRow | undefined {
|
||||
return this.db
|
||||
.prepare<[number, string]>(
|
||||
`SELECT * FROM sessions WHERE coValue = ? AND sessionID = ?`,
|
||||
)
|
||||
.get(coValueRowId, sessionID) as StoredSessionRow | undefined;
|
||||
}
|
||||
|
||||
getNewTransactionInSession(
|
||||
sessionRowId: number,
|
||||
firstNewTxIdx: number,
|
||||
@@ -159,7 +170,7 @@ export class SQLiteClient implements DBClientInterface {
|
||||
.run(sessionRowID, idx, signature);
|
||||
}
|
||||
|
||||
unitOfWork(operationsCallback: () => any[]) {
|
||||
transaction(operationsCallback: () => unknown) {
|
||||
this.db.transaction(operationsCallback)();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -200,15 +200,6 @@ export class SyncManager {
|
||||
? coValueRow.rowID
|
||||
: await this.dbClient.addCoValue(msg);
|
||||
|
||||
const allOurSessionsEntries =
|
||||
await this.dbClient.getCoValueSessions(storedCoValueRowID);
|
||||
|
||||
const allOurSessions: {
|
||||
[sessionID: SessionID]: StoredSessionRow;
|
||||
} = Object.fromEntries(
|
||||
allOurSessionsEntries.map((row) => [row.sessionID, row]),
|
||||
);
|
||||
|
||||
const ourKnown: CojsonInternalTypes.CoValueKnownState = {
|
||||
id: msg.id,
|
||||
header: true,
|
||||
@@ -217,9 +208,13 @@ export class SyncManager {
|
||||
|
||||
let invalidAssumptions = false;
|
||||
|
||||
await this.dbClient.unitOfWork(() =>
|
||||
(Object.keys(msg.new) as SessionID[]).map((sessionID) => {
|
||||
const sessionRow = allOurSessions[sessionID];
|
||||
for (const sessionID of Object.keys(msg.new) as SessionID[]) {
|
||||
await this.dbClient.transaction(async () => {
|
||||
const sessionRow = await this.dbClient.getSingleCoValueSession(
|
||||
storedCoValueRowID,
|
||||
sessionID,
|
||||
);
|
||||
|
||||
if (sessionRow) {
|
||||
ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx;
|
||||
}
|
||||
@@ -229,8 +224,8 @@ export class SyncManager {
|
||||
} else {
|
||||
return this.putNewTxs(msg, sessionID, sessionRow, storedCoValueRowID);
|
||||
}
|
||||
}),
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
if (invalidAssumptions) {
|
||||
this.sendStateMessage({
|
||||
|
||||
@@ -45,9 +45,11 @@ describe("DB sync manager", () => {
|
||||
const DBClient = vi.fn();
|
||||
DBClient.prototype.getCoValue = vi.fn();
|
||||
DBClient.prototype.getCoValueSessions = vi.fn();
|
||||
DBClient.prototype.getSingleCoValueSession = vi.fn();
|
||||
DBClient.prototype.getNewTransactionInSession = vi.fn();
|
||||
DBClient.prototype.addSessionUpdate = vi.fn();
|
||||
DBClient.prototype.addTransaction = vi.fn();
|
||||
DBClient.prototype.unitOfWork = vi.fn((callback) => Promise.all(callback()));
|
||||
DBClient.prototype.transaction = vi.fn((callback) => callback());
|
||||
|
||||
beforeEach(async () => {
|
||||
const idbClient = new DBClient() as unknown as Mocked<DBClientInterface>;
|
||||
|
||||
@@ -41,6 +41,11 @@ export interface DBClientInterface {
|
||||
coValueRowId: number,
|
||||
): Promise<StoredSessionRow[]> | StoredSessionRow[];
|
||||
|
||||
getSingleCoValueSession(
|
||||
coValueRowId: number,
|
||||
sessionID: SessionID,
|
||||
): Promise<StoredSessionRow | undefined> | StoredSessionRow | undefined;
|
||||
|
||||
getNewTransactionInSession(
|
||||
sessionRowId: number,
|
||||
firstNewTxIdx: number,
|
||||
@@ -79,5 +84,5 @@ export interface DBClientInterface {
|
||||
signature: Signature;
|
||||
}): Promise<number> | void | unknown;
|
||||
|
||||
unitOfWork(operationsCallback: () => unknown[]): Promise<unknown> | void;
|
||||
transaction(callback: () => unknown): Promise<unknown> | void;
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ import React from "react";
|
||||
import ReactDOM from "react-dom/client";
|
||||
import { Link, RouterProvider, createBrowserRouter } from "react-router-dom";
|
||||
import { AuthAndJazz } from "./jazz";
|
||||
import { ConcurrentChanges } from "./pages/ConcurrentChanges";
|
||||
import { FileStreamTest } from "./pages/FileStream";
|
||||
import { InboxPage } from "./pages/Inbox";
|
||||
import { ResumeSyncState } from "./pages/ResumeSyncState";
|
||||
@@ -31,6 +32,9 @@ function Index() {
|
||||
<li>
|
||||
<Link to="/write-only">Write Only</Link>
|
||||
</li>
|
||||
<li>
|
||||
<Link to="/concurrent-changes">Concurrent Changes</Link>
|
||||
</li>
|
||||
<li>
|
||||
<Link to="/inbox">Inbox</Link>
|
||||
</li>
|
||||
@@ -67,6 +71,10 @@ const router = createBrowserRouter([
|
||||
path: "/inbox",
|
||||
element: <InboxPage />,
|
||||
},
|
||||
{
|
||||
path: "/concurrent-changes",
|
||||
element: <ConcurrentChanges />,
|
||||
},
|
||||
{
|
||||
path: "/",
|
||||
element: <Index />,
|
||||
|
||||
75
tests/e2e/src/pages/ConcurrentChanges.tsx
Normal file
75
tests/e2e/src/pages/ConcurrentChanges.tsx
Normal file
@@ -0,0 +1,75 @@
|
||||
import { useAccount, useCoState } from "jazz-react";
|
||||
import { CoFeed, Group, ID, co } from "jazz-tools";
|
||||
import { useEffect, useState } from "react";
|
||||
|
||||
export class Counter extends CoFeed.Of(co.json<{ value: number }>()) {}
|
||||
|
||||
function getIdParam() {
|
||||
const url = new URL(window.location.href);
|
||||
return (url.searchParams.get("id") as ID<Counter>) ?? undefined;
|
||||
}
|
||||
|
||||
export function ConcurrentChanges() {
|
||||
const [id, setId] = useState(getIdParam);
|
||||
const counter = useCoState(Counter, id, []);
|
||||
const { me } = useAccount();
|
||||
|
||||
useEffect(() => {
|
||||
if (id) {
|
||||
const url = new URL(window.location.href);
|
||||
url.searchParams.set("id", id);
|
||||
history.pushState({}, "", url.toString());
|
||||
}
|
||||
}, [id]);
|
||||
|
||||
useEffect(() => {
|
||||
if (counter?.byMe) {
|
||||
count(counter);
|
||||
}
|
||||
}, [counter?.byMe?.value !== undefined]);
|
||||
|
||||
const createCounter = () => {
|
||||
if (!me) return;
|
||||
|
||||
const group = Group.create();
|
||||
|
||||
group.addMember("everyone", "writer");
|
||||
|
||||
const id = Counter.create([{ value: 0 }], group).id;
|
||||
|
||||
setId(id);
|
||||
|
||||
window.open(`?id=${id}`, "_blank");
|
||||
};
|
||||
|
||||
const done = Object.entries(counter?.perSession ?? {}).every(
|
||||
([_, entry]) => entry.value.value === 300,
|
||||
);
|
||||
|
||||
return (
|
||||
<div>
|
||||
<h1>Concurrent Changes</h1>
|
||||
<p>
|
||||
{Object.entries(counter?.perSession ?? {}).map(([sessionId, entry]) => (
|
||||
<div key={sessionId}>
|
||||
<p>{sessionId}</p>
|
||||
<p data-testid="value">{entry.value.value}</p>
|
||||
</div>
|
||||
))}
|
||||
</p>
|
||||
<button onClick={createCounter}>Create a new value!</button>
|
||||
{done && <p data-testid="done">Done!</p>}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
async function count(counter: Counter) {
|
||||
if (!counter.byMe) return;
|
||||
|
||||
let value = counter.byMe.value?.value ?? 0;
|
||||
|
||||
while (value < 300) {
|
||||
await new Promise((resolve) => setTimeout(resolve, 10));
|
||||
counter.push({ value: ++value });
|
||||
}
|
||||
}
|
||||
35
tests/e2e/tests/ConcurrentChanges.test.ts
Normal file
35
tests/e2e/tests/ConcurrentChanges.test.ts
Normal file
@@ -0,0 +1,35 @@
|
||||
import { expect, test } from "@playwright/test";
|
||||
|
||||
test.describe("Concurrent Changes", () => {
|
||||
test("should complete the task without incurring on InvalidSignature errors", async ({
|
||||
page,
|
||||
context,
|
||||
}) => {
|
||||
await page.goto("/concurrent-changes");
|
||||
const newPage = await context.newPage();
|
||||
|
||||
await page.getByRole("button", { name: "Create a new value!" }).click();
|
||||
|
||||
await newPage.goto(page.url());
|
||||
|
||||
await page.getByTestId("done").waitFor();
|
||||
|
||||
await newPage.close();
|
||||
|
||||
const errorLogs: string[] = [];
|
||||
|
||||
page.on("console", (message) => {
|
||||
if (message.type() === "error") {
|
||||
errorLogs.push(message.text());
|
||||
}
|
||||
});
|
||||
|
||||
await page.reload();
|
||||
|
||||
await expect(page.getByTestId("done")).toBeVisible();
|
||||
|
||||
expect(
|
||||
errorLogs.find((log) => log.includes("InvalidSignature")),
|
||||
).toBeUndefined();
|
||||
});
|
||||
});
|
||||
Reference in New Issue
Block a user