Compare commits

..

17 Commits

Author SHA1 Message Date
Anselm
5c4ca9103c Release 2024-08-15 13:36:33 +01:00
Anselm
b4aad92907 Option to not expect pings 2024-08-15 13:36:08 +01:00
Anselm
56d1e095a1 Release 2024-08-15 13:02:39 +01:00
Anselm Eickhoff
6dee9aae49 Merge pull request #281 from gardencmp/anselm-jazz-190
Remove effect from jazz
2024-08-15 11:51:53 +01:00
Anselm Eickhoff
a10bff981e Merge pull request #284 from gardencmp/anselm-jazz-227
Remove effect from peer communication
2024-08-15 11:51:10 +01:00
Anselm Eickhoff
e333f7884a Merge pull request #313 from gardencmp/anselm-jazz-246
Remove effect from jazz-tools and dependent packages
2024-08-15 11:50:44 +01:00
Anselm
8ea7bf237b Remove effect from storage implementation 2024-08-15 11:13:10 +01:00
Anselm
5e8409fa08 Remove rest of effect use 2024-08-14 17:51:10 +01:00
Anselm
23354c1767 Progress on removing effect 2024-08-14 15:24:20 +01:00
Anselm Eickhoff
0efb69d0db Merge pull request #312 from pax-k/JAZZ-252/make-sure-castas-preserves-subscriptionscope
fix: preserve subscriptionScope for castAs in CoList and CoMap
2024-08-14 14:51:54 +01:00
pax-k
0462c4e41b fix: preserve subscriptionScope for castAs in CoList and CoMap 2024-08-14 16:50:12 +03:00
Anselm
70a5673197 More progress 2024-08-13 12:25:15 +01:00
Anselm Eickhoff
9ec3203485 Merge pull request #285 from gdorsi/main
docs: fix the get started links position
2024-08-12 17:46:41 +01:00
Guido D'Orsi
1a46f9b2e1 docs: fix the get started links position 2024-08-10 16:17:18 +02:00
Anselm
77bb26a8d7 Only use queable in cojson, rework tests 2024-08-09 16:44:50 +01:00
Anselm
2a36dcf592 WIP switch to queueable 2024-08-09 13:59:26 +01:00
Anselm
fc2bcadbe2 Remove effect schema from jazz schema 2024-08-08 18:18:17 +01:00
58 changed files with 2131 additions and 2217 deletions

View File

@@ -1,5 +1,20 @@
# jazz-example-chat
## 0.0.74
### Patch Changes
- jazz-react@0.7.27
## 0.0.73
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- jazz-react@0.7.26
- jazz-tools@0.7.26
## 0.0.72
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-chat",
"private": true,
"version": "0.0.72",
"version": "0.0.74",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -3,6 +3,7 @@ import { createJazzReactContext, DemoAuth } from "jazz-react";
import { createRoot } from "react-dom/client";
import { useIframeHashRouter } from "hash-slash";
import { ChatScreen } from "./chatScreen.tsx";
import { StrictMode } from "react";
export class Message extends CoMap {
text = co.string;
@@ -39,4 +40,4 @@ function App() {
}
createRoot(document.getElementById("root")!)
.render(<Jazz.Provider><App/></Jazz.Provider>);
.render(<StrictMode><Jazz.Provider><App/></Jazz.Provider></StrictMode>);

View File

@@ -1,5 +1,20 @@
# jazz-example-chat
## 0.0.53
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.0.52
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
## 0.0.51
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-inspector",
"private": true,
"version": "0.0.51",
"version": "0.0.53",
"type": "module",
"scripts": {
"dev": "vite",
@@ -19,7 +19,6 @@
"hash-slash": "workspace:*",
"cojson": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -10,7 +10,6 @@ import {
WasmCrypto,
} from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import { Effect } from "effect";
import { Trash2 } from "lucide-react";
import { Breadcrumbs } from "./breadcrumbs";
import { usePagePath } from "./use-page-path";
@@ -62,13 +61,11 @@ export default function CoJsonViewerApp() {
}
WasmCrypto.create().then(async (crypto) => {
const wsPeer = await Effect.runPromise(
createWebSocketPeer({
id: "mesh",
websocket: new WebSocket("wss://mesh.jazz.tools"),
role: "server",
}),
);
const wsPeer = createWebSocketPeer({
id: "mesh",
websocket: new WebSocket("wss://mesh.jazz.tools"),
role: "server",
});
const node = await LocalNode.withLoadedAccount({
accountID: currentAccount.id,
accountSecret: currentAccount.secret,

View File

@@ -1,5 +1,21 @@
# jazz-example-pets
## 0.0.92
### Patch Changes
- jazz-browser-media-images@0.7.27
- jazz-react@0.7.27
## 0.0.91
### Patch Changes
- Updated dependencies
- jazz-react@0.7.26
- jazz-tools@0.7.26
- jazz-browser-media-images@0.7.26
## 0.0.90
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-pets",
"private": true,
"version": "0.0.90",
"version": "0.0.92",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,5 +1,19 @@
# jazz-example-todo
## 0.0.91
### Patch Changes
- jazz-react@0.7.27
## 0.0.90
### Patch Changes
- Updated dependencies
- jazz-react@0.7.26
- jazz-tools@0.7.26
## 0.0.89
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-todo",
"private": true,
"version": "0.0.89",
"version": "0.0.91",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -268,11 +268,7 @@ Jazz Mesh is currently free &mdash; and it's set up as the default sync & storag
## Get Started
- <Link href="/docs" target="_blank">
Read the docs
</Link>
- <Link href="https://discord.gg/utDMjHYg42" target="_blank">
Join our Discord
</Link>
- <Link href="/docs" target="_blank">Read the docs</Link>
- <Link href="https://discord.gg/utDMjHYg42" target="_blank">Join our Discord</Link>
</Prose>

View File

@@ -1,5 +1,13 @@
# cojson-storage-indexeddb
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,13 +1,12 @@
{
"name": "cojson-storage-indexeddb",
"version": "0.7.23",
"version": "0.7.26",
"main": "dist/index.js",
"type": "module",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -10,7 +10,6 @@ import {
OutgoingSyncQueue,
} from "cojson";
import { SyncPromise } from "./syncPromises.js";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -53,11 +52,15 @@ export class IDBStorage {
this.db = db;
this.toLocalNode = toLocalNode;
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
const processMessages = async () => {
for await (const msg of fromLocalNode) {
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.handleSyncMessage(msg);
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
@@ -68,9 +71,13 @@ export class IDBStorage {
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in IndexedDB", e),
);
}
@@ -82,25 +89,18 @@ export class IDBStorage {
localNodeName: "local",
},
): Promise<Peer> {
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
yield* Effect.promise(() =>
IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
}),
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
return { ...storageAsPeer, priority: 100 };
}
static async open(
@@ -392,35 +392,40 @@ export class IDBStorage {
),
).then(() => {
// we're done with IndexedDB stuff here so can use native Promises again
setTimeout(() =>
Effect.runPromise(
Effect.gen(this, function* () {
yield* Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
asDependencyOf,
});
setTimeout(() => {
this.toLocalNode
.push({
action: "known",
...ourKnown,
asDependencyOf,
})
.catch((e) =>
console.error(
"Error sending known state",
e,
),
);
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new)
.length > 0,
);
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new).length > 0,
);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
yield* Queue.offer(
this.toLocalNode,
piece,
);
yield* Effect.yieldNow();
}
}),
),
);
for (const piece of nonEmptyNewContentPieces) {
this.toLocalNode
.push(piece)
.catch((e) =>
console.error(
"Error sending new content piece",
e,
),
);
}
});
return Promise.resolve();
});
@@ -445,16 +450,18 @@ export class IDBStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
throw new Error("Expected to be sent header first");
})
.catch((e) =>
console.error("Error sending known state", e),
);
return SyncPromise.resolve();
}
return this.makeRequest<IDBValidKey>(({ coValues }) =>
@@ -515,13 +522,18 @@ export class IDBStorage {
),
).then(() => {
if (invalidAssumptions) {
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
})
.catch((e) =>
console.error(
"Error sending known state",
e,
),
);
}
});
});

View File

@@ -1,5 +1,13 @@
# cojson-storage-sqlite
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,14 +1,13 @@
{
"name": "cojson-storage-sqlite",
"type": "module",
"version": "0.7.23",
"version": "0.7.26",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"better-sqlite3": "^8.5.2",
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -11,7 +11,6 @@ import {
} from "cojson";
import Database, { Database as DatabaseT } from "better-sqlite3";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -54,11 +53,15 @@ export class SQLiteStorage {
this.db = db;
this.toLocalNode = toLocalNode;
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
const processMessages = async () => {
for await (const msg of fromLocalNode) {
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.handleSyncMessage(msg);
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
@@ -69,9 +72,13 @@ export class SQLiteStorage {
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in sqlite", e),
);
}
@@ -84,26 +91,19 @@ export class SQLiteStorage {
trace?: boolean;
localNodeName?: string;
}): Promise<Peer> {
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
yield* Effect.promise(() =>
SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
}),
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
return { ...storageAsPeer, priority: 100 };
}
static async open(
@@ -441,13 +441,13 @@ export class SQLiteStorage {
);
}
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
asDependencyOf,
}),
);
})
.catch((e) => console.error("Error while pushing known", e));
const nonEmptyNewContentPieces = newContentPieces.filter(
(piece) => piece.header || Object.keys(piece.new).length > 0,
@@ -456,7 +456,11 @@ export class SQLiteStorage {
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await Effect.runPromise(Queue.offer(this.toLocalNode, piece));
this.toLocalNode
.push(piece)
.catch((e) =>
console.error("Error while pushing content piece", e),
);
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
@@ -478,15 +482,17 @@ export class SQLiteStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
})
.catch((e) =>
console.error("Error while pushing known", e),
);
return;
}
@@ -618,13 +624,13 @@ export class SQLiteStorage {
})();
if (invalidAssumptions) {
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
})
.catch((e) => console.error("Error while pushing known", e));
}
}

View File

@@ -1,5 +1,19 @@
# cojson-transport-nodejs-ws
## 0.7.27
### Patch Changes
- Option to not expect pings
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,13 +1,12 @@
{
"name": "cojson-transport-ws",
"type": "module",
"version": "0.7.23",
"version": "0.7.27",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,5 +1,10 @@
import { DisconnectedError, Peer, PingTimeoutError, SyncMessage } from "cojson";
import { Stream, Queue, Effect, Console } from "effect";
import {
DisconnectedError,
Peer,
PingTimeoutError,
SyncMessage,
cojsonInternals,
} from "cojson";
interface WebsocketEvents {
close: { code: number; reason: string };
@@ -15,6 +20,7 @@ interface AnyWebSocket {
addEventListener<K extends keyof WebsocketEvents>(
type: K,
listener: (event: WebsocketEvents[K]) => void,
options?: { once: boolean },
): void;
removeEventListener<K extends keyof WebsocketEvents>(
type: K,
@@ -22,6 +28,7 @@ interface AnyWebSocket {
): void;
close(): void;
send(data: string): void;
readyState: number;
}
const g: typeof globalThis & {
@@ -32,88 +39,80 @@ const g: typeof globalThis & {
}[];
} = globalThis;
export function createWebSocketPeer(options: {
export function createWebSocketPeer({
id,
websocket,
role,
expectPings = true,
}: {
id: string;
websocket: AnyWebSocket;
role: Peer["role"];
}): Effect.Effect<Peer> {
return Effect.gen(function* () {
const ws = options.websocket;
const ws_ = ws as unknown as Stream.EventListener<WebsocketEvents["message"]>;
expectPings?: boolean;
}): Peer {
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
const outgoing = yield* Queue.unbounded<SyncMessage>();
const closed = once(ws, "close").pipe(
Effect.flatMap(
(event) =>
new DisconnectedError({
message: `${event.code}: ${event.reason}`,
}),
),
Stream.fromEffect,
);
const isSyncMessage = (msg: unknown): msg is SyncMessage => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((msg as any)?.type === "ping") {
const ping = msg as PingMsg;
g.jazzPings ||= [];
g.jazzPings.push({
received: Date.now(),
sent: ping.time,
dc: ping.dc,
});
return false;
}
return true;
};
yield* Effect.forkDaemon(Effect.gen(function* () {
yield* once(ws, "open");
yield* Queue.take(outgoing).pipe(
Effect.andThen((message) => ws.send(JSON.stringify(message))),
Effect.forever,
websocket.addEventListener("close", function handleClose() {
incoming
.push("Disconnected")
.catch((e) =>
console.error("Error while pushing disconnect msg", e),
);
}));
type E = WebsocketEvents["message"];
const messages = Stream.fromEventListener<E>(ws_, "message").pipe(
Stream.timeoutFail(() => new PingTimeoutError(), "10 seconds"),
Stream.tapError((_e) =>
Console.warn("Ping timeout").pipe(
Effect.andThen(Effect.try(() => ws.close())),
Effect.catchAll((e) =>
Console.error(
"Error while trying to close ws on ping timeout",
e,
),
),
),
),
Stream.mergeLeft(closed),
Stream.map((_) => JSON.parse(_.data as string)),
Stream.filter(isSyncMessage),
Stream.buffer({ capacity: "unbounded" }),
Stream.onDone(() => Queue.shutdown(outgoing)),
);
return {
id: options.id,
incoming: messages,
outgoing,
role: options.role,
};
});
let pingTimeout: ReturnType<typeof setTimeout> | null = null;
websocket.addEventListener("message", function handleIncomingMsg(event) {
const msg = JSON.parse(event.data as string);
pingTimeout && clearTimeout(pingTimeout);
if (msg?.type === "ping") {
const ping = msg as PingMsg;
g.jazzPings ||= [];
g.jazzPings.push({
received: Date.now(),
sent: ping.time,
dc: ping.dc,
});
} else {
incoming
.push(msg)
.catch((e) =>
console.error("Error while pushing incoming msg", e),
);
}
if (expectPings) {
pingTimeout = setTimeout(() => {
incoming
.push("PingTimeout")
.catch((e) =>
console.error("Error while pushing ping timeout", e),
);
}, 10_000);
}
});
const websocketOpen = new Promise<void>((resolve) => {
websocket.addEventListener("open", resolve, { once: true });
});
return {
id,
incoming,
outgoing: {
async push(msg) {
await websocketOpen;
if (websocket.readyState === 1) {
websocket.send(JSON.stringify(msg));
}
},
close() {
if (websocket.readyState === 1) {
websocket.close();
}
},
},
role,
};
}
const once = <Event extends keyof WebsocketEvents>(
ws: AnyWebSocket,
event: Event,
) =>
Effect.async<WebsocketEvents[Event]>((register) => {
const cb = (msg: WebsocketEvents[Event]) => {
ws.removeEventListener(event, cb);
register(Effect.succeed(msg));
};
ws.addEventListener(event, cb);
});

View File

@@ -1,5 +1,11 @@
# cojson
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
## 0.7.23
### Patch Changes

View File

@@ -5,7 +5,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.23",
"version": "0.7.26",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",
@@ -18,12 +18,12 @@
},
"dependencies": {
"@hazae41/berith": "^1.2.6",
"@noble/curves": "^1.3.0",
"@noble/ciphers": "^0.1.3",
"@noble/curves": "^1.3.0",
"@noble/hashes": "^1.4.0",
"@scure/base": "^1.1.1",
"effect": "^3.5.2",
"hash-wasm": "^4.9.0"
"hash-wasm": "^4.9.0",
"queueable": "^5.3.2"
},
"scripts": {
"dev": "tsc --watch --sourceMap --outDir dist",

View File

@@ -18,7 +18,7 @@ import {
} from "./crypto/crypto.js";
import { WasmCrypto } from "./crypto/WasmCrypto.js";
import { PureJSCrypto } from "./crypto/PureJSCrypto.js";
import { connectedPeers } from "./streamUtils.js";
import { connectedPeers, Channel } from "./streamUtils.js";
import { ControlledAgent, RawControlledAccount } from "./coValues/account.js";
import type { Role } from "./permissions.js";
import { rawCoIDtoBytes, rawCoIDfromBytes, isRawCoID } from "./ids.js";
@@ -59,12 +59,7 @@ import type * as Media from "./media.js";
type Value = JsonValue | AnyRawCoValue;
import {
LSMStorage,
FSErr,
BlockFilename,
WalFilename,
} from "./storage/index.js";
import { LSMStorage, BlockFilename, WalFilename } from "./storage/index.js";
import { FileSystem } from "./storage/FileSystem.js";
/** @hidden */
@@ -84,6 +79,7 @@ export const cojsonInternals = {
accountHeaderForInitialAgentSecret,
idforHeader,
StreamingHash,
Channel,
};
export {
@@ -123,18 +119,17 @@ export {
SyncMessage,
isRawCoID,
LSMStorage,
DisconnectedError,
PingTimeoutError,
};
export type {
Value,
FileSystem,
FSErr,
BlockFilename,
WalFilename,
IncomingSyncStream,
OutgoingSyncQueue,
DisconnectedError,
PingTimeoutError,
};
// eslint-disable-next-line @typescript-eslint/no-namespace

View File

@@ -670,6 +670,10 @@ export class LocalNode {
return newNode;
}
gracefulShutdown() {
this.syncManager.gracefulShutdown();
}
}
/** @internal */

View File

@@ -1,4 +1,3 @@
import { Effect } from "effect";
import { CoValueChunk } from "./index.js";
import { RawCoID } from "../ids.js";
import { CryptoProvider, StreamingHash } from "../crypto/crypto.js";
@@ -11,150 +10,124 @@ export type WalEntry = { id: RawCoID } & CoValueChunk;
export type WalFilename = `wal-${number}.jsonl`;
export type FSErr = {
type: "fileSystemError";
error: Error;
};
export interface FileSystem<WriteHandle, ReadHandle> {
crypto: CryptoProvider;
createFile(filename: string): Effect.Effect<WriteHandle, FSErr>;
append(handle: WriteHandle, data: Uint8Array): Effect.Effect<void, FSErr>;
close(handle: ReadHandle | WriteHandle): Effect.Effect<void, FSErr>;
closeAndRename(
handle: WriteHandle,
filename: BlockFilename,
): Effect.Effect<void, FSErr>;
openToRead(
filename: string,
): Effect.Effect<{ handle: ReadHandle; size: number }, FSErr>;
createFile(filename: string): Promise<WriteHandle>;
append(handle: WriteHandle, data: Uint8Array): Promise<void>;
close(handle: ReadHandle | WriteHandle): Promise<void>;
closeAndRename(handle: WriteHandle, filename: BlockFilename): Promise<void>;
openToRead(filename: string): Promise<{ handle: ReadHandle; size: number }>;
read(
handle: ReadHandle,
offset: number,
length: number,
): Effect.Effect<Uint8Array, FSErr>;
listFiles(): Effect.Effect<string[], FSErr>;
removeFile(
filename: BlockFilename | WalFilename,
): Effect.Effect<void, FSErr>;
): Promise<Uint8Array>;
listFiles(): Promise<string[]>;
removeFile(filename: BlockFilename | WalFilename): Promise<void>;
}
export const textEncoder = new TextEncoder();
export const textDecoder = new TextDecoder();
export function readChunk<RH, FS extends FileSystem<unknown, RH>>(
export async function readChunk<RH, FS extends FileSystem<unknown, RH>>(
handle: RH,
header: { start: number; length: number },
fs: FS,
): Effect.Effect<CoValueChunk, FSErr> {
return Effect.gen(function* ($) {
const chunkBytes = yield* $(
fs.read(handle, header.start, header.length),
);
): Promise<CoValueChunk> {
const chunkBytes = await fs.read(handle, header.start, header.length);
const chunk = JSON.parse(textDecoder.decode(chunkBytes));
return chunk;
});
const chunk = JSON.parse(textDecoder.decode(chunkBytes));
return chunk;
}
export function readHeader<RH, FS extends FileSystem<unknown, RH>>(
export async function readHeader<RH, FS extends FileSystem<unknown, RH>>(
filename: string,
handle: RH,
size: number,
fs: FS,
): Effect.Effect<BlockHeader, FSErr> {
return Effect.gen(function* ($) {
const headerLength = Number(filename.match(/-H(\d+)\.jsonl$/)![1]!);
): Promise<BlockHeader> {
const headerLength = Number(filename.match(/-H(\d+)\.jsonl$/)![1]!);
const headerBytes = yield* $(
fs.read(handle, size - headerLength, headerLength),
);
const headerBytes = await fs.read(
handle,
size - headerLength,
headerLength,
);
const header = JSON.parse(textDecoder.decode(headerBytes));
return header;
});
const header = JSON.parse(textDecoder.decode(headerBytes));
return header;
}
export function writeBlock<WH, RH, FS extends FileSystem<WH, RH>>(
export async function writeBlock<WH, RH, FS extends FileSystem<WH, RH>>(
chunks: Map<RawCoID, CoValueChunk>,
level: number,
blockNumber: number,
fs: FS,
): Effect.Effect<BlockFilename, FSErr> {
): Promise<BlockFilename> {
if (chunks.size === 0) {
return Effect.die(new Error("No chunks to write"));
throw new Error("No chunks to write");
}
return Effect.gen(function* ($) {
const blockHeader: BlockHeader = [];
const blockHeader: BlockHeader = [];
let offset = 0;
let offset = 0;
const file = yield* $(
fs.createFile(
"wipBlock" +
Math.random().toString(36).substring(7) +
".tmp.jsonl",
),
);
const hash = new StreamingHash(fs.crypto);
const file = await fs.createFile(
"wipBlock" + Math.random().toString(36).substring(7) + ".tmp.jsonl",
);
const hash = new StreamingHash(fs.crypto);
const chunksSortedById = Array.from(chunks).sort(([id1], [id2]) =>
id1.localeCompare(id2),
);
const chunksSortedById = Array.from(chunks).sort(([id1], [id2]) =>
id1.localeCompare(id2),
);
for (const [id, chunk] of chunksSortedById) {
const encodedBytes = hash.update(chunk);
const encodedBytesWithNewline = new Uint8Array(
encodedBytes.length + 1,
);
encodedBytesWithNewline.set(encodedBytes);
encodedBytesWithNewline[encodedBytes.length] = 10;
yield* $(fs.append(file, encodedBytesWithNewline));
const length = encodedBytesWithNewline.length;
blockHeader.push({ id, start: offset, length });
offset += length;
}
for (const [id, chunk] of chunksSortedById) {
const encodedBytes = hash.update(chunk);
const encodedBytesWithNewline = new Uint8Array(encodedBytes.length + 1);
encodedBytesWithNewline.set(encodedBytes);
encodedBytesWithNewline[encodedBytes.length] = 10;
await fs.append(file, encodedBytesWithNewline);
const length = encodedBytesWithNewline.length;
blockHeader.push({ id, start: offset, length });
offset += length;
}
const headerBytes = textEncoder.encode(JSON.stringify(blockHeader));
yield* $(fs.append(file, headerBytes));
const headerBytes = textEncoder.encode(JSON.stringify(blockHeader));
await fs.append(file, headerBytes);
// console.log(
// "full file",
// yield* $(
// fs.read(file as unknown as RH, 0, offset + headerBytes.length),
// ),
// );
// console.log(
// "full file",
// yield* $(
// fs.read(file as unknown as RH, 0, offset + headerBytes.length),
// ),
// );
const filename: BlockFilename = `L${level}-${(
blockNumber + ""
).padStart(3, "0")}-${hash
.digest()
.replace("hash_", "")
.slice(0, 15)}-H${headerBytes.length}.jsonl`;
// console.log("renaming to" + filename);
yield* $(fs.closeAndRename(file, filename));
const filename: BlockFilename = `L${level}-${(blockNumber + "").padStart(
3,
"0",
)}-${hash.digest().replace("hash_", "").slice(0, 15)}-H${
headerBytes.length
}.jsonl`;
// console.log("renaming to" + filename);
await fs.closeAndRename(file, filename);
return filename;
return filename;
// console.log("Wrote block", filename, blockHeader);
// console.log("IDs in block", blockHeader.map(e => e.id));
});
// console.log("Wrote block", filename, blockHeader);
// console.log("IDs in block", blockHeader.map(e => e.id));
}
export function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
export async function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
handle: WH,
fs: FS,
id: RawCoID,
chunk: CoValueChunk,
): Effect.Effect<void, FSErr> {
return Effect.gen(function* ($) {
const walEntry: WalEntry = {
id,
...chunk,
};
const bytes = textEncoder.encode(JSON.stringify(walEntry) + "\n");
console.log("writing to WAL", handle, id, bytes.length);
yield* $(fs.append(handle, bytes));
});
) {
const walEntry: WalEntry = {
id,
...chunk,
};
const bytes = textEncoder.encode(JSON.stringify(walEntry) + "\n");
console.log("writing to WAL", handle, id, bytes.length);
return fs.append(handle, bytes);
}

View File

@@ -1,4 +1,3 @@
import { Either } from "effect";
import { RawCoID, SessionID } from "../ids.js";
import { MAX_RECOMMENDED_TX_SIZE } from "../index.js";
import { CoValueKnownState, NewContentMessage } from "../sync.js";
@@ -80,7 +79,7 @@ export function chunkToKnownState(id: RawCoID, chunk: CoValueChunk) {
export function mergeChunks(
chunkA: CoValueChunk,
chunkB: CoValueChunk,
): Either.Either<"nonContigous", CoValueChunk> {
): "nonContigous" | CoValueChunk {
const header = chunkA.header || chunkB.header;
const newSessions = { ...chunkA.sessionEntries };
@@ -133,9 +132,9 @@ export function mergeChunks(
}
newSessions[sessionID] = newEntries;
} else {
return Either.right("nonContigous" as const);
return "nonContigous" as const;
}
}
return Either.left({ header, sessionEntries: newSessions });
return { header, sessionEntries: newSessions };
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
import { Console, Effect, Queue, Stream } from "effect";
import { Peer, PeerID, SyncMessage } from "./sync.js";
import { Channel } from "queueable";
export { Channel } from "queueable";
export function connectedPeers(
peer1id: PeerID,
@@ -13,60 +14,57 @@ export function connectedPeers(
peer1role?: Peer["role"];
peer2role?: Peer["role"];
} = {},
): Effect.Effect<[Peer, Peer]> {
return Effect.gen(function* () {
const [from1to2Rx, from1to2Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
);
const [from2to1Rx, from2to1Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
);
): [Peer, Peer] {
const [from1to2Rx, from1to2Tx] = newQueuePair(
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
);
const [from2to1Rx, from2to1Tx] = newQueuePair(
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
);
const peer2AsPeer: Peer = {
id: peer2id,
incoming: from2to1Rx,
outgoing: from1to2Tx,
role: peer2role,
};
const peer2AsPeer: Peer = {
id: peer2id,
incoming: from2to1Rx,
outgoing: from1to2Tx,
role: peer2role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: from1to2Rx,
outgoing: from2to1Tx,
role: peer1role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: from1to2Rx,
outgoing: from2to1Tx,
role: peer1role,
};
return [peer1AsPeer, peer2AsPeer];
});
return [peer1AsPeer, peer2AsPeer];
}
export function newQueuePair(
options: { traceAs?: string } = {},
): Effect.Effect<[Stream.Stream<SyncMessage>, Queue.Enqueue<SyncMessage>]> {
return Effect.gen(function* () {
const queue = yield* Queue.unbounded<SyncMessage>();
): [AsyncIterable<SyncMessage>, Channel<SyncMessage>] {
const channel = new Channel<SyncMessage>();
if (options.traceAs) {
return [
Stream.fromQueue(queue).pipe(
Stream.tap((msg) =>
Console.debug(
options.traceAs,
JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
if (options.traceAs) {
return [
(async function* () {
for await (const msg of channel) {
console.debug(
options.traceAs,
JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
),
),
queue,
];
} else {
return [Stream.fromQueue(queue), queue];
}
});
);
yield msg;
}
})(),
channel,
];
} else {
return [channel.wrap(), channel];
}
}

View File

@@ -3,7 +3,6 @@ import { CoValueHeader, Transaction } from "./coValueCore.js";
import { CoValueCore } from "./coValueCore.js";
import { LocalNode, newLoadingState } from "./localNode.js";
import { RawCoID, SessionID } from "./ids.js";
import { Data, Effect, Queue, Stream } from "effect";
export type CoValueKnownState = {
id: RawCoID;
@@ -56,19 +55,17 @@ export type DoneMessage = {
export type PeerID = string;
export class DisconnectedError extends Data.TaggedError("DisconnectedError")<{
message: string;
}> {}
export type DisconnectedError = "Disconnected";
export class PingTimeoutError extends Error {
readonly _tag = "PingTimeoutError";
}
export type PingTimeoutError = "PingTimeout";
export type IncomingSyncStream = Stream.Stream<
SyncMessage,
DisconnectedError | PingTimeoutError
export type IncomingSyncStream = AsyncIterable<
SyncMessage | DisconnectedError | PingTimeoutError
>;
export type OutgoingSyncQueue = Queue.Enqueue<SyncMessage>;
export type OutgoingSyncQueue = {
push: (msg: SyncMessage) => Promise<unknown>;
close: () => void;
};
export interface Peer {
id: PeerID;
@@ -144,15 +141,11 @@ export class SyncManager {
for (const peer of eligiblePeers) {
// console.log("loading", id, "from", peer.id);
Effect.runPromise(
Queue.offer(peer.outgoing, {
action: "load",
id: id,
header: false,
sessions: {},
}),
).catch((e) => {
console.error("Error writing to peer", e);
await peer.outgoing.push({
action: "load",
id: id,
header: false,
sessions: {},
});
const coValueEntry = this.local.coValues[id];
@@ -229,11 +222,13 @@ export class SyncManager {
}
if (entry.state === "loading") {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "load",
id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending load", e);
});
return;
}
@@ -246,9 +241,11 @@ export class SyncManager {
if (!peer.toldKnownState.has(id)) {
peer.toldKnownState.add(id);
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "load",
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending load", e);
});
}
}
@@ -273,10 +270,12 @@ export class SyncManager {
);
if (!peer.toldKnownState.has(id)) {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
asDependencyOf,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state", e);
});
peer.toldKnownState.add(id);
@@ -311,7 +310,9 @@ export class SyncManager {
// } header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
await this.trySendToPeer(peer, piece);
this.trySendToPeer(peer, piece).catch((e) => {
console.error("Error sending content piece", e);
});
if (performance.now() - lastYield > 10) {
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
@@ -365,55 +366,39 @@ export class SyncManager {
void initialSync();
}
void Effect.runPromise(
peerState.incoming.pipe(
Stream.ensuring(
Effect.sync(() => {
console.log("Peer disconnected:", peer.id);
delete this.peers[peer.id];
}),
),
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg, peerState),
catch: (e) =>
new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" ||
k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
}).pipe(
Effect.timeoutFail({
duration: 10000,
onTimeout: () =>
new Error("Took >10s to process message"),
}),
),
),
Effect.catchAll((e) =>
Effect.logError(
"Error in peer",
peer.id,
e.message,
typeof e.cause === "object" &&
e.cause instanceof Error &&
e.cause.message,
),
),
),
);
const processMessages = async () => {
for await (const msg of peerState.incoming) {
if (msg === "Disconnected") {
return;
}
if (msg === "PingTimeout") {
console.error("Ping timeout from peer", peer.id);
return;
}
try {
await this.handleSyncMessage(msg, peerState);
} catch (e) {
throw new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
);
}
}
};
processMessages().catch((e) => {
console.error("Error processing messages from peer", peer.id, e);
});
}
trySendToPeer(peer: PeerState, msg: SyncMessage) {
return Effect.runPromise(Queue.offer(peer.outgoing, msg));
return peer.outgoing.push(msg);
}
async handleLoad(msg: LoadMessage, peer: PeerState) {
@@ -439,7 +424,7 @@ export class SyncManager {
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
console.error("Error sending known state", e);
});
}
return;
@@ -470,11 +455,13 @@ export class SyncManager {
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
peer.toldKnownState.add(msg.id);
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
});
return;
@@ -684,10 +671,12 @@ export class SyncManager {
await this.syncCoValue(coValue);
if (invalidStateAssumed) {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
isCorrection: true,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state correction", e);
});
}
}
@@ -755,6 +744,12 @@ export class SyncManager {
}
}
}
gracefulShutdown() {
for (const peer of Object.values(this.peers)) {
peer.outgoing.close();
}
}
}
function knownStateIn(msg: LoadMessage | KnownStateMessage) {

View File

@@ -3,7 +3,6 @@ import { newRandomSessionID } from "../coValueCore.js";
import { LocalNode } from "../localNode.js";
import { connectedPeers } from "../streamUtils.js";
import { WasmCrypto } from "../crypto/WasmCrypto.js";
import { Effect } from "effect";
const Crypto = await WasmCrypto.create();
@@ -53,13 +52,11 @@ test("Can create account with one node, and then load it on another", async () =
map.set("foo", "bar", "private");
expect(map.get("foo")).toEqual("bar");
const [node1asPeer, node2asPeer] = await Effect.runPromise(
connectedPeers("node1", "node2", {
trace: true,
peer1role: "server",
peer2role: "client",
}),
);
const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {
trace: true,
peer1role: "server",
peer2role: "client",
})
console.log("After connected peers");

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,19 @@
# jazz-browser-media-images
## 0.7.27
### Patch Changes
- jazz-browser@0.7.27
## 0.7.26
### Patch Changes
- Updated dependencies
- jazz-browser@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser-media-images",
"version": "0.7.25",
"version": "0.7.27",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,23 @@
# jazz-browser
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- cojson-storage-indexeddb@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser",
"version": "0.7.25",
"version": "0.7.27",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",
@@ -10,7 +10,6 @@
"cojson": "workspace:*",
"cojson-storage-indexeddb": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"jazz-tools": "workspace:*",
"typescript": "^5.1.6"
},

View File

@@ -1,13 +1,12 @@
import {
BlockFilename,
FSErr,
FileSystem,
WalFilename,
CryptoProvider,
} from "cojson";
import { Effect } from "effect";
import { BlockFilename, FileSystem, WalFilename, CryptoProvider } from "cojson";
export class OPFSFilesystem implements FileSystem<{id: number, filename: string}, {id: number, filename: string}> {
export class OPFSFilesystem
implements
FileSystem<
{ id: number; filename: string },
{ id: number; filename: string }
>
{
opfsWorker: Worker;
callbacks: Map<number, (event: MessageEvent) => void> = new Map();
nextRequestId = 0;
@@ -28,8 +27,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
};
}
listFiles(): Effect.Effect<string[], FSErr, never> {
return Effect.async((cb) => {
listFiles(): Promise<string[]> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("listFiles" + requestId + "_listFiles");
this.callbacks.set(requestId, (event) => {
@@ -39,7 +38,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"listFiles" + requestId + "_listFiles",
"listFilesEnd" + requestId + "_listFiles",
);
cb(Effect.succeed(event.data.fileNames));
resolve(event.data.fileNames);
});
this.opfsWorker.postMessage({ type: "listFiles", requestId });
});
@@ -47,17 +46,15 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
openToRead(
filename: string,
): Effect.Effect<{ handle: {id: number, filename: string}; size: number }, FSErr, never> {
return Effect.async((cb) => {
): Promise<{ handle: { id: number; filename: string }; size: number }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("openToRead" + "_" + filename);
this.callbacks.set(requestId, (event) => {
cb(
Effect.succeed({
handle: {id: event.data.handle, filename},
size: event.data.size,
}),
);
resolve({
handle: { id: event.data.handle, filename },
size: event.data.size,
});
performance.mark("openToReadEnd" + "_" + filename);
performance.measure(
"openToRead" + "_" + filename,
@@ -73,8 +70,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
createFile(filename: string): Effect.Effect<{id: number, filename: string}, FSErr, never> {
return Effect.async((cb) => {
createFile(filename: string): Promise<{ id: number; filename: string }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("createFile" + "_" + filename);
this.callbacks.set(requestId, (event) => {
@@ -84,7 +81,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"createFile" + "_" + filename,
"createFileEnd" + "_" + filename,
);
cb(Effect.succeed({id: event.data.handle, filename}));
resolve({ id: event.data.handle, filename });
});
this.opfsWorker.postMessage({
type: "createFile",
@@ -94,10 +91,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
openToWrite(
filename: string,
): Effect.Effect<{id: number, filename: string}, FSErr, never> {
return Effect.async((cb) => {
openToWrite(filename: string): Promise<{ id: number; filename: string }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("openToWrite" + "_" + filename);
this.callbacks.set(requestId, (event) => {
@@ -107,7 +102,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"openToWrite" + "_" + filename,
"openToWriteEnd" + "_" + filename,
);
cb(Effect.succeed({id: event.data.handle, filename}));
resolve({ id: event.data.handle, filename });
});
this.opfsWorker.postMessage({
type: "openToWrite",
@@ -118,10 +113,10 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
append(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
data: Uint8Array,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("append" + "_" + handle.filename);
this.callbacks.set(requestId, (_) => {
@@ -131,7 +126,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"append" + "_" + handle.filename,
"appendEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "append",
@@ -143,11 +138,11 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
read(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
offset: number,
length: number,
): Effect.Effect<Uint8Array, FSErr, never> {
return Effect.async((cb) => {
): Promise<Uint8Array> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("read" + "_" + handle.filename);
this.callbacks.set(requestId, (event) => {
@@ -157,7 +152,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"read" + "_" + handle.filename,
"readEnd" + "_" + handle.filename,
);
cb(Effect.succeed(event.data.data));
resolve(event.data.data);
});
this.opfsWorker.postMessage({
type: "read",
@@ -169,8 +164,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
close(handle: {id: number, filename: string}): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
close(handle: { id: number; filename: string }): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("close" + "_" + handle.filename);
this.callbacks.set(requestId, (_) => {
@@ -180,7 +175,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"close" + "_" + handle.filename,
"closeEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "close",
@@ -191,22 +186,20 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
closeAndRename(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
filename: BlockFilename,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("closeAndRename" + "_" + handle.filename);
this.callbacks.set(requestId, () => {
performance.mark(
"closeAndRenameEnd" + "_" + handle.filename,
);
performance.mark("closeAndRenameEnd" + "_" + handle.filename);
performance.measure(
"closeAndRename" + "_" + handle.filename,
"closeAndRename" + "_" + handle.filename,
"closeAndRenameEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "closeAndRename",
@@ -217,10 +210,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
removeFile(
filename: BlockFilename | WalFilename,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
removeFile(filename: BlockFilename | WalFilename): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("removeFile" + "_" + filename);
this.callbacks.set(requestId, () => {
@@ -230,7 +221,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"removeFile" + "_" + filename,
"removeFileEnd" + "_" + filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "removeFile",

View File

@@ -14,7 +14,6 @@ import { AccountID, LSMStorage } from "cojson";
import { AuthProvider } from "./auth/auth.js";
import { OPFSFilesystem } from "./OPFSFilesystem.js";
import { IDBStorage } from "cojson-storage-indexeddb";
import { Effect, Queue } from "effect";
import { createWebSocketPeer } from "cojson-transport-ws";
export * from "./auth/auth.js";
@@ -42,13 +41,11 @@ export async function createJazzBrowserContext<Acc extends Account>({
const crypto = customCrypto || (await WasmCrypto.create());
let sessionDone: () => void;
const firstWsPeer = await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
);
const firstWsPeer = createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
});
let shouldTryToReconnect = true;
let currentReconnectionTimeout = initialReconnectionTimeout;
@@ -112,13 +109,11 @@ export async function createJazzBrowserContext<Acc extends Account>({
});
me._raw.core.node.syncManager.addPeer(
await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
),
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
})
);
}
}
@@ -132,11 +127,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
shouldTryToReconnect = false;
window.removeEventListener("online", onOnline);
console.log("Cleaning up node");
for (const peer of Object.values(
me._raw.core.node.syncManager.peers,
)) {
void Effect.runPromise(Queue.shutdown(peer.outgoing));
}
me._raw.core.node.gracefulShutdown();
sessionDone?.();
},
};

View File

@@ -1,5 +1,22 @@
# jazz-autosub
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -5,11 +5,10 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.25",
"version": "0.7.27",
"dependencies": {
"cojson": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"jazz-tools": "workspace:*",
"ws": "^8.14.2"
},

View File

@@ -1,7 +1,6 @@
import { AgentSecret, Peer, SessionID, WasmCrypto } from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import { Account, CoValueClass, ID } from "jazz-tools";
import { Effect } from "effect";
import { WebSocket } from "ws";
/** @category Context Creation */
@@ -18,13 +17,11 @@ export async function startWorker<Acc extends Account>({
syncServer?: string;
accountSchema?: CoValueClass<Acc> & typeof Account;
}): Promise<{ worker: Acc }> {
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
const wsPeer: Peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
});
if (!accountID) {
throw new Error("No accountID provided");
@@ -52,13 +49,11 @@ export async function startWorker<Acc extends Account>({
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
console.log(new Date(), "Reconnecting to upstream " + peer);
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
const wsPeer: Peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
});
worker._raw.core.node.syncManager.addPeer(wsPeer);
}

View File

@@ -1,5 +1,21 @@
# jazz-react
## 0.7.27
### Patch Changes
- jazz-browser@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- jazz-browser@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-react",
"version": "0.7.25",
"version": "0.7.27",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,21 @@
# jazz-autosub
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -3,7 +3,7 @@
"bin": "./dist/index.js",
"type": "module",
"license": "MIT",
"version": "0.7.25",
"version": "0.7.27",
"scripts": {
"lint": "eslint . --ext ts,tsx",
"format": "prettier --write './src/**/*.{ts,tsx}'",

View File

@@ -25,7 +25,7 @@ const accountCreate = Command.make(
return Effect.gen(function* () {
const crypto = yield* Effect.promise(() => WasmCrypto.create());
const peer = yield* createWebSocketPeer({
const peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peerAddr),
role: "server",
@@ -53,7 +53,7 @@ const accountCreate = Command.make(
),
);
const peer2 = yield* createWebSocketPeer({
const peer2 = createWebSocketPeer({
id: "upstream2",
websocket: new WebSocket(peerAddr),
role: "server",

View File

@@ -1,5 +1,13 @@
# jazz-autosub
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.25
### Patch Changes

View File

@@ -5,11 +5,9 @@
"types": "./src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.25",
"version": "0.7.26",
"dependencies": {
"@effect/schema": "^0.66.16",
"cojson": "workspace:*",
"effect": "^3.5.2",
"fast-check": "^3.17.2"
},
"scripts": {

View File

@@ -11,7 +11,6 @@ import type {
RawControlledAccount,
SessionID,
} from "cojson";
import { Context, Effect } from "effect";
import {
CoMap,
CoValue,
@@ -221,12 +220,10 @@ export class Account extends CoValueBase implements CoValue {
},
) {
// TODO: is there a cleaner way to do this?
const connectedPeers = await Effect.runPromise(
cojsonInternals.connectedPeers(
"creatingAccount",
"createdAccount",
{ peer1role: "server", peer2role: "client" },
),
const connectedPeers = cojsonInternals.connectedPeers(
"creatingAccount",
"createdAccount",
{ peer1role: "server", peer2role: "client" },
);
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);
@@ -378,9 +375,6 @@ export const AccountAndGroupProxyHandler: ProxyHandler<Account | Group> = {
},
};
/** @category Identity & Permissions */
export class AccountCtx extends Context.Tag("Account")<AccountCtx, Account>() {}
/** @category Identity & Permissions */
export function isControlledAccount(account: Account): account is Account & {
isMe: true;

View File

@@ -1,4 +1,4 @@
import type { RawCoList } from "cojson";
import type { JsonValue, RawCoList } from "cojson";
import { RawAccount } from "cojson";
import type {
CoValue,
@@ -26,8 +26,8 @@ import {
makeRefs,
subscribeToCoValue,
subscribeToExistingCoValue,
subscriptionsScopes,
} from "../internal.js";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
/**
* CoLists are collaborative versions of plain arrays.
@@ -303,7 +303,7 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
} else if ("encoded" in itemDescriptor) {
return this._raw
.asArray()
.map((e) => encodeSync(itemDescriptor.encoded)(e));
.map((e) => itemDescriptor.encoded.encode(e));
} else if (isRefEncoded(itemDescriptor)) {
return this.map((item, idx) =>
seenAbove?.includes((item as CoValue)?.id)
@@ -444,16 +444,21 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
castAs<Cl extends CoValueClass & CoValueFromRaw<CoValue>>(
cl: Cl,
): InstanceType<Cl> {
return cl.fromRaw(this._raw) as InstanceType<Cl>;
const casted = cl.fromRaw(this._raw) as InstanceType<Cl>;
const subscriptionScope = subscriptionsScopes.get(this);
if (subscriptionScope) {
subscriptionsScopes.set(casted, subscriptionScope);
}
return casted;
}
}
function toRawItems<Item>(items: Item[], itemDescriptor: Schema) {
const rawItems =
itemDescriptor === "json"
? items
? (items as JsonValue[])
: "encoded" in itemDescriptor
? items?.map((e) => encodeSync(itemDescriptor.encoded)(e))
? items?.map((e) => itemDescriptor.encoded.encode(e))
: isRefEncoded(itemDescriptor)
? items?.map((v) => (v as unknown as CoValue).id)
: (() => {
@@ -472,7 +477,7 @@ const CoListProxyHandler: ProxyHandler<CoList> = {
} else if ("encoded" in itemDescriptor) {
return rawValue === undefined
? undefined
: decodeSync(itemDescriptor.encoded)(rawValue);
: itemDescriptor.encoded.decode(rawValue);
} else if (isRefEncoded(itemDescriptor)) {
return rawValue === undefined
? undefined
@@ -505,7 +510,7 @@ const CoListProxyHandler: ProxyHandler<CoList> = {
if (itemDescriptor === "json") {
rawValue = value;
} else if ("encoded" in itemDescriptor) {
rawValue = encodeSync(itemDescriptor.encoded)(value);
rawValue = itemDescriptor.encoded.encode(value);
} else if (isRefEncoded(itemDescriptor)) {
rawValue = value.id;
}

View File

@@ -1,6 +1,4 @@
import type { JsonValue, RawCoMap } from "cojson";
import type { Simplify } from "effect/Types";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
import type {
CoValue,
Schema,
@@ -36,6 +34,10 @@ type CoMapEdit<V> = {
madeAt: Date;
};
export type Simplify<A> = {
[K in keyof A]: A[K]
} extends infer B ? B : never
/**
* CoMaps are collaborative versions of plain objects, mapping string-like keys to values.
*
@@ -147,7 +149,7 @@ export class CoMap extends CoValueBase implements CoValue {
descriptor === "json"
? rawEdit.value
: "encoded" in descriptor
? decodeSync(descriptor.encoded)(rawEdit.value)
? descriptor.encoded.encode(rawEdit.value)
: new Ref(
rawEdit.value as ID<CoValue>,
target._loadedAs,
@@ -317,7 +319,7 @@ export class CoMap extends CoValueBase implements CoValue {
rawInit[key] = (initValue as unknown as CoValue).id;
}
} else if ("encoded" in descriptor) {
rawInit[key] = encodeSync(descriptor.encoded)(
rawInit[key] = descriptor.encoded.encode(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
initValue as any,
);
@@ -461,19 +463,19 @@ export class CoMap extends CoValueBase implements CoValue {
const tKey = key as keyof typeof newValues & keyof this;
const descriptor = (this._schema[tKey as string] ||
this._schema[ItemsSym]) as Schema;
if (tKey in this._schema) {
const newValue = newValues[tKey];
const currentValue = this[tKey];
if (descriptor === "json" || "encoded" in descriptor) {
if (currentValue !== newValue) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this as any)[tKey] = newValue;
}
}
else if (isRefEncoded(descriptor)) {
const currentId = (currentValue as CoValue | undefined)?.id;
} else if (isRefEncoded(descriptor)) {
const currentId = (currentValue as CoValue | undefined)
?.id;
const newId = (newValue as CoValue | undefined)?.id;
if (currentId !== newId) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -516,7 +518,7 @@ const CoMapProxyHandler: ProxyHandler<CoMap> = {
} else if ("encoded" in descriptor) {
return raw === undefined
? undefined
: decodeSync(descriptor.encoded)(raw);
: descriptor.encoded.decode(raw);
} else if (isRefEncoded(descriptor)) {
return raw === undefined
? undefined
@@ -550,7 +552,7 @@ const CoMapProxyHandler: ProxyHandler<CoMap> = {
if (descriptor === "json") {
target._raw.set(key, value);
} else if ("encoded" in descriptor) {
target._raw.set(key, encodeSync(descriptor.encoded)(value));
target._raw.set(key, descriptor.encoded.encode(value));
} else if (isRefEncoded(descriptor)) {
target._raw.set(key, value.id);
subscriptionsScopes

View File

@@ -35,7 +35,6 @@ import {
ensureCoValueLoaded,
subscribeToExistingCoValue,
} from "../internal.js";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
export type CoStreamEntry<Item> = SingleCoStreamEntry<Item> & {
all: IterableIterator<SingleCoStreamEntry<Item>>;
@@ -141,7 +140,7 @@ export class CoStream<Item = any> extends CoValueBase implements CoValue {
if (itemDescriptor === "json") {
this._raw.push(item as JsonValue);
} else if ("encoded" in itemDescriptor) {
this._raw.push(encodeSync(itemDescriptor.encoded)(item));
this._raw.push(itemDescriptor.encoded.encode(item));
} else if (isRefEncoded(itemDescriptor)) {
this._raw.push((item as unknown as CoValue).id);
}
@@ -153,7 +152,7 @@ export class CoStream<Item = any> extends CoValueBase implements CoValue {
itemDescriptor === "json"
? (v: unknown) => v
: "encoded" in itemDescriptor
? encodeSync(itemDescriptor.encoded)
? itemDescriptor.encoded.encode
: (v: unknown) => v && (v as CoValue).id;
return {
@@ -247,7 +246,7 @@ function entryFromRawEntry<Item>(
? (CoValue & Item) | null
: Item;
} else if ("encoded" in itemField) {
return decodeSync(itemField.encoded)(rawEntry.value);
return itemField.encoded.decode(rawEntry.value);
} else if (isRefEncoded(itemField)) {
return this.ref?.accessFrom(
accessFrom,

View File

@@ -122,7 +122,12 @@ export class CoValueBase implements CoValue {
castAs<Cl extends CoValueClass & CoValueFromRaw<CoValue>>(
cl: Cl,
): InstanceType<Cl> {
return cl.fromRaw(this._raw) as InstanceType<Cl>;
const casted = cl.fromRaw(this._raw) as InstanceType<Cl>;
const subscriptionScope = subscriptionsScopes.get(this);
if (subscriptionScope) {
subscriptionsScopes.set(casted, subscriptionScope);
}
return casted;
}
}

View File

@@ -5,7 +5,6 @@ import {
isCoValueClass,
CoValueFromRaw,
} from "../internal.js";
import type { Schema as EffectSchema, TypeId } from "@effect/schema/Schema";
export type CoMarker = { readonly __co: unique symbol };
/** @category Schema definition */
@@ -113,7 +112,7 @@ function ref<
}
export type JsonEncoded = "json";
export type EncodedAs<V> = { encoded: Encoder<V> };
export type EncodedAs<V> = { encoded: Encoder<V> | OptionalEncoder<V> };
export type RefEncoded<V extends CoValue> = {
ref: CoValueClass<V> | ((raw: RawCoValue) => CoValueClass<V>);
optional: boolean;
@@ -152,31 +151,23 @@ export type SchemaFor<Field> = NonNullable<Field> extends CoValue
? JsonEncoded
: EncodedAs<NonNullable<Field>>;
export type EffectSchemaWithInputAndOutput<A, I = A> = EffectSchema<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
never
> & {
[TypeId]: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_A: (_: any) => A;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_I: (_: any) => I;
};
export type Encoder<V> = {
encode: (value: V) => JsonValue;
decode: (value: JsonValue) => V;
};
export type OptionalEncoder<V> =
| Encoder<V>
| {
encode: (value: V | undefined) => JsonValue;
decode: (value: JsonValue) => V | undefined;
};
export type Encoder<V> = EffectSchemaWithInputAndOutput<V, JsonValue>;
export type OptionalEncoder<V> = EffectSchemaWithInputAndOutput<
V,
JsonValue | undefined
>;
import { Date } from "@effect/schema/Schema";
import { SchemaInit, ItemsSym, MembersSym } from "./symbols.js";
/** @category Schema definition */
export const Encoders = {
Date,
Date: {
encode: (value: Date) => value.toISOString(),
decode: (value: JsonValue) => new Date(value as string),
},
};

View File

@@ -1,12 +1,12 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
Account,
CoList,
WasmCrypto,
co,
cojsonInternals,
isControlledAccount,
} from "../index.js";
@@ -157,11 +157,13 @@ describe("CoList resolution", async () => {
test("Loading and availability", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -217,11 +219,13 @@ describe("CoList resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -236,63 +240,52 @@ describe("CoList resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestList>());
const queue = new cojsonInternals.Channel();
TestList.subscribe(
list.id,
meOnSecondPeer,
[],
(subscribedList) => {
console.log(
"subscribedList?.[0]?.[0]?.[0]",
subscribedList?.[0]?.[0]?.[0],
);
void Effect.runPromise(
Queue.offer(queue, subscribedList),
);
},
);
TestList.subscribe(list.id, meOnSecondPeer, [], (subscribedList) => {
console.log(
"subscribedList?.[0]?.[0]?.[0]",
subscribedList?.[0]?.[0]?.[0],
);
void queue.push(subscribedList);
});
const update1 = yield* $(Queue.take(queue));
expect(update1?.[0]).toBe(null);
const update1 = (await queue.next()).value;
expect(update1?.[0]).toBe(null);
const update2 = yield* $(Queue.take(queue));
expect(update2?.[0]).toBeDefined();
expect(update2?.[0]?.[0]).toBe(null);
const update2 = (await queue.next()).value;
expect(update2?.[0]).toBeDefined();
expect(update2?.[0]?.[0]).toBe(null);
const update3 = yield* $(Queue.take(queue));
expect(update3?.[0]?.[0]).toBeDefined();
expect(update3?.[0]?.[0]?.[0]).toBe("a");
expect(update3?.[0]?.[0]?.joined()).toBe("a,b");
const update3 = (await queue.next()).value;
expect(update3?.[0]?.[0]).toBeDefined();
expect(update3?.[0]?.[0]?.[0]).toBe("a");
expect(update3?.[0]?.[0]?.joined()).toBe("a,b");
update3[0]![0]![0] = "x";
update3[0]![0]![0] = "x";
const update4 = yield* $(Queue.take(queue));
expect(update4?.[0]?.[0]?.[0]).toBe("x");
const update4 = (await queue.next()).value;
expect(update4?.[0]?.[0]?.[0]).toBe("x");
// When assigning a new nested value, we get an update
// When assigning a new nested value, we get an update
const newTwiceNestedList = TwiceNestedList.create(["y", "z"], {
owner: meOnSecondPeer,
});
const newTwiceNestedList = TwiceNestedList.create(["y", "z"], {
owner: meOnSecondPeer,
});
const newNestedList = NestedList.create([newTwiceNestedList], {
owner: meOnSecondPeer,
});
const newNestedList = NestedList.create([newTwiceNestedList], {
owner: meOnSecondPeer,
});
update4[0] = newNestedList;
update4[0] = newNestedList;
const update5 = yield* $(Queue.take(queue));
expect(update5?.[0]?.[0]?.[0]).toBe("y");
expect(update5?.[0]?.[0]?.joined()).toBe("y,z");
const update5 = (await queue.next()).value;
expect(update5?.[0]?.[0]?.[0]).toBe("y");
expect(update5?.[0]?.[0]?.joined()).toBe("y,z");
// we get updates when the new nested value changes
newTwiceNestedList[0] = "w";
const update6 = yield* $(Queue.take(queue));
expect(update6?.[0]?.[0]?.[0]).toBe("w");
}),
);
// we get updates when the new nested value changes
newTwiceNestedList[0] = "w";
const update6 = (await queue.next()).value;
expect(update6?.[0]?.[0]?.[0]).toBe("w");
});
});

View File

@@ -1,7 +1,6 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
Account,
Encoders,
@@ -9,8 +8,8 @@ import {
co,
WasmCrypto,
isControlledAccount,
cojsonInternals,
} from "../index.js";
import { Schema } from "@effect/schema";
const Crypto = await WasmCrypto.create();
@@ -25,7 +24,10 @@ describe("Simple CoMap operations", async () => {
_height = co.number;
birthday = co.encoded(Encoders.Date);
name? = co.string;
nullable = co.optional.encoded(Schema.NullishOr(Schema.String));
nullable = co.optional.encoded<string | undefined>({
encode: (value: string | undefined) => value || null,
decode: (value: unknown) => (value as string) || undefined,
});
optionalDate = co.optional.encoded(Encoders.Date);
get roughColor() {
@@ -42,7 +44,7 @@ describe("Simple CoMap operations", async () => {
color: "red",
_height: 10,
birthday: birthday,
nullable: null,
nullable: undefined,
},
{ owner: me },
);
@@ -94,7 +96,7 @@ describe("Simple CoMap operations", async () => {
expect(map._raw.get("_height")).toEqual(20);
map.nullable = "not null";
map.nullable = null;
map.nullable = undefined;
delete map.nullable;
map.nullable = undefined;
@@ -282,12 +284,12 @@ describe("CoMap resolution", async () => {
test("Loading and availability", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
const [initialAsPeer, secondPeer] =
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -353,12 +355,12 @@ describe("CoMap resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
const [initialAsPeer, secondAsPeer] =
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -372,9 +374,8 @@ describe("CoMap resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestMap>());
const queue = new cojsonInternals.Channel<TestMap>();
TestMap.subscribe(
map.id,
@@ -385,22 +386,20 @@ describe("CoMap resolution", async () => {
"subscribedMap.nested?.twiceNested?.taste",
subscribedMap.nested?.twiceNested?.taste,
);
void Effect.runPromise(
Queue.offer(queue, subscribedMap),
);
void queue.push(subscribedMap);
},
);
const update1 = yield* $(Queue.take(queue));
const update1 = (await queue.next()).value;
expect(update1.nested).toEqual(null);
const update2 = yield* $(Queue.take(queue));
const update2 = (await queue.next()).value;
expect(update2.nested?.name).toEqual("nested");
map.nested!.name = "nestedUpdated";
const _ = yield* $(Queue.take(queue));
const update3 = yield* $(Queue.take(queue));
const _ = (await queue.next()).value;
const update3 = (await queue.next()).value;
expect(update3.nested?.name).toEqual("nestedUpdated");
const oldTwiceNested = update3.nested!.twiceNested;
@@ -424,23 +423,21 @@ describe("CoMap resolution", async () => {
update3.nested = newNested;
yield* $(Queue.take(queue));
// const update4 = yield* $(Queue.take(queue));
const update4b = yield* $(Queue.take(queue));
(await queue.next()).value;
// const update4 = (await queue.next()).value;
const update4b = (await queue.next()).value;
expect(update4b.nested?.name).toEqual("newNested");
expect(update4b.nested?.twiceNested?.taste).toEqual("sweet");
// we get updates when the new nested value changes
newTwiceNested.taste = "salty";
const update5 = yield* $(Queue.take(queue));
const update5 = (await queue.next()).value;
expect(update5.nested?.twiceNested?.taste).toEqual("salty");
newTwiceNested.taste = "umami";
const update6 = yield* $(Queue.take(queue));
const update6 = (await queue.next()).value;
expect(update6.nested?.twiceNested?.taste).toEqual("umami");
}),
);
});
class TestMapWithOptionalRef extends CoMap {

View File

@@ -1,7 +1,6 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
BinaryCoStream,
ID,
@@ -10,6 +9,7 @@ import {
co,
WasmCrypto,
isControlledAccount,
cojsonInternals,
} from "../index.js";
const Crypto = await WasmCrypto.create();
@@ -83,11 +83,13 @@ describe("CoStream resolution", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -176,12 +178,15 @@ describe("CoStream resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -195,78 +200,68 @@ describe("CoStream resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestStream>());
const queue = new cojsonInternals.Channel();
TestStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
console.log(
"subscribedStream[me.id]",
subscribedStream[me.id],
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value,
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value?.[
me.id
]?.value,
);
void Effect.runPromise(
Queue.offer(queue, subscribedStream),
);
},
TestStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
console.log("subscribedStream[me.id]", subscribedStream[me.id]);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value,
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]
?.value,
);
void queue.push(subscribedStream);
},
);
const update1 = yield* $(Queue.take(queue));
expect(update1[me.id]?.value).toEqual(null);
const update1 = (await queue.next()).value;
expect(update1[me.id]?.value).toEqual(null);
const update2 = yield* $(Queue.take(queue));
expect(update2[me.id]?.value).toBeDefined();
expect(update2[me.id]?.value?.[me.id]?.value).toBe(null);
const update2 = (await queue.next()).value;
expect(update2[me.id]?.value).toBeDefined();
expect(update2[me.id]?.value?.[me.id]?.value).toBe(null);
const update3 = yield* $(Queue.take(queue));
expect(update3[me.id]?.value?.[me.id]?.value).toBeDefined();
expect(
update3[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("milk");
const update3 = (await queue.next()).value;
expect(update3[me.id]?.value?.[me.id]?.value).toBeDefined();
expect(update3[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"milk",
);
update3[me.id]!.value![me.id]!.value!.push("bread");
update3[me.id]!.value![me.id]!.value!.push("bread");
const update4 = yield* $(Queue.take(queue));
expect(
update4[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("bread");
const update4 = (await queue.next()).value;
expect(update4[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"bread",
);
// When assigning a new nested stream, we get an update
const newTwiceNested = TwiceNestedStream.create(["butter"], {
owner: meOnSecondPeer,
});
// When assigning a new nested stream, we get an update
const newTwiceNested = TwiceNestedStream.create(["butter"], {
owner: meOnSecondPeer,
});
const newNested = NestedStream.create([newTwiceNested], {
owner: meOnSecondPeer,
});
const newNested = NestedStream.create([newTwiceNested], {
owner: meOnSecondPeer,
});
update4.push(newNested);
update4.push(newNested);
const update5 = yield* $(Queue.take(queue));
expect(
update5[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("butter");
const update5 = (await queue.next()).value;
expect(update5[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"butter",
);
// we get updates when the new nested stream changes
newTwiceNested.push("jam");
const update6 = yield* $(Queue.take(queue));
expect(
update6[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("jam");
}),
// we get updates when the new nested stream changes
newTwiceNested.push("jam");
const update6 = (await queue.next()).value;
expect(update6[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"jam",
);
});
});
@@ -327,11 +322,13 @@ describe("BinaryCoStream loading & Subscription", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -360,98 +357,83 @@ describe("BinaryCoStream loading & Subscription", async () => {
});
test("Subscription", async () => {
await Effect.runPromise(
Effect.gen(function* ($) {
const { me } = yield* Effect.promise(() => initNodeAndStream());
const { me } = await initNodeAndStream();
const stream = BinaryCoStream.create({ owner: me });
const stream = BinaryCoStream.create({ owner: me });
const [initialAsPeer, secondAsPeer] = yield* connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = yield* Effect.promise(() =>
Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
}),
);
const queue = yield* $(Queue.unbounded<BinaryCoStream>());
BinaryCoStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
void Effect.runPromise(
Queue.offer(queue, subscribedStream),
);
},
);
const update1 = yield* $(Queue.take(queue));
expect(update1.getChunks()).toBe(undefined);
stream.start({ mimeType: "text/plain" });
const update2 = yield* $(Queue.take(queue));
expect(update2.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([1, 2, 3]));
const update3 = yield* $(Queue.take(queue));
expect(update3.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3])],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([4, 5, 6]));
const update4 = yield* $(Queue.take(queue));
expect(update4.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
],
totalSizeBytes: undefined,
finished: false,
});
stream.end();
const update5 = yield* $(Queue.take(queue));
expect(update5.getChunks()).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
],
totalSizeBytes: undefined,
finished: true,
});
}),
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = await Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
});
const queue = new cojsonInternals.Channel();
BinaryCoStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
void queue.push(subscribedStream);
},
);
const update1 = (await queue.next()).value;
expect(update1.getChunks()).toBe(undefined);
stream.start({ mimeType: "text/plain" });
const update2 = (await queue.next()).value;
expect(update2.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([1, 2, 3]));
const update3 = (await queue.next()).value;
expect(update3.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3])],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([4, 5, 6]));
const update4 = (await queue.next()).value;
expect(update4.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
totalSizeBytes: undefined,
finished: false,
});
stream.end();
const update5 = (await queue.next()).value;
expect(update5.getChunks()).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
totalSizeBytes: undefined,
finished: true,
});
});
});

View File

@@ -14,7 +14,6 @@ import {
ID,
} from "../index.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect } from "effect";
class TestMap extends CoMap {
list = co.ref(TestList);
@@ -39,12 +38,11 @@ describe("Deep loading with depth arg", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -140,8 +138,8 @@ describe("Deep loading with depth arg", async () => {
throw new Error("map4 is undefined");
}
expect(map4.list[0]?.stream).not.toBe(null);
expect(map4.list[0]?.stream?.[me.id]?.value).toBe(null);
expect(map4.list[0]?.stream?.byMe?.value).toBe(null);
expect(map4.list[0]?.stream?.[me.id]).toBe(undefined)
expect(map4.list[0]?.stream?.byMe?.value).toBe(undefined);
const map5 = await TestMap.load(map.id, meOnSecondPeer, {
list: [{ stream: [{}] }],
@@ -254,15 +252,15 @@ test("Deep loading a record-like coMap", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
const meOnSecondPeer = await Account.become({
accountID: me.id,

43
pnpm-lock.yaml generated
View File

@@ -156,9 +156,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../../packages/cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
hash-slash:
specifier: workspace:*
version: link:../../packages/hash-slash
@@ -444,12 +441,12 @@ importers:
'@scure/base':
specifier: ^1.1.1
version: 1.1.5
effect:
specifier: ^3.5.2
version: 3.5.2
hash-wasm:
specifier: ^4.9.0
version: 4.11.0
queueable:
specifier: ^5.3.2
version: 5.3.2
devDependencies:
'@types/jest':
specifier: ^29.5.3
@@ -481,9 +478,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -506,9 +500,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -522,9 +513,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -559,9 +547,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
jazz-tools:
specifier: workspace:*
version: link:../jazz-tools
@@ -602,9 +587,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
jazz-tools:
specifier: workspace:*
version: link:../jazz-tools
@@ -689,15 +671,9 @@ importers:
packages/jazz-tools:
dependencies:
'@effect/schema':
specifier: ^0.66.16
version: 0.66.16(effect@3.5.2)(fast-check@3.17.2)
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
fast-check:
specifier: ^3.17.2
version: 3.17.2
@@ -2639,6 +2615,9 @@ packages:
fast-levenshtein@2.0.6:
resolution: {integrity: sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==}
fast-list@1.0.3:
resolution: {integrity: sha512-Lm56Ci3EqefHNdIneRFuzhpPcpVVBz9fgqVmG3UQIxAefJv1mEYsZ1WQLTWqmdqeGEwbI2t6fbZgp9TqTYARuA==}
fast-loops@1.1.3:
resolution: {integrity: sha512-8EZzEP0eKkEEVX+drtd9mtuQ+/QrlfW/5MlwcwK5Nds6EkZ/tRzEexkzUY2mIssnAyVLT+TKHuRXmFNNXYUd6g==}
@@ -3743,6 +3722,10 @@ packages:
queue-tick@1.0.1:
resolution: {integrity: sha512-kJt5qhMxoszgU/62PLP1CJytzd2NKetjSRnyuj31fDd3Rlcz3fzlFdFLD1SItunPwyqEOkca6GbV612BWfaBag==}
queueable@5.3.2:
resolution: {integrity: sha512-/2ZxV1PJh7J9Q/h9ewZ4fLMmDreUlbwrWsBnluvDoKW6Nw0gbWm5hN+kiWfdDMw1o/QTAFxV9wx4KpuN5IA7OA==}
engines: {node: '>=18'}
quick-lru@4.0.1:
resolution: {integrity: sha512-ARhCpm70fzdcvNQfPoy49IaanKkTlRWF2JMzqhcJbhSFRZv7nPTvZJdcY7301IPmvW+/p0RgIWnQDLJxifsQ7g==}
engines: {node: '>=8'}
@@ -6893,6 +6876,8 @@ snapshots:
fast-levenshtein@2.0.6: {}
fast-list@1.0.3: {}
fast-loops@1.1.3: {}
fast-querystring@1.1.2:
@@ -8010,6 +7995,10 @@ snapshots:
queue-tick@1.0.1: {}
queueable@5.3.2:
dependencies:
fast-list: 1.0.3
quick-lru@4.0.1: {}
quick-lru@5.1.1: {}