Compare commits
8 Commits
jazz-react
...
cojson@0.7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a862cb8819 | ||
|
|
4246aed7db | ||
|
|
41554e0e0b | ||
|
|
93c4d8155e | ||
|
|
33db0fd654 | ||
|
|
478ded93de | ||
|
|
89ad1fb79d | ||
|
|
a35353c987 |
@@ -12,7 +12,7 @@
|
||||
"jazz-react",
|
||||
"jazz-nodejs",
|
||||
"jazz-run",
|
||||
"cojson-transport-nodejs-ws",
|
||||
"cojson-transport-ws",
|
||||
"cojson-storage-indexeddb",
|
||||
"cojson-storage-sqlite"
|
||||
]
|
||||
|
||||
@@ -1,5 +1,22 @@
|
||||
# jazz-example-chat
|
||||
|
||||
## 0.0.61
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
- jazz-tools@0.7.14
|
||||
- jazz-react@0.7.14
|
||||
|
||||
## 0.0.60
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
- jazz-react@0.7.13
|
||||
|
||||
## 0.0.59
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "jazz-example-chat",
|
||||
"private": true,
|
||||
"version": "0.0.59",
|
||||
"version": "0.0.61",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
|
||||
@@ -1,5 +1,23 @@
|
||||
# jazz-example-pets
|
||||
|
||||
## 0.0.79
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.14
|
||||
- jazz-react@0.7.14
|
||||
- jazz-browser-media-images@0.7.14
|
||||
|
||||
## 0.0.78
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
- jazz-browser-media-images@0.7.13
|
||||
- jazz-react@0.7.13
|
||||
|
||||
## 0.0.77
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "jazz-example-pets",
|
||||
"private": true,
|
||||
"version": "0.0.77",
|
||||
"version": "0.0.79",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
|
||||
@@ -1,5 +1,21 @@
|
||||
# jazz-example-todo
|
||||
|
||||
## 0.0.78
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.14
|
||||
- jazz-react@0.7.14
|
||||
|
||||
## 0.0.77
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
- jazz-react@0.7.13
|
||||
|
||||
## 0.0.76
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "jazz-example-todo",
|
||||
"private": true,
|
||||
"version": "0.0.76",
|
||||
"version": "0.0.78",
|
||||
"type": "module",
|
||||
"scripts": {
|
||||
"dev": "vite",
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
# cojson-storage-indexeddb
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
|
||||
## 0.7.11
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
{
|
||||
"name": "cojson-storage-indexeddb",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.14",
|
||||
"main": "dist/index.js",
|
||||
"type": "module",
|
||||
"types": "src/index.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "workspace:*",
|
||||
"typescript": "^5.1.6",
|
||||
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
|
||||
"effect": "^3.1.5",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@vitest/browser": "^0.34.1",
|
||||
|
||||
@@ -6,14 +6,11 @@ import {
|
||||
CojsonInternalTypes,
|
||||
MAX_RECOMMENDED_TX_SIZE,
|
||||
AccountID,
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
} from "cojson";
|
||||
import {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
ReadableStreamDefaultReader,
|
||||
WritableStreamDefaultWriter,
|
||||
} from "isomorphic-streams";
|
||||
import { SyncPromise } from "./syncPromises.js";
|
||||
import { Effect, Queue, Stream } from "effect";
|
||||
|
||||
type CoValueRow = {
|
||||
id: CojsonInternalTypes.RawCoID;
|
||||
@@ -46,39 +43,35 @@ type SignatureAfterRow = {
|
||||
|
||||
export class IDBStorage {
|
||||
db: IDBDatabase;
|
||||
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
|
||||
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
|
||||
toLocalNode: OutgoingSyncQueue;
|
||||
|
||||
constructor(
|
||||
db: IDBDatabase,
|
||||
fromLocalNode: ReadableStream<SyncMessage>,
|
||||
toLocalNode: WritableStream<SyncMessage>,
|
||||
fromLocalNode: IncomingSyncStream,
|
||||
toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
this.db = db;
|
||||
this.fromLocalNode = fromLocalNode.getReader();
|
||||
this.toLocalNode = toLocalNode.getWriter();
|
||||
this.toLocalNode = toLocalNode;
|
||||
|
||||
void (async () => {
|
||||
let done = false;
|
||||
while (!done) {
|
||||
const result = await this.fromLocalNode.read();
|
||||
done = result.done;
|
||||
|
||||
if (result.value) {
|
||||
// console.log(
|
||||
// "IDB: handling msg",
|
||||
// result.value.id,
|
||||
// result.value.action
|
||||
// );
|
||||
await this.handleSyncMessage(result.value);
|
||||
// console.log(
|
||||
// "IDB: handled msg",
|
||||
// result.value.id,
|
||||
// result.value.action
|
||||
// );
|
||||
}
|
||||
}
|
||||
})();
|
||||
void fromLocalNode.pipe(
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.tryPromise({
|
||||
try: () => this.handleSyncMessage(msg),
|
||||
catch: (e) =>
|
||||
new Error(
|
||||
`Error reading from localNode, handling msg\n\n${JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
),
|
||||
}),
|
||||
),
|
||||
Effect.runPromise,
|
||||
);
|
||||
}
|
||||
|
||||
static async asPeer(
|
||||
@@ -89,23 +82,30 @@ export class IDBStorage {
|
||||
localNodeName: "local",
|
||||
},
|
||||
): Promise<Peer> {
|
||||
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
|
||||
localNodeName,
|
||||
"storage",
|
||||
{ peer1role: "client", peer2role: "server", trace },
|
||||
);
|
||||
return Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const [localNodeAsPeer, storageAsPeer] =
|
||||
yield* cojsonInternals.connectedPeers(
|
||||
localNodeName,
|
||||
"storage",
|
||||
{ peer1role: "client", peer2role: "server", trace },
|
||||
);
|
||||
|
||||
await IDBStorage.open(
|
||||
localNodeAsPeer.incoming,
|
||||
localNodeAsPeer.outgoing,
|
||||
);
|
||||
yield* Effect.promise(() =>
|
||||
IDBStorage.open(
|
||||
localNodeAsPeer.incoming,
|
||||
localNodeAsPeer.outgoing,
|
||||
),
|
||||
);
|
||||
|
||||
return { ...storageAsPeer, priority: 100 };
|
||||
return { ...storageAsPeer, priority: 100 };
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
static async open(
|
||||
fromLocalNode: ReadableStream<SyncMessage>,
|
||||
toLocalNode: WritableStream<SyncMessage>,
|
||||
fromLocalNode: IncomingSyncStream,
|
||||
toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
const dbPromise = new Promise<IDBDatabase>((resolve, reject) => {
|
||||
const request = indexedDB.open("jazz-storage", 4);
|
||||
@@ -150,23 +150,6 @@ export class IDBStorage {
|
||||
keyPath: ["ses", "idx"],
|
||||
});
|
||||
}
|
||||
// if (ev.oldVersion !== 0 && ev.oldVersion <= 3) {
|
||||
// // fix embarrassing off-by-one error for transaction indices
|
||||
// console.log("Migration: fixing off-by-one error");
|
||||
// const transaction = (
|
||||
// ev.target as unknown as { transaction: IDBTransaction }
|
||||
// ).transaction;
|
||||
|
||||
// const txsStore = transaction.objectStore("transactions");
|
||||
// const txs = await promised(txsStore.getAll());
|
||||
|
||||
// for (const tx of txs) {
|
||||
// await promised(txsStore.delete([tx.ses, tx.idx]));
|
||||
// tx.idx -= 1;
|
||||
// await promised(txsStore.add(tx));
|
||||
// }
|
||||
// console.log("Migration: fixing off-by-one error - done");
|
||||
// }
|
||||
};
|
||||
});
|
||||
|
||||
@@ -409,29 +392,35 @@ export class IDBStorage {
|
||||
),
|
||||
).then(() => {
|
||||
// we're done with IndexedDB stuff here so can use native Promises again
|
||||
setTimeout(async () => {
|
||||
await this.toLocalNode.write({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
});
|
||||
setTimeout(() =>
|
||||
Effect.runPromise(
|
||||
Effect.gen(this, function* () {
|
||||
yield* Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
});
|
||||
|
||||
const nonEmptyNewContentPieces =
|
||||
newContentPieces.filter(
|
||||
(piece) =>
|
||||
piece.header ||
|
||||
Object.keys(piece.new).length > 0,
|
||||
);
|
||||
const nonEmptyNewContentPieces =
|
||||
newContentPieces.filter(
|
||||
(piece) =>
|
||||
piece.header ||
|
||||
Object.keys(piece.new)
|
||||
.length > 0,
|
||||
);
|
||||
|
||||
// console.log(theirKnown.id, nonEmptyNewContentPieces);
|
||||
// console.log(theirKnown.id, nonEmptyNewContentPieces);
|
||||
|
||||
for (const piece of nonEmptyNewContentPieces) {
|
||||
await this.toLocalNode.write(piece);
|
||||
await new Promise((resolve) =>
|
||||
setTimeout(resolve, 0),
|
||||
);
|
||||
}
|
||||
}, 0);
|
||||
for (const piece of nonEmptyNewContentPieces) {
|
||||
yield* Queue.offer(
|
||||
this.toLocalNode,
|
||||
piece,
|
||||
);
|
||||
yield* Effect.yieldNow();
|
||||
}
|
||||
}),
|
||||
),
|
||||
);
|
||||
|
||||
return Promise.resolve();
|
||||
});
|
||||
@@ -456,13 +445,15 @@ export class IDBStorage {
|
||||
const header = msg.header;
|
||||
if (!header) {
|
||||
console.error("Expected to be sent header first");
|
||||
void this.toLocalNode.write({
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
isCorrection: true,
|
||||
});
|
||||
void Effect.runPromise(
|
||||
Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
isCorrection: true,
|
||||
}),
|
||||
);
|
||||
throw new Error("Expected to be sent header first");
|
||||
}
|
||||
|
||||
@@ -524,11 +515,13 @@ export class IDBStorage {
|
||||
),
|
||||
).then(() => {
|
||||
if (invalidAssumptions) {
|
||||
void this.toLocalNode.write({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
isCorrection: invalidAssumptions,
|
||||
});
|
||||
void Effect.runPromise(
|
||||
Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
isCorrection: invalidAssumptions,
|
||||
}),
|
||||
);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,5 +1,12 @@
|
||||
# cojson-storage-sqlite
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
|
||||
## 0.7.11
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,15 +1,15 @@
|
||||
{
|
||||
"name": "cojson-storage-sqlite",
|
||||
"type": "module",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.14",
|
||||
"main": "dist/index.js",
|
||||
"types": "src/index.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"better-sqlite3": "^8.5.2",
|
||||
"cojson": "workspace:*",
|
||||
"typescript": "^5.1.6",
|
||||
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
|
||||
"effect": "^3.1.5",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@types/better-sqlite3": "^7.6.4"
|
||||
|
||||
@@ -6,15 +6,12 @@ import {
|
||||
SessionID,
|
||||
MAX_RECOMMENDED_TX_SIZE,
|
||||
AccountID,
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
} from "cojson";
|
||||
import {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
ReadableStreamDefaultReader,
|
||||
WritableStreamDefaultWriter,
|
||||
} from "isomorphic-streams";
|
||||
|
||||
import Database, { Database as DatabaseT } from "better-sqlite3";
|
||||
import { Effect, Queue, Stream } from "effect";
|
||||
|
||||
type CoValueRow = {
|
||||
id: CojsonInternalTypes.RawCoID;
|
||||
@@ -46,30 +43,36 @@ type SignatureAfterRow = {
|
||||
};
|
||||
|
||||
export class SQLiteStorage {
|
||||
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
|
||||
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
|
||||
toLocalNode: OutgoingSyncQueue;
|
||||
db: DatabaseT;
|
||||
|
||||
constructor(
|
||||
db: DatabaseT,
|
||||
fromLocalNode: ReadableStream<SyncMessage>,
|
||||
toLocalNode: WritableStream<SyncMessage>,
|
||||
fromLocalNode: IncomingSyncStream,
|
||||
toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
this.db = db;
|
||||
this.fromLocalNode = fromLocalNode.getReader();
|
||||
this.toLocalNode = toLocalNode.getWriter();
|
||||
this.toLocalNode = toLocalNode;
|
||||
|
||||
void (async () => {
|
||||
let done = false;
|
||||
while (!done) {
|
||||
const result = await this.fromLocalNode.read();
|
||||
done = result.done;
|
||||
|
||||
if (result.value) {
|
||||
await this.handleSyncMessage(result.value);
|
||||
}
|
||||
}
|
||||
})();
|
||||
void fromLocalNode.pipe(
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.tryPromise({
|
||||
try: () => this.handleSyncMessage(msg),
|
||||
catch: (e) =>
|
||||
new Error(
|
||||
`Error reading from localNode, handling msg\n\n${JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
),
|
||||
}),
|
||||
),
|
||||
Effect.runPromise,
|
||||
);
|
||||
}
|
||||
|
||||
static async asPeer({
|
||||
@@ -81,25 +84,32 @@ export class SQLiteStorage {
|
||||
trace?: boolean;
|
||||
localNodeName?: string;
|
||||
}): Promise<Peer> {
|
||||
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
|
||||
localNodeName,
|
||||
"storage",
|
||||
{ peer1role: "client", peer2role: "server", trace },
|
||||
);
|
||||
return Effect.runPromise(
|
||||
Effect.gen(function* () {
|
||||
const [localNodeAsPeer, storageAsPeer] =
|
||||
yield* cojsonInternals.connectedPeers(
|
||||
localNodeName,
|
||||
"storage",
|
||||
{ peer1role: "client", peer2role: "server", trace },
|
||||
);
|
||||
|
||||
await SQLiteStorage.open(
|
||||
filename,
|
||||
localNodeAsPeer.incoming,
|
||||
localNodeAsPeer.outgoing,
|
||||
);
|
||||
yield* Effect.promise(() =>
|
||||
SQLiteStorage.open(
|
||||
filename,
|
||||
localNodeAsPeer.incoming,
|
||||
localNodeAsPeer.outgoing,
|
||||
),
|
||||
);
|
||||
|
||||
return { ...storageAsPeer, priority: 100 };
|
||||
return { ...storageAsPeer, priority: 100 };
|
||||
}),
|
||||
);
|
||||
}
|
||||
|
||||
static async open(
|
||||
filename: string,
|
||||
fromLocalNode: ReadableStream<SyncMessage>,
|
||||
toLocalNode: WritableStream<SyncMessage>,
|
||||
fromLocalNode: IncomingSyncStream,
|
||||
toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
const db = Database(filename);
|
||||
db.pragma("journal_mode = WAL");
|
||||
@@ -431,11 +441,13 @@ export class SQLiteStorage {
|
||||
);
|
||||
}
|
||||
|
||||
await this.toLocalNode.write({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
});
|
||||
await Effect.runPromise(
|
||||
Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
}),
|
||||
);
|
||||
|
||||
const nonEmptyNewContentPieces = newContentPieces.filter(
|
||||
(piece) => piece.header || Object.keys(piece.new).length > 0,
|
||||
@@ -444,7 +456,7 @@ export class SQLiteStorage {
|
||||
// console.log(theirKnown.id, nonEmptyNewContentPieces);
|
||||
|
||||
for (const piece of nonEmptyNewContentPieces) {
|
||||
await this.toLocalNode.write(piece);
|
||||
await Effect.runPromise(Queue.offer(this.toLocalNode, piece));
|
||||
await new Promise((resolve) => setTimeout(resolve, 0));
|
||||
}
|
||||
}
|
||||
@@ -466,13 +478,15 @@ export class SQLiteStorage {
|
||||
const header = msg.header;
|
||||
if (!header) {
|
||||
console.error("Expected to be sent header first");
|
||||
await this.toLocalNode.write({
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
isCorrection: true,
|
||||
});
|
||||
await Effect.runPromise(
|
||||
Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
isCorrection: true,
|
||||
}),
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -604,11 +618,13 @@ export class SQLiteStorage {
|
||||
})();
|
||||
|
||||
if (invalidAssumptions) {
|
||||
await this.toLocalNode.write({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
isCorrection: invalidAssumptions,
|
||||
});
|
||||
await Effect.runPromise(
|
||||
Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
isCorrection: invalidAssumptions,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,94 +0,0 @@
|
||||
import { WebSocket } from "ws";
|
||||
import { WritableStream, ReadableStream } from "isomorphic-streams";
|
||||
|
||||
export function websocketReadableStream<T>(ws: WebSocket) {
|
||||
ws.binaryType = "arraybuffer";
|
||||
|
||||
return new ReadableStream<T>({
|
||||
start(controller) {
|
||||
ws.addEventListener("message", (event) => {
|
||||
if (typeof event.data !== "string")
|
||||
return console.warn(
|
||||
"Got non-string message from client",
|
||||
event.data,
|
||||
);
|
||||
const msg = JSON.parse(event.data);
|
||||
if (msg.type === "ping") {
|
||||
// console.debug(
|
||||
// "Got ping from",
|
||||
// msg.dc,
|
||||
// "latency",
|
||||
// Date.now() - msg.time,
|
||||
// "ms"
|
||||
// );
|
||||
return;
|
||||
}
|
||||
controller.enqueue(msg);
|
||||
});
|
||||
ws.addEventListener("close", () => {
|
||||
try {
|
||||
controller.close();
|
||||
} catch (ignore) {
|
||||
// will throw if already closed, with no way to check before-hand
|
||||
}
|
||||
});
|
||||
ws.addEventListener("error", () =>
|
||||
controller.error(new Error("The WebSocket errored!")),
|
||||
);
|
||||
},
|
||||
|
||||
cancel() {
|
||||
ws.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export function websocketWritableStream<T>(ws: WebSocket) {
|
||||
return new WritableStream<T>({
|
||||
start(controller) {
|
||||
ws.addEventListener("close", () =>
|
||||
controller.error(
|
||||
new Error("The WebSocket closed unexpectedly!"),
|
||||
),
|
||||
);
|
||||
ws.addEventListener("error", () =>
|
||||
controller.error(new Error("The WebSocket errored!")),
|
||||
);
|
||||
|
||||
if (ws.readyState === WebSocket.OPEN) {
|
||||
return;
|
||||
}
|
||||
|
||||
return new Promise((resolve) =>
|
||||
ws.addEventListener("open", resolve, { once: true }),
|
||||
);
|
||||
},
|
||||
|
||||
write(chunk) {
|
||||
ws.send(JSON.stringify(chunk));
|
||||
// Return immediately, since the web socket gives us no easy way to tell
|
||||
// when the write completes.
|
||||
},
|
||||
|
||||
close() {
|
||||
return closeWS(1000);
|
||||
},
|
||||
|
||||
abort(reason) {
|
||||
return closeWS(4000, reason && reason.message);
|
||||
},
|
||||
});
|
||||
|
||||
function closeWS(code: number, reasonString?: string) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
ws.onclose = (e) => {
|
||||
if (e.wasClean) {
|
||||
resolve();
|
||||
} else {
|
||||
reject(new Error("The connection was not closed cleanly"));
|
||||
}
|
||||
};
|
||||
ws.close(code, reasonString);
|
||||
});
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,12 @@
|
||||
# cojson-transport-nodejs-ws
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
|
||||
## 0.7.11
|
||||
|
||||
### Patch Changes
|
||||
@@ -1,15 +1,14 @@
|
||||
{
|
||||
"name": "cojson-transport-nodejs-ws",
|
||||
"name": "cojson-transport-ws",
|
||||
"type": "module",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.14",
|
||||
"main": "dist/index.js",
|
||||
"types": "src/index.ts",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"cojson": "workspace:*",
|
||||
"typescript": "^5.1.6",
|
||||
"ws": "^8.14.2",
|
||||
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
|
||||
"effect": "^3.1.5",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
"scripts": {
|
||||
"dev": "tsc --watch --sourceMap --outDir dist",
|
||||
108
packages/cojson-transport-ws/src/index.ts
Normal file
108
packages/cojson-transport-ws/src/index.ts
Normal file
@@ -0,0 +1,108 @@
|
||||
import { DisconnectedError, Peer, PingTimeoutError, SyncMessage } from "cojson";
|
||||
import { Either, Stream, Queue, Effect, Exit } from "effect";
|
||||
|
||||
interface AnyWebSocket {
|
||||
addEventListener(
|
||||
type: "close",
|
||||
listener: (event: { code: number; reason: string }) => void,
|
||||
): void;
|
||||
addEventListener(
|
||||
type: "message",
|
||||
listener: (event: { data: string | unknown }) => void,
|
||||
): void;
|
||||
addEventListener(type: "open", listener: () => void): void;
|
||||
close(): void;
|
||||
send(data: string): void;
|
||||
}
|
||||
|
||||
export function createWebSocketPeer(options: {
|
||||
id: string;
|
||||
websocket: AnyWebSocket;
|
||||
role: Peer["role"];
|
||||
}): Effect.Effect<Peer> {
|
||||
return Effect.gen(function* () {
|
||||
const ws = options.websocket;
|
||||
|
||||
const incoming =
|
||||
yield* Queue.unbounded<
|
||||
Either.Either<SyncMessage, DisconnectedError | PingTimeoutError>
|
||||
>();
|
||||
const outgoing = yield* Queue.unbounded<SyncMessage>();
|
||||
|
||||
ws.addEventListener("close", (event) => {
|
||||
void Effect.runPromiseExit(
|
||||
Queue.offer(
|
||||
incoming,
|
||||
Either.left(
|
||||
new DisconnectedError(`${event.code}: ${event.reason}`),
|
||||
),
|
||||
),
|
||||
).then((e) => {
|
||||
if (Exit.isFailure(e) && !Exit.isInterrupted(e)) {
|
||||
console.warn("Failed closing ws", e);
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
let pingTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
ws.addEventListener("message", (event) => {
|
||||
const msg = JSON.parse(event.data as string);
|
||||
|
||||
if (pingTimeout) {
|
||||
clearTimeout(pingTimeout);
|
||||
}
|
||||
|
||||
pingTimeout = setTimeout(() => {
|
||||
console.debug("Ping timeout");
|
||||
void Effect.runPromise(
|
||||
Queue.offer(incoming, Either.left(new PingTimeoutError())),
|
||||
);
|
||||
try {
|
||||
ws.close();
|
||||
} catch (e) {
|
||||
console.error(
|
||||
"Error while trying to close ws on ping timeout",
|
||||
e,
|
||||
);
|
||||
}
|
||||
}, 2500);
|
||||
|
||||
if (msg.type === "ping") {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(globalThis as any).jazzPings =
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(globalThis as any).jazzPings || [];
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(globalThis as any).jazzPings.push({
|
||||
received: Date.now(),
|
||||
sent: msg.time,
|
||||
dc: msg.dc,
|
||||
});
|
||||
return;
|
||||
} else {
|
||||
void Effect.runPromise(
|
||||
Queue.offer(incoming, Either.right(msg)),
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
ws.addEventListener("open", () => {
|
||||
void Stream.fromQueue(outgoing).pipe(
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.sync(() => ws.send(JSON.stringify(msg))),
|
||||
),
|
||||
Effect.runPromise,
|
||||
);
|
||||
});
|
||||
|
||||
return {
|
||||
id: options.id,
|
||||
incoming: Stream.fromQueue(incoming, { shutdown: true }).pipe(
|
||||
Stream.mapEffect((either) => either),
|
||||
),
|
||||
outgoing,
|
||||
role: options.role,
|
||||
};
|
||||
});
|
||||
}
|
||||
@@ -1,5 +1,11 @@
|
||||
# cojson
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Use Effect Queues and Streams instead of custom queue implementation
|
||||
|
||||
## 0.7.11
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
"types": "src/index.ts",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.7.11",
|
||||
"version": "0.7.14",
|
||||
"devDependencies": {
|
||||
"@types/jest": "^29.5.3",
|
||||
"@typescript-eslint/eslint-plugin": "^6.2.1",
|
||||
@@ -23,8 +23,7 @@
|
||||
"@noble/hashes": "^1.4.0",
|
||||
"@scure/base": "^1.1.1",
|
||||
"effect": "^3.1.5",
|
||||
"hash-wasm": "^4.9.0",
|
||||
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae"
|
||||
"hash-wasm": "^4.9.0"
|
||||
},
|
||||
"scripts": {
|
||||
"dev": "tsc --watch --sourceMap --outDir dist",
|
||||
|
||||
@@ -41,7 +41,13 @@ import type {
|
||||
BinaryCoStreamMeta,
|
||||
} from "./coValues/coStream.js";
|
||||
import type { JsonValue } from "./jsonValue.js";
|
||||
import type { SyncMessage, Peer } from "./sync.js";
|
||||
import type {
|
||||
SyncMessage,
|
||||
Peer,
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
} from "./sync.js";
|
||||
import { DisconnectedError, PingTimeoutError } from "./sync.js";
|
||||
import type { AgentSecret } from "./crypto/crypto.js";
|
||||
import type {
|
||||
AccountID,
|
||||
@@ -117,9 +123,19 @@ export {
|
||||
SyncMessage,
|
||||
isRawCoID,
|
||||
LSMStorage,
|
||||
DisconnectedError,
|
||||
PingTimeoutError,
|
||||
};
|
||||
|
||||
export type { Value, FileSystem, FSErr, BlockFilename, WalFilename };
|
||||
export type {
|
||||
Value,
|
||||
FileSystem,
|
||||
FSErr,
|
||||
BlockFilename,
|
||||
WalFilename,
|
||||
IncomingSyncStream,
|
||||
OutgoingSyncQueue,
|
||||
};
|
||||
|
||||
// eslint-disable-next-line @typescript-eslint/no-namespace
|
||||
export namespace CojsonInternalTypes {
|
||||
|
||||
@@ -1,18 +1,13 @@
|
||||
import {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
ReadableStreamDefaultReader,
|
||||
WritableStreamDefaultWriter,
|
||||
} from "isomorphic-streams";
|
||||
import { Effect, Either, SynchronizedRef } from "effect";
|
||||
import { Effect, Either, Queue, Stream, SynchronizedRef } from "effect";
|
||||
import { RawCoID } from "../ids.js";
|
||||
import { CoValueHeader, Transaction } from "../coValueCore.js";
|
||||
import { Signature } from "../crypto/crypto.js";
|
||||
import {
|
||||
CoValueKnownState,
|
||||
IncomingSyncStream,
|
||||
NewContentMessage,
|
||||
OutgoingSyncQueue,
|
||||
Peer,
|
||||
SyncMessage,
|
||||
} from "../sync.js";
|
||||
import { CoID, RawCoValue } from "../index.js";
|
||||
import { connectedPeers } from "../streamUtils.js";
|
||||
@@ -47,9 +42,6 @@ export type CoValueChunk = {
|
||||
};
|
||||
|
||||
export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
fromLocalNode!: ReadableStreamDefaultReader<SyncMessage>;
|
||||
toLocalNode: WritableStreamDefaultWriter<SyncMessage>;
|
||||
fs: FS;
|
||||
currentWal: SynchronizedRef.SynchronizedRef<WH | undefined>;
|
||||
coValues: SynchronizedRef.SynchronizedRef<{
|
||||
[id: RawCoID]: CoValueChunk | undefined;
|
||||
@@ -61,44 +53,28 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
>();
|
||||
|
||||
constructor(
|
||||
fs: FS,
|
||||
fromLocalNode: ReadableStream<SyncMessage>,
|
||||
toLocalNode: WritableStream<SyncMessage>,
|
||||
public fs: FS,
|
||||
public fromLocalNode: IncomingSyncStream,
|
||||
public toLocalNode: OutgoingSyncQueue,
|
||||
) {
|
||||
this.fs = fs;
|
||||
this.fromLocalNode = fromLocalNode.getReader();
|
||||
this.toLocalNode = toLocalNode.getWriter();
|
||||
this.coValues = SynchronizedRef.unsafeMake({});
|
||||
this.currentWal = SynchronizedRef.unsafeMake<WH | undefined>(undefined);
|
||||
|
||||
void Effect.runPromise(
|
||||
Effect.gen(this, function* () {
|
||||
let done = false;
|
||||
while (!done) {
|
||||
const result = yield* Effect.promise(() =>
|
||||
this.fromLocalNode.read(),
|
||||
);
|
||||
done = result.done;
|
||||
|
||||
if (result.value) {
|
||||
if (result.value.action === "done") {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (result.value.action === "content") {
|
||||
yield* this.handleNewContent(result.value);
|
||||
} else {
|
||||
yield* this.sendNewContent(
|
||||
result.value.id,
|
||||
result.value,
|
||||
undefined,
|
||||
);
|
||||
}
|
||||
void this.fromLocalNode.pipe(
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.gen(this, function* () {
|
||||
if (msg.action === "done") {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
return;
|
||||
}),
|
||||
if (msg.action === "content") {
|
||||
yield* this.handleNewContent(msg);
|
||||
} else {
|
||||
yield* this.sendNewContent(msg.id, msg, undefined);
|
||||
}
|
||||
}),
|
||||
),
|
||||
Effect.runPromise,
|
||||
);
|
||||
|
||||
setTimeout(() => this.compact(), 20000);
|
||||
@@ -132,15 +108,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
}
|
||||
|
||||
if (!coValue) {
|
||||
yield* Effect.promise(() =>
|
||||
this.toLocalNode.write({
|
||||
id: id,
|
||||
action: "known",
|
||||
header: false,
|
||||
sessions: {},
|
||||
asDependencyOf,
|
||||
}),
|
||||
);
|
||||
yield* Queue.offer(this.toLocalNode, {
|
||||
id: id,
|
||||
action: "known",
|
||||
header: false,
|
||||
sessions: {},
|
||||
asDependencyOf,
|
||||
});
|
||||
|
||||
return coValues;
|
||||
}
|
||||
@@ -195,17 +169,15 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
|
||||
const ourKnown: CoValueKnownState = chunkToKnownState(id, coValue);
|
||||
|
||||
yield* Effect.promise(() =>
|
||||
this.toLocalNode.write({
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
}),
|
||||
);
|
||||
yield* Queue.offer(this.toLocalNode, {
|
||||
action: "known",
|
||||
...ourKnown,
|
||||
asDependencyOf,
|
||||
});
|
||||
|
||||
for (const message of newContentMessages) {
|
||||
if (Object.keys(message.new).length === 0) continue;
|
||||
yield* Effect.promise(() => this.toLocalNode.write(message));
|
||||
yield* Queue.offer(this.toLocalNode, message);
|
||||
}
|
||||
|
||||
return { ...coValues, [id]: coValue };
|
||||
@@ -452,7 +424,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
setTimeout(() => this.compact(), 5000);
|
||||
}
|
||||
|
||||
static asPeer<WH, RH, FS extends FileSystem<WH, RH>>({
|
||||
static async asPeer<WH, RH, FS extends FileSystem<WH, RH>>({
|
||||
fs,
|
||||
trace,
|
||||
localNodeName = "local",
|
||||
@@ -460,15 +432,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
|
||||
fs: FS;
|
||||
trace?: boolean;
|
||||
localNodeName?: string;
|
||||
}): Peer {
|
||||
const [localNodeAsPeer, storageAsPeer] = connectedPeers(
|
||||
localNodeName,
|
||||
"storage",
|
||||
{
|
||||
}): Promise<Peer> {
|
||||
const [localNodeAsPeer, storageAsPeer] = await Effect.runPromise(
|
||||
connectedPeers(localNodeName, "storage", {
|
||||
peer1role: "client",
|
||||
peer2role: "server",
|
||||
trace,
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
new LSMStorage(fs, localNodeAsPeer.incoming, localNodeAsPeer.outgoing);
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
import {
|
||||
ReadableStream,
|
||||
TransformStream,
|
||||
WritableStream,
|
||||
} from "isomorphic-streams";
|
||||
import { Console, Effect, Queue, Stream } from "effect";
|
||||
import { Peer, PeerID, SyncMessage } from "./sync.js";
|
||||
|
||||
export function connectedPeers(
|
||||
@@ -17,160 +13,54 @@ export function connectedPeers(
|
||||
peer1role?: Peer["role"];
|
||||
peer2role?: Peer["role"];
|
||||
} = {},
|
||||
): [Peer, Peer] {
|
||||
const [inRx1, inTx1] = newStreamPair<SyncMessage>(peer1id + "_in");
|
||||
const [outRx1, outTx1] = newStreamPair<SyncMessage>(peer1id + "_out");
|
||||
): Effect.Effect<[Peer, Peer]> {
|
||||
return Effect.gen(function* () {
|
||||
const [from1to2Rx, from1to2Tx] = yield* newQueuePair(
|
||||
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
|
||||
);
|
||||
const [from2to1Rx, from2to1Tx] = yield* newQueuePair(
|
||||
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
|
||||
);
|
||||
|
||||
const [inRx2, inTx2] = newStreamPair<SyncMessage>(peer2id + "_in");
|
||||
const [outRx2, outTx2] = newStreamPair<SyncMessage>(peer2id + "_out");
|
||||
const peer2AsPeer: Peer = {
|
||||
id: peer2id,
|
||||
incoming: from2to1Rx,
|
||||
outgoing: from1to2Tx,
|
||||
role: peer2role,
|
||||
};
|
||||
|
||||
void outRx2
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void },
|
||||
) {
|
||||
trace &&
|
||||
console.debug(
|
||||
`${peer2id} -> ${peer1id}`,
|
||||
JSON.stringify(
|
||||
chunk,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
2,
|
||||
),
|
||||
);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
}),
|
||||
)
|
||||
.pipeTo(inTx1);
|
||||
const peer1AsPeer: Peer = {
|
||||
id: peer1id,
|
||||
incoming: from1to2Rx,
|
||||
outgoing: from2to1Tx,
|
||||
role: peer1role,
|
||||
};
|
||||
|
||||
void outRx1
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void },
|
||||
) {
|
||||
trace &&
|
||||
console.debug(
|
||||
`${peer1id} -> ${peer2id}`,
|
||||
JSON.stringify(
|
||||
chunk,
|
||||
(k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
2,
|
||||
),
|
||||
);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
}),
|
||||
)
|
||||
.pipeTo(inTx2);
|
||||
|
||||
const peer2AsPeer: Peer = {
|
||||
id: peer2id,
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
role: peer2role,
|
||||
};
|
||||
|
||||
const peer1AsPeer: Peer = {
|
||||
id: peer1id,
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
role: peer1role,
|
||||
};
|
||||
|
||||
return [peer1AsPeer, peer2AsPeer];
|
||||
return [peer1AsPeer, peer2AsPeer];
|
||||
});
|
||||
}
|
||||
|
||||
export function newStreamPair<T>(
|
||||
pairName?: string,
|
||||
): [ReadableStream<T>, WritableStream<T>] {
|
||||
let queueLength = 0;
|
||||
let readerClosed = false;
|
||||
export function newQueuePair(
|
||||
options: { traceAs?: string } = {},
|
||||
): Effect.Effect<[Stream.Stream<SyncMessage>, Queue.Enqueue<SyncMessage>]> {
|
||||
return Effect.gen(function* () {
|
||||
const queue = yield* Queue.unbounded<SyncMessage>();
|
||||
|
||||
let resolveEnqueue: (enqueue: (item: T) => void) => void;
|
||||
const enqueuePromise = new Promise<(item: T) => void>((resolve) => {
|
||||
resolveEnqueue = resolve;
|
||||
});
|
||||
|
||||
let resolveClose: (close: () => void) => void;
|
||||
const closePromise = new Promise<() => void>((resolve) => {
|
||||
resolveClose = resolve;
|
||||
});
|
||||
|
||||
let queueWasOverflowing = false;
|
||||
|
||||
function maybeReportQueueLength() {
|
||||
if (queueLength >= 100) {
|
||||
queueWasOverflowing = true;
|
||||
if (queueLength % 100 === 0) {
|
||||
console.warn(pairName, "overflowing queue length", queueLength);
|
||||
}
|
||||
if (options.traceAs) {
|
||||
return [Stream.fromQueue(queue).pipe(Stream.tap((msg) => Console.debug(
|
||||
options.traceAs,
|
||||
JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" ||
|
||||
k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
2,
|
||||
),
|
||||
))), queue];
|
||||
} else {
|
||||
if (queueWasOverflowing) {
|
||||
console.debug(pairName, "ok queue length", queueLength);
|
||||
queueWasOverflowing = false;
|
||||
}
|
||||
return [Stream.fromQueue(queue), queue];
|
||||
}
|
||||
}
|
||||
|
||||
const readable = new ReadableStream<T>({
|
||||
async start(controller) {
|
||||
resolveEnqueue(controller.enqueue.bind(controller));
|
||||
resolveClose(controller.close.bind(controller));
|
||||
},
|
||||
|
||||
cancel(_reason) {
|
||||
console.log("Manually closing reader");
|
||||
readerClosed = true;
|
||||
},
|
||||
}).pipeThrough(
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
new TransformStream<any, any>({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void },
|
||||
) {
|
||||
queueLength -= 1;
|
||||
maybeReportQueueLength();
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
}),
|
||||
) as ReadableStream<T>;
|
||||
|
||||
let lastWritePromise = Promise.resolve();
|
||||
|
||||
const writable = new WritableStream<T>({
|
||||
async write(chunk) {
|
||||
queueLength += 1;
|
||||
maybeReportQueueLength();
|
||||
const enqueue = await enqueuePromise;
|
||||
if (readerClosed) {
|
||||
throw new Error("Reader closed");
|
||||
} else {
|
||||
// make sure write resolves before corresponding read, but make sure writes are still in order
|
||||
await lastWritePromise;
|
||||
lastWritePromise = new Promise((resolve) => {
|
||||
enqueue(chunk);
|
||||
resolve();
|
||||
});
|
||||
}
|
||||
},
|
||||
async abort(reason) {
|
||||
console.debug("Manually closing writer", reason);
|
||||
const close = await closePromise;
|
||||
close();
|
||||
},
|
||||
});
|
||||
|
||||
return [readable, writable];
|
||||
}
|
||||
|
||||
@@ -1,13 +1,9 @@
|
||||
import { Signature } from "./crypto/crypto.js";
|
||||
import { CoValueHeader, Transaction } from "./coValueCore.js";
|
||||
import { CoValueCore } from "./coValueCore.js";
|
||||
import { LocalNode } from "./localNode.js";
|
||||
import {
|
||||
ReadableStream,
|
||||
WritableStream,
|
||||
WritableStreamDefaultWriter,
|
||||
} from "isomorphic-streams";
|
||||
import { LocalNode, newLoadingState } from "./localNode.js";
|
||||
import { RawCoID, SessionID } from "./ids.js";
|
||||
import { Effect, Queue, Stream } from "effect";
|
||||
|
||||
export type CoValueKnownState = {
|
||||
id: RawCoID;
|
||||
@@ -60,10 +56,27 @@ export type DoneMessage = {
|
||||
|
||||
export type PeerID = string;
|
||||
|
||||
export class DisconnectedError extends Error {
|
||||
readonly _tag = "DisconnectedError";
|
||||
constructor(public message: string) {
|
||||
super(message);
|
||||
}
|
||||
}
|
||||
|
||||
export class PingTimeoutError extends Error {
|
||||
readonly _tag = "PingTimeoutError";
|
||||
}
|
||||
|
||||
export type IncomingSyncStream = Stream.Stream<
|
||||
SyncMessage,
|
||||
DisconnectedError | PingTimeoutError
|
||||
>;
|
||||
export type OutgoingSyncQueue = Queue.Enqueue<SyncMessage>;
|
||||
|
||||
export interface Peer {
|
||||
id: PeerID;
|
||||
incoming: ReadableStream<SyncMessage>;
|
||||
outgoing: WritableStream<SyncMessage>;
|
||||
incoming: IncomingSyncStream;
|
||||
outgoing: OutgoingSyncQueue;
|
||||
role: "peer" | "server" | "client";
|
||||
delayOnError?: number;
|
||||
priority?: number;
|
||||
@@ -73,8 +86,8 @@ export interface PeerState {
|
||||
id: PeerID;
|
||||
optimisticKnownStates: { [id: RawCoID]: CoValueKnownState };
|
||||
toldKnownState: Set<RawCoID>;
|
||||
incoming: ReadableStream<SyncMessage>;
|
||||
outgoing: WritableStreamDefaultWriter<SyncMessage>;
|
||||
incoming: IncomingSyncStream;
|
||||
outgoing: OutgoingSyncQueue;
|
||||
role: "peer" | "server" | "client";
|
||||
delayOnError?: number;
|
||||
priority?: number;
|
||||
@@ -127,25 +140,24 @@ export class SyncManager {
|
||||
});
|
||||
}
|
||||
|
||||
async loadFromPeers(id: RawCoID, excludePeer?: PeerID) {
|
||||
for (const peer of this.peersInPriorityOrder()) {
|
||||
if (peer.id === excludePeer) {
|
||||
continue;
|
||||
}
|
||||
if (peer.role !== "server") {
|
||||
continue;
|
||||
}
|
||||
async loadFromPeers(id: RawCoID, forPeer?: PeerID) {
|
||||
const eligiblePeers = this.peersInPriorityOrder().filter(
|
||||
(peer) => peer.id !== forPeer && peer.role === "server",
|
||||
);
|
||||
|
||||
for (const peer of eligiblePeers) {
|
||||
// console.log("loading", id, "from", peer.id);
|
||||
peer.outgoing
|
||||
.write({
|
||||
Effect.runPromise(
|
||||
Queue.offer(peer.outgoing, {
|
||||
action: "load",
|
||||
id: id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error writing to peer", e);
|
||||
});
|
||||
}),
|
||||
).catch((e) => {
|
||||
console.error("Error writing to peer", e);
|
||||
});
|
||||
|
||||
const coValueEntry = this.local.coValues[id];
|
||||
if (coValueEntry?.state !== "loading") {
|
||||
continue;
|
||||
@@ -297,7 +309,9 @@ export class SyncManager {
|
||||
let lastYield = performance.now();
|
||||
for (const [_i, piece] of newContentPieces.entries()) {
|
||||
// console.log(
|
||||
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${newContentPieces.length} header: ${!!piece.header}`,
|
||||
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${
|
||||
// newContentPieces.length
|
||||
// } header: ${!!piece.header}`,
|
||||
// // Object.values(piece.new).map((s) => s.newTransactions)
|
||||
// );
|
||||
await this.trySendToPeer(peer, piece);
|
||||
@@ -328,7 +342,7 @@ export class SyncManager {
|
||||
id: peer.id,
|
||||
optimisticKnownStates: {},
|
||||
incoming: peer.incoming,
|
||||
outgoing: peer.outgoing.getWriter(),
|
||||
outgoing: peer.outgoing,
|
||||
toldKnownState: new Set(),
|
||||
role: peer.role,
|
||||
delayOnError: peer.delayOnError,
|
||||
@@ -354,91 +368,55 @@ export class SyncManager {
|
||||
void initialSync();
|
||||
}
|
||||
|
||||
const readIncoming = async () => {
|
||||
try {
|
||||
for await (const msg of peerState.incoming) {
|
||||
try {
|
||||
// await this.handleSyncMessage(msg, peerState);
|
||||
this.handleSyncMessage(msg, peerState).catch((e) => {
|
||||
console.error(
|
||||
new Date(),
|
||||
`Error reading from peer ${peer.id}, handling msg`,
|
||||
JSON.stringify(msg, (k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
),
|
||||
e,
|
||||
);
|
||||
});
|
||||
// await new Promise<void>((resolve) => {
|
||||
// setTimeout(resolve, 0);
|
||||
// });
|
||||
} catch (e) {
|
||||
console.error(
|
||||
new Date(),
|
||||
`Error reading from peer ${peer.id}, handling msg`,
|
||||
JSON.stringify(msg, (k, v) =>
|
||||
k === "changes" || k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
void Effect.runPromise(
|
||||
peerState.incoming.pipe(
|
||||
Stream.ensuring(
|
||||
Effect.sync(() => {
|
||||
console.log("Peer disconnected:", peer.id);
|
||||
delete this.peers[peer.id];
|
||||
}),
|
||||
),
|
||||
Stream.runForEach((msg) =>
|
||||
Effect.tryPromise({
|
||||
try: () => this.handleSyncMessage(msg, peerState),
|
||||
catch: (e) =>
|
||||
new Error(
|
||||
`Error reading from peer ${
|
||||
peer.id
|
||||
}, handling msg\n\n${JSON.stringify(
|
||||
msg,
|
||||
(k, v) =>
|
||||
k === "changes" ||
|
||||
k === "encryptedChanges"
|
||||
? v.slice(0, 20) + "..."
|
||||
: v,
|
||||
)}`,
|
||||
{ cause: e },
|
||||
),
|
||||
e,
|
||||
);
|
||||
if (peerState.delayOnError) {
|
||||
await new Promise<void>((resolve) => {
|
||||
setTimeout(resolve, peerState.delayOnError);
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e) {
|
||||
console.error(`Error reading from peer ${peer.id}`, e);
|
||||
}
|
||||
|
||||
console.log("Peer disconnected:", peer.id);
|
||||
delete this.peers[peer.id];
|
||||
};
|
||||
|
||||
void readIncoming();
|
||||
}).pipe(
|
||||
Effect.timeoutFail({
|
||||
duration: 10000,
|
||||
onTimeout: () =>
|
||||
new Error("Took >10s to process message"),
|
||||
}),
|
||||
),
|
||||
),
|
||||
Effect.catchAll((e) =>
|
||||
Effect.logError(
|
||||
"Error in peer",
|
||||
peer.id,
|
||||
e.message,
|
||||
typeof e.cause === "object" &&
|
||||
e.cause instanceof Error &&
|
||||
e.cause.message,
|
||||
),
|
||||
),
|
||||
),
|
||||
);
|
||||
}
|
||||
|
||||
trySendToPeer(peer: PeerState, msg: SyncMessage) {
|
||||
if (!this.peers[peer.id]) {
|
||||
// already disconnected, return to drain potential queue
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const start = Date.now();
|
||||
peer.outgoing
|
||||
.write(msg)
|
||||
.then(() => {
|
||||
const end = Date.now();
|
||||
if (end - start > 1000) {
|
||||
// console.error(
|
||||
// new Error(
|
||||
// `Writing to peer "${peer.id}" took ${
|
||||
// Math.round((Date.now() - start) / 100) / 10
|
||||
// }s - this should never happen as write should resolve quickly or error`
|
||||
// )
|
||||
// );
|
||||
} else {
|
||||
resolve();
|
||||
}
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error(
|
||||
new Error(
|
||||
`Error writing to peer ${peer.id}, disconnecting`,
|
||||
{
|
||||
cause: e,
|
||||
},
|
||||
),
|
||||
);
|
||||
delete this.peers[peer.id];
|
||||
});
|
||||
});
|
||||
return Effect.runPromise(Queue.offer(peer.outgoing, msg));
|
||||
}
|
||||
|
||||
async handleLoad(msg: LoadMessage, peer: PeerState) {
|
||||
@@ -447,21 +425,50 @@ export class SyncManager {
|
||||
|
||||
if (!entry) {
|
||||
// console.log(`Loading ${msg.id} from all peers except ${peer.id}`);
|
||||
this.local
|
||||
.loadCoValueCore(msg.id, {
|
||||
dontLoadFrom: peer.id,
|
||||
dontWaitFor: peer.id,
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error loading coValue in handleLoad", e);
|
||||
});
|
||||
|
||||
// special case: we should be able to solve this much more neatly
|
||||
// with an explicit state machine in the future
|
||||
const eligiblePeers = this.peersInPriorityOrder().filter(
|
||||
(other) => other.id !== peer.id && peer.role === "server",
|
||||
);
|
||||
if (eligiblePeers.length === 0) {
|
||||
if (msg.header || Object.keys(msg.sessions).length > 0) {
|
||||
this.local.coValues[msg.id] = newLoadingState(
|
||||
new Set([peer.id]),
|
||||
);
|
||||
this.trySendToPeer(peer, {
|
||||
action: "known",
|
||||
id: msg.id,
|
||||
header: false,
|
||||
sessions: {},
|
||||
}).catch((e) => {
|
||||
console.error("Error sending known state back", e);
|
||||
});
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
this.local
|
||||
.loadCoValueCore(msg.id, {
|
||||
dontLoadFrom: peer.id,
|
||||
dontWaitFor: peer.id,
|
||||
})
|
||||
.catch((e) => {
|
||||
console.error("Error loading coValue in handleLoad", e);
|
||||
});
|
||||
}
|
||||
|
||||
entry = this.local.coValues[msg.id]!;
|
||||
}
|
||||
|
||||
if (entry.state === "loading") {
|
||||
console.log(
|
||||
"Waiting for loaded",
|
||||
msg.id,
|
||||
"after message from",
|
||||
peer.id,
|
||||
);
|
||||
const loaded = await entry.done;
|
||||
|
||||
console.log("Loaded", msg.id, loaded);
|
||||
if (loaded === "unavailable") {
|
||||
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
|
||||
peer.toldKnownState.add(msg.id);
|
||||
@@ -508,7 +515,7 @@ export class SyncManager {
|
||||
}
|
||||
} else {
|
||||
throw new Error(
|
||||
"Expected coValue entry to be created, missing subscribe?",
|
||||
`Expected coValue entry for ${msg.id} to be created on known state, missing subscribe?`,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -549,7 +556,7 @@ export class SyncManager {
|
||||
|
||||
if (!entry) {
|
||||
throw new Error(
|
||||
"Expected coValue entry to be created, missing subscribe?",
|
||||
`Expected coValue entry for ${msg.id} to be created on new content, missing subscribe?`,
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ import { newRandomSessionID } from "../coValueCore.js";
|
||||
import { LocalNode } from "../localNode.js";
|
||||
import { connectedPeers } from "../streamUtils.js";
|
||||
import { WasmCrypto } from "../crypto/WasmCrypto.js";
|
||||
import { Effect } from "effect";
|
||||
|
||||
const Crypto = await WasmCrypto.create();
|
||||
|
||||
@@ -52,11 +53,13 @@ test("Can create account with one node, and then load it on another", async () =
|
||||
map.set("foo", "bar", "private");
|
||||
expect(map.get("foo")).toEqual("bar");
|
||||
|
||||
const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {
|
||||
const [node1asPeer, node2asPeer] = await Effect.runPromise(connectedPeers("node1", "node2", {
|
||||
trace: true,
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
});
|
||||
}));
|
||||
|
||||
console.log("After connected peers")
|
||||
|
||||
node.syncManager.addPeer(node2asPeer);
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,5 +1,21 @@
|
||||
# jazz-browser-media-images
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.14
|
||||
- jazz-browser@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
- jazz-browser@0.7.13
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "jazz-browser-media-images",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
"types": "src/index.ts",
|
||||
|
||||
@@ -1,5 +1,22 @@
|
||||
# jazz-browser
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
- jazz-tools@0.7.14
|
||||
- cojson-storage-indexeddb@0.7.14
|
||||
- cojson-transport-ws@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "jazz-browser",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
"types": "src/index.ts",
|
||||
@@ -9,8 +9,8 @@
|
||||
"@scure/bip39": "^1.3.0",
|
||||
"cojson": "workspace:*",
|
||||
"cojson-storage-indexeddb": "workspace:*",
|
||||
"cojson-transport-ws": "workspace:*",
|
||||
"effect": "^3.1.5",
|
||||
"isomorphic-streams": "https://github.com/sgwilym/isomorphic-streams.git#aa9394781bfc92f8d7c981be7daf8af4b4cd4fae",
|
||||
"jazz-tools": "workspace:*",
|
||||
"typescript": "^5.1.6"
|
||||
},
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
import { ReadableStream, WritableStream } from "isomorphic-streams";
|
||||
import {
|
||||
CoValue,
|
||||
ID,
|
||||
Peer,
|
||||
AgentID,
|
||||
SessionID,
|
||||
SyncMessage,
|
||||
cojsonInternals,
|
||||
InviteSecret,
|
||||
Account,
|
||||
@@ -13,10 +10,15 @@ import {
|
||||
WasmCrypto,
|
||||
CryptoProvider,
|
||||
} from "jazz-tools";
|
||||
import { AccountID, LSMStorage } from "cojson";
|
||||
import {
|
||||
AccountID,
|
||||
LSMStorage,
|
||||
} from "cojson";
|
||||
import { AuthProvider } from "./auth/auth.js";
|
||||
import { OPFSFilesystem } from "./OPFSFilesystem.js";
|
||||
import { IDBStorage } from "cojson-storage-indexeddb";
|
||||
import { Effect, Queue } from "effect";
|
||||
import { createWebSocketPeer } from "cojson-transport-ws";
|
||||
export * from "./auth/auth.js";
|
||||
|
||||
/** @category Context Creation */
|
||||
@@ -29,7 +31,7 @@ export type BrowserContext<Acc extends Account> = {
|
||||
/** @category Context Creation */
|
||||
export async function createJazzBrowserContext<Acc extends Account>({
|
||||
auth,
|
||||
peer,
|
||||
peer: peerAddr,
|
||||
reconnectionTimeout: initialReconnectionTimeout = 500,
|
||||
storage = "indexedDB",
|
||||
crypto: customCrypto,
|
||||
@@ -43,7 +45,13 @@ export async function createJazzBrowserContext<Acc extends Account>({
|
||||
const crypto = customCrypto || (await WasmCrypto.create());
|
||||
let sessionDone: () => void;
|
||||
|
||||
const firstWsPeer = createWebSocketPeer(peer);
|
||||
const firstWsPeer = await Effect.runPromise(
|
||||
createWebSocketPeer({
|
||||
websocket: new WebSocket(peerAddr),
|
||||
id: peerAddr + "@" + new Date().toISOString(),
|
||||
role: "server",
|
||||
}),
|
||||
);
|
||||
let shouldTryToReconnect = true;
|
||||
|
||||
let currentReconnectionTimeout = initialReconnectionTimeout;
|
||||
@@ -77,7 +85,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
|
||||
while (shouldTryToReconnect) {
|
||||
if (
|
||||
Object.keys(me._raw.core.node.syncManager.peers).some(
|
||||
(peerId) => peerId.includes(peer),
|
||||
(peerId) => peerId.includes(peerAddr),
|
||||
)
|
||||
) {
|
||||
// TODO: this might drain battery, use listeners instead
|
||||
@@ -107,7 +115,13 @@ export async function createJazzBrowserContext<Acc extends Account>({
|
||||
});
|
||||
|
||||
me._raw.core.node.syncManager.addPeer(
|
||||
createWebSocketPeer(peer),
|
||||
await Effect.runPromise(
|
||||
createWebSocketPeer({
|
||||
websocket: new WebSocket(peerAddr),
|
||||
id: peerAddr + "@" + new Date().toISOString(),
|
||||
role: "server",
|
||||
}),
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -124,9 +138,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
|
||||
for (const peer of Object.values(
|
||||
me._raw.core.node.syncManager.peers,
|
||||
)) {
|
||||
peer.outgoing
|
||||
.close()
|
||||
.catch((e) => console.error("Error while closing peer", e));
|
||||
void Effect.runPromise(Queue.shutdown(peer.outgoing));
|
||||
}
|
||||
sessionDone?.();
|
||||
},
|
||||
@@ -207,140 +219,6 @@ export function getSessionHandleFor(
|
||||
};
|
||||
}
|
||||
|
||||
function websocketReadableStream<T>(ws: WebSocket) {
|
||||
ws.binaryType = "arraybuffer";
|
||||
|
||||
return new ReadableStream<T>({
|
||||
start(controller) {
|
||||
let pingTimeout: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
ws.onmessage = (event) => {
|
||||
const msg = JSON.parse(event.data);
|
||||
|
||||
if (pingTimeout) {
|
||||
clearTimeout(pingTimeout);
|
||||
}
|
||||
|
||||
pingTimeout = setTimeout(() => {
|
||||
console.debug("Ping timeout");
|
||||
try {
|
||||
controller.close();
|
||||
ws.close();
|
||||
} catch (e) {
|
||||
console.error(
|
||||
"Error while trying to close ws on ping timeout",
|
||||
e,
|
||||
);
|
||||
}
|
||||
}, 2500);
|
||||
|
||||
if (msg.type === "ping") {
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(window as any).jazzPings = (window as any).jazzPings || [];
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
(window as any).jazzPings.push({
|
||||
received: Date.now(),
|
||||
sent: msg.time,
|
||||
dc: msg.dc,
|
||||
});
|
||||
return;
|
||||
}
|
||||
controller.enqueue(msg);
|
||||
};
|
||||
const closeListener = () => {
|
||||
controller.close();
|
||||
clearTimeout(pingTimeout);
|
||||
};
|
||||
ws.addEventListener("close", closeListener);
|
||||
ws.addEventListener("error", () => {
|
||||
controller.error(new Error("The WebSocket errored!"));
|
||||
ws.removeEventListener("close", closeListener);
|
||||
});
|
||||
},
|
||||
|
||||
cancel() {
|
||||
ws.close();
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
export function createWebSocketPeer(syncAddress: string): Peer {
|
||||
const ws = new WebSocket(syncAddress);
|
||||
|
||||
const incoming = websocketReadableStream<SyncMessage>(ws);
|
||||
const outgoing = websocketWritableStream<SyncMessage>(ws);
|
||||
|
||||
return {
|
||||
id: syncAddress + "@" + new Date().toISOString(),
|
||||
incoming,
|
||||
outgoing,
|
||||
role: "server",
|
||||
};
|
||||
}
|
||||
|
||||
function websocketWritableStream<T>(ws: WebSocket) {
|
||||
const initialQueue = [] as T[];
|
||||
let isOpen = false;
|
||||
|
||||
return new WritableStream<T>({
|
||||
start(controller) {
|
||||
ws.addEventListener("error", (event) => {
|
||||
controller.error(
|
||||
new Error("The WebSocket errored!" + JSON.stringify(event)),
|
||||
);
|
||||
});
|
||||
ws.addEventListener("close", () => {
|
||||
controller.error(
|
||||
new Error("The server closed the connection unexpectedly!"),
|
||||
);
|
||||
});
|
||||
ws.addEventListener("open", () => {
|
||||
for (const item of initialQueue) {
|
||||
ws.send(JSON.stringify(item));
|
||||
}
|
||||
isOpen = true;
|
||||
});
|
||||
},
|
||||
|
||||
async write(chunk) {
|
||||
if (isOpen) {
|
||||
ws.send(JSON.stringify(chunk));
|
||||
// Return immediately, since the web socket gives us no easy way to tell
|
||||
// when the write completes.
|
||||
} else {
|
||||
initialQueue.push(chunk);
|
||||
}
|
||||
},
|
||||
|
||||
close() {
|
||||
return closeWS(1000);
|
||||
},
|
||||
|
||||
abort(reason) {
|
||||
return closeWS(4000, reason && reason.message);
|
||||
},
|
||||
});
|
||||
|
||||
function closeWS(code: number, reasonString?: string) {
|
||||
return new Promise<void>((resolve, reject) => {
|
||||
ws.addEventListener(
|
||||
"close",
|
||||
(e) => {
|
||||
if (e.wasClean) {
|
||||
resolve();
|
||||
} else {
|
||||
reject(
|
||||
new Error("The connection was not closed cleanly"),
|
||||
);
|
||||
}
|
||||
},
|
||||
{ once: true },
|
||||
);
|
||||
ws.close(code, reasonString);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/** @category Invite Links */
|
||||
export function createInviteLink<C extends CoValue>(
|
||||
value: C,
|
||||
|
||||
@@ -1,5 +1,21 @@
|
||||
# jazz-autosub
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
- jazz-tools@0.7.14
|
||||
- cojson-transport-ws@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -5,10 +5,11 @@
|
||||
"types": "src/index.ts",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"dependencies": {
|
||||
"cojson": "workspace:*",
|
||||
"cojson-transport-nodejs-ws": "workspace:*",
|
||||
"cojson-transport-ws": "workspace:*",
|
||||
"effect": "^3.1.5",
|
||||
"jazz-tools": "workspace:*",
|
||||
"ws": "^8.14.2"
|
||||
},
|
||||
|
||||
@@ -1,11 +1,8 @@
|
||||
import {
|
||||
websocketReadableStream,
|
||||
websocketWritableStream,
|
||||
} from "cojson-transport-nodejs-ws";
|
||||
import { WebSocket } from "ws";
|
||||
|
||||
import { AgentSecret, Peer, SessionID, WasmCrypto } from "cojson";
|
||||
import { createWebSocketPeer } from "cojson-transport-ws";
|
||||
import { Account, CoValueClass, ID } from "jazz-tools";
|
||||
import { Effect } from "effect";
|
||||
import { WebSocket } from "ws";
|
||||
|
||||
/** @category Context Creation */
|
||||
export async function startWorker<Acc extends Account>({
|
||||
@@ -21,14 +18,13 @@ export async function startWorker<Acc extends Account>({
|
||||
syncServer?: string;
|
||||
accountSchema?: CoValueClass<Acc> & typeof Account;
|
||||
}): Promise<{ worker: Acc }> {
|
||||
const ws = new WebSocket(peer);
|
||||
|
||||
const wsPeer: Peer = {
|
||||
id: "upstream",
|
||||
role: "server",
|
||||
incoming: websocketReadableStream(ws),
|
||||
outgoing: websocketWritableStream(ws),
|
||||
};
|
||||
const wsPeer: Peer = await Effect.runPromise(
|
||||
createWebSocketPeer({
|
||||
id: "upstream",
|
||||
websocket: new WebSocket(peer),
|
||||
role: "server",
|
||||
}),
|
||||
);
|
||||
|
||||
if (!accountID) {
|
||||
throw new Error("No accountID provided");
|
||||
@@ -52,17 +48,17 @@ export async function startWorker<Acc extends Account>({
|
||||
crypto: await WasmCrypto.create(),
|
||||
});
|
||||
|
||||
setInterval(() => {
|
||||
setInterval(async () => {
|
||||
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
|
||||
console.log(new Date(), "Reconnecting to upstream " + peer);
|
||||
const ws = new WebSocket(peer);
|
||||
|
||||
const wsPeer: Peer = {
|
||||
id: "upstream",
|
||||
role: "server",
|
||||
incoming: websocketReadableStream(ws),
|
||||
outgoing: websocketWritableStream(ws),
|
||||
};
|
||||
const wsPeer: Peer = await Effect.runPromise(
|
||||
createWebSocketPeer({
|
||||
id: "upstream",
|
||||
websocket: new WebSocket(peer),
|
||||
role: "server",
|
||||
}),
|
||||
);
|
||||
|
||||
worker._raw.core.node.syncManager.addPeer(wsPeer);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,22 @@
|
||||
# jazz-react
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
- jazz-tools@0.7.14
|
||||
- jazz-browser@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
- jazz-browser@0.7.13
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
{
|
||||
"name": "jazz-react",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"type": "module",
|
||||
"main": "dist/index.js",
|
||||
"types": "src/index.ts",
|
||||
|
||||
@@ -1,5 +1,21 @@
|
||||
# jazz-autosub
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
- jazz-tools@0.7.14
|
||||
- cojson-transport-ws@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Updated dependencies
|
||||
- jazz-tools@0.7.13
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
"bin": "./dist/index.js",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"scripts": {
|
||||
"lint": "eslint . --ext ts,tsx",
|
||||
"format": "prettier --write './src/**/*.{ts,tsx}'",
|
||||
@@ -15,7 +15,7 @@
|
||||
"@effect/platform-node": "^0.49.2",
|
||||
"@effect/schema": "^0.66.16",
|
||||
"cojson": "workspace:*",
|
||||
"cojson-transport-nodejs-ws": "workspace:*",
|
||||
"cojson-transport-ws": "workspace:*",
|
||||
"effect": "^3.1.5",
|
||||
"fast-check": "^3.17.2",
|
||||
"jazz-tools": "workspace:*",
|
||||
|
||||
@@ -2,11 +2,8 @@
|
||||
import { Command, Options } from "@effect/cli";
|
||||
import { NodeContext, NodeRuntime } from "@effect/platform-node";
|
||||
import { Console, Effect } from "effect";
|
||||
import {
|
||||
websocketReadableStream,
|
||||
websocketWritableStream,
|
||||
} from "cojson-transport-nodejs-ws";
|
||||
import { WebSocket } from "ws";
|
||||
import { createWebSocketPeer } from "cojson-transport-ws";
|
||||
import { WebSocket } from "ws"
|
||||
import {
|
||||
Account,
|
||||
WasmCrypto,
|
||||
@@ -24,23 +21,20 @@ const peer = Options.text("peer")
|
||||
const accountCreate = Command.make(
|
||||
"create",
|
||||
{ name, peer },
|
||||
({ name, peer }) => {
|
||||
({ name, peer: peerAddr }) => {
|
||||
return Effect.gen(function* () {
|
||||
const ws = new WebSocket(peer);
|
||||
|
||||
const crypto = yield* Effect.promise(() => WasmCrypto.create());
|
||||
|
||||
const peer = yield* createWebSocketPeer({
|
||||
id: "upstream",
|
||||
websocket: new WebSocket(peerAddr),
|
||||
role: "server",
|
||||
});
|
||||
|
||||
const account: Account = yield* Effect.promise(async () =>
|
||||
Account.create({
|
||||
creationProps: { name },
|
||||
peersToLoadFrom: [
|
||||
{
|
||||
id: "upstream",
|
||||
role: "server",
|
||||
incoming: websocketReadableStream(ws),
|
||||
outgoing: websocketWritableStream(ws),
|
||||
},
|
||||
],
|
||||
peersToLoadFrom: [peer],
|
||||
crypto,
|
||||
}),
|
||||
);
|
||||
@@ -59,7 +53,11 @@ const accountCreate = Command.make(
|
||||
),
|
||||
);
|
||||
|
||||
const ws2 = new WebSocket(peer);
|
||||
const peer2 = yield* createWebSocketPeer({
|
||||
id: "upstream2",
|
||||
websocket: new WebSocket(peerAddr),
|
||||
role: "server",
|
||||
});
|
||||
|
||||
yield* Effect.promise(async () =>
|
||||
Account.become({
|
||||
@@ -68,14 +66,7 @@ const accountCreate = Command.make(
|
||||
sessionID: cojsonInternals.newRandomSessionID(
|
||||
account.id as unknown as AccountID,
|
||||
),
|
||||
peersToLoadFrom: [
|
||||
{
|
||||
id: "upstream",
|
||||
role: "server",
|
||||
incoming: websocketReadableStream(ws2),
|
||||
outgoing: websocketWritableStream(ws2),
|
||||
},
|
||||
],
|
||||
peersToLoadFrom: [peer2],
|
||||
crypto,
|
||||
}),
|
||||
);
|
||||
|
||||
@@ -1,5 +1,19 @@
|
||||
# jazz-autosub
|
||||
|
||||
## 0.7.14
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Use Effect Queues and Streams instead of custom queue implementation
|
||||
- Updated dependencies
|
||||
- cojson@0.7.14
|
||||
|
||||
## 0.7.13
|
||||
|
||||
### Patch Changes
|
||||
|
||||
- Fix CoList.toJSON()
|
||||
|
||||
## 0.7.12
|
||||
|
||||
### Patch Changes
|
||||
|
||||
@@ -5,11 +5,10 @@
|
||||
"types": "./src/index.ts",
|
||||
"type": "module",
|
||||
"license": "MIT",
|
||||
"version": "0.7.12",
|
||||
"version": "0.7.14",
|
||||
"dependencies": {
|
||||
"@effect/schema": "^0.66.16",
|
||||
"cojson": "workspace:*",
|
||||
"cojson-transport-nodejs-ws": "workspace:*",
|
||||
"effect": "^3.1.5",
|
||||
"fast-check": "^3.17.2"
|
||||
},
|
||||
|
||||
@@ -12,7 +12,7 @@ import type {
|
||||
SessionID,
|
||||
} from "cojson";
|
||||
import { Context, Effect, Stream } from "effect";
|
||||
import type {
|
||||
import {
|
||||
CoMap,
|
||||
CoValue,
|
||||
CoValueClass,
|
||||
@@ -61,9 +61,11 @@ export class Account extends CoValueBase implements CoValue {
|
||||
ref: () => Profile,
|
||||
optional: false,
|
||||
} satisfies RefEncoded<Profile>,
|
||||
root: "json" satisfies Schema,
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
} as any;
|
||||
root: {
|
||||
ref: () => CoMap,
|
||||
optional: true,
|
||||
} satisfies RefEncoded<CoMap>,
|
||||
};
|
||||
}
|
||||
|
||||
get _owner(): Account {
|
||||
@@ -214,7 +216,7 @@ export class Account extends CoValueBase implements CoValue {
|
||||
return this.fromNode(node) as A;
|
||||
}
|
||||
|
||||
static createAs<A extends Account>(
|
||||
static async createAs<A extends Account>(
|
||||
this: CoValueClass<A> & typeof Account,
|
||||
as: Account,
|
||||
options: {
|
||||
@@ -222,11 +224,11 @@ export class Account extends CoValueBase implements CoValue {
|
||||
},
|
||||
) {
|
||||
// TODO: is there a cleaner way to do this?
|
||||
const connectedPeers = cojsonInternals.connectedPeers(
|
||||
const connectedPeers = await Effect.runPromise(cojsonInternals.connectedPeers(
|
||||
"creatingAccount",
|
||||
"createdAccount",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
));
|
||||
|
||||
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);
|
||||
|
||||
|
||||
@@ -311,7 +311,7 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
|
||||
.map((e) => encodeSync(itemDescriptor.encoded)(e));
|
||||
} else if (isRefEncoded(itemDescriptor)) {
|
||||
return this.map((item, idx) =>
|
||||
seenAbove?.includes((item as CoValue).id)
|
||||
seenAbove?.includes((item as CoValue)?.id)
|
||||
? { _circular: (item as CoValue).id }
|
||||
: (item as unknown as CoValue)?.toJSON(idx + "", [
|
||||
...(seenAbove || []),
|
||||
|
||||
@@ -157,11 +157,11 @@ describe("CoList resolution", async () => {
|
||||
test("Loading and availability", async () => {
|
||||
const { me, list } = await initNodeAndList();
|
||||
|
||||
const [initialAsPeer, secondPeer] = connectedPeers(
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
@@ -216,11 +216,11 @@ describe("CoList resolution", async () => {
|
||||
test("Subscription & auto-resolution", async () => {
|
||||
const { me, list } = await initNodeAndList();
|
||||
|
||||
const [initialAsPeer, secondPeer] = connectedPeers(
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
|
||||
@@ -253,11 +253,11 @@ describe("CoMap resolution", async () => {
|
||||
|
||||
test("Loading and availability", async () => {
|
||||
const { me, map } = await initNodeAndMap();
|
||||
const [initialAsPeer, secondPeer] = connectedPeers(
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
@@ -323,11 +323,11 @@ describe("CoMap resolution", async () => {
|
||||
test("Subscription & auto-resolution", async () => {
|
||||
const { me, map } = await initNodeAndMap();
|
||||
|
||||
const [initialAsPeer, secondAsPeer] = connectedPeers(
|
||||
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
|
||||
@@ -83,10 +83,11 @@ describe("CoStream resolution", async () => {
|
||||
|
||||
test("Loading and availability", async () => {
|
||||
const { me, stream } = await initNodeAndStream();
|
||||
const [initialAsPeer, secondPeer] = connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(
|
||||
connectedPeers("initial", "second", {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
}),
|
||||
);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
@@ -175,10 +176,11 @@ describe("CoStream resolution", async () => {
|
||||
test("Subscription & auto-resolution", async () => {
|
||||
const { me, stream } = await initNodeAndStream();
|
||||
|
||||
const [initialAsPeer, secondAsPeer] = connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
|
||||
connectedPeers("initial", "second", {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
}),
|
||||
);
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
if (!isControlledAccount(me)) {
|
||||
@@ -325,10 +327,11 @@ describe("BinaryCoStream loading & Subscription", async () => {
|
||||
|
||||
test("Loading and availability", async () => {
|
||||
const { me, stream } = await initNodeAndStream();
|
||||
const [initialAsPeer, secondAsPeer] = connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
|
||||
connectedPeers("initial", "second", {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
}),
|
||||
);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
@@ -357,30 +360,32 @@ describe("BinaryCoStream loading & Subscription", async () => {
|
||||
});
|
||||
|
||||
test("Subscription", async () => {
|
||||
const { me } = await initNodeAndStream();
|
||||
|
||||
const stream = BinaryCoStream.create({ owner: me });
|
||||
|
||||
const [initialAsPeer, secondAsPeer] = connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
const meOnSecondPeer = await Account.become({
|
||||
accountID: me.id,
|
||||
accountSecret: me._raw.agentSecret,
|
||||
peersToLoadFrom: [initialAsPeer],
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
sessionID: newRandomSessionID(me.id as any),
|
||||
crypto: Crypto,
|
||||
});
|
||||
|
||||
await Effect.runPromise(
|
||||
Effect.gen(function* ($) {
|
||||
const { me } = yield* Effect.promise(() => initNodeAndStream());
|
||||
|
||||
const stream = BinaryCoStream.create({ owner: me });
|
||||
|
||||
const [initialAsPeer, secondAsPeer] = yield* connectedPeers(
|
||||
"initial",
|
||||
"second",
|
||||
{ peer1role: "server", peer2role: "client" },
|
||||
);
|
||||
me._raw.core.node.syncManager.addPeer(secondAsPeer);
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
const meOnSecondPeer = yield* Effect.promise(() =>
|
||||
Account.become({
|
||||
accountID: me.id,
|
||||
accountSecret: me._raw.agentSecret,
|
||||
peersToLoadFrom: [initialAsPeer],
|
||||
// eslint-disable-next-line @typescript-eslint/no-explicit-any
|
||||
sessionID: newRandomSessionID(me.id as any),
|
||||
crypto: Crypto,
|
||||
}),
|
||||
);
|
||||
|
||||
const queue = yield* $(Queue.unbounded<BinaryCoStream>());
|
||||
|
||||
BinaryCoStream.subscribe(
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
ID,
|
||||
} from "../index.js";
|
||||
import { newRandomSessionID } from "cojson/src/coValueCore.js";
|
||||
import { Effect } from "effect";
|
||||
|
||||
class TestMap extends CoMap {
|
||||
list = co.ref(TestList);
|
||||
@@ -38,10 +39,10 @@ describe("Deep loading with depth arg", async () => {
|
||||
crypto: Crypto,
|
||||
});
|
||||
|
||||
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers("initial", "second", {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
});
|
||||
}));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
@@ -137,9 +138,8 @@ describe("Deep loading with depth arg", async () => {
|
||||
throw new Error("map4 is undefined");
|
||||
}
|
||||
expect(map4.list[0]?.stream).not.toBe(null);
|
||||
// TODO: we should expect null here, but apparently we don't even have the id/ref?
|
||||
expect(map4.list[0]?.stream?.[me.id]?.value).not.toBeDefined();
|
||||
expect(map4.list[0]?.stream?.byMe?.value).not.toBeDefined();
|
||||
expect(map4.list[0]?.stream?.[me.id]?.value).toBe(null);
|
||||
expect(map4.list[0]?.stream?.byMe?.value).toBe(null);
|
||||
|
||||
const map5 = await TestMap.load(map.id, meOnSecondPeer, {
|
||||
list: [{ stream: [{}] }],
|
||||
@@ -252,10 +252,10 @@ test("Deep loading a record-like coMap", async () => {
|
||||
crypto: Crypto,
|
||||
});
|
||||
|
||||
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
|
||||
const [initialAsPeer, secondPeer] = await Effect.runPromise(connectedPeers("initial", "second", {
|
||||
peer1role: "server",
|
||||
peer2role: "client",
|
||||
});
|
||||
}));
|
||||
if (!isControlledAccount(me)) {
|
||||
throw "me is not a controlled account";
|
||||
}
|
||||
|
||||
9620
pnpm-lock.yaml
generated
9620
pnpm-lock.yaml
generated
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user