Compare commits

..

8 Commits

Author SHA1 Message Date
Anselm
a862cb8819 Release 2024-06-25 15:13:55 +01:00
Anselm Eickhoff
4246aed7db Merge pull request #214 from gardencmp/effect-streams-for-peers
Effect streams for peers
2024-06-25 15:12:56 +01:00
Anselm
41554e0e0b Release 2024-06-25 13:57:24 +01:00
Anselm
93c4d8155e Fix CoList.toJSON() 2024-06-25 13:56:20 +01:00
Anselm
33db0fd654 Keep old hompage lockfile (unrelated) 2024-06-13 22:17:00 +01:00
Anselm
478ded93de Merge branch 'main' into effect-streams-for-peers 2024-06-13 22:12:33 +01:00
Anselm
89ad1fb79d Fix remaining tests by introducing even more special cases 2024-06-13 22:11:25 +01:00
Anselm
a35353c987 Use Effect streams 2024-06-13 15:58:21 +01:00
54 changed files with 7051 additions and 5890 deletions

View File

@@ -12,7 +12,7 @@
"jazz-react",
"jazz-nodejs",
"jazz-run",
"cojson-transport-nodejs-ws",
"cojson-transport-ws",
"cojson-storage-indexeddb",
"cojson-storage-sqlite"
]

View File

@@ -1,5 +1,22 @@
# jazz-example-chat
## 0.0.61
### Patch Changes
- Updated dependencies
- cojson@0.7.14
- jazz-tools@0.7.14
- jazz-react@0.7.14
## 0.0.60
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
- jazz-react@0.7.13
## 0.0.59
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-chat",
"private": true,
"version": "0.0.59",
"version": "0.0.61",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,5 +1,23 @@
# jazz-example-pets
## 0.0.79
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.14
- jazz-react@0.7.14
- jazz-browser-media-images@0.7.14
## 0.0.78
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
- jazz-browser-media-images@0.7.13
- jazz-react@0.7.13
## 0.0.77
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-pets",
"private": true,
"version": "0.0.77",
"version": "0.0.79",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,5 +1,21 @@
# jazz-example-todo
## 0.0.78
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.14
- jazz-react@0.7.14
## 0.0.77
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
- jazz-react@0.7.13
## 0.0.76
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-todo",
"private": true,
"version": "0.0.76",
"version": "0.0.78",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,5 +1,12 @@
# cojson-storage-indexeddb
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
## 0.7.11
### Patch Changes

View File

@@ -1,14 +1,14 @@
{
"name": "cojson-storage-indexeddb",
"version": "0.7.11",
"version": "0.7.14",
"main": "dist/index.js",
"type": "module",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"typescript": "^5.1.6",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
"effect": "^3.1.5",
"typescript": "^5.1.6"
},
"devDependencies": {
"@vitest/browser": "^0.34.1",

View File

@@ -6,14 +6,11 @@ import {
CojsonInternalTypes,
MAX_RECOMMENDED_TX_SIZE,
AccountID,
IncomingSyncStream,
OutgoingSyncQueue,
} from "cojson";
import {
ReadableStream,
WritableStream,
ReadableStreamDefaultReader,
WritableStreamDefaultWriter,
} from "isomorphic-streams";
import { SyncPromise } from "./syncPromises.js";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -46,39 +43,35 @@ type SignatureAfterRow = {
export class IDBStorage {
db: IDBDatabase;
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
toLocalNode: OutgoingSyncQueue;
constructor(
db: IDBDatabase,
fromLocalNode: ReadableStream<SyncMessage>,
toLocalNode: WritableStream<SyncMessage>,
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
this.db = db;
this.fromLocalNode = fromLocalNode.getReader();
this.toLocalNode = toLocalNode.getWriter();
this.toLocalNode = toLocalNode;
void (async () => {
let done = false;
while (!done) {
const result = await this.fromLocalNode.read();
done = result.done;
if (result.value) {
// console.log(
// "IDB: handling msg",
// result.value.id,
// result.value.action
// );
await this.handleSyncMessage(result.value);
// console.log(
// "IDB: handled msg",
// result.value.id,
// result.value.action
// );
}
}
})();
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
static async asPeer(
@@ -89,23 +82,30 @@ export class IDBStorage {
localNodeName: "local",
},
): Promise<Peer> {
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
yield* Effect.promise(() =>
IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
return { ...storageAsPeer, priority: 100 };
}),
);
}
static async open(
fromLocalNode: ReadableStream<SyncMessage>,
toLocalNode: WritableStream<SyncMessage>,
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
const dbPromise = new Promise<IDBDatabase>((resolve, reject) => {
const request = indexedDB.open("jazz-storage", 4);
@@ -150,23 +150,6 @@ export class IDBStorage {
keyPath: ["ses", "idx"],
});
}
// if (ev.oldVersion !== 0 && ev.oldVersion <= 3) {
// // fix embarrassing off-by-one error for transaction indices
// console.log("Migration: fixing off-by-one error");
// const transaction = (
// ev.target as unknown as { transaction: IDBTransaction }
// ).transaction;
// const txsStore = transaction.objectStore("transactions");
// const txs = await promised(txsStore.getAll());
// for (const tx of txs) {
// await promised(txsStore.delete([tx.ses, tx.idx]));
// tx.idx -= 1;
// await promised(txsStore.add(tx));
// }
// console.log("Migration: fixing off-by-one error - done");
// }
};
});
@@ -409,29 +392,35 @@ export class IDBStorage {
),
).then(() => {
// we're done with IndexedDB stuff here so can use native Promises again
setTimeout(async () => {
await this.toLocalNode.write({
action: "known",
...ourKnown,
asDependencyOf,
});
setTimeout(() =>
Effect.runPromise(
Effect.gen(this, function* () {
yield* Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
asDependencyOf,
});
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new).length > 0,
);
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new)
.length > 0,
);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await this.toLocalNode.write(piece);
await new Promise((resolve) =>
setTimeout(resolve, 0),
);
}
}, 0);
for (const piece of nonEmptyNewContentPieces) {
yield* Queue.offer(
this.toLocalNode,
piece,
);
yield* Effect.yieldNow();
}
}),
),
);
return Promise.resolve();
});
@@ -456,13 +445,15 @@ export class IDBStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
void this.toLocalNode.write({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
});
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
throw new Error("Expected to be sent header first");
}
@@ -524,11 +515,13 @@ export class IDBStorage {
),
).then(() => {
if (invalidAssumptions) {
void this.toLocalNode.write({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
});
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
}
});
});

View File

@@ -1,5 +1,12 @@
# cojson-storage-sqlite
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
## 0.7.11
### Patch Changes

View File

@@ -1,15 +1,15 @@
{
"name": "cojson-storage-sqlite",
"type": "module",
"version": "0.7.11",
"version": "0.7.14",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"better-sqlite3": "^8.5.2",
"cojson": "workspace:*",
"typescript": "^5.1.6",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
"effect": "^3.1.5",
"typescript": "^5.1.6"
},
"devDependencies": {
"@types/better-sqlite3": "^7.6.4"

View File

@@ -6,15 +6,12 @@ import {
SessionID,
MAX_RECOMMENDED_TX_SIZE,
AccountID,
IncomingSyncStream,
OutgoingSyncQueue,
} from "cojson";
import {
ReadableStream,
WritableStream,
ReadableStreamDefaultReader,
WritableStreamDefaultWriter,
} from "isomorphic-streams";
import Database, { Database as DatabaseT } from "better-sqlite3";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -46,30 +43,36 @@ type SignatureAfterRow = {
};
export class SQLiteStorage {
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
toLocalNode: OutgoingSyncQueue;
db: DatabaseT;
constructor(
db: DatabaseT,
fromLocalNode: ReadableStream<SyncMessage>,
toLocalNode: WritableStream<SyncMessage>,
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
this.db = db;
this.fromLocalNode = fromLocalNode.getReader();
this.toLocalNode = toLocalNode.getWriter();
this.toLocalNode = toLocalNode;
void (async () => {
let done = false;
while (!done) {
const result = await this.fromLocalNode.read();
done = result.done;
if (result.value) {
await this.handleSyncMessage(result.value);
}
}
})();
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
static async asPeer({
@@ -81,25 +84,32 @@ export class SQLiteStorage {
trace?: boolean;
localNodeName?: string;
}): Promise<Peer> {
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
yield* Effect.promise(() =>
SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
return { ...storageAsPeer, priority: 100 };
}),
);
}
static async open(
filename: string,
fromLocalNode: ReadableStream<SyncMessage>,
toLocalNode: WritableStream<SyncMessage>,
fromLocalNode: IncomingSyncStream,
toLocalNode: OutgoingSyncQueue,
) {
const db = Database(filename);
db.pragma("journal_mode = WAL");
@@ -431,11 +441,13 @@ export class SQLiteStorage {
);
}
await this.toLocalNode.write({
action: "known",
...ourKnown,
asDependencyOf,
});
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
asDependencyOf,
}),
);
const nonEmptyNewContentPieces = newContentPieces.filter(
(piece) => piece.header || Object.keys(piece.new).length > 0,
@@ -444,7 +456,7 @@ export class SQLiteStorage {
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await this.toLocalNode.write(piece);
await Effect.runPromise(Queue.offer(this.toLocalNode, piece));
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
@@ -466,13 +478,15 @@ export class SQLiteStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
await this.toLocalNode.write({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
});
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
return;
}
@@ -604,11 +618,13 @@ export class SQLiteStorage {
})();
if (invalidAssumptions) {
await this.toLocalNode.write({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
});
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
}
}

View File

@@ -1,94 +0,0 @@
import { WebSocket } from "ws";
import { WritableStream, ReadableStream } from "isomorphic-streams";
export function websocketReadableStream<T>(ws: WebSocket) {
ws.binaryType = "arraybuffer";
return new ReadableStream<T>({
start(controller) {
ws.addEventListener("message", (event) => {
if (typeof event.data !== "string")
return console.warn(
"Got non-string message from client",
event.data,
);
const msg = JSON.parse(event.data);
if (msg.type === "ping") {
// console.debug(
// "Got ping from",
// msg.dc,
// "latency",
// Date.now() - msg.time,
// "ms"
// );
return;
}
controller.enqueue(msg);
});
ws.addEventListener("close", () => {
try {
controller.close();
} catch (ignore) {
// will throw if already closed, with no way to check before-hand
}
});
ws.addEventListener("error", () =>
controller.error(new Error("The WebSocket errored!")),
);
},
cancel() {
ws.close();
},
});
}
export function websocketWritableStream<T>(ws: WebSocket) {
return new WritableStream<T>({
start(controller) {
ws.addEventListener("close", () =>
controller.error(
new Error("The WebSocket closed unexpectedly!"),
),
);
ws.addEventListener("error", () =>
controller.error(new Error("The WebSocket errored!")),
);
if (ws.readyState === WebSocket.OPEN) {
return;
}
return new Promise((resolve) =>
ws.addEventListener("open", resolve, { once: true }),
);
},
write(chunk) {
ws.send(JSON.stringify(chunk));
// Return immediately, since the web socket gives us no easy way to tell
// when the write completes.
},
close() {
return closeWS(1000);
},
abort(reason) {
return closeWS(4000, reason && reason.message);
},
});
function closeWS(code: number, reasonString?: string) {
return new Promise<void>((resolve, reject) => {
ws.onclose = (e) => {
if (e.wasClean) {
resolve();
} else {
reject(new Error("The connection was not closed cleanly"));
}
};
ws.close(code, reasonString);
});
}
}

View File

@@ -1,5 +1,12 @@
# cojson-transport-nodejs-ws
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
## 0.7.11
### Patch Changes

View File

@@ -1,15 +1,14 @@
{
"name": "cojson-transport-nodejs-ws",
"name": "cojson-transport-ws",
"type": "module",
"version": "0.7.11",
"version": "0.7.14",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"typescript": "^5.1.6",
"ws": "^8.14.2",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
"effect": "^3.1.5",
"typescript": "^5.1.6"
},
"scripts": {
"dev": "tsc --watch --sourceMap --outDir dist",

View File

@@ -0,0 +1,108 @@
import { DisconnectedError, Peer, PingTimeoutError, SyncMessage } from "cojson";
import { Either, Stream, Queue, Effect, Exit } from "effect";
interface AnyWebSocket {
addEventListener(
type: "close",
listener: (event: { code: number; reason: string }) => void,
): void;
addEventListener(
type: "message",
listener: (event: { data: string | unknown }) => void,
): void;
addEventListener(type: "open", listener: () => void): void;
close(): void;
send(data: string): void;
}
export function createWebSocketPeer(options: {
id: string;
websocket: AnyWebSocket;
role: Peer["role"];
}): Effect.Effect<Peer> {
return Effect.gen(function* () {
const ws = options.websocket;
const incoming =
yield* Queue.unbounded<
Either.Either<SyncMessage, DisconnectedError | PingTimeoutError>
>();
const outgoing = yield* Queue.unbounded<SyncMessage>();
ws.addEventListener("close", (event) => {
void Effect.runPromiseExit(
Queue.offer(
incoming,
Either.left(
new DisconnectedError(`${event.code}: ${event.reason}`),
),
),
).then((e) => {
if (Exit.isFailure(e) && !Exit.isInterrupted(e)) {
console.warn("Failed closing ws", e);
}
});
});
let pingTimeout: ReturnType<typeof setTimeout> | undefined;
ws.addEventListener("message", (event) => {
const msg = JSON.parse(event.data as string);
if (pingTimeout) {
clearTimeout(pingTimeout);
}
pingTimeout = setTimeout(() => {
console.debug("Ping timeout");
void Effect.runPromise(
Queue.offer(incoming, Either.left(new PingTimeoutError())),
);
try {
ws.close();
} catch (e) {
console.error(
"Error while trying to close ws on ping timeout",
e,
);
}
}, 2500);
if (msg.type === "ping") {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(globalThis as any).jazzPings =
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(globalThis as any).jazzPings || [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(globalThis as any).jazzPings.push({
received: Date.now(),
sent: msg.time,
dc: msg.dc,
});
return;
} else {
void Effect.runPromise(
Queue.offer(incoming, Either.right(msg)),
);
}
});
ws.addEventListener("open", () => {
void Stream.fromQueue(outgoing).pipe(
Stream.runForEach((msg) =>
Effect.sync(() => ws.send(JSON.stringify(msg))),
),
Effect.runPromise,
);
});
return {
id: options.id,
incoming: Stream.fromQueue(incoming, { shutdown: true }).pipe(
Stream.mapEffect((either) => either),
),
outgoing,
role: options.role,
};
});
}

View File

@@ -1,5 +1,11 @@
# cojson
## 0.7.14
### Patch Changes
- Use Effect Queues and Streams instead of custom queue implementation
## 0.7.11
### Patch Changes

View File

@@ -5,7 +5,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.11",
"version": "0.7.14",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",
@@ -23,8 +23,7 @@
"@noble/hashes": "^1.4.0",
"@scure/base": "^1.1.1",
"effect": "^3.1.5",
"hash-wasm": "^4.9.0",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
"hash-wasm": "^4.9.0"
},
"scripts": {
"dev": "tsc --watch --sourceMap --outDir dist",

View File

@@ -41,7 +41,13 @@ import type {
BinaryCoStreamMeta,
} from "./coValues/coStream.js";
import type { JsonValue } from "./jsonValue.js";
import type { SyncMessage, Peer } from "./sync.js";
import type {
SyncMessage,
Peer,
IncomingSyncStream,
OutgoingSyncQueue,
} from "./sync.js";
import { DisconnectedError, PingTimeoutError } from "./sync.js";
import type { AgentSecret } from "./crypto/crypto.js";
import type {
AccountID,
@@ -117,9 +123,19 @@ export {
SyncMessage,
isRawCoID,
LSMStorage,
DisconnectedError,
PingTimeoutError,
};
export type { Value, FileSystem, FSErr, BlockFilename, WalFilename };
export type {
Value,
FileSystem,
FSErr,
BlockFilename,
WalFilename,
IncomingSyncStream,
OutgoingSyncQueue,
};
// eslint-disable-next-line @typescript-eslint/no-namespace
export namespace CojsonInternalTypes {

View File

@@ -1,18 +1,13 @@
import {
ReadableStream,
WritableStream,
ReadableStreamDefaultReader,
WritableStreamDefaultWriter,
} from "isomorphic-streams";
import { Effect, Either, SynchronizedRef } from "effect";
import { Effect, Either, Queue, Stream, SynchronizedRef } from "effect";
import { RawCoID } from "../ids.js";
import { CoValueHeader, Transaction } from "../coValueCore.js";
import { Signature } from "../crypto/crypto.js";
import {
CoValueKnownState,
IncomingSyncStream,
NewContentMessage,
OutgoingSyncQueue,
Peer,
SyncMessage,
} from "../sync.js";
import { CoID, RawCoValue } from "../index.js";
import { connectedPeers } from "../streamUtils.js";
@@ -47,9 +42,6 @@ export type CoValueChunk = {
};
export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
fs: FS;
currentWal: SynchronizedRef.SynchronizedRef<WH | undefined>;
coValues: SynchronizedRef.SynchronizedRef<{
[id: RawCoID]: CoValueChunk | undefined;
@@ -61,44 +53,28 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
>();
constructor(
fs: FS,
fromLocalNode: ReadableStream<SyncMessage>,
toLocalNode: WritableStream<SyncMessage>,
public fs: FS,
public fromLocalNode: IncomingSyncStream,
public toLocalNode: OutgoingSyncQueue,
) {
this.fs = fs;
this.fromLocalNode = fromLocalNode.getReader();
this.toLocalNode = toLocalNode.getWriter();
this.coValues = SynchronizedRef.unsafeMake({});
this.currentWal = SynchronizedRef.unsafeMake<WH | undefined>(undefined);
void Effect.runPromise(
Effect.gen(this, function* () {
let done = false;
while (!done) {
const result = yield* Effect.promise(() =>
this.fromLocalNode.read(),
);
done = result.done;
if (result.value) {
if (result.value.action === "done") {
continue;
}
if (result.value.action === "content") {
yield* this.handleNewContent(result.value);
} else {
yield* this.sendNewContent(
result.value.id,
result.value,
undefined,
);
}
void this.fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.gen(this, function* () {
if (msg.action === "done") {
return;
}
}
return;
}),
if (msg.action === "content") {
yield* this.handleNewContent(msg);
} else {
yield* this.sendNewContent(msg.id, msg, undefined);
}
}),
),
Effect.runPromise,
);
setTimeout(() => this.compact(), 20000);
@@ -132,15 +108,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
}
if (!coValue) {
yield* Effect.promise(() =>
this.toLocalNode.write({
id: id,
action: "known",
header: false,
sessions: {},
asDependencyOf,
}),
);
yield* Queue.offer(this.toLocalNode, {
id: id,
action: "known",
header: false,
sessions: {},
asDependencyOf,
});
return coValues;
}
@@ -195,17 +169,15 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
const ourKnown: CoValueKnownState = chunkToKnownState(id, coValue);
yield* Effect.promise(() =>
this.toLocalNode.write({
action: "known",
...ourKnown,
asDependencyOf,
}),
);
yield* Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
asDependencyOf,
});
for (const message of newContentMessages) {
if (Object.keys(message.new).length === 0) continue;
yield* Effect.promise(() => this.toLocalNode.write(message));
yield* Queue.offer(this.toLocalNode, message);
}
return { ...coValues, [id]: coValue };
@@ -452,7 +424,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
setTimeout(() => this.compact(), 5000);
}
static asPeer<WH, RH, FS extends FileSystem<WH, RH>>({
static async asPeer<WH, RH, FS extends FileSystem<WH, RH>>({
fs,
trace,
localNodeName = "local",
@@ -460,15 +432,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
fs: FS;
trace?: boolean;
localNodeName?: string;
}): Peer {
const [localNodeAsPeer, storageAsPeer] = connectedPeers(
localNodeName,
"storage",
{
}): Promise<Peer> {
const [localNodeAsPeer, storageAsPeer] = await Effect.runPromise(
connectedPeers(localNodeName, "storage", {
peer1role: "client",
peer2role: "server",
trace,
},
}),
);
new LSMStorage(fs, localNodeAsPeer.incoming, localNodeAsPeer.outgoing);

View File

@@ -1,8 +1,4 @@
import {
ReadableStream,
TransformStream,
WritableStream,
} from "isomorphic-streams";
import { Console, Effect, Queue, Stream } from "effect";
import { Peer, PeerID, SyncMessage } from "./sync.js";
export function connectedPeers(
@@ -17,160 +13,54 @@ export function connectedPeers(
peer1role?: Peer["role"];
peer2role?: Peer["role"];
} = {},
): [Peer, Peer] {
const [inRx1, inTx1] = newStreamPair<SyncMessage>(peer1id + "_in");
const [outRx1, outTx1] = newStreamPair<SyncMessage>(peer1id + "_out");
): Effect.Effect<[Peer, Peer]> {
return Effect.gen(function* () {
const [from1to2Rx, from1to2Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
);
const [from2to1Rx, from2to1Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
);
const [inRx2, inTx2] = newStreamPair<SyncMessage>(peer2id + "_in");
const [outRx2, outTx2] = newStreamPair<SyncMessage>(peer2id + "_out");
const peer2AsPeer: Peer = {
id: peer2id,
incoming: from2to1Rx,
outgoing: from1to2Tx,
role: peer2role,
};
void outRx2
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void },
) {
trace &&
console.debug(
`${peer2id} -> ${peer1id}`,
JSON.stringify(
chunk,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
);
controller.enqueue(chunk);
},
}),
)
.pipeTo(inTx1);
const peer1AsPeer: Peer = {
id: peer1id,
incoming: from1to2Rx,
outgoing: from2to1Tx,
role: peer1role,
};
void outRx1
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void },
) {
trace &&
console.debug(
`${peer1id} -> ${peer2id}`,
JSON.stringify(
chunk,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
);
controller.enqueue(chunk);
},
}),
)
.pipeTo(inTx2);
const peer2AsPeer: Peer = {
id: peer2id,
incoming: inRx1,
outgoing: outTx1,
role: peer2role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: inRx2,
outgoing: outTx2,
role: peer1role,
};
return [peer1AsPeer, peer2AsPeer];
return [peer1AsPeer, peer2AsPeer];
});
}
export function newStreamPair<T>(
pairName?: string,
): [ReadableStream<T>, WritableStream<T>] {
let queueLength = 0;
let readerClosed = false;
export function newQueuePair(
options: { traceAs?: string } = {},
): Effect.Effect<[Stream.Stream<SyncMessage>, Queue.Enqueue<SyncMessage>]> {
return Effect.gen(function* () {
const queue = yield* Queue.unbounded<SyncMessage>();
let resolveEnqueue: (enqueue: (item: T) => void) => void;
const enqueuePromise = new Promise<(item: T) => void>((resolve) => {
resolveEnqueue = resolve;
});
let resolveClose: (close: () => void) => void;
const closePromise = new Promise<() => void>((resolve) => {
resolveClose = resolve;
});
let queueWasOverflowing = false;
function maybeReportQueueLength() {
if (queueLength >= 100) {
queueWasOverflowing = true;
if (queueLength % 100 === 0) {
console.warn(pairName, "overflowing queue length", queueLength);
}
if (options.traceAs) {
return [Stream.fromQueue(queue).pipe(Stream.tap((msg) => Console.debug(
options.traceAs,
JSON.stringify(
msg,
(k, v) =>
k === "changes" ||
k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
))), queue];
} else {
if (queueWasOverflowing) {
console.debug(pairName, "ok queue length", queueLength);
queueWasOverflowing = false;
}
return [Stream.fromQueue(queue), queue];
}
}
const readable = new ReadableStream<T>({
async start(controller) {
resolveEnqueue(controller.enqueue.bind(controller));
resolveClose(controller.close.bind(controller));
},
cancel(_reason) {
console.log("Manually closing reader");
readerClosed = true;
},
}).pipeThrough(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
new TransformStream<any, any>({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void },
) {
queueLength -= 1;
maybeReportQueueLength();
controller.enqueue(chunk);
},
}),
) as ReadableStream<T>;
let lastWritePromise = Promise.resolve();
const writable = new WritableStream<T>({
async write(chunk) {
queueLength += 1;
maybeReportQueueLength();
const enqueue = await enqueuePromise;
if (readerClosed) {
throw new Error("Reader closed");
} else {
// make sure write resolves before corresponding read, but make sure writes are still in order
await lastWritePromise;
lastWritePromise = new Promise((resolve) => {
enqueue(chunk);
resolve();
});
}
},
async abort(reason) {
console.debug("Manually closing writer", reason);
const close = await closePromise;
close();
},
});
return [readable, writable];
}

View File

@@ -1,13 +1,9 @@
import { Signature } from "./crypto/crypto.js";
import { CoValueHeader, Transaction } from "./coValueCore.js";
import { CoValueCore } from "./coValueCore.js";
import { LocalNode } from "./localNode.js";
import {
ReadableStream,
WritableStream,
WritableStreamDefaultWriter,
} from "isomorphic-streams";
import { LocalNode, newLoadingState } from "./localNode.js";
import { RawCoID, SessionID } from "./ids.js";
import { Effect, Queue, Stream } from "effect";
export type CoValueKnownState = {
id: RawCoID;
@@ -60,10 +56,27 @@ export type DoneMessage = {
export type PeerID = string;
export class DisconnectedError extends Error {
readonly _tag = "DisconnectedError";
constructor(public message: string) {
super(message);
}
}
export class PingTimeoutError extends Error {
readonly _tag = "PingTimeoutError";
}
export type IncomingSyncStream = Stream.Stream<
SyncMessage,
DisconnectedError | PingTimeoutError
>;
export type OutgoingSyncQueue = Queue.Enqueue<SyncMessage>;
export interface Peer {
id: PeerID;
incoming: ReadableStream<SyncMessage>;
outgoing: WritableStream<SyncMessage>;
incoming: IncomingSyncStream;
outgoing: OutgoingSyncQueue;
role: "peer" | "server" | "client";
delayOnError?: number;
priority?: number;
@@ -73,8 +86,8 @@ export interface PeerState {
id: PeerID;
optimisticKnownStates: { [id: RawCoID]: CoValueKnownState };
toldKnownState: Set<RawCoID>;
incoming: ReadableStream<SyncMessage>;
outgoing: WritableStreamDefaultWriter<SyncMessage>;
incoming: IncomingSyncStream;
outgoing: OutgoingSyncQueue;
role: "peer" | "server" | "client";
delayOnError?: number;
priority?: number;
@@ -127,25 +140,24 @@ export class SyncManager {
});
}
async loadFromPeers(id: RawCoID, excludePeer?: PeerID) {
for (const peer of this.peersInPriorityOrder()) {
if (peer.id === excludePeer) {
continue;
}
if (peer.role !== "server") {
continue;
}
async loadFromPeers(id: RawCoID, forPeer?: PeerID) {
const eligiblePeers = this.peersInPriorityOrder().filter(
(peer) => peer.id !== forPeer && peer.role === "server",
);
for (const peer of eligiblePeers) {
// console.log("loading", id, "from", peer.id);
peer.outgoing
.write({
Effect.runPromise(
Queue.offer(peer.outgoing, {
action: "load",
id: id,
header: false,
sessions: {},
})
.catch((e) => {
console.error("Error writing to peer", e);
});
}),
).catch((e) => {
console.error("Error writing to peer", e);
});
const coValueEntry = this.local.coValues[id];
if (coValueEntry?.state !== "loading") {
continue;
@@ -297,7 +309,9 @@ export class SyncManager {
let lastYield = performance.now();
for (const [_i, piece] of newContentPieces.entries()) {
// console.log(
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${newContentPieces.length} header: ${!!piece.header}`,
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${
// newContentPieces.length
// } header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
await this.trySendToPeer(peer, piece);
@@ -328,7 +342,7 @@ export class SyncManager {
id: peer.id,
optimisticKnownStates: {},
incoming: peer.incoming,
outgoing: peer.outgoing.getWriter(),
outgoing: peer.outgoing,
toldKnownState: new Set(),
role: peer.role,
delayOnError: peer.delayOnError,
@@ -354,91 +368,55 @@ export class SyncManager {
void initialSync();
}
const readIncoming = async () => {
try {
for await (const msg of peerState.incoming) {
try {
// await this.handleSyncMessage(msg, peerState);
this.handleSyncMessage(msg, peerState).catch((e) => {
console.error(
new Date(),
`Error reading from peer ${peer.id}, handling msg`,
JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
),
e,
);
});
// await new Promise<void>((resolve) => {
// setTimeout(resolve, 0);
// });
} catch (e) {
console.error(
new Date(),
`Error reading from peer ${peer.id}, handling msg`,
JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
void Effect.runPromise(
peerState.incoming.pipe(
Stream.ensuring(
Effect.sync(() => {
console.log("Peer disconnected:", peer.id);
delete this.peers[peer.id];
}),
),
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg, peerState),
catch: (e) =>
new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" ||
k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
e,
);
if (peerState.delayOnError) {
await new Promise<void>((resolve) => {
setTimeout(resolve, peerState.delayOnError);
});
}
}
}
} catch (e) {
console.error(`Error reading from peer ${peer.id}`, e);
}
console.log("Peer disconnected:", peer.id);
delete this.peers[peer.id];
};
void readIncoming();
}).pipe(
Effect.timeoutFail({
duration: 10000,
onTimeout: () =>
new Error("Took >10s to process message"),
}),
),
),
Effect.catchAll((e) =>
Effect.logError(
"Error in peer",
peer.id,
e.message,
typeof e.cause === "object" &&
e.cause instanceof Error &&
e.cause.message,
),
),
),
);
}
trySendToPeer(peer: PeerState, msg: SyncMessage) {
if (!this.peers[peer.id]) {
// already disconnected, return to drain potential queue
return Promise.resolve();
}
return new Promise<void>((resolve) => {
const start = Date.now();
peer.outgoing
.write(msg)
.then(() => {
const end = Date.now();
if (end - start > 1000) {
// console.error(
// new Error(
// `Writing to peer "${peer.id}" took ${
// Math.round((Date.now() - start) / 100) / 10
// }s - this should never happen as write should resolve quickly or error`
// )
// );
} else {
resolve();
}
})
.catch((e) => {
console.error(
new Error(
`Error writing to peer ${peer.id}, disconnecting`,
{
cause: e,
},
),
);
delete this.peers[peer.id];
});
});
return Effect.runPromise(Queue.offer(peer.outgoing, msg));
}
async handleLoad(msg: LoadMessage, peer: PeerState) {
@@ -447,21 +425,50 @@ export class SyncManager {
if (!entry) {
// console.log(`Loading ${msg.id} from all peers except ${peer.id}`);
this.local
.loadCoValueCore(msg.id, {
dontLoadFrom: peer.id,
dontWaitFor: peer.id,
})
.catch((e) => {
console.error("Error loading coValue in handleLoad", e);
});
// special case: we should be able to solve this much more neatly
// with an explicit state machine in the future
const eligiblePeers = this.peersInPriorityOrder().filter(
(other) => other.id !== peer.id && peer.role === "server",
);
if (eligiblePeers.length === 0) {
if (msg.header || Object.keys(msg.sessions).length > 0) {
this.local.coValues[msg.id] = newLoadingState(
new Set([peer.id]),
);
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
});
}
return;
} else {
this.local
.loadCoValueCore(msg.id, {
dontLoadFrom: peer.id,
dontWaitFor: peer.id,
})
.catch((e) => {
console.error("Error loading coValue in handleLoad", e);
});
}
entry = this.local.coValues[msg.id]!;
}
if (entry.state === "loading") {
console.log(
"Waiting for loaded",
msg.id,
"after message from",
peer.id,
);
const loaded = await entry.done;
console.log("Loaded", msg.id, loaded);
if (loaded === "unavailable") {
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
peer.toldKnownState.add(msg.id);
@@ -508,7 +515,7 @@ export class SyncManager {
}
} else {
throw new Error(
"Expected coValue entry to be created, missing subscribe?",
`Expected coValue entry for ${msg.id} to be created on known state, missing subscribe?`,
);
}
}
@@ -549,7 +556,7 @@ export class SyncManager {
if (!entry) {
throw new Error(
"Expected coValue entry to be created, missing subscribe?",
`Expected coValue entry for ${msg.id} to be created on new content, missing subscribe?`,
);
}

View File

@@ -3,6 +3,7 @@ import { newRandomSessionID } from "../coValueCore.js";
import { LocalNode } from "../localNode.js";
import { connectedPeers } from "../streamUtils.js";
import { WasmCrypto } from "../crypto/WasmCrypto.js";
import { Effect } from "effect";
const Crypto = await WasmCrypto.create();
@@ -52,11 +53,13 @@ test("Can create account with one node, and then load it on another", async () =
map.set("foo", "bar", "private");
expect(map.get("foo")).toEqual("bar");
const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {
const [node1asPeer, node2asPeer] = await Effect.runPromise(connectedPeers("node1", "node2", {
trace: true,
peer1role: "server",
peer2role: "client",
});
}));
console.log("After connected peers")
node.syncManager.addPeer(node2asPeer);

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,21 @@
# jazz-browser-media-images
## 0.7.14
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.14
- jazz-browser@0.7.14
## 0.7.13
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
- jazz-browser@0.7.13
## 0.7.12
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser-media-images",
"version": "0.7.12",
"version": "0.7.14",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,22 @@
# jazz-browser
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
- jazz-tools@0.7.14
- cojson-storage-indexeddb@0.7.14
- cojson-transport-ws@0.7.14
## 0.7.13
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
## 0.7.12
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser",
"version": "0.7.12",
"version": "0.7.14",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",
@@ -9,8 +9,8 @@
"@scure/bip39": "^1.3.0",
"cojson": "workspace:*",
"cojson-storage-indexeddb": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.1.5",
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae",
"jazz-tools": "workspace:*",
"typescript": "^5.1.6"
},

View File

@@ -1,11 +1,8 @@
import { ReadableStream, WritableStream } from "isomorphic-streams";
import {
CoValue,
ID,
Peer,
AgentID,
SessionID,
SyncMessage,
cojsonInternals,
InviteSecret,
Account,
@@ -13,10 +10,15 @@ import {
WasmCrypto,
CryptoProvider,
} from "jazz-tools";
import { AccountID, LSMStorage } from "cojson";
import {
AccountID,
LSMStorage,
} from "cojson";
import { AuthProvider } from "./auth/auth.js";
import { OPFSFilesystem } from "./OPFSFilesystem.js";
import { IDBStorage } from "cojson-storage-indexeddb";
import { Effect, Queue } from "effect";
import { createWebSocketPeer } from "cojson-transport-ws";
export * from "./auth/auth.js";
/** @category Context Creation */
@@ -29,7 +31,7 @@ export type BrowserContext<Acc extends Account> = {
/** @category Context Creation */
export async function createJazzBrowserContext<Acc extends Account>({
auth,
peer,
peer: peerAddr,
reconnectionTimeout: initialReconnectionTimeout = 500,
storage = "indexedDB",
crypto: customCrypto,
@@ -43,7 +45,13 @@ export async function createJazzBrowserContext<Acc extends Account>({
const crypto = customCrypto || (await WasmCrypto.create());
let sessionDone: () => void;
const firstWsPeer = createWebSocketPeer(peer);
const firstWsPeer = await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
);
let shouldTryToReconnect = true;
let currentReconnectionTimeout = initialReconnectionTimeout;
@@ -77,7 +85,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
while (shouldTryToReconnect) {
if (
Object.keys(me._raw.core.node.syncManager.peers).some(
(peerId) => peerId.includes(peer),
(peerId) => peerId.includes(peerAddr),
)
) {
// TODO: this might drain battery, use listeners instead
@@ -107,7 +115,13 @@ export async function createJazzBrowserContext<Acc extends Account>({
});
me._raw.core.node.syncManager.addPeer(
createWebSocketPeer(peer),
await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
),
);
}
}
@@ -124,9 +138,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
for (const peer of Object.values(
me._raw.core.node.syncManager.peers,
)) {
peer.outgoing
.close()
.catch((e) => console.error("Error while closing peer", e));
void Effect.runPromise(Queue.shutdown(peer.outgoing));
}
sessionDone?.();
},
@@ -207,140 +219,6 @@ export function getSessionHandleFor(
};
}
function websocketReadableStream<T>(ws: WebSocket) {
ws.binaryType = "arraybuffer";
return new ReadableStream<T>({
start(controller) {
let pingTimeout: ReturnType<typeof setTimeout> | undefined;
ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
if (pingTimeout) {
clearTimeout(pingTimeout);
}
pingTimeout = setTimeout(() => {
console.debug("Ping timeout");
try {
controller.close();
ws.close();
} catch (e) {
console.error(
"Error while trying to close ws on ping timeout",
e,
);
}
}, 2500);
if (msg.type === "ping") {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(window as any).jazzPings = (window as any).jazzPings || [];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(window as any).jazzPings.push({
received: Date.now(),
sent: msg.time,
dc: msg.dc,
});
return;
}
controller.enqueue(msg);
};
const closeListener = () => {
controller.close();
clearTimeout(pingTimeout);
};
ws.addEventListener("close", closeListener);
ws.addEventListener("error", () => {
controller.error(new Error("The WebSocket errored!"));
ws.removeEventListener("close", closeListener);
});
},
cancel() {
ws.close();
},
});
}
export function createWebSocketPeer(syncAddress: string): Peer {
const ws = new WebSocket(syncAddress);
const incoming = websocketReadableStream<SyncMessage>(ws);
const outgoing = websocketWritableStream<SyncMessage>(ws);
return {
id: syncAddress + "@" + new Date().toISOString(),
incoming,
outgoing,
role: "server",
};
}
function websocketWritableStream<T>(ws: WebSocket) {
const initialQueue = [] as T[];
let isOpen = false;
return new WritableStream<T>({
start(controller) {
ws.addEventListener("error", (event) => {
controller.error(
new Error("The WebSocket errored!" + JSON.stringify(event)),
);
});
ws.addEventListener("close", () => {
controller.error(
new Error("The server closed the connection unexpectedly!"),
);
});
ws.addEventListener("open", () => {
for (const item of initialQueue) {
ws.send(JSON.stringify(item));
}
isOpen = true;
});
},
async write(chunk) {
if (isOpen) {
ws.send(JSON.stringify(chunk));
// Return immediately, since the web socket gives us no easy way to tell
// when the write completes.
} else {
initialQueue.push(chunk);
}
},
close() {
return closeWS(1000);
},
abort(reason) {
return closeWS(4000, reason && reason.message);
},
});
function closeWS(code: number, reasonString?: string) {
return new Promise<void>((resolve, reject) => {
ws.addEventListener(
"close",
(e) => {
if (e.wasClean) {
resolve();
} else {
reject(
new Error("The connection was not closed cleanly"),
);
}
},
{ once: true },
);
ws.close(code, reasonString);
});
}
}
/** @category Invite Links */
export function createInviteLink<C extends CoValue>(
value: C,

View File

@@ -1,5 +1,21 @@
# jazz-autosub
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
- jazz-tools@0.7.14
- cojson-transport-ws@0.7.14
## 0.7.13
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
## 0.7.12
### Patch Changes

View File

@@ -5,10 +5,11 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.12",
"version": "0.7.14",
"dependencies": {
"cojson": "workspace:*",
"cojson-transport-nodejs-ws": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.1.5",
"jazz-tools": "workspace:*",
"ws": "^8.14.2"
},

View File

@@ -1,11 +1,8 @@
import {
websocketReadableStream,
websocketWritableStream,
} from "cojson-transport-nodejs-ws";
import { WebSocket } from "ws";
import { AgentSecret, Peer, SessionID, WasmCrypto } from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import { Account, CoValueClass, ID } from "jazz-tools";
import { Effect } from "effect";
import { WebSocket } from "ws";
/** @category Context Creation */
export async function startWorker<Acc extends Account>({
@@ -21,14 +18,13 @@ export async function startWorker<Acc extends Account>({
syncServer?: string;
accountSchema?: CoValueClass<Acc> & typeof Account;
}): Promise<{ worker: Acc }> {
const ws = new WebSocket(peer);
const wsPeer: Peer = {
id: "upstream",
role: "server",
incoming: websocketReadableStream(ws),
outgoing: websocketWritableStream(ws),
};
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
if (!accountID) {
throw new Error("No accountID provided");
@@ -52,17 +48,17 @@ export async function startWorker<Acc extends Account>({
crypto: await WasmCrypto.create(),
});
setInterval(() => {
setInterval(async () => {
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
console.log(new Date(), "Reconnecting to upstream " + peer);
const ws = new WebSocket(peer);
const wsPeer: Peer = {
id: "upstream",
role: "server",
incoming: websocketReadableStream(ws),
outgoing: websocketWritableStream(ws),
};
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
worker._raw.core.node.syncManager.addPeer(wsPeer);
}

View File

@@ -1,5 +1,22 @@
# jazz-react
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
- jazz-tools@0.7.14
- jazz-browser@0.7.14
## 0.7.13
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
- jazz-browser@0.7.13
## 0.7.12
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-react",
"version": "0.7.12",
"version": "0.7.14",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,21 @@
# jazz-autosub
## 0.7.14
### Patch Changes
- Updated dependencies
- cojson@0.7.14
- jazz-tools@0.7.14
- cojson-transport-ws@0.7.14
## 0.7.13
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.13
## 0.7.12
### Patch Changes

View File

@@ -3,7 +3,7 @@
"bin": "./dist/index.js",
"type": "module",
"license": "MIT",
"version": "0.7.12",
"version": "0.7.14",
"scripts": {
"lint": "eslint . --ext ts,tsx",
"format": "prettier --write './src/**/*.{ts,tsx}'",
@@ -15,7 +15,7 @@
"@effect/platform-node": "^0.49.2",
"@effect/schema": "^0.66.16",
"cojson": "workspace:*",
"cojson-transport-nodejs-ws": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.1.5",
"fast-check": "^3.17.2",
"jazz-tools": "workspace:*",

View File

@@ -2,11 +2,8 @@
import { Command, Options } from "@effect/cli";
import { NodeContext, NodeRuntime } from "@effect/platform-node";
import { Console, Effect } from "effect";
import {
websocketReadableStream,
websocketWritableStream,
} from "cojson-transport-nodejs-ws";
import { WebSocket } from "ws";
import { createWebSocketPeer } from "cojson-transport-ws";
import { WebSocket } from "ws"
import {
Account,
WasmCrypto,
@@ -24,23 +21,20 @@ const peer = Options.text("peer")
const accountCreate = Command.make(
"create",
{ name, peer },
({ name, peer }) => {
({ name, peer: peerAddr }) => {
return Effect.gen(function* () {
const ws = new WebSocket(peer);
const crypto = yield* Effect.promise(() => WasmCrypto.create());
const peer = yield* createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peerAddr),
role: "server",
});
const account: Account = yield* Effect.promise(async () =>
Account.create({
creationProps: { name },
peersToLoadFrom: [
{
id: "upstream",
role: "server",
incoming: websocketReadableStream(ws),
outgoing: websocketWritableStream(ws),
},
],
peersToLoadFrom: [peer],
crypto,
}),
);
@@ -59,7 +53,11 @@ const accountCreate = Command.make(
),
);
const ws2 = new WebSocket(peer);
const peer2 = yield* createWebSocketPeer({
id: "upstream2",
websocket: new WebSocket(peerAddr),
role: "server",
});
yield* Effect.promise(async () =>
Account.become({
@@ -68,14 +66,7 @@ const accountCreate = Command.make(
sessionID: cojsonInternals.newRandomSessionID(
account.id as unknown as AccountID,
),
peersToLoadFrom: [
{
id: "upstream",
role: "server",
incoming: websocketReadableStream(ws2),
outgoing: websocketWritableStream(ws2),
},
],
peersToLoadFrom: [peer2],
crypto,
}),
);

View File

@@ -1,5 +1,19 @@
# jazz-autosub
## 0.7.14
### Patch Changes
- Use Effect Queues and Streams instead of custom queue implementation
- Updated dependencies
- cojson@0.7.14
## 0.7.13
### Patch Changes
- Fix CoList.toJSON()
## 0.7.12
### Patch Changes

View File

@@ -5,11 +5,10 @@
"types": "./src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.12",
"version": "0.7.14",
"dependencies": {
"@effect/schema": "^0.66.16",
"cojson": "workspace:*",
"cojson-transport-nodejs-ws": "workspace:*",
"effect": "^3.1.5",
"fast-check": "^3.17.2"
},

View File

@@ -12,7 +12,7 @@ import type {
SessionID,
} from "cojson";
import { Context, Effect, Stream } from "effect";
import type {
import {
CoMap,
CoValue,
CoValueClass,
@@ -61,9 +61,11 @@ export class Account extends CoValueBase implements CoValue {
ref: () => Profile,
optional: false,
} satisfies RefEncoded<Profile>,
root: "json" satisfies Schema,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} as any;
root: {
ref: () => CoMap,
optional: true,
} satisfies RefEncoded<CoMap>,
};
}
get _owner(): Account {
@@ -214,7 +216,7 @@ export class Account extends CoValueBase implements CoValue {
return this.fromNode(node) as A;
}
static createAs<A extends Account>(
static async createAs<A extends Account>(
this: CoValueClass<A> & typeof Account,
as: Account,
options: {
@@ -222,11 +224,11 @@ export class Account extends CoValueBase implements CoValue {
},
) {
// TODO: is there a cleaner way to do this?
const connectedPeers = cojsonInternals.connectedPeers(
const connectedPeers = await Effect.runPromise(cojsonInternals.connectedPeers(
"creatingAccount",
"createdAccount",
{ peer1role: "server", peer2role: "client" },
);
));
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);

View File

@@ -311,7 +311,7 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
.map((e) => encodeSync(itemDescriptor.encoded)(e));
} else if (isRefEncoded(itemDescriptor)) {
return this.map((item, idx) =>
seenAbove?.includes((item as CoValue).id)
seenAbove?.includes((item as CoValue)?.id)
? { _circular: (item as CoValue).id }
: (item as unknown as CoValue)?.toJSON(idx + "", [
...(seenAbove || []),

View File

@@ -157,11 +157,11 @@ describe("CoList resolution", async () => {
test("Loading and availability", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = connectedPeers(
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -216,11 +216,11 @@ describe("CoList resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = connectedPeers(
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}

View File

@@ -253,11 +253,11 @@ describe("CoMap resolution", async () => {
test("Loading and availability", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondPeer] = connectedPeers(
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -323,11 +323,11 @@ describe("CoMap resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondAsPeer] = connectedPeers(
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}

View File

@@ -83,10 +83,11 @@ describe("CoStream resolution", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -175,10 +176,11 @@ describe("CoStream resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
@@ -325,10 +327,11 @@ describe("BinaryCoStream loading & Subscription", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -357,30 +360,32 @@ describe("BinaryCoStream loading & Subscription", async () => {
});
test("Subscription", async () => {
const { me } = await initNodeAndStream();
const stream = BinaryCoStream.create({ owner: me });
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = await Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const { me } = yield* Effect.promise(() => initNodeAndStream());
const stream = BinaryCoStream.create({ owner: me });
const [initialAsPeer, secondAsPeer] = yield* connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = yield* Effect.promise(() =>
Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
}),
);
const queue = yield* $(Queue.unbounded<BinaryCoStream>());
BinaryCoStream.subscribe(

View File

@@ -14,6 +14,7 @@ import {
ID,
} from "../index.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect } from "effect";
class TestMap extends CoMap {
list = co.ref(TestList);
@@ -38,10 +39,10 @@ describe("Deep loading with depth arg", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
}));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -137,9 +138,8 @@ describe("Deep loading with depth arg", async () => {
throw new Error("map4 is undefined");
}
expect(map4.list[0]?.stream).not.toBe(null);
// TODO: we should expect null here, but apparently we don't even have the id/ref?
expect(map4.list[0]?.stream?.[me.id]?.value).not.toBeDefined();
expect(map4.list[0]?.stream?.byMe?.value).not.toBeDefined();
expect(map4.list[0]?.stream?.[me.id]?.value).toBe(null);
expect(map4.list[0]?.stream?.byMe?.value).toBe(null);
const map5 = await TestMap.load(map.id, meOnSecondPeer, {
list: [{ stream: [{}] }],
@@ -252,10 +252,10 @@ test("Deep loading a record-like coMap", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
}));
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}

9620
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff