Compare commits
14 Commits
jazz-react
...
cojson-sto
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
17e53f9998 | ||
|
|
cfb1f39efe | ||
|
|
2234276dcf | ||
|
|
bb0a6a0600 | ||
|
|
0a6eb0c10a | ||
|
|
88b67d89e0 | ||
|
|
1a65d826b2 | ||
|
|
6c65ec2b46 | ||
|
|
5b578a832d | ||
|
|
042afc52d7 | ||
|
|
1b83493964 | ||
|
|
3b50da1a74 | ||
|
|
8e0fc74d9f | ||
|
|
e28326f32c |
63
.github/workflows/build-and-deploy.yaml
vendored
63
.github/workflows/build-and-deploy.yaml
vendored
@@ -7,8 +7,11 @@ on:
|
||||
branches: [ "main" ]
|
||||
|
||||
jobs:
|
||||
build-and-deploy:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
example: ["todo", "pets"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
@@ -17,40 +20,50 @@ jobs:
|
||||
|
||||
- uses: actions/setup-node@v3
|
||||
with:
|
||||
node-version: 18
|
||||
node-version: 16
|
||||
cache: 'yarn'
|
||||
cache-dependency-path: yarn.lock
|
||||
- name: Nuke Workspace
|
||||
run: |
|
||||
rm package.json yarn.lock;
|
||||
- name: Yarn Build
|
||||
run: |
|
||||
yarn install --frozen-lockfile;
|
||||
yarn build;
|
||||
working-directory: ./examples/todo
|
||||
|
||||
- uses: satackey/action-docker-layer-caching@v0.0.11
|
||||
continue-on-error: true
|
||||
with:
|
||||
key: docker-layer-caching-${{ github.workflow }}-{hash}
|
||||
restore-keys: |
|
||||
docker-layer-caching-${{ github.workflow }}-
|
||||
- name: Set up Docker Buildx
|
||||
uses: docker/setup-buildx-action@v2
|
||||
|
||||
- name: Login to GitHub Container Registry
|
||||
uses: docker/login-action@v1
|
||||
uses: docker/login-action@v2
|
||||
with:
|
||||
registry: ghcr.io
|
||||
username: gardencmp
|
||||
password: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Docker Build & Push
|
||||
- name: Nuke Workspace
|
||||
run: |
|
||||
export DOCKER_TAG=ghcr.io/gardencmp/jazz-example-todo:${{github.head_ref || github.ref_name}}-${{github.sha}}-$(date +%s) ;
|
||||
docker build . --file Dockerfile --tag $DOCKER_TAG;
|
||||
docker push $DOCKER_TAG;
|
||||
echo "DOCKER_TAG=$DOCKER_TAG" >> $GITHUB_ENV
|
||||
working-directory: ./examples/todo
|
||||
rm package.json yarn.lock;
|
||||
|
||||
- name: Yarn Build
|
||||
run: |
|
||||
yarn install --frozen-lockfile;
|
||||
yarn build;
|
||||
working-directory: ./examples/${{ matrix.example }}
|
||||
|
||||
- name: Docker Build & Push
|
||||
uses: docker/build-push-action@v4
|
||||
with:
|
||||
context: ./examples/${{ matrix.example }}
|
||||
push: true
|
||||
tags: ghcr.io/gardencmp/${{github.event.repository.name}}-example-${{ matrix.example }}:${{github.head_ref || github.ref_name}}-${{github.sha}}-${{github.run_number}}-${{github.run_attempt}}
|
||||
cache-from: type=gha
|
||||
cache-to: type=gha,mode=max
|
||||
|
||||
deploy:
|
||||
runs-on: ubuntu-latest
|
||||
needs: build
|
||||
strategy:
|
||||
matrix:
|
||||
example: ["todo", "pets"]
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
with:
|
||||
submodules: true
|
||||
- uses: gacts/install-nomad@v1
|
||||
- name: Tailscale
|
||||
uses: tailscale/github-action@v1
|
||||
@@ -69,9 +82,9 @@ jobs:
|
||||
|
||||
export DOCKER_USER=gardencmp;
|
||||
export DOCKER_PASSWORD=${{ secrets.DOCKER_PULL_PAT }};
|
||||
export DOCKER_TAG=${{ env.DOCKER_TAG }};
|
||||
export DOCKER_TAG=ghcr.io/gardencmp/${{github.event.repository.name}}-example-${{ matrix.example }}:${{github.head_ref || github.ref_name}}-${{github.sha}}-${{github.run_number}}-${{github.run_attempt}};
|
||||
|
||||
envsubst '${DOCKER_USER} ${DOCKER_PASSWORD} ${DOCKER_TAG} ${BRANCH_SUFFIX} ${BRANCH_SUBDOMAIN}' < job-template.nomad > job-instance.nomad;
|
||||
cat job-instance.nomad;
|
||||
NOMAD_ADDR='http://control1v2-london:4646' nomad job run job-instance.nomad;
|
||||
working-directory: ./examples/todo
|
||||
working-directory: ./examples/${{ matrix.example }}
|
||||
43
DOCS.md
43
DOCS.md
@@ -2061,10 +2061,12 @@ The `Group` this `CoValue` belongs to (determining permissions)
|
||||
### Methods
|
||||
|
||||
<details>
|
||||
<summary><code>binaryCoStream.getBinaryChunks()</code> (undocumented)</summary>
|
||||
<summary><code>binaryCoStream.getBinaryChunks(allowUnfinished)</code> (undocumented)</summary>
|
||||
|
||||
```typescript
|
||||
binaryCoStream.getBinaryChunks(): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
|
||||
binaryCoStream.getBinaryChunks(
|
||||
allowUnfinished: boolean
|
||||
): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
|
||||
```
|
||||
TODO: document
|
||||
|
||||
@@ -2340,10 +2342,12 @@ TODO: document
|
||||
|
||||
|
||||
<details>
|
||||
<summary><code>writeableBinaryCoStream.getBinaryChunks()</code> (from <code>BinaryCoStream</code>) (undocumented)</summary>
|
||||
<summary><code>writeableBinaryCoStream.getBinaryChunks(allowUnfinished)</code> (from <code>BinaryCoStream</code>) (undocumented)</summary>
|
||||
|
||||
```typescript
|
||||
writeableBinaryCoStream.getBinaryChunks(): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
|
||||
writeableBinaryCoStream.getBinaryChunks(
|
||||
allowUnfinished: boolean
|
||||
): undefined | BinaryChunkInfo & {chunks: Uint8Array[], finished: boolean}
|
||||
```
|
||||
TODO: document
|
||||
|
||||
@@ -2653,6 +2657,24 @@ TODO: document
|
||||
|
||||
|
||||
|
||||
<details>
|
||||
<summary><code>coValueCore.doAddTransactions(sessionID, newTransactions, newSignature, expectedNewHash, newStreamingHash)</code> (undocumented)</summary>
|
||||
|
||||
```typescript
|
||||
coValueCore.doAddTransactions(
|
||||
sessionID: SessionID,
|
||||
newTransactions: Transaction[],
|
||||
newSignature: TEMPLATE_LITERAL,
|
||||
expectedNewHash: TEMPLATE_LITERAL,
|
||||
newStreamingHash: StreamingHash
|
||||
): void
|
||||
```
|
||||
TODO: document
|
||||
|
||||
</details>
|
||||
|
||||
|
||||
|
||||
<details>
|
||||
<summary><code>coValueCore.subscribe(listener)</code> (undocumented)</summary>
|
||||
|
||||
@@ -2794,7 +2816,7 @@ TODO: document
|
||||
```typescript
|
||||
coValueCore.newContentSince(
|
||||
knownState: undefined | CoValueKnownState
|
||||
): undefined | NewContentMessage
|
||||
): undefined | NewContentMessage[]
|
||||
```
|
||||
TODO: document
|
||||
|
||||
@@ -3251,6 +3273,17 @@ TODO: document
|
||||
|
||||
TODO: doc generator not implemented yet
|
||||
|
||||
----
|
||||
|
||||
## `MAX_RECOMMENDED_TX_SIZE` (variabl in `cojson`)
|
||||
|
||||
```typescript
|
||||
export MAX_RECOMMENDED_TX_SIZE
|
||||
```
|
||||
TODO: document
|
||||
|
||||
TODO: doc generator not implemented yet
|
||||
|
||||
|
||||
# jazz-react
|
||||
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
# Jazz Todo List Example
|
||||
# Jazz Rate-My-Pet List Example
|
||||
|
||||
Live version: https://example-todo.jazz.tools
|
||||
Live version: https://example-pets.jazz.tools
|
||||
|
||||
## Installing & running the example locally
|
||||
|
||||
Start by checking out just the example app to a folder:
|
||||
|
||||
```bash
|
||||
npx degit gardencmp/jazz/examples/todo jazz-example-todo
|
||||
cd jazz-example-todo
|
||||
npx degit gardencmp/jazz/examples/pets jazz-example-pets
|
||||
cd jazz-example-pets
|
||||
```
|
||||
|
||||
(This ensures that you have the example app without git history or our multi-package monorepo)
|
||||
@@ -27,31 +27,17 @@ npm run dev
|
||||
|
||||
## Structure
|
||||
|
||||
- [`src/basicComponents`](./src/basicComponents) contains simple components to build the UI, unrelated to Jazz (powered by [shadcn/ui](https://ui.shadcn.com))
|
||||
- [`src/components`](./src/components/) contains helper components that do contain Jazz-specific logic, but are not super relevant to understand the basics of Jazz and CoJSON
|
||||
- [`src/0_main.tsx`](./src/0_main.tsx), [`src/1_types.ts`](./src/1_types.ts), [`src/2_App.tsx`](./src/2_App.tsx), [`src/3_TodoTable.tsx`](./src/3_TodoTable.tsx), [`src/router.ts`](./src/router.ts) - the main files for this example, see the walkthrough below
|
||||
TODO
|
||||
|
||||
## Walkthrough
|
||||
|
||||
### Main parts
|
||||
|
||||
- The top-level provider `<WithJazz/>`: [`src/0_main.tsx`](./src/0_main.tsx)
|
||||
|
||||
- Defining the data model with CoJSON: [`src/1_types.ts`](./src/1_types.ts)
|
||||
|
||||
- Creating todo projects & routing in `<App/>`: [`src/2_App.tsx`](./src/2_App.tsx)
|
||||
|
||||
- Reactively rendering a todo project as a table, adding and editing tasks: [`src/3_TodoTable.tsx`](./src/3_TodoTable.tsx)
|
||||
TODO
|
||||
|
||||
### Helpers
|
||||
|
||||
- Getting user profiles in `<NameBadge/>`: [`src/components/NameBadge.tsx`](./src/components/NameBadge.tsx)
|
||||
|
||||
- (not yet commented) Creating invite links/QR codes with `<InviteButton/>`: [`src/components/InviteButton.tsx`](./src/components/InviteButton.tsx)
|
||||
|
||||
- (not yet commented) `location.hash`-based routing and accepting invite links with `useSimpleHashRouterThatAcceptsInvites()` in [`src/router.ts`](./src/router.ts)
|
||||
|
||||
This is the whole Todo List app!
|
||||
TODO
|
||||
|
||||
## Questions / problems / feedback
|
||||
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
job "example-todo$BRANCH_SUFFIX" {
|
||||
job "example-pets$BRANCH_SUFFIX" {
|
||||
region = "global"
|
||||
datacenters = ["*"]
|
||||
|
||||
@@ -41,7 +41,7 @@ job "example-todo$BRANCH_SUFFIX" {
|
||||
|
||||
service {
|
||||
tags = ["public"]
|
||||
name = "example-todo$BRANCH_SUFFIX"
|
||||
name = "example-pets$BRANCH_SUFFIX"
|
||||
port = "http"
|
||||
provider = "consul"
|
||||
}
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "jazz-example-pets",
|
||||
"private": true,
|
||||
"version": "0.0.4",
|
||||
"version": "0.0.5",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
@@ -16,9 +16,9 @@
|
||||
"@types/qrcode": "^1.5.1",
|
||||
"class-variance-authority": "^0.7.0",
|
||||
"clsx": "^2.0.0",
|
||||
"jazz-react": "^0.2.0",
|
||||
"jazz-react-auth-local": "^0.2.0",
|
||||
"jazz-react-media-images": "^0.2.0",
|
||||
"jazz-react": "^0.2.1",
|
||||
"jazz-react-auth-local": "^0.2.1",
|
||||
"jazz-react-media-images": "^0.2.1",
|
||||
"lucide-react": "^0.274.0",
|
||||
"qrcode": "^1.5.3",
|
||||
"react": "^18.2.0",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "jazz-example-todo",
|
||||
"private": true,
|
||||
"version": "0.0.29",
|
||||
"version": "0.0.30",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
@@ -16,8 +16,8 @@
|
||||
"@types/qrcode": "^1.5.1",
|
||||
"class-variance-authority": "^0.7.0",
|
||||
"clsx": "^2.0.0",
|
||||
"jazz-react": "^0.2.0",
|
||||
"jazz-react-auth-local": "^0.2.0",
|
||||
"jazz-react": "^0.2.1",
|
||||
"jazz-react-auth-local": "^0.2.1",
|
||||
"lucide-react": "^0.274.0",
|
||||
"qrcode": "^1.5.3",
|
||||
"react": "^18.2.0",
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
"types": "src/index.ts",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"devDependencies": {
|
||||
"@types/jest": "^29.5.3",
|
||||
"@types/ws": "^8.5.5",
|
||||
@@ -16,8 +16,8 @@
|
||||
"typescript": "5.0.2"
|
||||
},
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"cojson-storage-sqlite": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"cojson-storage-sqlite": "^0.2.1",
|
||||
"ws": "^8.13.0"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
{
|
||||
"name": "cojson-storage-sqlite",
|
||||
"type": "module",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"better-sqlite3": "^8.5.2",
|
||||
"cojson": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
@@ -60,7 +60,7 @@ export class SQLiteStorage {
|
||||
done = result.done;
|
||||
|
||||
if (result.value) {
|
||||
this.handleSyncMessage(result.value);
|
||||
await this.handleSyncMessage(result.value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
"types": "dist/index.d.ts",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"devDependencies": {
|
||||
"@types/jest": "^29.5.3",
|
||||
"@typescript-eslint/eslint-plugin": "^6.2.1",
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { accountOrAgentIDfromSessionID } from "./coValueCore.js";
|
||||
import { BinaryCoStream } from "./coValues/coStream.js";
|
||||
import { createdNowUnique } from "./crypto.js";
|
||||
import { cojsonReady } from "./index.js";
|
||||
import { MAX_RECOMMENDED_TX_SIZE, cojsonReady } from "./index.js";
|
||||
import { LocalNode } from "./node.js";
|
||||
import { randomAnonymousAccountAndSessionID } from "./testUtils.js";
|
||||
|
||||
@@ -382,14 +382,14 @@ test("Can push into BinaryCoStream", () => {
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
|
||||
expect(editable.getBinaryChunks()).toEqual({
|
||||
expect(editable.getBinaryChunks(true)).toEqual({
|
||||
mimeType: "text/plain",
|
||||
fileName: "test.txt",
|
||||
chunks: [],
|
||||
finished: false,
|
||||
});
|
||||
editable.pushBinaryStreamChunk(new Uint8Array([1, 2, 3]), "trusting");
|
||||
expect(editable.getBinaryChunks()).toEqual({
|
||||
expect(editable.getBinaryChunks(true)).toEqual({
|
||||
mimeType: "text/plain",
|
||||
fileName: "test.txt",
|
||||
chunks: [new Uint8Array([1, 2, 3])],
|
||||
@@ -397,7 +397,7 @@ test("Can push into BinaryCoStream", () => {
|
||||
});
|
||||
editable.pushBinaryStreamChunk(new Uint8Array([4, 5, 6]), "trusting");
|
||||
|
||||
expect(editable.getBinaryChunks()).toEqual({
|
||||
expect(editable.getBinaryChunks(true)).toEqual({
|
||||
mimeType: "text/plain",
|
||||
fileName: "test.txt",
|
||||
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
|
||||
@@ -413,3 +413,112 @@ test("Can push into BinaryCoStream", () => {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
test("When adding large transactions (small fraction of MAX_RECOMMENDED_TX_SIZE), we store an inbetween signature every time we reach MAX_RECOMMENDED_TX_SIZE and split up newContentSince accordingly", () => {
|
||||
const node = new LocalNode(...randomAnonymousAccountAndSessionID());
|
||||
|
||||
const coValue = node.createCoValue({
|
||||
type: "costream",
|
||||
ruleset: { type: "unsafeAllowAll" },
|
||||
meta: { type: "binary" },
|
||||
...createdNowUnique(),
|
||||
});
|
||||
|
||||
const content = coValue.getCurrentContent();
|
||||
|
||||
if (content.type !== "costream" || content.meta?.type !== "binary" || !(content instanceof BinaryCoStream)) {
|
||||
throw new Error("Expected binary stream");
|
||||
}
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
|
||||
});
|
||||
|
||||
for (let i = 0; i < 10; i++) {
|
||||
const chunk = new Uint8Array(MAX_RECOMMENDED_TX_SIZE/3 + 100);
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.pushBinaryStreamChunk(chunk, "trusting");
|
||||
});
|
||||
}
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.endBinaryStream("trusting");
|
||||
});
|
||||
|
||||
const sessionEntry = coValue._sessions[node.currentSessionID]!;
|
||||
expect(sessionEntry.transactions.length).toEqual(12);
|
||||
expect(sessionEntry.signatureAfter[0]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[1]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[2]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[3]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[4]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[5]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[6]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[7]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[8]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[9]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[10]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[11]).not.toBeDefined();
|
||||
|
||||
const newContent = coValue.newContentSince({id: coValue.id, header: false, sessions: {}})!;
|
||||
|
||||
expect(newContent.length).toEqual(5)
|
||||
expect(newContent[0]!.header).toBeDefined();
|
||||
expect(newContent[1]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[3]);
|
||||
expect(newContent[2]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[6]);
|
||||
expect(newContent[3]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[9]);
|
||||
expect(newContent[4]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.lastSignature);
|
||||
});
|
||||
|
||||
test("When adding large transactions (bigger than MAX_RECOMMENDED_TX_SIZE), we store an inbetween signature after every large transaction and split up newContentSince accordingly", () => {
|
||||
const node = new LocalNode(...randomAnonymousAccountAndSessionID());
|
||||
|
||||
const coValue = node.createCoValue({
|
||||
type: "costream",
|
||||
ruleset: { type: "unsafeAllowAll" },
|
||||
meta: { type: "binary" },
|
||||
...createdNowUnique(),
|
||||
});
|
||||
|
||||
const content = coValue.getCurrentContent();
|
||||
|
||||
if (content.type !== "costream" || content.meta?.type !== "binary" || !(content instanceof BinaryCoStream)) {
|
||||
throw new Error("Expected binary stream");
|
||||
}
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.startBinaryStream({mimeType: "text/plain", fileName: "test.txt"}, "trusting");
|
||||
});
|
||||
|
||||
const chunk = new Uint8Array(MAX_RECOMMENDED_TX_SIZE + 100);
|
||||
|
||||
for (let i = 0; i < 3; i++) {
|
||||
content.edit((editable) => {
|
||||
editable.pushBinaryStreamChunk(chunk, "trusting");
|
||||
});
|
||||
}
|
||||
|
||||
content.edit((editable) => {
|
||||
editable.endBinaryStream("trusting");
|
||||
});
|
||||
|
||||
const sessionEntry = coValue._sessions[node.currentSessionID]!;
|
||||
expect(sessionEntry.transactions.length).toEqual(5);
|
||||
expect(sessionEntry.signatureAfter[0]).not.toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[1]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[2]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[3]).toBeDefined();
|
||||
expect(sessionEntry.signatureAfter[4]).not.toBeDefined();
|
||||
|
||||
const newContent = coValue.newContentSince({id: coValue.id, header: false, sessions: {}})!;
|
||||
|
||||
expect(newContent.length).toEqual(5)
|
||||
expect(newContent[0]!.header).toBeDefined();
|
||||
expect(newContent[1]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[1]);
|
||||
expect(newContent[2]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[2]);
|
||||
expect(newContent[3]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.signatureAfter[3]);
|
||||
expect(newContent[4]!.new[node.currentSessionID]!.lastSignature).toEqual(sessionEntry.lastSignature);
|
||||
});
|
||||
|
||||
|
||||
|
||||
@@ -14,7 +14,6 @@ import {
|
||||
sign,
|
||||
verify,
|
||||
encryptForTransaction,
|
||||
decryptForTransaction,
|
||||
KeyID,
|
||||
decryptKeySecret,
|
||||
getAgentSignerID,
|
||||
@@ -33,11 +32,10 @@ import { LocalNode } from "./node.js";
|
||||
import { CoValueKnownState, NewContentMessage } from "./sync.js";
|
||||
import { AgentID, RawCoID, SessionID, TransactionID } from "./ids.js";
|
||||
import { CoList } from "./coValues/coList.js";
|
||||
import {
|
||||
AccountID,
|
||||
GeneralizedControlledAccount,
|
||||
} from "./account.js";
|
||||
import { Stringified, parseJSON, stableStringify } from "./jsonStringify.js";
|
||||
import { AccountID, GeneralizedControlledAccount } from "./account.js";
|
||||
import { Stringified, stableStringify } from "./jsonStringify.js";
|
||||
|
||||
export const MAX_RECOMMENDED_TX_SIZE = 100 * 1024;
|
||||
|
||||
export type CoValueHeader = {
|
||||
type: CoValueImpl["type"];
|
||||
@@ -66,6 +64,7 @@ type SessionLog = {
|
||||
transactions: Transaction[];
|
||||
lastHash?: Hash;
|
||||
streamingHash: StreamingHash;
|
||||
signatureAfter: { [txIdx: number]: Signature | undefined };
|
||||
lastSignature: Signature;
|
||||
};
|
||||
|
||||
@@ -79,7 +78,6 @@ export type PrivateTransaction = {
|
||||
>;
|
||||
};
|
||||
|
||||
|
||||
export type TrustingTransaction = {
|
||||
privacy: "trusting";
|
||||
madeAt: number;
|
||||
@@ -103,7 +101,11 @@ export class CoValueCore {
|
||||
_sessions: { [key: SessionID]: SessionLog };
|
||||
_cachedContent?: CoValueImpl;
|
||||
listeners: Set<(content?: CoValueImpl) => void> = new Set();
|
||||
_decryptionCache: {[key: Encrypted<JsonValue[], JsonValue>]: Stringified<JsonValue[]> | undefined} = {}
|
||||
_decryptionCache: {
|
||||
[key: Encrypted<JsonValue[], JsonValue>]:
|
||||
| Stringified<JsonValue[]>
|
||||
| undefined;
|
||||
} = {};
|
||||
|
||||
constructor(
|
||||
header: CoValueHeader,
|
||||
@@ -212,7 +214,8 @@ export class CoValueCore {
|
||||
// const beforeVerify = performance.now();
|
||||
if (!verify(newSignature, expectedNewHash, signerID)) {
|
||||
console.warn(
|
||||
"Invalid signature",
|
||||
"Invalid signature in",
|
||||
this.id,
|
||||
newSignature,
|
||||
expectedNewHash,
|
||||
signerID
|
||||
@@ -225,25 +228,13 @@ export class CoValueCore {
|
||||
// afterVerify - beforeVerify
|
||||
// );
|
||||
|
||||
const transactions = this.sessions[sessionID]?.transactions ?? [];
|
||||
|
||||
transactions.push(...newTransactions);
|
||||
|
||||
this._sessions[sessionID] = {
|
||||
transactions,
|
||||
lastHash: expectedNewHash,
|
||||
streamingHash: newStreamingHash,
|
||||
lastSignature: newSignature,
|
||||
};
|
||||
|
||||
this._cachedContent = undefined;
|
||||
|
||||
if (this.listeners.size > 0) {
|
||||
const content = this.getCurrentContent();
|
||||
for (const listener of this.listeners) {
|
||||
listener(content);
|
||||
}
|
||||
}
|
||||
this.doAddTransactions(
|
||||
sessionID,
|
||||
newTransactions,
|
||||
newSignature,
|
||||
expectedNewHash,
|
||||
newStreamingHash
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
@@ -272,10 +263,8 @@ export class CoValueCore {
|
||||
const nTxBefore = this.sessions[sessionID]?.transactions.length ?? 0;
|
||||
|
||||
// const beforeHash = performance.now();
|
||||
const { expectedNewHash, newStreamingHash } = await this.expectedNewHashAfterAsync(
|
||||
sessionID,
|
||||
newTransactions
|
||||
);
|
||||
const { expectedNewHash, newStreamingHash } =
|
||||
await this.expectedNewHashAfterAsync(sessionID, newTransactions);
|
||||
// const afterHash = performance.now();
|
||||
// console.log(
|
||||
// "Hashing took",
|
||||
@@ -286,7 +275,7 @@ export class CoValueCore {
|
||||
|
||||
if (nTxAfter !== nTxBefore) {
|
||||
const newTransactionLengthBefore = newTransactions.length;
|
||||
newTransactions = newTransactions.slice((nTxAfter - nTxBefore));
|
||||
newTransactions = newTransactions.slice(nTxAfter - nTxBefore);
|
||||
console.warn("Transactions changed while async hashing", {
|
||||
nTxBefore,
|
||||
nTxAfter,
|
||||
@@ -306,7 +295,8 @@ export class CoValueCore {
|
||||
// const beforeVerify = performance.now();
|
||||
if (!verify(newSignature, expectedNewHash, signerID)) {
|
||||
console.warn(
|
||||
"Invalid signature",
|
||||
"Invalid signature in",
|
||||
this.id,
|
||||
newSignature,
|
||||
expectedNewHash,
|
||||
signerID
|
||||
@@ -319,15 +309,61 @@ export class CoValueCore {
|
||||
// afterVerify - beforeVerify
|
||||
// );
|
||||
|
||||
const transactions = this.sessions[sessionID]?.transactions ?? [];
|
||||
this.doAddTransactions(
|
||||
sessionID,
|
||||
newTransactions,
|
||||
newSignature,
|
||||
expectedNewHash,
|
||||
newStreamingHash
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private doAddTransactions(
|
||||
sessionID: SessionID,
|
||||
newTransactions: Transaction[],
|
||||
newSignature: Signature,
|
||||
expectedNewHash: Hash,
|
||||
newStreamingHash: StreamingHash
|
||||
) {
|
||||
const transactions = this.sessions[sessionID]?.transactions ?? [];
|
||||
transactions.push(...newTransactions);
|
||||
|
||||
const signatureAfter = this.sessions[sessionID]?.signatureAfter ?? {};
|
||||
|
||||
const lastInbetweenSignatureIdx = Object.keys(signatureAfter).reduce(
|
||||
(max, idx) => (parseInt(idx) > max ? parseInt(idx) : max),
|
||||
-1
|
||||
);
|
||||
|
||||
const sizeOfTxsSinceLastInbetweenSignature = transactions
|
||||
.slice(lastInbetweenSignatureIdx + 1)
|
||||
.reduce(
|
||||
(sum, tx) =>
|
||||
sum +
|
||||
(tx.privacy === "private"
|
||||
? tx.encryptedChanges.length
|
||||
: tx.changes.length),
|
||||
0
|
||||
);
|
||||
|
||||
if (sizeOfTxsSinceLastInbetweenSignature > 100 * 1024) {
|
||||
// console.log(
|
||||
// "Saving inbetween signature for tx ",
|
||||
// sessionID,
|
||||
// transactions.length - 1,
|
||||
// sizeOfTxsSinceLastInbetweenSignature
|
||||
// );
|
||||
signatureAfter[transactions.length - 1] = newSignature;
|
||||
}
|
||||
|
||||
this._sessions[sessionID] = {
|
||||
transactions,
|
||||
lastHash: expectedNewHash,
|
||||
streamingHash: newStreamingHash,
|
||||
lastSignature: newSignature,
|
||||
signatureAfter: signatureAfter,
|
||||
};
|
||||
|
||||
this._cachedContent = undefined;
|
||||
@@ -338,8 +374,6 @@ export class CoValueCore {
|
||||
listener(content);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
subscribe(listener: (content?: CoValueImpl) => void): () => void {
|
||||
@@ -379,10 +413,10 @@ export class CoValueCore {
|
||||
new StreamingHash();
|
||||
let before = performance.now();
|
||||
for (const transaction of newTransactions) {
|
||||
streamingHash.update(transaction)
|
||||
streamingHash.update(transaction);
|
||||
const after = performance.now();
|
||||
if (after - before > 1) {
|
||||
console.log("Hashing blocked for", after - before);
|
||||
// console.log("Hashing blocked for", after - before);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
before = performance.now();
|
||||
}
|
||||
@@ -500,7 +534,8 @@ export class CoValueCore {
|
||||
if (!readKey) {
|
||||
return undefined;
|
||||
} else {
|
||||
let decrytedChanges = this._decryptionCache[tx.encryptedChanges];
|
||||
let decrytedChanges =
|
||||
this._decryptionCache[tx.encryptedChanges];
|
||||
|
||||
if (!decrytedChanges) {
|
||||
decrytedChanges = decryptRawForTransaction(
|
||||
@@ -511,7 +546,8 @@ export class CoValueCore {
|
||||
tx: txID,
|
||||
}
|
||||
);
|
||||
this._decryptionCache[tx.encryptedChanges] = decrytedChanges;
|
||||
this._decryptionCache[tx.encryptedChanges] =
|
||||
decrytedChanges;
|
||||
}
|
||||
|
||||
if (!decrytedChanges) {
|
||||
@@ -683,47 +719,95 @@ export class CoValueCore {
|
||||
|
||||
newContentSince(
|
||||
knownState: CoValueKnownState | undefined
|
||||
): NewContentMessage | undefined {
|
||||
const newContent: NewContentMessage = {
|
||||
): NewContentMessage[] | undefined {
|
||||
let currentPiece: NewContentMessage = {
|
||||
action: "content",
|
||||
id: this.id,
|
||||
header: knownState?.header ? undefined : this.header,
|
||||
new: Object.fromEntries(
|
||||
Object.entries(this.sessions)
|
||||
.map(([sessionID, log]) => {
|
||||
const newTransactions = log.transactions.slice(
|
||||
knownState?.sessions[sessionID as SessionID] || 0
|
||||
);
|
||||
|
||||
if (
|
||||
newTransactions.length === 0 ||
|
||||
!log.lastHash ||
|
||||
!log.lastSignature
|
||||
) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return [
|
||||
sessionID,
|
||||
{
|
||||
after:
|
||||
knownState?.sessions[
|
||||
sessionID as SessionID
|
||||
] || 0,
|
||||
newTransactions,
|
||||
lastSignature: log.lastSignature,
|
||||
},
|
||||
];
|
||||
})
|
||||
.filter((x): x is Exclude<typeof x, undefined> => !!x)
|
||||
),
|
||||
new: {},
|
||||
};
|
||||
|
||||
if (!newContent.header && Object.keys(newContent.new).length === 0) {
|
||||
const pieces = [currentPiece];
|
||||
|
||||
const sentState: CoValueKnownState["sessions"] = {
|
||||
...knownState?.sessions,
|
||||
};
|
||||
|
||||
let newTxsWereAdded = true;
|
||||
let pieceSize = 0;
|
||||
while (newTxsWereAdded) {
|
||||
newTxsWereAdded = false;
|
||||
|
||||
for (const [sessionID, log] of Object.entries(this.sessions) as [
|
||||
SessionID,
|
||||
SessionLog
|
||||
][]) {
|
||||
const nextKnownSignatureIdx = Object.keys(log.signatureAfter)
|
||||
.map(Number)
|
||||
.sort((a, b) => a - b)
|
||||
.find((idx) => idx >= (sentState[sessionID] ?? -1));
|
||||
|
||||
const txsToAdd = log.transactions.slice(
|
||||
sentState[sessionID] ?? 0,
|
||||
nextKnownSignatureIdx === undefined
|
||||
? undefined
|
||||
: nextKnownSignatureIdx + 1
|
||||
);
|
||||
|
||||
if (txsToAdd.length === 0) continue;
|
||||
|
||||
newTxsWereAdded = true;
|
||||
|
||||
const oldPieceSize = pieceSize;
|
||||
pieceSize += txsToAdd.reduce(
|
||||
(sum, tx) =>
|
||||
sum +
|
||||
(tx.privacy === "private"
|
||||
? tx.encryptedChanges.length
|
||||
: tx.changes.length),
|
||||
0
|
||||
);
|
||||
|
||||
if (pieceSize >= MAX_RECOMMENDED_TX_SIZE) {
|
||||
currentPiece = {
|
||||
action: "content",
|
||||
id: this.id,
|
||||
header: undefined,
|
||||
new: {},
|
||||
};
|
||||
pieces.push(currentPiece);
|
||||
pieceSize = pieceSize - oldPieceSize;
|
||||
}
|
||||
|
||||
let sessionEntry = currentPiece.new[sessionID];
|
||||
if (!sessionEntry) {
|
||||
sessionEntry = {
|
||||
after: sentState[sessionID] ?? 0,
|
||||
newTransactions: [],
|
||||
lastSignature: "WILL_BE_REPLACED" as Signature
|
||||
};
|
||||
currentPiece.new[sessionID] = sessionEntry;
|
||||
}
|
||||
|
||||
sessionEntry.newTransactions.push(...txsToAdd);
|
||||
sessionEntry.lastSignature = nextKnownSignatureIdx === undefined
|
||||
? log.lastSignature!
|
||||
: log.signatureAfter[nextKnownSignatureIdx]!
|
||||
|
||||
sentState[sessionID] =
|
||||
(sentState[sessionID] || 0) + txsToAdd.length;
|
||||
}
|
||||
}
|
||||
|
||||
const piecesWithContent = pieces.filter(
|
||||
(piece) => Object.keys(piece.new).length > 0 || piece.header
|
||||
);
|
||||
|
||||
if (piecesWithContent.length === 0) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return newContent;
|
||||
return piecesWithContent;
|
||||
}
|
||||
|
||||
getDependedOnCoValues(): RawCoID[] {
|
||||
|
||||
@@ -169,10 +169,10 @@ export class BinaryCoStream<
|
||||
{
|
||||
id!: CoID<BinaryCoStream<Meta>>;
|
||||
|
||||
getBinaryChunks():
|
||||
getBinaryChunks(allowUnfinished?: boolean):
|
||||
| (BinaryChunkInfo & { chunks: Uint8Array[]; finished: boolean })
|
||||
| undefined {
|
||||
const before = performance.now();
|
||||
// const before = performance.now();
|
||||
const items = this.getSingleStream();
|
||||
|
||||
if (!items) return;
|
||||
@@ -184,10 +184,14 @@ export class BinaryCoStream<
|
||||
return;
|
||||
}
|
||||
|
||||
const end = items[items.length - 1];
|
||||
|
||||
if (end?.type !== "end" && !allowUnfinished) return;
|
||||
|
||||
const chunks: Uint8Array[] = [];
|
||||
|
||||
let finished = false;
|
||||
let totalLength = 0;
|
||||
// let totalLength = 0;
|
||||
|
||||
for (const item of items.slice(1)) {
|
||||
if (item.type === "end") {
|
||||
@@ -203,15 +207,15 @@ export class BinaryCoStream<
|
||||
const chunk = base64URLtoBytes(
|
||||
item.chunk.slice(binary_U_prefixLength)
|
||||
);
|
||||
totalLength += chunk.length;
|
||||
// totalLength += chunk.length;
|
||||
chunks.push(chunk);
|
||||
}
|
||||
|
||||
const after = performance.now();
|
||||
console.log(
|
||||
"getBinaryChunks bandwidth in MB/s",
|
||||
(1000 * totalLength) / (after - before) / (1024 * 1024)
|
||||
);
|
||||
// const after = performance.now();
|
||||
// console.log(
|
||||
// "getBinaryChunks bandwidth in MB/s",
|
||||
// (1000 * totalLength) / (after - before) / (1024 * 1024)
|
||||
// );
|
||||
|
||||
return {
|
||||
mimeType: start.mimeType,
|
||||
@@ -286,7 +290,7 @@ export class WriteableBinaryCoStream<
|
||||
chunk: Uint8Array,
|
||||
privacy: "private" | "trusting" = "private"
|
||||
) {
|
||||
const before = performance.now();
|
||||
// const before = performance.now();
|
||||
this.push(
|
||||
{
|
||||
type: "chunk",
|
||||
@@ -294,11 +298,11 @@ export class WriteableBinaryCoStream<
|
||||
} satisfies BinaryStreamChunk,
|
||||
privacy
|
||||
);
|
||||
const after = performance.now();
|
||||
console.log(
|
||||
"pushBinaryStreamChunk bandwidth in MB/s",
|
||||
(1000 * chunk.length) / (after - before) / (1024 * 1024)
|
||||
);
|
||||
// const after = performance.now();
|
||||
// console.log(
|
||||
// "pushBinaryStreamChunk bandwidth in MB/s",
|
||||
// (1000 * chunk.length) / (after - before) / (1024 * 1024)
|
||||
// );
|
||||
}
|
||||
|
||||
endBinaryStream(privacy: "private" | "trusting" = "private") {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
import { CoValueCore, newRandomSessionID } from "./coValueCore.js";
|
||||
import { CoValueCore, newRandomSessionID, MAX_RECOMMENDED_TX_SIZE } from "./coValueCore.js";
|
||||
import { LocalNode } from "./node.js";
|
||||
import type { CoValue, ReadableCoValue } from "./coValue.js";
|
||||
import { CoMap, WriteableCoMap } from "./coValues/coMap.js";
|
||||
@@ -74,6 +74,7 @@ export {
|
||||
AnonymousControlledAccount,
|
||||
ControlledAccount,
|
||||
cryptoReady as cojsonReady,
|
||||
MAX_RECOMMENDED_TX_SIZE
|
||||
};
|
||||
|
||||
export type {
|
||||
|
||||
@@ -34,7 +34,14 @@ export function connectedPeers(
|
||||
trace &&
|
||||
console.debug(
|
||||
`${peer2id} -> ${peer1id}`,
|
||||
JSON.stringify(chunk, null, 2)
|
||||
JSON.stringify(
|
||||
chunk,
|
||||
(k, v) =>
|
||||
(k === "changes" || k === "encryptedChanges")
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
2
|
||||
)
|
||||
);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
@@ -52,7 +59,14 @@ export function connectedPeers(
|
||||
trace &&
|
||||
console.debug(
|
||||
`${peer1id} -> ${peer2id}`,
|
||||
JSON.stringify(chunk, null, 2)
|
||||
JSON.stringify(
|
||||
chunk,
|
||||
(k, v) =>
|
||||
(k === "changes" || k === "encryptedChanges")
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
2
|
||||
)
|
||||
);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
@@ -102,16 +116,22 @@ export function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
|
||||
},
|
||||
});
|
||||
|
||||
let lastWritePromise = Promise.resolve();
|
||||
|
||||
const writable = new WritableStream<T>({
|
||||
async write(chunk) {
|
||||
const enqueue = await enqueuePromise;
|
||||
if (readerClosed) {
|
||||
throw new Error("Reader closed");
|
||||
} else {
|
||||
// make sure write resolves before corresponding read
|
||||
setTimeout(() => {
|
||||
enqueue(chunk);
|
||||
})
|
||||
// make sure write resolves before corresponding read, but make sure writes are still in order
|
||||
await lastWritePromise;
|
||||
lastWritePromise = new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
enqueue(chunk);
|
||||
resolve();
|
||||
});
|
||||
});
|
||||
}
|
||||
},
|
||||
async abort(reason) {
|
||||
|
||||
@@ -436,8 +436,9 @@ test("No matter the optimistic known state, node respects invalid known state me
|
||||
editable.set("goodbye", "world", "trusting");
|
||||
});
|
||||
|
||||
const _mapEditMsg1 = await reader.read();
|
||||
const _mapEditMsg2 = await reader.read();
|
||||
const _mapEditMsgs = await reader.read();
|
||||
|
||||
console.log("Sending correction");
|
||||
|
||||
await writer.write({
|
||||
action: "known",
|
||||
|
||||
@@ -215,14 +215,32 @@ export class SyncManager {
|
||||
await this.sendNewContentIncludingDependencies(id, peer);
|
||||
}
|
||||
|
||||
const newContent = coValue.newContentSince(
|
||||
const newContentPieces = coValue.newContentSince(
|
||||
peer.optimisticKnownStates[id]
|
||||
);
|
||||
|
||||
if (newContent) {
|
||||
await this.trySendToPeer(peer, newContent);
|
||||
if (newContentPieces) {
|
||||
const optimisticKnownStateBefore =
|
||||
peer.optimisticKnownStates[id] || emptyKnownState(id);
|
||||
|
||||
const sendPieces = async () => {
|
||||
for (const [i, piece] of newContentPieces.entries()) {
|
||||
// console.log(
|
||||
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${newContentPieces.length} header: ${!!piece.header}`,
|
||||
// // Object.values(piece.new).map((s) => s.newTransactions)
|
||||
// );
|
||||
await this.trySendToPeer(peer, piece);
|
||||
}
|
||||
};
|
||||
|
||||
sendPieces().catch((e) => {
|
||||
console.error("Error sending new content piece, retrying", e);
|
||||
peer.optimisticKnownStates[id] = optimisticKnownStateBefore;
|
||||
return this.sendNewContentIncludingDependencies(id, peer);
|
||||
});
|
||||
|
||||
peer.optimisticKnownStates[id] = combinedKnownStates(
|
||||
peer.optimisticKnownStates[id] || emptyKnownState(id),
|
||||
optimisticKnownStateBefore,
|
||||
coValue.knownState()
|
||||
);
|
||||
}
|
||||
@@ -261,6 +279,9 @@ export class SyncManager {
|
||||
for await (const msg of peerState.incoming) {
|
||||
try {
|
||||
await this.handleSyncMessage(msg, peerState);
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, 0);
|
||||
});
|
||||
} catch (e) {
|
||||
console.error(
|
||||
`Error reading from peer ${peer.id}, handling msg`,
|
||||
@@ -445,6 +466,10 @@ export class SyncManager {
|
||||
const newTransactions =
|
||||
newContentForSession.newTransactions.slice(alreadyKnownOffset);
|
||||
|
||||
if (newTransactions.length === 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const before = performance.now();
|
||||
const success = await coValue.tryAddTransactionsAsync(
|
||||
sessionID,
|
||||
@@ -454,20 +479,26 @@ export class SyncManager {
|
||||
);
|
||||
const after = performance.now();
|
||||
if (after - before > 10) {
|
||||
const totalTxLength = newTransactions.map(t => stableStringify(t)!.length).reduce((a, b) => a + b, 0);
|
||||
const totalTxLength = newTransactions
|
||||
.map((t) =>
|
||||
t.privacy === "private"
|
||||
? t.encryptedChanges.length
|
||||
: t.changes.length
|
||||
)
|
||||
.reduce((a, b) => a + b, 0);
|
||||
console.log(
|
||||
"Adding incoming transactions took",
|
||||
after - before,
|
||||
"ms",
|
||||
totalTxLength,
|
||||
"bytes = ",
|
||||
"bandwidth: MB/s",
|
||||
(1000 * totalTxLength / (after - before)) / (1024 * 1024)
|
||||
`Adding incoming transactions took ${(
|
||||
after - before
|
||||
).toFixed(2)}ms for ${totalTxLength} bytes = bandwidth: ${(
|
||||
(1000 * totalTxLength) /
|
||||
(after - before) /
|
||||
(1024 * 1024)
|
||||
).toFixed(2)} MB/s`
|
||||
);
|
||||
}
|
||||
|
||||
if (!success) {
|
||||
console.error("Failed to add transactions", newTransactions);
|
||||
console.error("Failed to add transactions", msg.id, newTransactions);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -492,18 +523,9 @@ export class SyncManager {
|
||||
}
|
||||
|
||||
async handleCorrection(msg: KnownStateMessage, peer: PeerState) {
|
||||
const coValue = this.local.expectCoValueLoaded(msg.id);
|
||||
peer.optimisticKnownStates[msg.id] = msg;
|
||||
|
||||
peer.optimisticKnownStates[msg.id] = combinedKnownStates(
|
||||
msg,
|
||||
coValue.knownState()
|
||||
);
|
||||
|
||||
const newContent = coValue.newContentSince(msg);
|
||||
|
||||
if (newContent) {
|
||||
await this.trySendToPeer(peer, newContent);
|
||||
}
|
||||
return this.sendNewContentIncludingDependencies(msg.id, peer);
|
||||
}
|
||||
|
||||
handleUnsubscribe(_msg: DoneMessage) {
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"name": "jazz-browser-auth-local",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"jazz-browser": "^0.2.0",
|
||||
"jazz-browser": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
@@ -1,13 +1,13 @@
|
||||
{
|
||||
"name": "jazz-browser-media-images",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"image-blob-reduce": "^4.1.0",
|
||||
"jazz-browser": "^0.2.0",
|
||||
"jazz-browser": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
@@ -141,7 +141,8 @@ export function loadImage(
|
||||
const resState: {
|
||||
[res: `${number}x${number}`]:
|
||||
| { state: "queued" }
|
||||
| { state: "loading" }
|
||||
| { state: "waiting" }
|
||||
| { state: "loading"; doneOrFailed: Promise<void> }
|
||||
| { state: "loaded"; blobURL: string }
|
||||
| { state: "revoked" }
|
||||
| { state: "failed" }
|
||||
@@ -170,7 +171,8 @@ export function loadImage(
|
||||
const placeholderDataURL =
|
||||
imageDefinition.get("placeholderDataURL");
|
||||
|
||||
const resolutions = imageDefinition.keys()
|
||||
const resolutions = imageDefinition
|
||||
.keys()
|
||||
.filter(
|
||||
(key): key is `${number}x${number}` =>
|
||||
!!key.match(/\d+x\d+/)
|
||||
@@ -182,48 +184,126 @@ export function loadImage(
|
||||
});
|
||||
|
||||
const startLoading = async () => {
|
||||
|
||||
const notYetQueuedOrLoading = resolutions.filter(
|
||||
(res) => !resState[res]
|
||||
);
|
||||
);
|
||||
|
||||
console.log("Loading iteration", resolutions, resState, notYetQueuedOrLoading);
|
||||
// console.log(
|
||||
// "Loading iteration",
|
||||
// resolutions,
|
||||
// resState,
|
||||
// notYetQueuedOrLoading
|
||||
// );
|
||||
|
||||
for (const res of notYetQueuedOrLoading) {
|
||||
resState[res] = { state: "queued" };
|
||||
resState[res] = { state: "queued" };
|
||||
}
|
||||
|
||||
for (const res of notYetQueuedOrLoading) {
|
||||
if (stopped) return;
|
||||
resState[res] = { state: "loading" };
|
||||
resState[res] = { state: "waiting" };
|
||||
|
||||
const binaryStreamId = imageDefinition.get(res)!;
|
||||
console.log("Loading image res", imageID, res, binaryStreamId);
|
||||
// console.log(
|
||||
// "Loading image res",
|
||||
// imageID,
|
||||
// res,
|
||||
// binaryStreamId
|
||||
// );
|
||||
|
||||
const blob = await readBlobFromBinaryStream(
|
||||
binaryStreamId,
|
||||
localNode
|
||||
const binaryStream = await localNode.load(
|
||||
binaryStreamId
|
||||
);
|
||||
|
||||
if (stopped) return;
|
||||
if (!blob) {
|
||||
if (!binaryStream) {
|
||||
resState[res] = { state: "failed" };
|
||||
console.log("Loading image res failed", imageID, res, binaryStreamId);
|
||||
continue;
|
||||
console.error(
|
||||
"Loading image res failed",
|
||||
imageID,
|
||||
res,
|
||||
binaryStreamId
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const blobURL = URL.createObjectURL(blob);
|
||||
resState[res] = { state: "loaded", blobURL };
|
||||
await new Promise<void>((resolveFullyLoaded) => {
|
||||
const unsubFromStream = binaryStream.subscribe(
|
||||
async (_) => {
|
||||
if (stopped) return;
|
||||
const currentState = resState[res];
|
||||
if (currentState?.state === "loading") {
|
||||
await currentState.doneOrFailed;
|
||||
// console.log(
|
||||
// "Retrying image res after previous attempt",
|
||||
// imageID,
|
||||
// res,
|
||||
// binaryStreamId
|
||||
// );
|
||||
}
|
||||
if (resState[res]?.state === "loaded") {
|
||||
return;
|
||||
}
|
||||
|
||||
console.log("Loaded image res", imageID, res, binaryStreamId);
|
||||
const doneOrFailed = new Promise<void>(
|
||||
// eslint-disable-next-line no-async-promise-executor
|
||||
async (resolveDoneOrFailed) => {
|
||||
const blob =
|
||||
await readBlobFromBinaryStream(
|
||||
binaryStreamId,
|
||||
localNode
|
||||
);
|
||||
|
||||
progressiveCallback({
|
||||
originalSize,
|
||||
placeholderDataURL,
|
||||
highestResSrc: blobURL,
|
||||
if (stopped) return;
|
||||
if (!blob) {
|
||||
// console.log(
|
||||
// "Image res not available yet",
|
||||
// imageID,
|
||||
// res,
|
||||
// binaryStreamId
|
||||
// );
|
||||
resolveDoneOrFailed();
|
||||
return;
|
||||
}
|
||||
|
||||
const blobURL =
|
||||
URL.createObjectURL(blob);
|
||||
resState[res] = {
|
||||
state: "loaded",
|
||||
blobURL,
|
||||
};
|
||||
|
||||
// console.log(
|
||||
// "Loaded image res",
|
||||
// imageID,
|
||||
// res,
|
||||
// binaryStreamId
|
||||
// );
|
||||
|
||||
progressiveCallback({
|
||||
originalSize,
|
||||
placeholderDataURL,
|
||||
highestResSrc: blobURL,
|
||||
});
|
||||
|
||||
unsubFromStream();
|
||||
resolveDoneOrFailed();
|
||||
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, 0)
|
||||
);
|
||||
|
||||
resolveFullyLoaded();
|
||||
}
|
||||
);
|
||||
|
||||
resState[res] = {
|
||||
state: "loading",
|
||||
doneOrFailed,
|
||||
};
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "jazz-browser",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"jazz-storage-indexeddb": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"jazz-storage-indexeddb": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { BinaryCoStream, InviteSecret } from "cojson";
|
||||
import { BinaryCoStreamMeta } from "cojson";
|
||||
import { MAX_RECOMMENDED_TX_SIZE } from "cojson";
|
||||
import { cojsonReady } from "cojson";
|
||||
import {
|
||||
LocalNode,
|
||||
@@ -72,6 +73,10 @@ export async function createBrowserNode({
|
||||
node,
|
||||
done: () => {
|
||||
shouldTryToReconnect = false;
|
||||
console.log("Cleaning up node")
|
||||
for (const peer of Object.values(node.sync.peers)) {
|
||||
peer.outgoing.close().catch(e => console.error("Error while closing peer", e));
|
||||
}
|
||||
sessionDone?.();
|
||||
},
|
||||
};
|
||||
@@ -383,7 +388,7 @@ export async function createBinaryStreamFromBlob<
|
||||
fileName: blob instanceof File ? blob.name : undefined,
|
||||
});
|
||||
}) as C;// TODO: fix this
|
||||
const chunkSize = 256 * 1024;
|
||||
const chunkSize = MAX_RECOMMENDED_TX_SIZE;
|
||||
|
||||
for (let idx = 0; idx < data.length; idx += chunkSize) {
|
||||
stream = stream.edit((stream) => {
|
||||
@@ -419,15 +424,11 @@ export async function readBlobFromBinaryStream<
|
||||
return undefined;
|
||||
}
|
||||
|
||||
const chunks = stream.getBinaryChunks();
|
||||
const chunks = stream.getBinaryChunks(allowUnfinished);
|
||||
|
||||
if (!chunks) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
if (!allowUnfinished && !chunks.finished) {
|
||||
return undefined;
|
||||
}
|
||||
|
||||
return new Blob(chunks.chunks, { type: chunks.mimeType });
|
||||
}
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "jazz-react-auth-local",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"jazz-browser-auth-local": "^0.2.0",
|
||||
"jazz-react": "^0.2.0",
|
||||
"jazz-browser-auth-local": "^0.2.1",
|
||||
"jazz-react": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
{
|
||||
"name": "jazz-react-media-images",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"jazz-browser": "^0.2.0",
|
||||
"jazz-browser-media-images": "^0.2.0",
|
||||
"jazz-react": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"jazz-browser": "^0.2.1",
|
||||
"jazz-browser-media-images": "^0.2.1",
|
||||
"jazz-react": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "jazz-react",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"jazz-browser": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"jazz-browser": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -47,6 +47,7 @@ export function WithJazz({
|
||||
|
||||
useEffect(() => {
|
||||
let done: (() => void) | undefined = undefined;
|
||||
let stop = false;
|
||||
|
||||
(async () => {
|
||||
const nodeHandle = await createBrowserNode({
|
||||
@@ -57,6 +58,11 @@ export function WithJazz({
|
||||
undefined,
|
||||
});
|
||||
|
||||
if (stop) {
|
||||
nodeHandle.done();
|
||||
return;
|
||||
}
|
||||
|
||||
setNode(nodeHandle.node);
|
||||
|
||||
done = nodeHandle.done;
|
||||
@@ -65,6 +71,7 @@ export function WithJazz({
|
||||
});
|
||||
|
||||
return () => {
|
||||
stop = true;
|
||||
done && done();
|
||||
};
|
||||
}, [auth, syncAddress]);
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
{
|
||||
"name": "jazz-storage-indexeddb",
|
||||
"version": "0.2.0",
|
||||
"version": "0.2.1",
|
||||
"main": "dist/index.js",
|
||||
"types": "dist/index.d.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "^0.2.0",
|
||||
"cojson": "^0.2.1",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
||||
@@ -4,7 +4,9 @@ import {
|
||||
SyncMessage,
|
||||
Peer,
|
||||
CojsonInternalTypes,
|
||||
MAX_RECOMMENDED_TX_SIZE,
|
||||
} from "cojson";
|
||||
import { Signature } from "cojson/dist/crypto";
|
||||
import {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
@@ -24,6 +26,7 @@ type SessionRow = {
|
||||
sessionID: SessionID;
|
||||
lastIdx: number;
|
||||
lastSignature: CojsonInternalTypes.Signature;
|
||||
bytesSinceLastSignature?: number;
|
||||
};
|
||||
|
||||
type StoredSessionRow = SessionRow & { rowID: number };
|
||||
@@ -34,6 +37,12 @@ type TransactionRow = {
|
||||
tx: CojsonInternalTypes.Transaction;
|
||||
};
|
||||
|
||||
type SignatureAfterRow = {
|
||||
ses: number;
|
||||
idx: number;
|
||||
signature: CojsonInternalTypes.Signature;
|
||||
};
|
||||
|
||||
export class IDBStorage {
|
||||
db: IDBDatabase;
|
||||
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
|
||||
@@ -55,7 +64,7 @@ export class IDBStorage {
|
||||
done = result.done;
|
||||
|
||||
if (result.value) {
|
||||
this.handleSyncMessage(result.value);
|
||||
await this.handleSyncMessage(result.value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
@@ -88,42 +97,63 @@ export class IDBStorage {
|
||||
toLocalNode: WritableStream<SyncMessage>
|
||||
) {
|
||||
const dbPromise = new Promise<IDBDatabase>((resolve, reject) => {
|
||||
const request = indexedDB.open("jazz-storage", 1);
|
||||
const request = indexedDB.open("jazz-storage", 3);
|
||||
request.onerror = () => {
|
||||
reject(request.error);
|
||||
};
|
||||
request.onsuccess = () => {
|
||||
resolve(request.result);
|
||||
};
|
||||
request.onupgradeneeded = () => {
|
||||
request.onupgradeneeded = async (ev) => {
|
||||
const db = request.result;
|
||||
if (ev.oldVersion === 0) {
|
||||
const coValues = db.createObjectStore("coValues", {
|
||||
autoIncrement: true,
|
||||
keyPath: "rowID",
|
||||
});
|
||||
|
||||
const coValues = db.createObjectStore("coValues", {
|
||||
autoIncrement: true,
|
||||
keyPath: "rowID",
|
||||
});
|
||||
|
||||
coValues.createIndex("coValuesById", "id", {
|
||||
unique: true,
|
||||
});
|
||||
|
||||
const sessions = db.createObjectStore("sessions", {
|
||||
autoIncrement: true,
|
||||
keyPath: "rowID",
|
||||
});
|
||||
|
||||
sessions.createIndex("sessionsByCoValue", "coValue");
|
||||
sessions.createIndex(
|
||||
"uniqueSessions",
|
||||
["coValue", "sessionID"],
|
||||
{
|
||||
coValues.createIndex("coValuesById", "id", {
|
||||
unique: true,
|
||||
}
|
||||
);
|
||||
});
|
||||
|
||||
db.createObjectStore("transactions", {
|
||||
keyPath: ["ses", "idx"],
|
||||
});
|
||||
const sessions = db.createObjectStore("sessions", {
|
||||
autoIncrement: true,
|
||||
keyPath: "rowID",
|
||||
});
|
||||
|
||||
sessions.createIndex("sessionsByCoValue", "coValue");
|
||||
sessions.createIndex(
|
||||
"uniqueSessions",
|
||||
["coValue", "sessionID"],
|
||||
{
|
||||
unique: true,
|
||||
}
|
||||
);
|
||||
|
||||
db.createObjectStore("transactions", {
|
||||
keyPath: ["ses", "idx"],
|
||||
});
|
||||
}
|
||||
if (ev.oldVersion <= 1) {
|
||||
db.createObjectStore("signatureAfter", {
|
||||
keyPath: ["ses", "idx"],
|
||||
});
|
||||
}
|
||||
if (ev.oldVersion !== 0 && ev.oldVersion === 2) {
|
||||
// fix embarrassing off-by-one error for transaction indices
|
||||
console.log("Migration: fixing off-by-one error");
|
||||
const transaction = (ev.target as unknown as {transaction: IDBTransaction}).transaction;
|
||||
|
||||
const txsStore = transaction.objectStore("transactions");
|
||||
const txs = await promised(txsStore.getAll());
|
||||
|
||||
for (const tx of txs) {
|
||||
await promised(txsStore.delete([tx.ses, tx.idx]));
|
||||
tx.idx -= 1;
|
||||
await promised(txsStore.add(tx));
|
||||
}
|
||||
console.log("Migration: fixing off-by-one error - done");
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
@@ -153,10 +183,12 @@ export class IDBStorage {
|
||||
coValues,
|
||||
sessions,
|
||||
transactions,
|
||||
signatureAfter,
|
||||
}: {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
},
|
||||
asDependencyOf?: CojsonInternalTypes.RawCoID
|
||||
) {
|
||||
@@ -176,12 +208,14 @@ export class IDBStorage {
|
||||
sessions: {},
|
||||
};
|
||||
|
||||
const newContent: CojsonInternalTypes.NewContentMessage = {
|
||||
action: "content",
|
||||
id: theirKnown.id,
|
||||
header: theirKnown.header ? undefined : coValueRow?.header,
|
||||
new: {},
|
||||
};
|
||||
const newContentPieces: CojsonInternalTypes.NewContentMessage[] = [
|
||||
{
|
||||
action: "content",
|
||||
id: theirKnown.id,
|
||||
header: theirKnown.header ? undefined : coValueRow?.header,
|
||||
new: {},
|
||||
},
|
||||
];
|
||||
|
||||
for (const sessionRow of allOurSessions) {
|
||||
ourKnown.sessions[sessionRow.sessionID] = sessionRow.lastIdx;
|
||||
@@ -193,6 +227,21 @@ export class IDBStorage {
|
||||
const firstNewTxIdx =
|
||||
theirKnown.sessions[sessionRow.sessionID] || 0;
|
||||
|
||||
const signaturesAndIdxs = await promised<SignatureAfterRow[]>(
|
||||
signatureAfter.getAll(
|
||||
IDBKeyRange.bound(
|
||||
[sessionRow.rowID, firstNewTxIdx],
|
||||
[sessionRow.rowID, Infinity]
|
||||
)
|
||||
)
|
||||
);
|
||||
|
||||
console.log(
|
||||
theirKnown.id,
|
||||
"signaturesAndIdxs",
|
||||
JSON.stringify(signaturesAndIdxs)
|
||||
);
|
||||
|
||||
const newTxInSession = await promised<TransactionRow[]>(
|
||||
transactions.getAll(
|
||||
IDBKeyRange.bound(
|
||||
@@ -202,38 +251,83 @@ export class IDBStorage {
|
||||
)
|
||||
);
|
||||
|
||||
newContent.new[sessionRow.sessionID] = {
|
||||
after: firstNewTxIdx,
|
||||
lastSignature: sessionRow.lastSignature,
|
||||
newTransactions: newTxInSession.map((row) => row.tx),
|
||||
};
|
||||
let idx = firstNewTxIdx;
|
||||
|
||||
console.log(
|
||||
theirKnown.id,
|
||||
"newTxInSession",
|
||||
newTxInSession.length
|
||||
);
|
||||
|
||||
for (const tx of newTxInSession) {
|
||||
let sessionEntry =
|
||||
newContentPieces[newContentPieces.length - 1]!.new[
|
||||
sessionRow.sessionID
|
||||
];
|
||||
if (!sessionEntry) {
|
||||
sessionEntry = {
|
||||
after: idx,
|
||||
lastSignature: "WILL_BE_REPLACED" as Signature,
|
||||
newTransactions: [],
|
||||
};
|
||||
newContentPieces[newContentPieces.length - 1]!.new[
|
||||
sessionRow.sessionID
|
||||
] = sessionEntry;
|
||||
}
|
||||
|
||||
sessionEntry.newTransactions.push(tx.tx);
|
||||
|
||||
if (
|
||||
signaturesAndIdxs[0] &&
|
||||
idx === signaturesAndIdxs[0].idx
|
||||
) {
|
||||
sessionEntry.lastSignature =
|
||||
signaturesAndIdxs[0].signature;
|
||||
signaturesAndIdxs.shift();
|
||||
newContentPieces.push({
|
||||
action: "content",
|
||||
id: theirKnown.id,
|
||||
new: {},
|
||||
});
|
||||
} else if (
|
||||
idx ===
|
||||
firstNewTxIdx + newTxInSession.length - 1
|
||||
) {
|
||||
sessionEntry.lastSignature = sessionRow.lastSignature;
|
||||
}
|
||||
idx += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const dependedOnCoValues =
|
||||
coValueRow?.header.ruleset.type === "group"
|
||||
? Object.values(newContent.new).flatMap((sessionEntry) =>
|
||||
sessionEntry.newTransactions.flatMap((tx) => {
|
||||
if (tx.privacy !== "trusting") return [];
|
||||
// TODO: avoid parse here?
|
||||
return cojsonInternals
|
||||
.parseJSON(tx.changes)
|
||||
.map(
|
||||
(change) =>
|
||||
change &&
|
||||
typeof change === "object" &&
|
||||
"op" in change &&
|
||||
change.op === "set" &&
|
||||
"key" in change &&
|
||||
change.key
|
||||
)
|
||||
.filter(
|
||||
(key): key is CojsonInternalTypes.RawCoID =>
|
||||
typeof key === "string" &&
|
||||
key.startsWith("co_")
|
||||
);
|
||||
})
|
||||
)
|
||||
? newContentPieces
|
||||
.flatMap((piece) => Object.values(piece.new))
|
||||
.flatMap((sessionEntry) =>
|
||||
sessionEntry.newTransactions.flatMap((tx) => {
|
||||
if (tx.privacy !== "trusting") return [];
|
||||
// TODO: avoid parse here?
|
||||
return cojsonInternals
|
||||
.parseJSON(tx.changes)
|
||||
.map(
|
||||
(change) =>
|
||||
change &&
|
||||
typeof change === "object" &&
|
||||
"op" in change &&
|
||||
change.op === "set" &&
|
||||
"key" in change &&
|
||||
change.key
|
||||
)
|
||||
.filter(
|
||||
(
|
||||
key
|
||||
): key is CojsonInternalTypes.RawCoID =>
|
||||
typeof key === "string" &&
|
||||
key.startsWith("co_")
|
||||
);
|
||||
})
|
||||
)
|
||||
: coValueRow?.header.ruleset.type === "ownedByGroup"
|
||||
? [coValueRow?.header.ruleset.group]
|
||||
: [];
|
||||
@@ -241,7 +335,7 @@ export class IDBStorage {
|
||||
for (const dependedOnCoValue of dependedOnCoValues) {
|
||||
await this.sendNewContentAfter(
|
||||
{ id: dependedOnCoValue, header: false, sessions: {} },
|
||||
{ coValues, sessions, transactions },
|
||||
{ coValues, sessions, transactions, signatureAfter },
|
||||
asDependencyOf || theirKnown.id
|
||||
);
|
||||
}
|
||||
@@ -252,8 +346,15 @@ export class IDBStorage {
|
||||
asDependencyOf,
|
||||
});
|
||||
|
||||
if (newContent.header || Object.keys(newContent.new).length > 0) {
|
||||
await this.toLocalNode.write(newContent);
|
||||
const nonEmptyNewContentPieces = newContentPieces.filter(
|
||||
(piece) => piece.header || Object.keys(piece.new).length > 0
|
||||
);
|
||||
|
||||
console.log(theirKnown.id, nonEmptyNewContentPieces);
|
||||
|
||||
for (const piece of nonEmptyNewContentPieces) {
|
||||
await this.toLocalNode.write(piece);
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -262,7 +363,7 @@ export class IDBStorage {
|
||||
}
|
||||
|
||||
async handleContent(msg: CojsonInternalTypes.NewContentMessage) {
|
||||
const { coValues, sessions, transactions } =
|
||||
const { coValues, sessions, transactions, signatureAfter } =
|
||||
this.inTransaction("readwrite");
|
||||
|
||||
let storedCoValueRowID = (
|
||||
@@ -333,18 +434,39 @@ export class IDBStorage {
|
||||
const actuallyNewOffset =
|
||||
(sessionRow?.lastIdx || 0) -
|
||||
(msg.new[sessionID]?.after || 0);
|
||||
|
||||
const actuallyNewTransactions =
|
||||
newTransactions.slice(actuallyNewOffset);
|
||||
|
||||
let newBytesSinceLastSignature =
|
||||
(sessionRow?.bytesSinceLastSignature || 0) +
|
||||
actuallyNewTransactions.reduce(
|
||||
(sum, tx) =>
|
||||
sum +
|
||||
(tx.privacy === "private"
|
||||
? tx.encryptedChanges.length
|
||||
: tx.changes.length),
|
||||
0
|
||||
);
|
||||
|
||||
const newLastIdx =
|
||||
(sessionRow?.lastIdx || 0) + actuallyNewTransactions.length;
|
||||
|
||||
let shouldWriteSignature = false;
|
||||
|
||||
if (newBytesSinceLastSignature > MAX_RECOMMENDED_TX_SIZE) {
|
||||
shouldWriteSignature = true;
|
||||
newBytesSinceLastSignature = 0;
|
||||
}
|
||||
|
||||
let nextIdx = sessionRow?.lastIdx || 0;
|
||||
|
||||
const sessionUpdate = {
|
||||
coValue: storedCoValueRowID,
|
||||
sessionID: sessionID,
|
||||
lastIdx:
|
||||
(sessionRow?.lastIdx || 0) +
|
||||
actuallyNewTransactions.length,
|
||||
lastIdx: newLastIdx,
|
||||
lastSignature: msg.new[sessionID]!.lastSignature,
|
||||
bytesSinceLastSignature: newBytesSinceLastSignature,
|
||||
};
|
||||
|
||||
const sessionRowID = (await promised(
|
||||
@@ -358,8 +480,18 @@ export class IDBStorage {
|
||||
)
|
||||
)) as number;
|
||||
|
||||
if (shouldWriteSignature) {
|
||||
await promised(
|
||||
signatureAfter.put({
|
||||
ses: sessionRowID,
|
||||
// TODO: newLastIdx is a misnomer, it's actually more like nextIdx or length
|
||||
idx: newLastIdx - 1,
|
||||
signature: msg.new[sessionID]!.lastSignature,
|
||||
} satisfies SignatureAfterRow)
|
||||
);
|
||||
}
|
||||
|
||||
for (const newTransaction of actuallyNewTransactions) {
|
||||
nextIdx++;
|
||||
await promised(
|
||||
transactions.add({
|
||||
ses: sessionRowID,
|
||||
@@ -367,6 +499,7 @@ export class IDBStorage {
|
||||
tx: newTransaction,
|
||||
} satisfies TransactionRow)
|
||||
);
|
||||
nextIdx++;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -390,9 +523,10 @@ export class IDBStorage {
|
||||
coValues: IDBObjectStore;
|
||||
sessions: IDBObjectStore;
|
||||
transactions: IDBObjectStore;
|
||||
signatureAfter: IDBObjectStore;
|
||||
} {
|
||||
const tx = this.db.transaction(
|
||||
["coValues", "sessions", "transactions"],
|
||||
["coValues", "sessions", "transactions", "signatureAfter"],
|
||||
mode
|
||||
);
|
||||
|
||||
@@ -409,8 +543,9 @@ export class IDBStorage {
|
||||
const coValues = tx.objectStore("coValues");
|
||||
const sessions = tx.objectStore("sessions");
|
||||
const transactions = tx.objectStore("transactions");
|
||||
const signatureAfter = tx.objectStore("signatureAfter");
|
||||
|
||||
return { coValues, sessions, transactions };
|
||||
return { coValues, sessions, transactions, signatureAfter };
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user