Compare commits

..

8 Commits

Author SHA1 Message Date
Anselm
6720c19233 Publish
- jazz-example-pets@0.0.8
 - jazz-example-todo@0.0.33
 - cojson-simple-sync@0.2.5
 - cojson-storage-sqlite@0.2.5
 - jazz-browser@0.2.4
 - jazz-browser-auth-local@0.2.4
 - jazz-browser-media-images@0.2.4
 - jazz-react@0.2.4
 - jazz-react-auth-local@0.2.4
 - jazz-react-media-images@0.2.4
 - jazz-storage-indexeddb@0.2.4
2023-09-12 16:17:09 +01:00
Anselm
ef732b4700 Implement saving signatures and streaming txs for SQLite 2023-09-12 16:16:40 +01:00
Anselm
ee7e3ee5a7 Publish
- jazz-example-pets@0.0.7
 - jazz-example-todo@0.0.32
 - cojson@0.2.2
 - cojson-simple-sync@0.2.4
 - cojson-storage-sqlite@0.2.4
 - jazz-browser@0.2.3
 - jazz-browser-auth-local@0.2.3
 - jazz-browser-media-images@0.2.3
 - jazz-react@0.2.3
 - jazz-react-auth-local@0.2.3
 - jazz-react-media-images@0.2.3
 - jazz-storage-indexeddb@0.2.3
2023-09-12 15:26:43 +01:00
Anselm
ceeed88fa5 Less verbose error output 2023-09-12 15:26:22 +01:00
Anselm
79353a1d97 Publish
- cojson-simple-sync@0.2.3
 - cojson-storage-sqlite@0.2.3
2023-09-12 15:22:01 +01:00
Anselm
7fdc42c62f Fix migration 2023-09-12 15:21:45 +01:00
Anselm
3a2e854a88 Publish
- cojson-simple-sync@0.2.2
 - cojson-storage-sqlite@0.2.2
2023-09-12 15:19:12 +01:00
Anselm
661a2d023a Fixes #90 for SQLite 2023-09-12 15:18:53 +01:00
15 changed files with 279 additions and 110 deletions

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-pets",
"private": true,
"version": "0.0.6",
"version": "0.0.8",
"type": "module",
"scripts": {
"dev": "vite",
@@ -16,9 +16,9 @@
"@types/qrcode": "^1.5.1",
"class-variance-authority": "^0.7.0",
"clsx": "^2.0.0",
"jazz-react": "^0.2.2",
"jazz-react-auth-local": "^0.2.2",
"jazz-react-media-images": "^0.2.2",
"jazz-react": "^0.2.4",
"jazz-react-auth-local": "^0.2.4",
"jazz-react-media-images": "^0.2.4",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-todo",
"private": true,
"version": "0.0.31",
"version": "0.0.33",
"type": "module",
"scripts": {
"dev": "vite",
@@ -16,8 +16,8 @@
"@types/qrcode": "^1.5.1",
"class-variance-authority": "^0.7.0",
"clsx": "^2.0.0",
"jazz-react": "^0.2.2",
"jazz-react-auth-local": "^0.2.2",
"jazz-react": "^0.2.4",
"jazz-react-auth-local": "^0.2.4",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -4,7 +4,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.2.1",
"version": "0.2.5",
"devDependencies": {
"@types/jest": "^29.5.3",
"@types/ws": "^8.5.5",
@@ -16,8 +16,8 @@
"typescript": "5.0.2"
},
"dependencies": {
"cojson": "^0.2.1",
"cojson-storage-sqlite": "^0.2.1",
"cojson": "^0.2.2",
"cojson-storage-sqlite": "^0.2.5",
"ws": "^8.13.0"
},
"scripts": {

View File

@@ -1,13 +1,13 @@
{
"name": "cojson-storage-sqlite",
"type": "module",
"version": "0.2.1",
"version": "0.2.5",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"better-sqlite3": "^8.5.2",
"cojson": "^0.2.1",
"cojson": "^0.2.2",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -4,8 +4,7 @@ import {
Peer,
CojsonInternalTypes,
SessionID,
// CojsonInternalTypes,
// SessionID,
MAX_RECOMMENDED_TX_SIZE,
} from "cojson";
import {
ReadableStream,
@@ -15,7 +14,6 @@ import {
} from "isomorphic-streams";
import Database, { Database as DatabaseT } from "better-sqlite3";
import { RawCoID } from "cojson/dist/ids";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -29,6 +27,7 @@ type SessionRow = {
sessionID: SessionID;
lastIdx: number;
lastSignature: CojsonInternalTypes.Signature;
bytesSinceLastSignature?: number;
};
type StoredSessionRow = SessionRow & { rowID: number };
@@ -39,6 +38,12 @@ type TransactionRow = {
tx: string;
};
type SignatureAfterRow = {
ses: number;
idx: number;
signature: CojsonInternalTypes.Signature;
};
export class SQLiteStorage {
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
@@ -98,41 +103,99 @@ export class SQLiteStorage {
const db = Database(filename);
db.pragma("journal_mode = WAL");
db.prepare(
`CREATE TABLE IF NOT EXISTS transactions (
ses INTEGER,
idx INTEGER,
tx TEXT NOT NULL ,
PRIMARY KEY (ses, idx)
) WITHOUT ROWID;`
).run();
const oldVersion = (
db.pragma("user_version") as [{ user_version: number }]
)[0].user_version as number;
db.prepare(
`CREATE TABLE IF NOT EXISTS sessions (
rowID INTEGER PRIMARY KEY,
coValue INTEGER NOT NULL,
sessionID TEXT NOT NULL,
lastIdx INTEGER,
lastSignature TEXT,
UNIQUE (sessionID, coValue)
);`
).run();
console.log("DB version", oldVersion);
db.prepare(
`CREATE INDEX IF NOT EXISTS sessionsByCoValue ON sessions (coValue);`
).run();
if (oldVersion === 0) {
console.log("Migration 0 -> 1: Basic schema");
db.prepare(
`CREATE TABLE IF NOT EXISTS transactions (
ses INTEGER,
idx INTEGER,
tx TEXT NOT NULL,
PRIMARY KEY (ses, idx)
) WITHOUT ROWID;`
).run();
db.prepare(
`CREATE TABLE IF NOT EXISTS coValues (
rowID INTEGER PRIMARY KEY,
id TEXT NOT NULL UNIQUE,
header TEXT NOT NULL UNIQUE
);`
).run();
db.prepare(
`CREATE TABLE IF NOT EXISTS sessions (
rowID INTEGER PRIMARY KEY,
coValue INTEGER NOT NULL,
sessionID TEXT NOT NULL,
lastIdx INTEGER,
lastSignature TEXT,
UNIQUE (sessionID, coValue)
);`
).run();
db.prepare(
`CREATE INDEX IF NOT EXISTS coValuesByID ON coValues (id);`
).run();
db.prepare(
`CREATE INDEX IF NOT EXISTS sessionsByCoValue ON sessions (coValue);`
).run();
db.prepare(
`CREATE TABLE IF NOT EXISTS coValues (
rowID INTEGER PRIMARY KEY,
id TEXT NOT NULL UNIQUE,
header TEXT NOT NULL UNIQUE
);`
).run();
db.prepare(
`CREATE INDEX IF NOT EXISTS coValuesByID ON coValues (id);`
).run();
db.pragma("user_version = 1");
console.log("Migration 0 -> 1: Basic schema - done");
}
if (oldVersion <= 1) {
// fix embarrassing off-by-one error for transaction indices
console.log(
"Migration 1 -> 2: Fix off-by-one error for transaction indices"
);
const txs = db
.prepare(`SELECT * FROM transactions`)
.all() as TransactionRow[];
for (const tx of txs) {
db.prepare(
`DELETE FROM transactions WHERE ses = ? AND idx = ?`
).run(tx.ses, tx.idx);
tx.idx -= 1;
db.prepare(
`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)`
).run(tx.ses, tx.idx, tx.tx);
}
db.pragma("user_version = 2");
console.log(
"Migration 1 -> 2: Fix off-by-one error for transaction indices - done"
);
}
if (oldVersion <= 2) {
console.log("Migration 2 -> 3: Add signatureAfter");
db.prepare(
`CREATE TABLE IF NOT EXISTS signatureAfter (
ses INTEGER,
idx INTEGER,
signature TEXT NOT NULL,
PRIMARY KEY (ses, idx)
) WITHOUT ROWID;`
).run();
db.prepare(
`ALTER TABLE sessions ADD COLUMN bytesSinceLastSignature INTEGER;`
).run();
db.pragma("user_version = 3");
console.log("Migration 2 -> 3: Add signatureAfter - done");
}
return new SQLiteStorage(db, fromLocalNode, toLocalNode);
}
@@ -179,12 +242,14 @@ export class SQLiteStorage {
| CojsonInternalTypes.CoValueHeader
| undefined;
const newContent: CojsonInternalTypes.NewContentMessage = {
action: "content",
id: theirKnown.id,
header: theirKnown.header ? undefined : parsedHeader,
new: {},
};
const newContentPieces: CojsonInternalTypes.NewContentMessage[] = [
{
action: "content",
id: theirKnown.id,
header: theirKnown.header ? undefined : parsedHeader,
new: {},
},
];
for (const sessionRow of allOurSessions) {
ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx;
@@ -196,25 +261,77 @@ export class SQLiteStorage {
const firstNewTxIdx =
theirKnown.sessions[sessionRow.sessionID] || 0;
const signaturesAndIdxs = this.db
.prepare<[number, number]>(
`SELECT * FROM signatureAfter WHERE ses = ? AND idx >= ?`
)
.all(sessionRow.rowID, firstNewTxIdx) as SignatureAfterRow[];
// console.log(
// theirKnown.id,
// "signaturesAndIdxs",
// JSON.stringify(signaturesAndIdxs)
// );
const newTxInSession = this.db
.prepare<[number, number]>(
`SELECT * FROM transactions WHERE ses = ? AND idx > ?`
`SELECT * FROM transactions WHERE ses = ? AND idx >= ?`
)
.all(sessionRow.rowID, firstNewTxIdx) as TransactionRow[];
newContent.new[sessionRow.sessionID] = {
after: firstNewTxIdx,
lastSignature: sessionRow.lastSignature,
newTransactions: newTxInSession.map((row) =>
JSON.parse(row.tx)
),
};
let idx = firstNewTxIdx;
// console.log(
// theirKnown.id,
// "newTxInSession",
// newTxInSession.length
// );
for (const tx of newTxInSession) {
let sessionEntry =
newContentPieces[newContentPieces.length - 1]!.new[
sessionRow.sessionID
];
if (!sessionEntry) {
sessionEntry = {
after: idx,
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature,
newTransactions: [],
};
newContentPieces[newContentPieces.length - 1]!.new[
sessionRow.sessionID
] = sessionEntry;
}
sessionEntry.newTransactions.push(JSON.parse(tx.tx));
if (
signaturesAndIdxs[0] &&
idx === signaturesAndIdxs[0].idx
) {
sessionEntry.lastSignature =
signaturesAndIdxs[0].signature;
signaturesAndIdxs.shift();
newContentPieces.push({
action: "content",
id: theirKnown.id,
new: {},
});
} else if (
idx ===
firstNewTxIdx + newTxInSession.length - 1
) {
sessionEntry.lastSignature = sessionRow.lastSignature;
}
idx += 1;
}
}
}
const dependedOnCoValues =
parsedHeader?.ruleset.type === "group"
? Object.values(newContent.new).flatMap((sessionEntry) =>
? newContentPieces
.flatMap((piece) => Object.values(piece.new)).flatMap((sessionEntry) =>
sessionEntry.newTransactions.flatMap((tx) => {
if (tx.privacy !== "trusting") return [];
// TODO: avoid parsing here?
@@ -253,8 +370,15 @@ export class SQLiteStorage {
asDependencyOf,
});
if (newContent.header || Object.keys(newContent.new).length > 0) {
await this.toLocalNode.write(newContent);
const nonEmptyNewContentPieces = newContentPieces.filter(
(piece) => piece.header || Object.keys(piece.new).length > 0
);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await this.toLocalNode.write(piece);
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
@@ -265,7 +389,9 @@ export class SQLiteStorage {
async handleContent(msg: CojsonInternalTypes.NewContentMessage) {
let storedCoValueRowID = (
this.db
.prepare<RawCoID>(`SELECT rowID FROM coValues WHERE id = ?`)
.prepare<CojsonInternalTypes.RawCoID>(
`SELECT rowID FROM coValues WHERE id = ?`
)
.get(msg.id) as StoredCoValueRow | undefined
)?.rowID;
@@ -284,7 +410,7 @@ export class SQLiteStorage {
}
storedCoValueRowID = this.db
.prepare<[RawCoID, string]>(
.prepare<[CojsonInternalTypes.RawCoID, string]>(
`INSERT INTO coValues (id, header) VALUES (?, ?)`
)
.run(msg.id, JSON.stringify(header)).lastInsertRowid as number;
@@ -326,37 +452,72 @@ export class SQLiteStorage {
const actuallyNewOffset =
(sessionRow?.lastIdx || 0) -
(msg.new[sessionID]?.after || 0);
const actuallyNewTransactions =
newTransactions.slice(actuallyNewOffset);
let newBytesSinceLastSignature =
(sessionRow?.bytesSinceLastSignature || 0) +
actuallyNewTransactions.reduce(
(sum, tx) =>
sum +
(tx.privacy === "private"
? tx.encryptedChanges.length
: tx.changes.length),
0
);
const newLastIdx =
(sessionRow?.lastIdx || 0) +
actuallyNewTransactions.length;
let shouldWriteSignature = false;
if (newBytesSinceLastSignature > MAX_RECOMMENDED_TX_SIZE) {
shouldWriteSignature = true;
newBytesSinceLastSignature = 0;
}
let nextIdx = sessionRow?.lastIdx || 0;
const sessionUpdate = {
coValue: storedCoValueRowID!,
sessionID: sessionID,
lastIdx:
(sessionRow?.lastIdx || 0) +
actuallyNewTransactions.length,
lastIdx: newLastIdx,
lastSignature: msg.new[sessionID]!.lastSignature,
bytesSinceLastSignature: newBytesSinceLastSignature,
};
const upsertedSession = this.db
.prepare<[number, string, number, string]>(
`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature) VALUES (?, ?, ?, ?)
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature
.prepare<[number, string, number, string, number]>(
`INSERT INTO sessions (coValue, sessionID, lastIdx, lastSignature, bytesSinceLastSignature) VALUES (?, ?, ?, ?, ?)
ON CONFLICT(coValue, sessionID) DO UPDATE SET lastIdx=excluded.lastIdx, lastSignature=excluded.lastSignature, bytesSinceLastSignature=excluded.bytesSinceLastSignature
RETURNING rowID`
)
.get(
sessionUpdate.coValue,
sessionUpdate.sessionID,
sessionUpdate.lastIdx,
sessionUpdate.lastSignature
sessionUpdate.lastSignature,
sessionUpdate.bytesSinceLastSignature,
) as { rowID: number };
const sessionRowID = upsertedSession.rowID;
if (shouldWriteSignature) {
this.db
.prepare<[number, number, string]>(
`INSERT INTO signatureAfter (ses, idx, signature) VALUES (?, ?, ?)`
)
.run(
sessionRowID,
// TODO: newLastIdx is a misnomer, it's actually more like nextIdx or length
newLastIdx - 1,
msg.new[sessionID]!.lastSignature
);
}
for (const newTransaction of actuallyNewTransactions) {
nextIdx++;
this.db
.prepare<[number, number, string]>(
`INSERT INTO transactions (ses, idx, tx) VALUES (?, ?, ?)`
@@ -366,6 +527,7 @@ export class SQLiteStorage {
nextIdx,
JSON.stringify(newTransaction)
);
nextIdx++;
}
}
}

View File

@@ -5,7 +5,7 @@
"types": "dist/index.d.ts",
"type": "module",
"license": "MIT",
"version": "0.2.1",
"version": "0.2.2",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",

View File

@@ -285,7 +285,11 @@ export class SyncManager {
} catch (e) {
console.error(
`Error reading from peer ${peer.id}, handling msg`,
JSON.stringify(msg),
JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v
),
e
);
}
@@ -498,7 +502,11 @@ export class SyncManager {
}
if (!success) {
console.error("Failed to add transactions", msg.id, newTransactions);
console.error(
"Failed to add transactions",
msg.id,
newTransactions
);
continue;
}

View File

@@ -1,11 +1,11 @@
{
"name": "jazz-browser-auth-local",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"jazz-browser": "^0.2.2",
"jazz-browser": "^0.2.4",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,13 +1,13 @@
{
"name": "jazz-browser-media-images",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.1",
"cojson": "^0.2.2",
"image-blob-reduce": "^4.1.0",
"jazz-browser": "^0.2.2",
"jazz-browser": "^0.2.4",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-browser",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.1",
"jazz-storage-indexeddb": "^0.2.2",
"cojson": "^0.2.2",
"jazz-storage-indexeddb": "^0.2.4",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-react-auth-local",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"jazz-browser-auth-local": "^0.2.2",
"jazz-react": "^0.2.2",
"jazz-browser-auth-local": "^0.2.4",
"jazz-react": "^0.2.4",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -1,14 +1,14 @@
{
"name": "jazz-react-media-images",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.1",
"jazz-browser": "^0.2.2",
"jazz-browser-media-images": "^0.2.2",
"jazz-react": "^0.2.2",
"cojson": "^0.2.2",
"jazz-browser": "^0.2.4",
"jazz-browser-media-images": "^0.2.4",
"jazz-react": "^0.2.4",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-react",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.1",
"jazz-browser": "^0.2.2",
"cojson": "^0.2.2",
"jazz-browser": "^0.2.4",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -1,11 +1,11 @@
{
"name": "jazz-storage-indexeddb",
"version": "0.2.2",
"version": "0.2.4",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.1",
"cojson": "^0.2.2",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -6,7 +6,6 @@ import {
CojsonInternalTypes,
MAX_RECOMMENDED_TX_SIZE,
} from "cojson";
import { Signature } from "cojson/dist/crypto";
import {
ReadableStream,
WritableStream,
@@ -236,11 +235,11 @@ export class IDBStorage {
)
);
console.log(
theirKnown.id,
"signaturesAndIdxs",
JSON.stringify(signaturesAndIdxs)
);
// console.log(
// theirKnown.id,
// "signaturesAndIdxs",
// JSON.stringify(signaturesAndIdxs)
// );
const newTxInSession = await promised<TransactionRow[]>(
transactions.getAll(
@@ -253,11 +252,11 @@ export class IDBStorage {
let idx = firstNewTxIdx;
console.log(
theirKnown.id,
"newTxInSession",
newTxInSession.length
);
// console.log(
// theirKnown.id,
// "newTxInSession",
// newTxInSession.length
// );
for (const tx of newTxInSession) {
let sessionEntry =
@@ -267,7 +266,7 @@ export class IDBStorage {
if (!sessionEntry) {
sessionEntry = {
after: idx,
lastSignature: "WILL_BE_REPLACED" as Signature,
lastSignature: "WILL_BE_REPLACED" as CojsonInternalTypes.Signature,
newTransactions: [],
};
newContentPieces[newContentPieces.length - 1]!.new[
@@ -350,7 +349,7 @@ export class IDBStorage {
(piece) => piece.header || Object.keys(piece.new).length > 0
);
console.log(theirKnown.id, nonEmptyNewContentPieces);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await this.toLocalNode.write(piece);