Compare commits

...

14 Commits

Author SHA1 Message Date
Anselm
17e53f9998 Publish
- jazz-example-pets@0.0.5
 - jazz-example-todo@0.0.30
 - cojson@0.2.1
 - cojson-simple-sync@0.2.1
 - cojson-storage-sqlite@0.2.1
 - jazz-browser@0.2.1
 - jazz-browser-auth-local@0.2.1
 - jazz-browser-media-images@0.2.1
 - jazz-react@0.2.1
 - jazz-react-auth-local@0.2.1
 - jazz-react-media-images@0.2.1
 - jazz-storage-indexeddb@0.2.1
2023-09-12 14:47:50 +01:00
Anselm
cfb1f39efe update docs 2023-09-12 14:47:17 +01:00
Anselm
2234276dcf Implement extra signatures & fix #90 for IndexedDB 2023-09-12 14:42:47 +01:00
Anselm
bb0a6a0600 yield microtask between incoming messages 2023-09-12 11:22:44 +01:00
Anselm
0a6eb0c10a Lots of fixes around streaming 2023-09-12 11:13:19 +01:00
Anselm
88b67d89e0 First implementation of streaming transactions, also fixes #80 2023-09-11 19:29:52 +01:00
Anselm Eickhoff
1a65d826b2 Update pets README.md 2023-09-11 17:24:01 +01:00
Anselm Eickhoff
6c65ec2b46 Merge pull request #81 from gardencmp/publish-pet-example
Publish pet example
2023-09-11 17:21:16 +01:00
Anselm
5b578a832d Fix job name and missing amtrix 2023-09-11 17:13:16 +01:00
Anselm
042afc52d7 Fix interpolation 2023-09-11 17:10:12 +01:00
Anselm
1b83493964 Use matrix and add pets example 2023-09-11 17:09:14 +01:00
Anselm
3b50da1a74 Remove redundant yarn build step 2023-09-11 17:04:42 +01:00
Anselm
8e0fc74d9f Switch to buildx 2023-09-11 17:03:18 +01:00
Anselm Eickhoff
e28326f32c Merge pull request #79 from gardencmp/anselm-gar-155
Make payload of trusting transactions JSON string instead of immediately-parsed JSON
2023-09-11 16:32:30 +01:00
28 changed files with 809 additions and 313 deletions

View File

@@ -7,8 +7,11 @@ on:
branches: [ "main" ]
jobs:
build-and-deploy:
build:
runs-on: ubuntu-latest
strategy:
matrix:
example: ["todo", "pets"]
steps:
- uses: actions/checkout@v3
@@ -17,40 +20,50 @@ jobs:
- uses: actions/setup-node@v3
with:
node-version: 18
node-version: 16
cache: 'yarn'
cache-dependency-path: yarn.lock
- name: Nuke Workspace
run: |
rm package.json yarn.lock;
- name: Yarn Build
run: |
yarn install --frozen-lockfile;
yarn build;
working-directory: ./examples/todo
- uses: satackey/action-docker-layer-caching@v0.0.11
continue-on-error: true
with:
key: docker-layer-caching-${{ github.workflow }}-{hash}
restore-keys: |
docker-layer-caching-${{ github.workflow }}-
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
- name: Login to GitHub Container Registry
uses: docker/login-action@v1
uses: docker/login-action@v2
with:
registry: ghcr.io
username: gardencmp
password: ${{ secrets.GITHUB_TOKEN }}
- name: Docker Build & Push
- name: Nuke Workspace
run: |
export DOCKER_TAG=ghcr.io/gardencmp/jazz-example-todo:${{github.head_ref || github.ref_name}}-${{github.sha}}-$(date +%s) ;
docker build . --file Dockerfile --tag $DOCKER_TAG;
docker push $DOCKER_TAG;
echo "DOCKER_TAG=$DOCKER_TAG" >> $GITHUB_ENV
working-directory: ./examples/todo
rm package.json yarn.lock;
- name: Yarn Build
run: |
yarn install --frozen-lockfile;
yarn build;
working-directory: ./examples/${{ matrix.example }}
- name: Docker Build & Push
uses: docker/build-push-action@v4
with:
context: ./examples/${{ matrix.example }}
push: true
tags: ghcr.io/gardencmp/${{github.event.repository.name}}-example-${{ matrix.example }}:${{github.head_ref || github.ref_name}}-${{github.sha}}-${{github.run_number}}-${{github.run_attempt}}
cache-from: type=gha
cache-to: type=gha,mode=max
deploy:
runs-on: ubuntu-latest
needs: build
strategy:
matrix:
example: ["todo", "pets"]
steps:
- uses: actions/checkout@v3
with:
submodules: true
- uses: gacts/install-nomad@v1
- name: Tailscale
uses: tailscale/github-action@v1
@@ -69,9 +82,9 @@ jobs:
export DOCKER_USER=gardencmp;
export DOCKER_PASSWORD=${{ secrets.DOCKER_PULL_PAT }};
export DOCKER_TAG=${{ env.DOCKER_TAG }};
export DOCKER_TAG=ghcr.io/gardencmp/${{github.event.repository.name}}-example-${{ matrix.example }}:${{github.head_ref || github.ref_name}}-${{github.sha}}-${{github.run_number}}-${{github.run_attempt}};
envsubst '${DOCKER_USER} ${DOCKER_PASSWORD} ${DOCKER_TAG} ${BRANCH_SUFFIX} ${BRANCH_SUBDOMAIN}' < job-template.nomad > job-instance.nomad;
cat job-instance.nomad;
NOMAD_ADDR='http://control1v2-london:4646' nomad job run job-instance.nomad;
working-directory: ./examples/todo
working-directory: ./examples/${{ matrix.example }}

43
DOCS.md
View File

@@ -2061,10 +2061,12 @@ The `Group` this `CoValue` belongs to (determining permissions)
### Methods
<details>
<summary><code>binaryCoStream.getBinaryChunks()</code> (undocumented)</summary>
<summary><code>binaryCoStream.getBinaryChunks(allowUnfinished)</code> (undocumented)</summary>
```typescript
binaryCoStream.getBinaryChunks(): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
binaryCoStream.getBinaryChunks(
allowUnfinished: boolean
): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
```
TODO: document
@@ -2340,10 +2342,12 @@ TODO: document
<details>
<summary><code>writeableBinaryCoStream.getBinaryChunks()</code> (from <code>BinaryCoStream</code>) (undocumented)</summary>
<summary><code>writeableBinaryCoStream.getBinaryChunks(allowUnfinished)</code> (from <code>BinaryCoStream</code>) (undocumented)</summary>
```typescript
writeableBinaryCoStream.getBinaryChunks(): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
writeableBinaryCoStream.getBinaryChunks(
allowUnfinished: boolean
): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
```
TODO: document
@@ -2653,6 +2657,24 @@ TODO: document
<details>
<summary><code>coValueCore.doAddTransactions(sessionID, newTransactions, newSignature, expectedNewHash, newStreamingHash)</code> (undocumented)</summary>
```typescript
coValueCore.doAddTransactions(
sessionID: SessionID,
newTransactions: Transaction[],
newSignature: TEMPLATE_LITERAL,
expectedNewHash: TEMPLATE_LITERAL,
newStreamingHash: StreamingHash
): void
```
TODO: document
</details>
<details>
<summary><code>coValueCore.subscribe(listener)</code> (undocumented)</summary>
@@ -2794,7 +2816,7 @@ TODO: document
```typescript
coValueCore.newContentSince(
knownState: undefined | CoValueKnownState
): undefined | NewContentMessage
): undefined | NewContentMessage[]
```
TODO: document
@@ -3251,6 +3273,17 @@ TODO: document
TODO: doc generator not implemented yet
----
## `MAX_RECOMMENDED_TX_SIZE` (variabl in `cojson`)
```typescript
export MAX_RECOMMENDED_TX_SIZE
```
TODO: document
TODO: doc generator not implemented yet
# jazz-react

View File

@@ -1,14 +1,14 @@
# Jazz Todo List Example
# Jazz Rate-My-Pet List Example
Live version: https://example-todo.jazz.tools
Live version: https://example-pets.jazz.tools
## Installing & running the example locally
Start by checking out just the example app to a folder:
```bash
npx degit gardencmp/jazz/examples/todo jazz-example-todo
cd jazz-example-todo
npx degit gardencmp/jazz/examples/pets jazz-example-pets
cd jazz-example-pets
```
(This ensures that you have the example app without git history or our multi-package monorepo)
@@ -27,31 +27,17 @@ npm run dev
## Structure
- [`src/basicComponents`](./src/basicComponents) contains simple components to build the UI, unrelated to Jazz (powered by [shadcn/ui](https://ui.shadcn.com))
- [`src/components`](./src/components/) contains helper components that do contain Jazz-specific logic, but are not super relevant to understand the basics of Jazz and CoJSON
- [`src/0_main.tsx`](./src/0_main.tsx), [`src/1_types.ts`](./src/1_types.ts), [`src/2_App.tsx`](./src/2_App.tsx), [`src/3_TodoTable.tsx`](./src/3_TodoTable.tsx), [`src/router.ts`](./src/router.ts) - the main files for this example, see the walkthrough below
TODO
## Walkthrough
### Main parts
- The top-level provider `<WithJazz/>`: [`src/0_main.tsx`](./src/0_main.tsx)
- Defining the data model with CoJSON: [`src/1_types.ts`](./src/1_types.ts)
- Creating todo projects & routing in `<App/>`: [`src/2_App.tsx`](./src/2_App.tsx)
- Reactively rendering a todo project as a table, adding and editing tasks: [`src/3_TodoTable.tsx`](./src/3_TodoTable.tsx)
TODO
### Helpers
- Getting user profiles in `<NameBadge/>`: [`src/components/NameBadge.tsx`](./src/components/NameBadge.tsx)
- (not yet commented) Creating invite links/QR codes with `<InviteButton/>`: [`src/components/InviteButton.tsx`](./src/components/InviteButton.tsx)
- (not yet commented) `location.hash`-based routing and accepting invite links with `useSimpleHashRouterThatAcceptsInvites()` in [`src/router.ts`](./src/router.ts)
This is the whole Todo List app!
TODO
## Questions / problems / feedback

View File

@@ -1,4 +1,4 @@
job "example-todo$BRANCH_SUFFIX" {
job "example-pets$BRANCH_SUFFIX" {
region = "global"
datacenters = ["*"]
@@ -41,7 +41,7 @@ job "example-todo$BRANCH_SUFFIX" {
service {
tags = ["public"]
name = "example-todo$BRANCH_SUFFIX"
name = "example-pets$BRANCH_SUFFIX"
port = "http"
provider = "consul"
}

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-pets",
"private": true,
"version": "0.0.4",
"version": "0.0.5",
"type": "module",
"scripts": {
"dev": "vite",
@@ -16,9 +16,9 @@
"@types/qrcode": "^1.5.1",
"class-variance-authority": "^0.7.0",
"clsx": "^2.0.0",
"jazz-react": "^0.2.0",
"jazz-react-auth-local": "^0.2.0",
"jazz-react-media-images": "^0.2.0",
"jazz-react": "^0.2.1",
"jazz-react-auth-local": "^0.2.1",
"jazz-react-media-images": "^0.2.1",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-todo",
"private": true,
"version": "0.0.29",
"version": "0.0.30",
"type": "module",
"scripts": {
"dev": "vite",
@@ -16,8 +16,8 @@
"@types/qrcode": "^1.5.1",
"class-variance-authority": "^0.7.0",
"clsx": "^2.0.0",
"jazz-react": "^0.2.0",
"jazz-react-auth-local": "^0.2.0",
"jazz-react": "^0.2.1",
"jazz-react-auth-local": "^0.2.1",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -4,7 +4,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.2.0",
"version": "0.2.1",
"devDependencies": {
"@types/jest": "^29.5.3",
"@types/ws": "^8.5.5",
@@ -16,8 +16,8 @@
"typescript": "5.0.2"
},
"dependencies": {
"cojson": "^0.2.0",
"cojson-storage-sqlite": "^0.2.0",
"cojson": "^0.2.1",
"cojson-storage-sqlite": "^0.2.1",
"ws": "^8.13.0"
},
"scripts": {

View File

@@ -1,13 +1,13 @@
{
"name": "cojson-storage-sqlite",
"type": "module",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"better-sqlite3": "^8.5.2",
"cojson": "^0.2.0",
"cojson": "^0.2.1",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -60,7 +60,7 @@ export class SQLiteStorage {
done = result.done;
if (result.value) {
this.handleSyncMessage(result.value);
await this.handleSyncMessage(result.value);
}
}
})();

View File

@@ -5,7 +5,7 @@
"types": "dist/index.d.ts",
"type": "module",
"license": "MIT",
"version": "0.2.0",
"version": "0.2.1",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",

View File

@@ -1,7 +1,7 @@
import { accountOrAgentIDfromSessionID } from "./coValueCore.js";
import { BinaryCoStream } from "./coValues/coStream.js";
import { createdNowUnique } from "./crypto.js";
import { cojsonReady } from "./index.js";
import { MAX_RECOMMENDED_TX_SIZE, cojsonReady } from "./index.js";
import { LocalNode } from "./node.js";
import { randomAnonymousAccountAndSessionID } from "./testUtils.js";
@@ -382,14 +382,14 @@ test("Can push into BinaryCoStream", () => {
content.edit((editable) => {
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
expect(editable.getBinaryChunks()).toEqual({
expect(editable.getBinaryChunks(true)).toEqual({
mimeType: "text/plain",
fileName: "test.txt",
chunks: [],
finished: false,
});
editable.pushBinaryStreamChunk(new Uint8Array([1, 2, 3]), "trusting");
expect(editable.getBinaryChunks()).toEqual({
expect(editable.getBinaryChunks(true)).toEqual({
mimeType: "text/plain",
fileName: "test.txt",
chunks: [new Uint8Array([1, 2, 3])],
@@ -397,7 +397,7 @@ test("Can push into BinaryCoStream", () => {
});
editable.pushBinaryStreamChunk(new Uint8Array([4, 5, 6]), "trusting");
expect(editable.getBinaryChunks()).toEqual({
expect(editable.getBinaryChunks(true)).toEqual({
mimeType: "text/plain",
fileName: "test.txt",
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
@@ -413,3 +413,112 @@ test("Can push into BinaryCoStream", () => {
});
});
});
test("When adding large transactions (small fraction of MAX_RECOMMENDED_TX_SIZE), we store an inbetween signature every time we reach MAX_RECOMMENDED_TX_SIZE and split up newContentSince accordingly", () => {
const node = new LocalNode(...randomAnonymousAccountAndSessionID());
const coValue = node.createCoValue({
type: "costream",
ruleset: { type: "unsafeAllowAll" },
meta: { type: "binary" },
...createdNowUnique(),
});
const content = coValue.getCurrentContent();
if (content.type !== "costream" || content.meta?.type !== "binary" || !(content instanceof BinaryCoStream)) {
throw new Error("Expected binary stream");
}
content.edit((editable) => {
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
});
for (let i = 0; i < 10; i++) {
const chunk = new Uint8Array(MAX_RECOMMENDED_TX_SIZE/3 + 100);
content.edit((editable) => {
editable.pushBinaryStreamChunk(chunk, "trusting");
});
}
content.edit((editable) => {
editable.endBinaryStream("trusting");
});
const sessionEntry = coValue._sessions[node.currentSessionID]!;
expect(sessionEntry.transactions.length).toEqual(12);
expect(sessionEntry.signatureAfter[0]).not.toBeDefined();
expect(sessionEntry.signatureAfter[1]).not.toBeDefined();
expect(sessionEntry.signatureAfter[2]).not.toBeDefined();
expect(sessionEntry.signatureAfter[3]).toBeDefined();
expect(sessionEntry.signatureAfter[4]).not.toBeDefined();
expect(sessionEntry.signatureAfter[5]).not.toBeDefined();
expect(sessionEntry.signatureAfter[6]).toBeDefined();
expect(sessionEntry.signatureAfter[7]).not.toBeDefined();
expect(sessionEntry.signatureAfter[8]).not.toBeDefined();
expect(sessionEntry.signatureAfter[9]).toBeDefined();
expect(sessionEntry.signatureAfter[10]).not.toBeDefined();
expect(sessionEntry.signatureAfter[11]).not.toBeDefined();
const newContent = coValue.newContentSince({id: coValue.id, header: false, sessions: {}})!;
expect(newContent.length).toEqual(5)
expect(newContent[0]!.header).toBeDefined();
expect(newContent[1]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[3]);
expect(newContent[2]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[6]);
expect(newContent[3]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[9]);
expect(newContent[4]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.lastSignature);
});
test("When adding large transactions (bigger than MAX_RECOMMENDED_TX_SIZE), we store an inbetween signature after every large transaction and split up newContentSince accordingly", () => {
const node = new LocalNode(...randomAnonymousAccountAndSessionID());
const coValue = node.createCoValue({
type: "costream",
ruleset: { type: "unsafeAllowAll" },
meta: { type: "binary" },
...createdNowUnique(),
});
const content = coValue.getCurrentContent();
if (content.type !== "costream" || content.meta?.type !== "binary" || !(content instanceof BinaryCoStream)) {
throw new Error("Expected binary stream");
}
content.edit((editable) => {
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
});
const chunk = new Uint8Array(MAX_RECOMMENDED_TX_SIZE + 100);
for (let i = 0; i < 3; i++) {
content.edit((editable) => {
editable.pushBinaryStreamChunk(chunk, "trusting");
});
}
content.edit((editable) => {
editable.endBinaryStream("trusting");
});
const sessionEntry = coValue._sessions[node.currentSessionID]!;
expect(sessionEntry.transactions.length).toEqual(5);
expect(sessionEntry.signatureAfter[0]).not.toBeDefined();
expect(sessionEntry.signatureAfter[1]).toBeDefined();
expect(sessionEntry.signatureAfter[2]).toBeDefined();
expect(sessionEntry.signatureAfter[3]).toBeDefined();
expect(sessionEntry.signatureAfter[4]).not.toBeDefined();
const newContent = coValue.newContentSince({id: coValue.id, header: false, sessions: {}})!;
expect(newContent.length).toEqual(5)
expect(newContent[0]!.header).toBeDefined();
expect(newContent[1]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[1]);
expect(newContent[2]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[2]);
expect(newContent[3]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[3]);
expect(newContent[4]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.lastSignature);
});

View File

@@ -14,7 +14,6 @@ import {
sign,
verify,
encryptForTransaction,
decryptForTransaction,
KeyID,
decryptKeySecret,
getAgentSignerID,
@@ -33,11 +32,10 @@ import { LocalNode } from "./node.js";
import { CoValueKnownState, NewContentMessage } from "./sync.js";
import { AgentID, RawCoID, SessionID, TransactionID } from "./ids.js";
import { CoList } from "./coValues/coList.js";
import {
AccountID,
GeneralizedControlledAccount,
} from "./account.js";
import { Stringified, parseJSON, stableStringify } from "./jsonStringify.js";
import { AccountID, GeneralizedControlledAccount } from "./account.js";
import { Stringified, stableStringify } from "./jsonStringify.js";
export const MAX_RECOMMENDED_TX_SIZE = 100 * 1024;
export type CoValueHeader = {
type: CoValueImpl["type"];
@@ -66,6 +64,7 @@ type SessionLog = {
transactions: Transaction[];
lastHash?: Hash;
streamingHash: StreamingHash;
signatureAfter: { [txIdx: number]: Signature | undefined };
lastSignature: Signature;
};
@@ -79,7 +78,6 @@ export type PrivateTransaction = {
>;
};
export type TrustingTransaction = {
privacy: "trusting";
madeAt: number;
@@ -103,7 +101,11 @@ export class CoValueCore {
_sessions: { [key: SessionID]: SessionLog };
_cachedContent?: CoValueImpl;
listeners: Set<(content?: CoValueImpl) => void> = new Set();
_decryptionCache: {[key: Encrypted<JsonValue[], JsonValue>]: Stringified<JsonValue[]> | undefined} = {}
_decryptionCache: {
[key: Encrypted<JsonValue[], JsonValue>]:
| Stringified<JsonValue[]>
| undefined;
} = {};
constructor(
header: CoValueHeader,
@@ -212,7 +214,8 @@ export class CoValueCore {
// const beforeVerify = performance.now();
if (!verify(newSignature, expectedNewHash, signerID)) {
console.warn(
"Invalid signature",
"Invalid signature in",
this.id,
newSignature,
expectedNewHash,
signerID
@@ -225,25 +228,13 @@ export class CoValueCore {
// afterVerify - beforeVerify
// );
const transactions = this.sessions[sessionID]?.transactions ?? [];
transactions.push(...newTransactions);
this._sessions[sessionID] = {
transactions,
lastHash: expectedNewHash,
streamingHash: newStreamingHash,
lastSignature: newSignature,
};
this._cachedContent = undefined;
if (this.listeners.size > 0) {
const content = this.getCurrentContent();
for (const listener of this.listeners) {
listener(content);
}
}
this.doAddTransactions(
sessionID,
newTransactions,
newSignature,
expectedNewHash,
newStreamingHash
);
return true;
}
@@ -272,10 +263,8 @@ export class CoValueCore {
const nTxBefore = this.sessions[sessionID]?.transactions.length ?? 0;
// const beforeHash = performance.now();
const { expectedNewHash, newStreamingHash } = await this.expectedNewHashAfterAsync(
sessionID,
newTransactions
);
const { expectedNewHash, newStreamingHash } =
await this.expectedNewHashAfterAsync(sessionID, newTransactions);
// const afterHash = performance.now();
// console.log(
// "Hashing took",
@@ -286,7 +275,7 @@ export class CoValueCore {
if (nTxAfter !== nTxBefore) {
const newTransactionLengthBefore = newTransactions.length;
newTransactions = newTransactions.slice((nTxAfter - nTxBefore));
newTransactions = newTransactions.slice(nTxAfter - nTxBefore);
console.warn("Transactions changed while async hashing", {
nTxBefore,
nTxAfter,
@@ -306,7 +295,8 @@ export class CoValueCore {
// const beforeVerify = performance.now();
if (!verify(newSignature, expectedNewHash, signerID)) {
console.warn(
"Invalid signature",
"Invalid signature in",
this.id,
newSignature,
expectedNewHash,
signerID
@@ -319,15 +309,61 @@ export class CoValueCore {
// afterVerify - beforeVerify
// );
const transactions = this.sessions[sessionID]?.transactions ?? [];
this.doAddTransactions(
sessionID,
newTransactions,
newSignature,
expectedNewHash,
newStreamingHash
);
return true;
}
private doAddTransactions(
sessionID: SessionID,
newTransactions: Transaction[],
newSignature: Signature,
expectedNewHash: Hash,
newStreamingHash: StreamingHash
) {
const transactions = this.sessions[sessionID]?.transactions ?? [];
transactions.push(...newTransactions);
const signatureAfter = this.sessions[sessionID]?.signatureAfter ?? {};
const lastInbetweenSignatureIdx = Object.keys(signatureAfter).reduce(
(max, idx) => (parseInt(idx) > max ? parseInt(idx) : max),
-1
);
const sizeOfTxsSinceLastInbetweenSignature = transactions
.slice(lastInbetweenSignatureIdx + 1)
.reduce(
(sum, tx) =>
sum +
(tx.privacy === "private"
? tx.encryptedChanges.length
: tx.changes.length),
0
);
if (sizeOfTxsSinceLastInbetweenSignature > 100 * 1024) {
// console.log(
// "Saving inbetween signature for tx ",
// sessionID,
// transactions.length - 1,
// sizeOfTxsSinceLastInbetweenSignature
// );
signatureAfter[transactions.length - 1] = newSignature;
}
this._sessions[sessionID] = {
transactions,
lastHash: expectedNewHash,
streamingHash: newStreamingHash,
lastSignature: newSignature,
signatureAfter: signatureAfter,
};
this._cachedContent = undefined;
@@ -338,8 +374,6 @@ export class CoValueCore {
listener(content);
}
}
return true;
}
subscribe(listener: (content?: CoValueImpl) => void): () => void {
@@ -379,10 +413,10 @@ export class CoValueCore {
new StreamingHash();
let before = performance.now();
for (const transaction of newTransactions) {
streamingHash.update(transaction)
streamingHash.update(transaction);
const after = performance.now();
if (after - before > 1) {
console.log("Hashing blocked for", after - before);
// console.log("Hashing blocked for", after - before);
await new Promise((resolve) => setTimeout(resolve, 0));
before = performance.now();
}
@@ -500,7 +534,8 @@ export class CoValueCore {
if (!readKey) {
return undefined;
} else {
let decrytedChanges = this._decryptionCache[tx.encryptedChanges];
let decrytedChanges =
this._decryptionCache[tx.encryptedChanges];
if (!decrytedChanges) {
decrytedChanges = decryptRawForTransaction(
@@ -511,7 +546,8 @@ export class CoValueCore {
tx: txID,
}
);
this._decryptionCache[tx.encryptedChanges] = decrytedChanges;
this._decryptionCache[tx.encryptedChanges] =
decrytedChanges;
}
if (!decrytedChanges) {
@@ -683,47 +719,95 @@ export class CoValueCore {
newContentSince(
knownState: CoValueKnownState | undefined
): NewContentMessage | undefined {
const newContent: NewContentMessage = {
): NewContentMessage[] | undefined {
let currentPiece: NewContentMessage = {
action: "content",
id: this.id,
header: knownState?.header ? undefined : this.header,
new: Object.fromEntries(
Object.entries(this.sessions)
.map(([sessionID, log]) => {
const newTransactions = log.transactions.slice(
knownState?.sessions[sessionID as SessionID] || 0
);
if (
newTransactions.length === 0 ||
!log.lastHash ||
!log.lastSignature
) {
return undefined;
}
return [
sessionID,
{
after:
knownState?.sessions[
sessionID as SessionID
] || 0,
newTransactions,
lastSignature: log.lastSignature,
},
];
})
.filter((x): x is Exclude<typeof x, undefined> => !!x)
),
new: {},
};
if (!newContent.header && Object.keys(newContent.new).length === 0) {
const pieces = [currentPiece];
const sentState: CoValueKnownState["sessions"] = {
...knownState?.sessions,
};
let newTxsWereAdded = true;
let pieceSize = 0;
while (newTxsWereAdded) {
newTxsWereAdded = false;
for (const [sessionID, log] of Object.entries(this.sessions) as [
SessionID,
SessionLog
][]) {
const nextKnownSignatureIdx = Object.keys(log.signatureAfter)
.map(Number)
.sort((a, b) => a - b)
.find((idx) => idx >= (sentState[sessionID] ?? -1));
const txsToAdd = log.transactions.slice(
sentState[sessionID] ?? 0,
nextKnownSignatureIdx === undefined
? undefined
: nextKnownSignatureIdx + 1
);
if (txsToAdd.length === 0) continue;
newTxsWereAdded = true;
const oldPieceSize = pieceSize;
pieceSize += txsToAdd.reduce(
(sum, tx) =>
sum +
(tx.privacy === "private"
? tx.encryptedChanges.length
: tx.changes.length),
0
);
if (pieceSize >= MAX_RECOMMENDED_TX_SIZE) {
currentPiece = {
action: "content",
id: this.id,
header: undefined,
new: {},
};
pieces.push(currentPiece);
pieceSize = pieceSize - oldPieceSize;
}
let sessionEntry = currentPiece.new[sessionID];
if (!sessionEntry) {
sessionEntry = {
after: sentState[sessionID] ?? 0,
newTransactions: [],
lastSignature: "WILL_BE_REPLACED" as Signature
};
currentPiece.new[sessionID] = sessionEntry;
}
sessionEntry.newTransactions.push(...txsToAdd);
sessionEntry.lastSignature = nextKnownSignatureIdx === undefined
? log.lastSignature!
: log.signatureAfter[nextKnownSignatureIdx]!
sentState[sessionID] =
(sentState[sessionID] || 0) + txsToAdd.length;
}
}
const piecesWithContent = pieces.filter(
(piece) => Object.keys(piece.new).length > 0 || piece.header
);
if (piecesWithContent.length === 0) {
return undefined;
}
return newContent;
return piecesWithContent;
}
getDependedOnCoValues(): RawCoID[] {

View File

@@ -169,10 +169,10 @@ export class BinaryCoStream<
{
id!: CoID<BinaryCoStream<Meta>>;
getBinaryChunks():
getBinaryChunks(allowUnfinished?: boolean):
| (BinaryChunkInfo & { chunks: Uint8Array[]; finished: boolean })
| undefined {
const before = performance.now();
// const before = performance.now();
const items = this.getSingleStream();
if (!items) return;
@@ -184,10 +184,14 @@ export class BinaryCoStream<
return;
}
const end = items[items.length - 1];
if (end?.type !== "end" && !allowUnfinished) return;
const chunks: Uint8Array[] = [];
let finished = false;
let totalLength = 0;
// let totalLength = 0;
for (const item of items.slice(1)) {
if (item.type === "end") {
@@ -203,15 +207,15 @@ export class BinaryCoStream<
const chunk = base64URLtoBytes(
item.chunk.slice(binary_U_prefixLength)
);
totalLength += chunk.length;
// totalLength += chunk.length;
chunks.push(chunk);
}
const after = performance.now();
console.log(
"getBinaryChunks bandwidth in MB/s",
(1000 * totalLength) / (after - before) / (1024 * 1024)
);
// const after = performance.now();
// console.log(
// "getBinaryChunks bandwidth in MB/s",
// (1000 * totalLength) / (after - before) / (1024 * 1024)
// );
return {
mimeType: start.mimeType,
@@ -286,7 +290,7 @@ export class WriteableBinaryCoStream<
chunk: Uint8Array,
privacy: "private" | "trusting" = "private"
) {
const before = performance.now();
// const before = performance.now();
this.push(
{
type: "chunk",
@@ -294,11 +298,11 @@ export class WriteableBinaryCoStream<
} satisfies BinaryStreamChunk,
privacy
);
const after = performance.now();
console.log(
"pushBinaryStreamChunk bandwidth in MB/s",
(1000 * chunk.length) / (after - before) / (1024 * 1024)
);
// const after = performance.now();
// console.log(
// "pushBinaryStreamChunk bandwidth in MB/s",
// (1000 * chunk.length) / (after - before) / (1024 * 1024)
// );
}
endBinaryStream(privacy: "private" | "trusting" = "private") {

View File

@@ -1,4 +1,4 @@
import { CoValueCore, newRandomSessionID } from "./coValueCore.js";
import { CoValueCore, newRandomSessionID, MAX_RECOMMENDED_TX_SIZE } from "./coValueCore.js";
import { LocalNode } from "./node.js";
import type { CoValue, ReadableCoValue } from "./coValue.js";
import { CoMap, WriteableCoMap } from "./coValues/coMap.js";
@@ -74,6 +74,7 @@ export {
AnonymousControlledAccount,
ControlledAccount,
cryptoReady as cojsonReady,
MAX_RECOMMENDED_TX_SIZE
};
export type {

View File

@@ -34,7 +34,14 @@ export function connectedPeers(
trace &&
console.debug(
`${peer2id} -> ${peer1id}`,
JSON.stringify(chunk, null, 2)
JSON.stringify(
chunk,
(k, v) =>
(k === "changes" || k === "encryptedChanges")
? v.slice(0, 20) + "..."
: v,
2
)
);
controller.enqueue(chunk);
},
@@ -52,7 +59,14 @@ export function connectedPeers(
trace &&
console.debug(
`${peer1id} -> ${peer2id}`,
JSON.stringify(chunk, null, 2)
JSON.stringify(
chunk,
(k, v) =>
(k === "changes" || k === "encryptedChanges")
? v.slice(0, 20) + "..."
: v,
2
)
);
controller.enqueue(chunk);
},
@@ -102,16 +116,22 @@ export function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
},
});
let lastWritePromise = Promise.resolve();
const writable = new WritableStream<T>({
async write(chunk) {
const enqueue = await enqueuePromise;
if (readerClosed) {
throw new Error("Reader closed");
} else {
// make sure write resolves before corresponding read
setTimeout(() => {
enqueue(chunk);
})
// make sure write resolves before corresponding read, but make sure writes are still in order
await lastWritePromise;
lastWritePromise = new Promise((resolve) => {
setTimeout(() => {
enqueue(chunk);
resolve();
});
});
}
},
async abort(reason) {

View File

@@ -436,8 +436,9 @@ test("No matter the optimistic known state, node respects invalid known state me
editable.set("goodbye", "world", "trusting");
});
const _mapEditMsg1 = await reader.read();
const _mapEditMsg2 = await reader.read();
const _mapEditMsgs = await reader.read();
console.log("Sending correction");
await writer.write({
action: "known",

View File

@@ -215,14 +215,32 @@ export class SyncManager {
await this.sendNewContentIncludingDependencies(id, peer);
}
const newContent = coValue.newContentSince(
const newContentPieces = coValue.newContentSince(
peer.optimisticKnownStates[id]
);
if (newContent) {
await this.trySendToPeer(peer, newContent);
if (newContentPieces) {
const optimisticKnownStateBefore =
peer.optimisticKnownStates[id] || emptyKnownState(id);
const sendPieces = async () => {
for (const [i, piece] of newContentPieces.entries()) {
// console.log(
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${newContentPieces.length} header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
await this.trySendToPeer(peer, piece);
}
};
sendPieces().catch((e) => {
console.error("Error sending new content piece, retrying", e);
peer.optimisticKnownStates[id] = optimisticKnownStateBefore;
return this.sendNewContentIncludingDependencies(id, peer);
});
peer.optimisticKnownStates[id] = combinedKnownStates(
peer.optimisticKnownStates[id] || emptyKnownState(id),
optimisticKnownStateBefore,
coValue.knownState()
);
}
@@ -261,6 +279,9 @@ export class SyncManager {
for await (const msg of peerState.incoming) {
try {
await this.handleSyncMessage(msg, peerState);
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
});
} catch (e) {
console.error(
`Error reading from peer ${peer.id}, handling msg`,
@@ -445,6 +466,10 @@ export class SyncManager {
const newTransactions =
newContentForSession.newTransactions.slice(alreadyKnownOffset);
if (newTransactions.length === 0) {
continue;
}
const before = performance.now();
const success = await coValue.tryAddTransactionsAsync(
sessionID,
@@ -454,20 +479,26 @@ export class SyncManager {
);
const after = performance.now();
if (after - before > 10) {
const totalTxLength = newTransactions.map(t => stableStringify(t)!.length).reduce((a, b) => a + b, 0);
const totalTxLength = newTransactions
.map((t) =>
t.privacy === "private"
? t.encryptedChanges.length
: t.changes.length
)
.reduce((a, b) => a + b, 0);
console.log(
"Adding incoming transactions took",
after - before,
"ms",
totalTxLength,
"bytes = ",
"bandwidth: MB/s",
(1000 * totalTxLength / (after - before)) / (1024 * 1024)
`Adding incoming transactions took ${(
after - before
).toFixed(2)}ms for ${totalTxLength} bytes = bandwidth: ${(
(1000 * totalTxLength) /
(after - before) /
(1024 * 1024)
).toFixed(2)} MB/s`
);
}
if (!success) {
console.error("Failed to add transactions", newTransactions);
console.error("Failed to add transactions", msg.id, newTransactions);
continue;
}
@@ -492,18 +523,9 @@ export class SyncManager {
}
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
const coValue = this.local.expectCoValueLoaded(msg.id);
peer.optimisticKnownStates[msg.id] = msg;
peer.optimisticKnownStates[msg.id] = combinedKnownStates(
msg,
coValue.knownState()
);
const newContent = coValue.newContentSince(msg);
if (newContent) {
await this.trySendToPeer(peer, newContent);
}
return this.sendNewContentIncludingDependencies(msg.id, peer);
}
handleUnsubscribe(_msg: DoneMessage) {

View File

@@ -1,11 +1,11 @@
{
"name": "jazz-browser-auth-local",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"jazz-browser": "^0.2.0",
"jazz-browser": "^0.2.1",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,13 +1,13 @@
{
"name": "jazz-browser-media-images",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.0",
"cojson": "^0.2.1",
"image-blob-reduce": "^4.1.0",
"jazz-browser": "^0.2.0",
"jazz-browser": "^0.2.1",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -141,7 +141,8 @@ export function loadImage(
const resState: {
[res: `${number}x${number}`]:
| { state: "queued" }
| { state: "loading" }
| { state: "waiting" }
| { state: "loading"; doneOrFailed: Promise<void> }
| { state: "loaded"; blobURL: string }
| { state: "revoked" }
| { state: "failed" }
@@ -170,7 +171,8 @@ export function loadImage(
const placeholderDataURL =
imageDefinition.get("placeholderDataURL");
const resolutions = imageDefinition.keys()
const resolutions = imageDefinition
.keys()
.filter(
(key): key is `${number}x${number}` =>
!!key.match(/\d+x\d+/)
@@ -182,48 +184,126 @@ export function loadImage(
});
const startLoading = async () => {
const notYetQueuedOrLoading = resolutions.filter(
(res) => !resState[res]
);
);
console.log("Loading iteration", resolutions, resState, notYetQueuedOrLoading);
// console.log(
// "Loading iteration",
// resolutions,
// resState,
// notYetQueuedOrLoading
// );
for (const res of notYetQueuedOrLoading) {
resState[res] = { state: "queued" };
resState[res] = { state: "queued" };
}
for (const res of notYetQueuedOrLoading) {
if (stopped) return;
resState[res] = { state: "loading" };
resState[res] = { state: "waiting" };
const binaryStreamId = imageDefinition.get(res)!;
console.log("Loading image res", imageID, res, binaryStreamId);
// console.log(
// "Loading image res",
// imageID,
// res,
// binaryStreamId
// );
const blob = await readBlobFromBinaryStream(
binaryStreamId,
localNode
const binaryStream = await localNode.load(
binaryStreamId
);
if (stopped) return;
if (!blob) {
if (!binaryStream) {
resState[res] = { state: "failed" };
console.log("Loading image res failed", imageID, res, binaryStreamId);
continue;
console.error(
"Loading image res failed",
imageID,
res,
binaryStreamId
);
return;
}
const blobURL = URL.createObjectURL(blob);
resState[res] = { state: "loaded", blobURL };
await new Promise<void>((resolveFullyLoaded) => {
const unsubFromStream = binaryStream.subscribe(
async (_) => {
if (stopped) return;
const currentState = resState[res];
if (currentState?.state === "loading") {
await currentState.doneOrFailed;
// console.log(
// "Retrying image res after previous attempt",
// imageID,
// res,
// binaryStreamId
// );
}
if (resState[res]?.state === "loaded") {
return;
}
console.log("Loaded image res", imageID, res, binaryStreamId);
const doneOrFailed = new Promise<void>(
// eslint-disable-next-line no-async-promise-executor
async (resolveDoneOrFailed) => {
const blob =
await readBlobFromBinaryStream(
binaryStreamId,
localNode
);
progressiveCallback({
originalSize,
placeholderDataURL,
highestResSrc: blobURL,
if (stopped) return;
if (!blob) {
// console.log(
// "Image res not available yet",
// imageID,
// res,
// binaryStreamId
// );
resolveDoneOrFailed();
return;
}
const blobURL =
URL.createObjectURL(blob);
resState[res] = {
state: "loaded",
blobURL,
};
// console.log(
// "Loaded image res",
// imageID,
// res,
// binaryStreamId
// );
progressiveCallback({
originalSize,
placeholderDataURL,
highestResSrc: blobURL,
});
unsubFromStream();
resolveDoneOrFailed();
await new Promise((resolve) =>
setTimeout(resolve, 0)
);
resolveFullyLoaded();
}
);
resState[res] = {
state: "loading",
doneOrFailed,
};
}
);
});
await new Promise((resolve) => setTimeout(resolve, 0));
}
};

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-browser",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.0",
"jazz-storage-indexeddb": "^0.2.0",
"cojson": "^0.2.1",
"jazz-storage-indexeddb": "^0.2.1",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,5 +1,6 @@
import { BinaryCoStream, InviteSecret } from "cojson";
import { BinaryCoStreamMeta } from "cojson";
import { MAX_RECOMMENDED_TX_SIZE } from "cojson";
import { cojsonReady } from "cojson";
import {
LocalNode,
@@ -72,6 +73,10 @@ export async function createBrowserNode({
node,
done: () => {
shouldTryToReconnect = false;
console.log("Cleaning up node")
for (const peer of Object.values(node.sync.peers)) {
peer.outgoing.close().catch(e => console.error("Error while closing peer", e));
}
sessionDone?.();
},
};
@@ -383,7 +388,7 @@ export async function createBinaryStreamFromBlob<
fileName: blob instanceof File ? blob.name : undefined,
});
}) as C;// TODO: fix this
const chunkSize = 256 * 1024;
const chunkSize = MAX_RECOMMENDED_TX_SIZE;
for (let idx = 0; idx < data.length; idx += chunkSize) {
stream = stream.edit((stream) => {
@@ -419,15 +424,11 @@ export async function readBlobFromBinaryStream<
return undefined;
}
const chunks = stream.getBinaryChunks();
const chunks = stream.getBinaryChunks(allowUnfinished);
if (!chunks) {
return undefined;
}
if (!allowUnfinished && !chunks.finished) {
return undefined;
}
return new Blob(chunks.chunks, { type: chunks.mimeType });
}

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-react-auth-local",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"jazz-browser-auth-local": "^0.2.0",
"jazz-react": "^0.2.0",
"jazz-browser-auth-local": "^0.2.1",
"jazz-react": "^0.2.1",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -1,14 +1,14 @@
{
"name": "jazz-react-media-images",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.0",
"jazz-browser": "^0.2.0",
"jazz-browser-media-images": "^0.2.0",
"jazz-react": "^0.2.0",
"cojson": "^0.2.1",
"jazz-browser": "^0.2.1",
"jazz-browser-media-images": "^0.2.1",
"jazz-react": "^0.2.1",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -1,12 +1,12 @@
{
"name": "jazz-react",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.0",
"jazz-browser": "^0.2.0",
"cojson": "^0.2.1",
"jazz-browser": "^0.2.1",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -47,6 +47,7 @@ export function WithJazz({
useEffect(() => {
let done: (() => void) | undefined = undefined;
let stop = false;
(async () => {
const nodeHandle = await createBrowserNode({
@@ -57,6 +58,11 @@ export function WithJazz({
undefined,
});
if (stop) {
nodeHandle.done();
return;
}
setNode(nodeHandle.node);
done = nodeHandle.done;
@@ -65,6 +71,7 @@ export function WithJazz({
});
return () => {
stop = true;
done && done();
};
}, [auth, syncAddress]);

View File

@@ -1,11 +1,11 @@
{
"name": "jazz-storage-indexeddb",
"version": "0.2.0",
"version": "0.2.1",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"license": "MIT",
"dependencies": {
"cojson": "^0.2.0",
"cojson": "^0.2.1",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -4,7 +4,9 @@ import {
SyncMessage,
Peer,
CojsonInternalTypes,
MAX_RECOMMENDED_TX_SIZE,
} from "cojson";
import { Signature } from "cojson/dist/crypto";
import {
ReadableStream,
WritableStream,
@@ -24,6 +26,7 @@ type SessionRow = {
sessionID: SessionID;
lastIdx: number;
lastSignature: CojsonInternalTypes.Signature;
bytesSinceLastSignature?: number;
};
type StoredSessionRow = SessionRow & { rowID: number };
@@ -34,6 +37,12 @@ type TransactionRow = {
tx: CojsonInternalTypes.Transaction;
};
type SignatureAfterRow = {
ses: number;
idx: number;
signature: CojsonInternalTypes.Signature;
};
export class IDBStorage {
db: IDBDatabase;
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
@@ -55,7 +64,7 @@ export class IDBStorage {
done = result.done;
if (result.value) {
this.handleSyncMessage(result.value);
await this.handleSyncMessage(result.value);
}
}
})();
@@ -88,42 +97,63 @@ export class IDBStorage {
toLocalNode: WritableStream<SyncMessage>
) {
const dbPromise = new Promise<IDBDatabase>((resolve, reject) => {
const request = indexedDB.open("jazz-storage", 1);
const request = indexedDB.open("jazz-storage", 3);
request.onerror = () => {
reject(request.error);
};
request.onsuccess = () => {
resolve(request.result);
};
request.onupgradeneeded = () => {
request.onupgradeneeded = async (ev) => {
const db = request.result;
if (ev.oldVersion === 0) {
const coValues = db.createObjectStore("coValues", {
autoIncrement: true,
keyPath: "rowID",
});
const coValues = db.createObjectStore("coValues", {
autoIncrement: true,
keyPath: "rowID",
});
coValues.createIndex("coValuesById", "id", {
unique: true,
});
const sessions = db.createObjectStore("sessions", {
autoIncrement: true,
keyPath: "rowID",
});
sessions.createIndex("sessionsByCoValue", "coValue");
sessions.createIndex(
"uniqueSessions",
["coValue", "sessionID"],
{
coValues.createIndex("coValuesById", "id", {
unique: true,
}
);
});
db.createObjectStore("transactions", {
keyPath: ["ses", "idx"],
});
const sessions = db.createObjectStore("sessions", {
autoIncrement: true,
keyPath: "rowID",
});
sessions.createIndex("sessionsByCoValue", "coValue");
sessions.createIndex(
"uniqueSessions",
["coValue", "sessionID"],
{
unique: true,
}
);
db.createObjectStore("transactions", {
keyPath: ["ses", "idx"],
});
}
if (ev.oldVersion <= 1) {
db.createObjectStore("signatureAfter", {
keyPath: ["ses", "idx"],
});
}
if (ev.oldVersion !== 0 && ev.oldVersion === 2) {
// fix embarrassing off-by-one error for transaction indices
console.log("Migration: fixing off-by-one error");
const transaction = (ev.target as unknown as {transaction: IDBTransaction}).transaction;
const txsStore = transaction.objectStore("transactions");
const txs = await promised(txsStore.getAll());
for (const tx of txs) {
await promised(txsStore.delete([tx.ses, tx.idx]));
tx.idx -= 1;
await promised(txsStore.add(tx));
}
console.log("Migration: fixing off-by-one error - done");
}
};
});
@@ -153,10 +183,12 @@ export class IDBStorage {
coValues,
sessions,
transactions,
signatureAfter,
}: {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
},
asDependencyOf?: CojsonInternalTypes.RawCoID
) {
@@ -176,12 +208,14 @@ export class IDBStorage {
sessions: {},
};
const newContent: CojsonInternalTypes.NewContentMessage = {
action: "content",
id: theirKnown.id,
header: theirKnown.header ? undefined : coValueRow?.header,
new: {},
};
const newContentPieces: CojsonInternalTypes.NewContentMessage[] = [
{
action: "content",
id: theirKnown.id,
header: theirKnown.header ? undefined : coValueRow?.header,
new: {},
},
];
for (const sessionRow of allOurSessions) {
ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx;
@@ -193,6 +227,21 @@ export class IDBStorage {
const firstNewTxIdx =
theirKnown.sessions[sessionRow.sessionID] || 0;
const signaturesAndIdxs = await promised<SignatureAfterRow[]>(
signatureAfter.getAll(
IDBKeyRange.bound(
[sessionRow.rowID, firstNewTxIdx],
[sessionRow.rowID, Infinity]
)
)
);
console.log(
theirKnown.id,
"signaturesAndIdxs",
JSON.stringify(signaturesAndIdxs)
);
const newTxInSession = await promised<TransactionRow[]>(
transactions.getAll(
IDBKeyRange.bound(
@@ -202,38 +251,83 @@ export class IDBStorage {
)
);
newContent.new[sessionRow.sessionID] = {
after: firstNewTxIdx,
lastSignature: sessionRow.lastSignature,
newTransactions: newTxInSession.map((row) => row.tx),
};
let idx = firstNewTxIdx;
console.log(
theirKnown.id,
"newTxInSession",
newTxInSession.length
);
for (const tx of newTxInSession) {
let sessionEntry =
newContentPieces[newContentPieces.length - 1]!.new[
sessionRow.sessionID
];
if (!sessionEntry) {
sessionEntry = {
after: idx,
lastSignature: "WILL_BE_REPLACED" as Signature,
newTransactions: [],
};
newContentPieces[newContentPieces.length - 1]!.new[
sessionRow.sessionID
] = sessionEntry;
}
sessionEntry.newTransactions.push(tx.tx);
if (
signaturesAndIdxs[0] &&
idx === signaturesAndIdxs[0].idx
) {
sessionEntry.lastSignature =
signaturesAndIdxs[0].signature;
signaturesAndIdxs.shift();
newContentPieces.push({
action: "content",
id: theirKnown.id,
new: {},
});
} else if (
idx ===
firstNewTxIdx + newTxInSession.length - 1
) {
sessionEntry.lastSignature = sessionRow.lastSignature;
}
idx += 1;
}
}
}
const dependedOnCoValues =
coValueRow?.header.ruleset.type === "group"
? Object.values(newContent.new).flatMap((sessionEntry) =>
sessionEntry.newTransactions.flatMap((tx) => {
if (tx.privacy !== "trusting") return [];
// TODO: avoid parse here?
return cojsonInternals
.parseJSON(tx.changes)
.map(
(change) =>
change &&
typeof change === "object" &&
"op" in change &&
change.op === "set" &&
"key" in change &&
change.key
)
.filter(
(key): key is CojsonInternalTypes.RawCoID =>
typeof key === "string" &&
key.startsWith("co_")
);
})
)
? newContentPieces
.flatMap((piece) => Object.values(piece.new))
.flatMap((sessionEntry) =>
sessionEntry.newTransactions.flatMap((tx) => {
if (tx.privacy !== "trusting") return [];
// TODO: avoid parse here?
return cojsonInternals
.parseJSON(tx.changes)
.map(
(change) =>
change &&
typeof change === "object" &&
"op" in change &&
change.op === "set" &&
"key" in change &&
change.key
)
.filter(
(
key
): key is CojsonInternalTypes.RawCoID =>
typeof key === "string" &&
key.startsWith("co_")
);
})
)
: coValueRow?.header.ruleset.type === "ownedByGroup"
? [coValueRow?.header.ruleset.group]
: [];
@@ -241,7 +335,7 @@ export class IDBStorage {
for (const dependedOnCoValue of dependedOnCoValues) {
await this.sendNewContentAfter(
{ id: dependedOnCoValue, header: false, sessions: {} },
{ coValues, sessions, transactions },
{ coValues, sessions, transactions, signatureAfter },
asDependencyOf || theirKnown.id
);
}
@@ -252,8 +346,15 @@ export class IDBStorage {
asDependencyOf,
});
if (newContent.header || Object.keys(newContent.new).length > 0) {
await this.toLocalNode.write(newContent);
const nonEmptyNewContentPieces = newContentPieces.filter(
(piece) => piece.header || Object.keys(piece.new).length > 0
);
console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await this.toLocalNode.write(piece);
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
@@ -262,7 +363,7 @@ export class IDBStorage {
}
async handleContent(msg: CojsonInternalTypes.NewContentMessage) {
const { coValues, sessions, transactions } =
const { coValues, sessions, transactions, signatureAfter } =
this.inTransaction("readwrite");
let storedCoValueRowID = (
@@ -333,18 +434,39 @@ export class IDBStorage {
const actuallyNewOffset =
(sessionRow?.lastIdx || 0) -
(msg.new[sessionID]?.after || 0);
const actuallyNewTransactions =
newTransactions.slice(actuallyNewOffset);
let newBytesSinceLastSignature =
(sessionRow?.bytesSinceLastSignature || 0) +
actuallyNewTransactions.reduce(
(sum, tx) =>
sum +
(tx.privacy === "private"
? tx.encryptedChanges.length
: tx.changes.length),
0
);
const newLastIdx =
(sessionRow?.lastIdx || 0) + actuallyNewTransactions.length;
let shouldWriteSignature = false;
if (newBytesSinceLastSignature > MAX_RECOMMENDED_TX_SIZE) {
shouldWriteSignature = true;
newBytesSinceLastSignature = 0;
}
let nextIdx = sessionRow?.lastIdx || 0;
const sessionUpdate = {
coValue: storedCoValueRowID,
sessionID: sessionID,
lastIdx:
(sessionRow?.lastIdx || 0) +
actuallyNewTransactions.length,
lastIdx: newLastIdx,
lastSignature: msg.new[sessionID]!.lastSignature,
bytesSinceLastSignature: newBytesSinceLastSignature,
};
const sessionRowID = (await promised(
@@ -358,8 +480,18 @@ export class IDBStorage {
)
)) as number;
if (shouldWriteSignature) {
await promised(
signatureAfter.put({
ses: sessionRowID,
// TODO: newLastIdx is a misnomer, it's actually more like nextIdx or length
idx: newLastIdx - 1,
signature: msg.new[sessionID]!.lastSignature,
} satisfies SignatureAfterRow)
);
}
for (const newTransaction of actuallyNewTransactions) {
nextIdx++;
await promised(
transactions.add({
ses: sessionRowID,
@@ -367,6 +499,7 @@ export class IDBStorage {
tx: newTransaction,
} satisfies TransactionRow)
);
nextIdx++;
}
}
}
@@ -390,9 +523,10 @@ export class IDBStorage {
coValues: IDBObjectStore;
sessions: IDBObjectStore;
transactions: IDBObjectStore;
signatureAfter: IDBObjectStore;
} {
const tx = this.db.transaction(
["coValues", "sessions", "transactions"],
["coValues", "sessions", "transactions", "signatureAfter"],
mode
);
@@ -409,8 +543,9 @@ export class IDBStorage {
const coValues = tx.objectStore("coValues");
const sessions = tx.objectStore("sessions");
const transactions = tx.objectStore("transactions");
const signatureAfter = tx.objectStore("signatureAfter");
return { coValues, sessions, transactions };
return { coValues, sessions, transactions, signatureAfter };
}
}