Compare commits

..

27 Commits

Author SHA1 Message Date
Anselm
4f75dc8d97 Release 2024-08-15 16:17:31 +01:00
Anselm
e2c79cccb5 Remove noisy log 2024-08-15 16:17:03 +01:00
Anselm
c14a0e05be Release 2024-08-15 15:30:22 +01:00
Anselm
016dd3a5dd Fix ignoring server peers 2024-08-15 15:30:00 +01:00
Anselm
5c4ca9103c Release 2024-08-15 13:36:33 +01:00
Anselm
b4aad92907 Option to not expect pings 2024-08-15 13:36:08 +01:00
Anselm
56d1e095a1 Release 2024-08-15 13:02:39 +01:00
Anselm Eickhoff
6dee9aae49 Merge pull request #281 from gardencmp/anselm-jazz-190
Remove effect from jazz
2024-08-15 11:51:53 +01:00
Anselm Eickhoff
a10bff981e Merge pull request #284 from gardencmp/anselm-jazz-227
Remove effect from peer communication
2024-08-15 11:51:10 +01:00
Anselm Eickhoff
e333f7884a Merge pull request #313 from gardencmp/anselm-jazz-246
Remove effect from jazz-tools and dependent packages
2024-08-15 11:50:44 +01:00
Anselm
8ea7bf237b Remove effect from storage implementation 2024-08-15 11:13:10 +01:00
Anselm
5e8409fa08 Remove rest of effect use 2024-08-14 17:51:10 +01:00
Anselm
23354c1767 Progress on removing effect 2024-08-14 15:24:20 +01:00
Anselm Eickhoff
0efb69d0db Merge pull request #312 from pax-k/JAZZ-252/make-sure-castas-preserves-subscriptionscope
fix: preserve subscriptionScope for castAs in CoList and CoMap
2024-08-14 14:51:54 +01:00
pax-k
0462c4e41b fix: preserve subscriptionScope for castAs in CoList and CoMap 2024-08-14 16:50:12 +03:00
Anselm
70a5673197 More progress 2024-08-13 12:25:15 +01:00
Anselm Eickhoff
9ec3203485 Merge pull request #285 from gdorsi/main
docs: fix the get started links position
2024-08-12 17:46:41 +01:00
Guido D'Orsi
1a46f9b2e1 docs: fix the get started links position 2024-08-10 16:17:18 +02:00
Anselm
77bb26a8d7 Only use queable in cojson, rework tests 2024-08-09 16:44:50 +01:00
Anselm
2a36dcf592 WIP switch to queueable 2024-08-09 13:59:26 +01:00
Anselm
fc2bcadbe2 Remove effect schema from jazz schema 2024-08-08 18:18:17 +01:00
Anselm
46b0cc1adb Release 2024-08-08 14:44:00 +01:00
Anselm Eickhoff
d75d1c6a3f Merge pull request #279 from pax-k/JAZZ-219/implement-applydiff-on-comap-to-only-update-changed-fields
feat: Implement applyDiff on CoMap to only update changed fields
2024-08-08 13:50:17 +01:00
pax-k
13b236aeed feat: Implement applyDiff on CoMap to only update changed fields 2024-08-08 11:03:08 +03:00
Anselm Eickhoff
1c0a61b0b2 Merge pull request #271 from pax-k/document-max-recommended-tx-size
chore: document MAX_RECOMMENDED_TX_SIZE
2024-08-07 16:32:07 +01:00
pax-k
3f5ef7e799 chore: formatting 2024-08-06 19:35:14 +03:00
pax-k
e7a573fa94 chore: document MAX_RECOMMENDED_TX_SIZE 2024-08-06 19:22:41 +03:00
59 changed files with 2615 additions and 2215 deletions

View File

@@ -1,5 +1,46 @@
# jazz-example-chat
## 0.0.76
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- jazz-react@0.7.29
- jazz-tools@0.7.29
## 0.0.75
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- jazz-react@0.7.28
- jazz-tools@0.7.28
## 0.0.74
### Patch Changes
- jazz-react@0.7.27
## 0.0.73
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- jazz-react@0.7.26
- jazz-tools@0.7.26
## 0.0.72
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
- jazz-react@0.7.25
## 0.0.71
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-chat",
"private": true,
"version": "0.0.71",
"version": "0.0.76",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -3,6 +3,7 @@ import { createJazzReactContext, DemoAuth } from "jazz-react";
import { createRoot } from "react-dom/client";
import { useIframeHashRouter } from "hash-slash";
import { ChatScreen } from "./chatScreen.tsx";
import { StrictMode } from "react";
export class Message extends CoMap {
text = co.string;
@@ -39,4 +40,4 @@ function App() {
}
createRoot(document.getElementById("root")!)
.render(<Jazz.Provider><App/></Jazz.Provider>);
.render(<StrictMode><Jazz.Provider><App/></Jazz.Provider></StrictMode>);

View File

@@ -1,5 +1,36 @@
# jazz-example-chat
## 0.0.55
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- cojson-transport-ws@0.7.29
## 0.0.54
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- cojson-transport-ws@0.7.28
## 0.0.53
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.0.52
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
## 0.0.51
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-inspector",
"private": true,
"version": "0.0.51",
"version": "0.0.55",
"type": "module",
"scripts": {
"dev": "vite",
@@ -19,7 +19,6 @@
"hash-slash": "workspace:*",
"cojson": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"lucide-react": "^0.274.0",
"qrcode": "^1.5.3",
"react": "^18.2.0",

View File

@@ -10,7 +10,6 @@ import {
WasmCrypto,
} from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import { Effect } from "effect";
import { Trash2 } from "lucide-react";
import { Breadcrumbs } from "./breadcrumbs";
import { usePagePath } from "./use-page-path";
@@ -62,13 +61,11 @@ export default function CoJsonViewerApp() {
}
WasmCrypto.create().then(async (crypto) => {
const wsPeer = await Effect.runPromise(
createWebSocketPeer({
id: "mesh",
websocket: new WebSocket("wss://mesh.jazz.tools"),
role: "server",
}),
);
const wsPeer = createWebSocketPeer({
id: "mesh",
websocket: new WebSocket("wss://mesh.jazz.tools"),
role: "server",
});
const node = await LocalNode.withLoadedAccount({
accountID: currentAccount.id,
accountSecret: currentAccount.secret,

View File

@@ -1,5 +1,46 @@
# jazz-example-pets
## 0.0.94
### Patch Changes
- jazz-react@0.7.29
- jazz-tools@0.7.29
- jazz-browser-media-images@0.7.29
## 0.0.93
### Patch Changes
- jazz-react@0.7.28
- jazz-tools@0.7.28
- jazz-browser-media-images@0.7.28
## 0.0.92
### Patch Changes
- jazz-browser-media-images@0.7.27
- jazz-react@0.7.27
## 0.0.91
### Patch Changes
- Updated dependencies
- jazz-react@0.7.26
- jazz-tools@0.7.26
- jazz-browser-media-images@0.7.26
## 0.0.90
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
- jazz-browser-media-images@0.7.25
- jazz-react@0.7.25
## 0.0.89
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-pets",
"private": true,
"version": "0.0.89",
"version": "0.0.94",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -1,5 +1,41 @@
# jazz-example-todo
## 0.0.93
### Patch Changes
- jazz-react@0.7.29
- jazz-tools@0.7.29
## 0.0.92
### Patch Changes
- jazz-react@0.7.28
- jazz-tools@0.7.28
## 0.0.91
### Patch Changes
- jazz-react@0.7.27
## 0.0.90
### Patch Changes
- Updated dependencies
- jazz-react@0.7.26
- jazz-tools@0.7.26
## 0.0.89
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
- jazz-react@0.7.25
## 0.0.88
### Patch Changes

View File

@@ -1,7 +1,7 @@
{
"name": "jazz-example-todo",
"private": true,
"version": "0.0.88",
"version": "0.0.93",
"type": "module",
"scripts": {
"dev": "vite",

View File

@@ -268,11 +268,7 @@ Jazz Mesh is currently free &mdash; and it's set up as the default sync & storag
## Get Started
- <Link href="/docs" target="_blank">
Read the docs
</Link>
- <Link href="https://discord.gg/utDMjHYg42" target="_blank">
Join our Discord
</Link>
- <Link href="/docs" target="_blank">Read the docs</Link>
- <Link href="https://discord.gg/utDMjHYg42" target="_blank">Join our Discord</Link>
</Prose>

View File

@@ -1,5 +1,27 @@
# cojson-storage-indexeddb
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,13 +1,12 @@
{
"name": "cojson-storage-indexeddb",
"version": "0.7.23",
"version": "0.7.29",
"main": "dist/index.js",
"type": "module",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -10,7 +10,6 @@ import {
OutgoingSyncQueue,
} from "cojson";
import { SyncPromise } from "./syncPromises.js";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -53,11 +52,15 @@ export class IDBStorage {
this.db = db;
this.toLocalNode = toLocalNode;
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
const processMessages = async () => {
for await (const msg of fromLocalNode) {
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.handleSyncMessage(msg);
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
@@ -68,9 +71,13 @@ export class IDBStorage {
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in IndexedDB", e),
);
}
@@ -82,25 +89,18 @@ export class IDBStorage {
localNodeName: "local",
},
): Promise<Peer> {
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
yield* Effect.promise(() =>
IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
}),
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await IDBStorage.open(
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
return { ...storageAsPeer, priority: 100 };
}
static async open(
@@ -392,35 +392,40 @@ export class IDBStorage {
),
).then(() => {
// we're done with IndexedDB stuff here so can use native Promises again
setTimeout(() =>
Effect.runPromise(
Effect.gen(this, function* () {
yield* Queue.offer(this.toLocalNode, {
action: "known",
...ourKnown,
asDependencyOf,
});
setTimeout(() => {
this.toLocalNode
.push({
action: "known",
...ourKnown,
asDependencyOf,
})
.catch((e) =>
console.error(
"Error sending known state",
e,
),
);
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new)
.length > 0,
);
const nonEmptyNewContentPieces =
newContentPieces.filter(
(piece) =>
piece.header ||
Object.keys(piece.new).length > 0,
);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
yield* Queue.offer(
this.toLocalNode,
piece,
);
yield* Effect.yieldNow();
}
}),
),
);
for (const piece of nonEmptyNewContentPieces) {
this.toLocalNode
.push(piece)
.catch((e) =>
console.error(
"Error sending new content piece",
e,
),
);
}
});
return Promise.resolve();
});
@@ -445,16 +450,18 @@ export class IDBStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
throw new Error("Expected to be sent header first");
})
.catch((e) =>
console.error("Error sending known state", e),
);
return SyncPromise.resolve();
}
return this.makeRequest<IDBValidKey>(({ coValues }) =>
@@ -515,13 +522,18 @@ export class IDBStorage {
),
).then(() => {
if (invalidAssumptions) {
void Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
})
.catch((e) =>
console.error(
"Error sending known state",
e,
),
);
}
});
});

View File

@@ -1,5 +1,27 @@
# cojson-storage-sqlite
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,14 +1,13 @@
{
"name": "cojson-storage-sqlite",
"type": "module",
"version": "0.7.23",
"version": "0.7.29",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"better-sqlite3": "^8.5.2",
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"devDependencies": {

View File

@@ -11,7 +11,6 @@ import {
} from "cojson";
import Database, { Database as DatabaseT } from "better-sqlite3";
import { Effect, Queue, Stream } from "effect";
type CoValueRow = {
id: CojsonInternalTypes.RawCoID;
@@ -54,11 +53,15 @@ export class SQLiteStorage {
this.db = db;
this.toLocalNode = toLocalNode;
void fromLocalNode.pipe(
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg),
catch: (e) =>
const processMessages = async () => {
for await (const msg of fromLocalNode) {
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
}
await this.handleSyncMessage(msg);
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
@@ -69,9 +72,13 @@ export class SQLiteStorage {
)}`,
{ cause: e },
),
}),
),
Effect.runPromise,
);
}
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in sqlite", e),
);
}
@@ -84,26 +91,19 @@ export class SQLiteStorage {
trace?: boolean;
localNodeName?: string;
}): Promise<Peer> {
return Effect.runPromise(
Effect.gen(function* () {
const [localNodeAsPeer, storageAsPeer] =
yield* cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
yield* Effect.promise(() =>
SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
),
);
return { ...storageAsPeer, priority: 100 };
}),
const [localNodeAsPeer, storageAsPeer] = cojsonInternals.connectedPeers(
localNodeName,
"storage",
{ peer1role: "client", peer2role: "server", trace },
);
await SQLiteStorage.open(
filename,
localNodeAsPeer.incoming,
localNodeAsPeer.outgoing,
);
return { ...storageAsPeer, priority: 100 };
}
static async open(
@@ -441,13 +441,13 @@ export class SQLiteStorage {
);
}
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
asDependencyOf,
}),
);
})
.catch((e) => console.error("Error while pushing known", e));
const nonEmptyNewContentPieces = newContentPieces.filter(
(piece) => piece.header || Object.keys(piece.new).length > 0,
@@ -456,7 +456,11 @@ export class SQLiteStorage {
// console.log(theirKnown.id, nonEmptyNewContentPieces);
for (const piece of nonEmptyNewContentPieces) {
await Effect.runPromise(Queue.offer(this.toLocalNode, piece));
this.toLocalNode
.push(piece)
.catch((e) =>
console.error("Error while pushing content piece", e),
);
await new Promise((resolve) => setTimeout(resolve, 0));
}
}
@@ -478,15 +482,17 @@ export class SQLiteStorage {
const header = msg.header;
if (!header) {
console.error("Expected to be sent header first");
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
id: msg.id,
header: false,
sessions: {},
isCorrection: true,
}),
);
})
.catch((e) =>
console.error("Error while pushing known", e),
);
return;
}
@@ -618,13 +624,13 @@ export class SQLiteStorage {
})();
if (invalidAssumptions) {
await Effect.runPromise(
Queue.offer(this.toLocalNode, {
this.toLocalNode
.push({
action: "known",
...ourKnown,
isCorrection: invalidAssumptions,
}),
);
})
.catch((e) => console.error("Error while pushing known", e));
}
}

View File

@@ -1,5 +1,33 @@
# cojson-transport-nodejs-ws
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
## 0.7.27
### Patch Changes
- Option to not expect pings
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.23
### Patch Changes

View File

@@ -1,13 +1,12 @@
{
"name": "cojson-transport-ws",
"type": "module",
"version": "0.7.23",
"version": "0.7.29",
"main": "dist/index.js",
"types": "src/index.ts",
"license": "MIT",
"dependencies": {
"cojson": "workspace:*",
"effect": "^3.5.2",
"typescript": "^5.1.6"
},
"scripts": {

View File

@@ -1,5 +1,10 @@
import { DisconnectedError, Peer, PingTimeoutError, SyncMessage } from "cojson";
import { Stream, Queue, Effect, Console } from "effect";
import {
DisconnectedError,
Peer,
PingTimeoutError,
SyncMessage,
cojsonInternals,
} from "cojson";
interface WebsocketEvents {
close: { code: number; reason: string };
@@ -15,6 +20,7 @@ interface AnyWebSocket {
addEventListener<K extends keyof WebsocketEvents>(
type: K,
listener: (event: WebsocketEvents[K]) => void,
options?: { once: boolean },
): void;
removeEventListener<K extends keyof WebsocketEvents>(
type: K,
@@ -22,6 +28,7 @@ interface AnyWebSocket {
): void;
close(): void;
send(data: string): void;
readyState: number;
}
const g: typeof globalThis & {
@@ -32,88 +39,80 @@ const g: typeof globalThis & {
}[];
} = globalThis;
export function createWebSocketPeer(options: {
export function createWebSocketPeer({
id,
websocket,
role,
expectPings = true,
}: {
id: string;
websocket: AnyWebSocket;
role: Peer["role"];
}): Effect.Effect<Peer> {
return Effect.gen(function* () {
const ws = options.websocket;
const ws_ = ws as unknown as Stream.EventListener<WebsocketEvents["message"]>;
expectPings?: boolean;
}): Peer {
const incoming = new cojsonInternals.Channel<
SyncMessage | DisconnectedError | PingTimeoutError
>();
const outgoing = yield* Queue.unbounded<SyncMessage>();
const closed = once(ws, "close").pipe(
Effect.flatMap(
(event) =>
new DisconnectedError({
message: `${event.code}: ${event.reason}`,
}),
),
Stream.fromEffect,
);
const isSyncMessage = (msg: unknown): msg is SyncMessage => {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((msg as any)?.type === "ping") {
const ping = msg as PingMsg;
g.jazzPings ||= [];
g.jazzPings.push({
received: Date.now(),
sent: ping.time,
dc: ping.dc,
});
return false;
}
return true;
};
yield* Effect.forkDaemon(Effect.gen(function* () {
yield* once(ws, "open");
yield* Queue.take(outgoing).pipe(
Effect.andThen((message) => ws.send(JSON.stringify(message))),
Effect.forever,
websocket.addEventListener("close", function handleClose() {
incoming
.push("Disconnected")
.catch((e) =>
console.error("Error while pushing disconnect msg", e),
);
}));
type E = WebsocketEvents["message"];
const messages = Stream.fromEventListener<E>(ws_, "message").pipe(
Stream.timeoutFail(() => new PingTimeoutError(), "10 seconds"),
Stream.tapError((_e) =>
Console.warn("Ping timeout").pipe(
Effect.andThen(Effect.try(() => ws.close())),
Effect.catchAll((e) =>
Console.error(
"Error while trying to close ws on ping timeout",
e,
),
),
),
),
Stream.mergeLeft(closed),
Stream.map((_) => JSON.parse(_.data as string)),
Stream.filter(isSyncMessage),
Stream.buffer({ capacity: "unbounded" }),
Stream.onDone(() => Queue.shutdown(outgoing)),
);
return {
id: options.id,
incoming: messages,
outgoing,
role: options.role,
};
});
let pingTimeout: ReturnType<typeof setTimeout> | null = null;
websocket.addEventListener("message", function handleIncomingMsg(event) {
const msg = JSON.parse(event.data as string);
pingTimeout && clearTimeout(pingTimeout);
if (msg?.type === "ping") {
const ping = msg as PingMsg;
g.jazzPings ||= [];
g.jazzPings.push({
received: Date.now(),
sent: ping.time,
dc: ping.dc,
});
} else {
incoming
.push(msg)
.catch((e) =>
console.error("Error while pushing incoming msg", e),
);
}
if (expectPings) {
pingTimeout = setTimeout(() => {
incoming
.push("PingTimeout")
.catch((e) =>
console.error("Error while pushing ping timeout", e),
);
}, 10_000);
}
});
const websocketOpen = new Promise<void>((resolve) => {
websocket.addEventListener("open", resolve, { once: true });
});
return {
id,
incoming,
outgoing: {
async push(msg) {
await websocketOpen;
if (websocket.readyState === 1) {
websocket.send(JSON.stringify(msg));
}
},
close() {
if (websocket.readyState === 1) {
websocket.close();
}
},
},
role,
};
}
const once = <Event extends keyof WebsocketEvents>(
ws: AnyWebSocket,
event: Event,
) =>
Effect.async<WebsocketEvents[Event]>((register) => {
const cb = (msg: WebsocketEvents[Event]) => {
ws.removeEventListener(event, cb);
register(Effect.succeed(msg));
};
ws.addEventListener(event, cb);
});

View File

@@ -1,5 +1,23 @@
# cojson
## 0.7.29
### Patch Changes
- Remove noisy log
## 0.7.28
### Patch Changes
- Fix ignoring server peers
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
## 0.7.23
### Patch Changes

View File

@@ -5,7 +5,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.23",
"version": "0.7.29",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",
@@ -18,12 +18,12 @@
},
"dependencies": {
"@hazae41/berith": "^1.2.6",
"@noble/curves": "^1.3.0",
"@noble/ciphers": "^0.1.3",
"@noble/curves": "^1.3.0",
"@noble/hashes": "^1.4.0",
"@scure/base": "^1.1.1",
"effect": "^3.5.2",
"hash-wasm": "^4.9.0"
"hash-wasm": "^4.9.0",
"queueable": "^5.3.2"
},
"scripts": {
"dev": "tsc --watch --sourceMap --outDir dist",

View File

@@ -26,6 +26,13 @@ import { expectGroup } from "./typeUtils/expectGroup.js";
import { isAccountID } from "./typeUtils/isAccountID.js";
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
/**
In order to not block other concurrently syncing CoValues we introduce a maximum size of transactions,
since they are the smallest unit of progress that can be synced within a CoValue.
This is particularly important for storing binary data in CoValues, since they are likely to be at least on the order of megabytes.
This also means that we want to keep signatures roughly after each MAX_RECOMMENDED_TX size chunk,
to be able to verify partially loaded CoValues or CoValues that are still being created (like a video live stream).
**/
export const MAX_RECOMMENDED_TX_SIZE = 100 * 1024;
export type CoValueHeader = {
@@ -383,7 +390,7 @@ export class CoValueCore {
0,
);
if (sizeOfTxsSinceLastInbetweenSignature > 100 * 1024) {
if (sizeOfTxsSinceLastInbetweenSignature > MAX_RECOMMENDED_TX_SIZE) {
// console.log(
// "Saving inbetween signature for tx ",
// sessionID,

View File

@@ -18,7 +18,7 @@ import {
} from "./crypto/crypto.js";
import { WasmCrypto } from "./crypto/WasmCrypto.js";
import { PureJSCrypto } from "./crypto/PureJSCrypto.js";
import { connectedPeers } from "./streamUtils.js";
import { connectedPeers, Channel } from "./streamUtils.js";
import { ControlledAgent, RawControlledAccount } from "./coValues/account.js";
import type { Role } from "./permissions.js";
import { rawCoIDtoBytes, rawCoIDfromBytes, isRawCoID } from "./ids.js";
@@ -59,12 +59,7 @@ import type * as Media from "./media.js";
type Value = JsonValue | AnyRawCoValue;
import {
LSMStorage,
FSErr,
BlockFilename,
WalFilename,
} from "./storage/index.js";
import { LSMStorage, BlockFilename, WalFilename } from "./storage/index.js";
import { FileSystem } from "./storage/FileSystem.js";
/** @hidden */
@@ -84,6 +79,7 @@ export const cojsonInternals = {
accountHeaderForInitialAgentSecret,
idforHeader,
StreamingHash,
Channel,
};
export {
@@ -123,18 +119,17 @@ export {
SyncMessage,
isRawCoID,
LSMStorage,
DisconnectedError,
PingTimeoutError,
};
export type {
Value,
FileSystem,
FSErr,
BlockFilename,
WalFilename,
IncomingSyncStream,
OutgoingSyncQueue,
DisconnectedError,
PingTimeoutError,
};
// eslint-disable-next-line @typescript-eslint/no-namespace

View File

@@ -670,6 +670,10 @@ export class LocalNode {
return newNode;
}
gracefulShutdown() {
this.syncManager.gracefulShutdown();
}
}
/** @internal */

View File

@@ -1,4 +1,3 @@
import { Effect } from "effect";
import { CoValueChunk } from "./index.js";
import { RawCoID } from "../ids.js";
import { CryptoProvider, StreamingHash } from "../crypto/crypto.js";
@@ -11,150 +10,124 @@ export type WalEntry = { id: RawCoID } & CoValueChunk;
export type WalFilename = `wal-${number}.jsonl`;
export type FSErr = {
type: "fileSystemError";
error: Error;
};
export interface FileSystem<WriteHandle, ReadHandle> {
crypto: CryptoProvider;
createFile(filename: string): Effect.Effect<WriteHandle, FSErr>;
append(handle: WriteHandle, data: Uint8Array): Effect.Effect<void, FSErr>;
close(handle: ReadHandle | WriteHandle): Effect.Effect<void, FSErr>;
closeAndRename(
handle: WriteHandle,
filename: BlockFilename,
): Effect.Effect<void, FSErr>;
openToRead(
filename: string,
): Effect.Effect<{ handle: ReadHandle; size: number }, FSErr>;
createFile(filename: string): Promise<WriteHandle>;
append(handle: WriteHandle, data: Uint8Array): Promise<void>;
close(handle: ReadHandle | WriteHandle): Promise<void>;
closeAndRename(handle: WriteHandle, filename: BlockFilename): Promise<void>;
openToRead(filename: string): Promise<{ handle: ReadHandle; size: number }>;
read(
handle: ReadHandle,
offset: number,
length: number,
): Effect.Effect<Uint8Array, FSErr>;
listFiles(): Effect.Effect<string[], FSErr>;
removeFile(
filename: BlockFilename | WalFilename,
): Effect.Effect<void, FSErr>;
): Promise<Uint8Array>;
listFiles(): Promise<string[]>;
removeFile(filename: BlockFilename | WalFilename): Promise<void>;
}
export const textEncoder = new TextEncoder();
export const textDecoder = new TextDecoder();
export function readChunk<RH, FS extends FileSystem<unknown, RH>>(
export async function readChunk<RH, FS extends FileSystem<unknown, RH>>(
handle: RH,
header: { start: number; length: number },
fs: FS,
): Effect.Effect<CoValueChunk, FSErr> {
return Effect.gen(function* ($) {
const chunkBytes = yield* $(
fs.read(handle, header.start, header.length),
);
): Promise<CoValueChunk> {
const chunkBytes = await fs.read(handle, header.start, header.length);
const chunk = JSON.parse(textDecoder.decode(chunkBytes));
return chunk;
});
const chunk = JSON.parse(textDecoder.decode(chunkBytes));
return chunk;
}
export function readHeader<RH, FS extends FileSystem<unknown, RH>>(
export async function readHeader<RH, FS extends FileSystem<unknown, RH>>(
filename: string,
handle: RH,
size: number,
fs: FS,
): Effect.Effect<BlockHeader, FSErr> {
return Effect.gen(function* ($) {
const headerLength = Number(filename.match(/-H(\d+)\.jsonl$/)![1]!);
): Promise<BlockHeader> {
const headerLength = Number(filename.match(/-H(\d+)\.jsonl$/)![1]!);
const headerBytes = yield* $(
fs.read(handle, size - headerLength, headerLength),
);
const headerBytes = await fs.read(
handle,
size - headerLength,
headerLength,
);
const header = JSON.parse(textDecoder.decode(headerBytes));
return header;
});
const header = JSON.parse(textDecoder.decode(headerBytes));
return header;
}
export function writeBlock<WH, RH, FS extends FileSystem<WH, RH>>(
export async function writeBlock<WH, RH, FS extends FileSystem<WH, RH>>(
chunks: Map<RawCoID, CoValueChunk>,
level: number,
blockNumber: number,
fs: FS,
): Effect.Effect<BlockFilename, FSErr> {
): Promise<BlockFilename> {
if (chunks.size === 0) {
return Effect.die(new Error("No chunks to write"));
throw new Error("No chunks to write");
}
return Effect.gen(function* ($) {
const blockHeader: BlockHeader = [];
const blockHeader: BlockHeader = [];
let offset = 0;
let offset = 0;
const file = yield* $(
fs.createFile(
"wipBlock" +
Math.random().toString(36).substring(7) +
".tmp.jsonl",
),
);
const hash = new StreamingHash(fs.crypto);
const file = await fs.createFile(
"wipBlock" + Math.random().toString(36).substring(7) + ".tmp.jsonl",
);
const hash = new StreamingHash(fs.crypto);
const chunksSortedById = Array.from(chunks).sort(([id1], [id2]) =>
id1.localeCompare(id2),
);
const chunksSortedById = Array.from(chunks).sort(([id1], [id2]) =>
id1.localeCompare(id2),
);
for (const [id, chunk] of chunksSortedById) {
const encodedBytes = hash.update(chunk);
const encodedBytesWithNewline = new Uint8Array(
encodedBytes.length + 1,
);
encodedBytesWithNewline.set(encodedBytes);
encodedBytesWithNewline[encodedBytes.length] = 10;
yield* $(fs.append(file, encodedBytesWithNewline));
const length = encodedBytesWithNewline.length;
blockHeader.push({ id, start: offset, length });
offset += length;
}
for (const [id, chunk] of chunksSortedById) {
const encodedBytes = hash.update(chunk);
const encodedBytesWithNewline = new Uint8Array(encodedBytes.length + 1);
encodedBytesWithNewline.set(encodedBytes);
encodedBytesWithNewline[encodedBytes.length] = 10;
await fs.append(file, encodedBytesWithNewline);
const length = encodedBytesWithNewline.length;
blockHeader.push({ id, start: offset, length });
offset += length;
}
const headerBytes = textEncoder.encode(JSON.stringify(blockHeader));
yield* $(fs.append(file, headerBytes));
const headerBytes = textEncoder.encode(JSON.stringify(blockHeader));
await fs.append(file, headerBytes);
// console.log(
// "full file",
// yield* $(
// fs.read(file as unknown as RH, 0, offset + headerBytes.length),
// ),
// );
// console.log(
// "full file",
// yield* $(
// fs.read(file as unknown as RH, 0, offset + headerBytes.length),
// ),
// );
const filename: BlockFilename = `L${level}-${(
blockNumber + ""
).padStart(3, "0")}-${hash
.digest()
.replace("hash_", "")
.slice(0, 15)}-H${headerBytes.length}.jsonl`;
// console.log("renaming to" + filename);
yield* $(fs.closeAndRename(file, filename));
const filename: BlockFilename = `L${level}-${(blockNumber + "").padStart(
3,
"0",
)}-${hash.digest().replace("hash_", "").slice(0, 15)}-H${
headerBytes.length
}.jsonl`;
// console.log("renaming to" + filename);
await fs.closeAndRename(file, filename);
return filename;
return filename;
// console.log("Wrote block", filename, blockHeader);
// console.log("IDs in block", blockHeader.map(e => e.id));
});
// console.log("Wrote block", filename, blockHeader);
// console.log("IDs in block", blockHeader.map(e => e.id));
}
export function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
export async function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
handle: WH,
fs: FS,
id: RawCoID,
chunk: CoValueChunk,
): Effect.Effect<void, FSErr> {
return Effect.gen(function* ($) {
const walEntry: WalEntry = {
id,
...chunk,
};
const bytes = textEncoder.encode(JSON.stringify(walEntry) + "\n");
console.log("writing to WAL", handle, id, bytes.length);
yield* $(fs.append(handle, bytes));
});
) {
const walEntry: WalEntry = {
id,
...chunk,
};
const bytes = textEncoder.encode(JSON.stringify(walEntry) + "\n");
console.log("writing to WAL", handle, id, bytes.length);
return fs.append(handle, bytes);
}

View File

@@ -1,4 +1,3 @@
import { Either } from "effect";
import { RawCoID, SessionID } from "../ids.js";
import { MAX_RECOMMENDED_TX_SIZE } from "../index.js";
import { CoValueKnownState, NewContentMessage } from "../sync.js";
@@ -80,7 +79,7 @@ export function chunkToKnownState(id: RawCoID, chunk: CoValueChunk) {
export function mergeChunks(
chunkA: CoValueChunk,
chunkB: CoValueChunk,
): Either.Either<"nonContigous", CoValueChunk> {
): "nonContigous" | CoValueChunk {
const header = chunkA.header || chunkB.header;
const newSessions = { ...chunkA.sessionEntries };
@@ -133,9 +132,9 @@ export function mergeChunks(
}
newSessions[sessionID] = newEntries;
} else {
return Either.right("nonContigous" as const);
return "nonContigous" as const;
}
}
return Either.left({ header, sessionEntries: newSessions });
return { header, sessionEntries: newSessions };
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,6 @@
import { Console, Effect, Queue, Stream } from "effect";
import { Peer, PeerID, SyncMessage } from "./sync.js";
import { Channel } from "queueable";
export { Channel } from "queueable";
export function connectedPeers(
peer1id: PeerID,
@@ -13,60 +14,57 @@ export function connectedPeers(
peer1role?: Peer["role"];
peer2role?: Peer["role"];
} = {},
): Effect.Effect<[Peer, Peer]> {
return Effect.gen(function* () {
const [from1to2Rx, from1to2Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
);
const [from2to1Rx, from2to1Tx] = yield* newQueuePair(
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
);
): [Peer, Peer] {
const [from1to2Rx, from1to2Tx] = newQueuePair(
trace ? { traceAs: `${peer1id} -> ${peer2id}` } : undefined,
);
const [from2to1Rx, from2to1Tx] = newQueuePair(
trace ? { traceAs: `${peer2id} -> ${peer1id}` } : undefined,
);
const peer2AsPeer: Peer = {
id: peer2id,
incoming: from2to1Rx,
outgoing: from1to2Tx,
role: peer2role,
};
const peer2AsPeer: Peer = {
id: peer2id,
incoming: from2to1Rx,
outgoing: from1to2Tx,
role: peer2role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: from1to2Rx,
outgoing: from2to1Tx,
role: peer1role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: from1to2Rx,
outgoing: from2to1Tx,
role: peer1role,
};
return [peer1AsPeer, peer2AsPeer];
});
return [peer1AsPeer, peer2AsPeer];
}
export function newQueuePair(
options: { traceAs?: string } = {},
): Effect.Effect<[Stream.Stream<SyncMessage>, Queue.Enqueue<SyncMessage>]> {
return Effect.gen(function* () {
const queue = yield* Queue.unbounded<SyncMessage>();
): [AsyncIterable<SyncMessage>, Channel<SyncMessage>] {
const channel = new Channel<SyncMessage>();
if (options.traceAs) {
return [
Stream.fromQueue(queue).pipe(
Stream.tap((msg) =>
Console.debug(
options.traceAs,
JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
if (options.traceAs) {
return [
(async function* () {
for await (const msg of channel) {
console.debug(
options.traceAs,
JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
2,
),
),
),
queue,
];
} else {
return [Stream.fromQueue(queue), queue];
}
});
);
yield msg;
}
})(),
channel,
];
} else {
return [channel.wrap(), channel];
}
}

View File

@@ -3,7 +3,6 @@ import { CoValueHeader, Transaction } from "./coValueCore.js";
import { CoValueCore } from "./coValueCore.js";
import { LocalNode, newLoadingState } from "./localNode.js";
import { RawCoID, SessionID } from "./ids.js";
import { Data, Effect, Queue, Stream } from "effect";
export type CoValueKnownState = {
id: RawCoID;
@@ -56,19 +55,17 @@ export type DoneMessage = {
export type PeerID = string;
export class DisconnectedError extends Data.TaggedError("DisconnectedError")<{
message: string;
}> {}
export type DisconnectedError = "Disconnected";
export class PingTimeoutError extends Error {
readonly _tag = "PingTimeoutError";
}
export type PingTimeoutError = "PingTimeout";
export type IncomingSyncStream = Stream.Stream<
SyncMessage,
DisconnectedError | PingTimeoutError
export type IncomingSyncStream = AsyncIterable<
SyncMessage | DisconnectedError | PingTimeoutError
>;
export type OutgoingSyncQueue = Queue.Enqueue<SyncMessage>;
export type OutgoingSyncQueue = {
push: (msg: SyncMessage) => Promise<unknown>;
close: () => void;
};
export interface Peer {
id: PeerID;
@@ -144,15 +141,11 @@ export class SyncManager {
for (const peer of eligiblePeers) {
// console.log("loading", id, "from", peer.id);
Effect.runPromise(
Queue.offer(peer.outgoing, {
action: "load",
id: id,
header: false,
sessions: {},
}),
).catch((e) => {
console.error("Error writing to peer", e);
await peer.outgoing.push({
action: "load",
id: id,
header: false,
sessions: {},
});
const coValueEntry = this.local.coValues[id];
@@ -229,11 +222,13 @@ export class SyncManager {
}
if (entry.state === "loading") {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "load",
id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending load", e);
});
return;
}
@@ -246,9 +241,11 @@ export class SyncManager {
if (!peer.toldKnownState.has(id)) {
peer.toldKnownState.add(id);
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "load",
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending load", e);
});
}
}
@@ -273,10 +270,12 @@ export class SyncManager {
);
if (!peer.toldKnownState.has(id)) {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
asDependencyOf,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state", e);
});
peer.toldKnownState.add(id);
@@ -311,7 +310,9 @@ export class SyncManager {
// } header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
await this.trySendToPeer(peer, piece);
this.trySendToPeer(peer, piece).catch((e) => {
console.error("Error sending content piece", e);
});
if (performance.now() - lastYield > 10) {
await new Promise<void>((resolve) => {
setTimeout(resolve, 0);
@@ -365,55 +366,39 @@ export class SyncManager {
void initialSync();
}
void Effect.runPromise(
peerState.incoming.pipe(
Stream.ensuring(
Effect.sync(() => {
console.log("Peer disconnected:", peer.id);
delete this.peers[peer.id];
}),
),
Stream.runForEach((msg) =>
Effect.tryPromise({
try: () => this.handleSyncMessage(msg, peerState),
catch: (e) =>
new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" ||
k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
}).pipe(
Effect.timeoutFail({
duration: 10000,
onTimeout: () =>
new Error("Took >10s to process message"),
}),
),
),
Effect.catchAll((e) =>
Effect.logError(
"Error in peer",
peer.id,
e.message,
typeof e.cause === "object" &&
e.cause instanceof Error &&
e.cause.message,
),
),
),
);
const processMessages = async () => {
for await (const msg of peerState.incoming) {
if (msg === "Disconnected") {
return;
}
if (msg === "PingTimeout") {
console.error("Ping timeout from peer", peer.id);
return;
}
try {
await this.handleSyncMessage(msg, peerState);
} catch (e) {
throw new Error(
`Error reading from peer ${
peer.id
}, handling msg\n\n${JSON.stringify(msg, (k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
);
}
}
};
processMessages().catch((e) => {
console.error("Error processing messages from peer", peer.id, e);
});
}
trySendToPeer(peer: PeerState, msg: SyncMessage) {
return Effect.runPromise(Queue.offer(peer.outgoing, msg));
return peer.outgoing.push(msg);
}
async handleLoad(msg: LoadMessage, peer: PeerState) {
@@ -426,7 +411,7 @@ export class SyncManager {
// special case: we should be able to solve this much more neatly
// with an explicit state machine in the future
const eligiblePeers = this.peersInPriorityOrder().filter(
(other) => other.id !== peer.id && peer.role === "server",
(other) => other.id !== peer.id && other.role === "server",
);
if (eligiblePeers.length === 0) {
if (msg.header || Object.keys(msg.sessions).length > 0) {
@@ -439,7 +424,7 @@ export class SyncManager {
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
console.error("Error sending known state", e);
});
}
return;
@@ -465,16 +450,18 @@ export class SyncManager {
peer.id,
);
const loaded = await entry.done;
console.log("Loaded", msg.id, loaded);
// console.log("Loaded", msg.id, loaded);
if (loaded === "unavailable") {
peer.optimisticKnownStates[msg.id] = knownStateIn(msg);
peer.toldKnownState.add(msg.id);
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
id: msg.id,
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
});
return;
@@ -684,10 +671,12 @@ export class SyncManager {
await this.syncCoValue(coValue);
if (invalidStateAssumed) {
await this.trySendToPeer(peer, {
this.trySendToPeer(peer, {
action: "known",
isCorrection: true,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state correction", e);
});
}
}
@@ -755,6 +744,12 @@ export class SyncManager {
}
}
}
gracefulShutdown() {
for (const peer of Object.values(this.peers)) {
peer.outgoing.close();
}
}
}
function knownStateIn(msg: LoadMessage | KnownStateMessage) {

View File

@@ -3,7 +3,6 @@ import { newRandomSessionID } from "../coValueCore.js";
import { LocalNode } from "../localNode.js";
import { connectedPeers } from "../streamUtils.js";
import { WasmCrypto } from "../crypto/WasmCrypto.js";
import { Effect } from "effect";
const Crypto = await WasmCrypto.create();
@@ -53,13 +52,11 @@ test("Can create account with one node, and then load it on another", async () =
map.set("foo", "bar", "private");
expect(map.get("foo")).toEqual("bar");
const [node1asPeer, node2asPeer] = await Effect.runPromise(
connectedPeers("node1", "node2", {
trace: true,
peer1role: "server",
peer2role: "client",
}),
);
const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {
trace: true,
peer1role: "server",
peer2role: "client",
})
console.log("After connected peers");

File diff suppressed because it is too large Load Diff

View File

@@ -1,5 +1,41 @@
# jazz-browser-media-images
## 0.7.29
### Patch Changes
- jazz-browser@0.7.29
- jazz-tools@0.7.29
## 0.7.28
### Patch Changes
- jazz-browser@0.7.28
- jazz-tools@0.7.28
## 0.7.27
### Patch Changes
- jazz-browser@0.7.27
## 0.7.26
### Patch Changes
- Updated dependencies
- jazz-browser@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
- jazz-browser@0.7.25
## 0.7.24
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser-media-images",
"version": "0.7.24",
"version": "0.7.29",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,50 @@
# jazz-browser
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- cojson-storage-indexeddb@0.7.29
- cojson-transport-ws@0.7.29
- jazz-tools@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- cojson-storage-indexeddb@0.7.28
- cojson-transport-ws@0.7.28
- jazz-tools@0.7.28
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- cojson-storage-indexeddb@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
## 0.7.24
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-browser",
"version": "0.7.24",
"version": "0.7.29",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",
@@ -10,7 +10,6 @@
"cojson": "workspace:*",
"cojson-storage-indexeddb": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"jazz-tools": "workspace:*",
"typescript": "^5.1.6"
},

View File

@@ -1,13 +1,12 @@
import {
BlockFilename,
FSErr,
FileSystem,
WalFilename,
CryptoProvider,
} from "cojson";
import { Effect } from "effect";
import { BlockFilename, FileSystem, WalFilename, CryptoProvider } from "cojson";
export class OPFSFilesystem implements FileSystem<{id: number, filename: string}, {id: number, filename: string}> {
export class OPFSFilesystem
implements
FileSystem<
{ id: number; filename: string },
{ id: number; filename: string }
>
{
opfsWorker: Worker;
callbacks: Map<number, (event: MessageEvent) => void> = new Map();
nextRequestId = 0;
@@ -28,8 +27,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
};
}
listFiles(): Effect.Effect<string[], FSErr, never> {
return Effect.async((cb) => {
listFiles(): Promise<string[]> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("listFiles" + requestId + "_listFiles");
this.callbacks.set(requestId, (event) => {
@@ -39,7 +38,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"listFiles" + requestId + "_listFiles",
"listFilesEnd" + requestId + "_listFiles",
);
cb(Effect.succeed(event.data.fileNames));
resolve(event.data.fileNames);
});
this.opfsWorker.postMessage({ type: "listFiles", requestId });
});
@@ -47,17 +46,15 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
openToRead(
filename: string,
): Effect.Effect<{ handle: {id: number, filename: string}; size: number }, FSErr, never> {
return Effect.async((cb) => {
): Promise<{ handle: { id: number; filename: string }; size: number }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("openToRead" + "_" + filename);
this.callbacks.set(requestId, (event) => {
cb(
Effect.succeed({
handle: {id: event.data.handle, filename},
size: event.data.size,
}),
);
resolve({
handle: { id: event.data.handle, filename },
size: event.data.size,
});
performance.mark("openToReadEnd" + "_" + filename);
performance.measure(
"openToRead" + "_" + filename,
@@ -73,8 +70,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
createFile(filename: string): Effect.Effect<{id: number, filename: string}, FSErr, never> {
return Effect.async((cb) => {
createFile(filename: string): Promise<{ id: number; filename: string }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("createFile" + "_" + filename);
this.callbacks.set(requestId, (event) => {
@@ -84,7 +81,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"createFile" + "_" + filename,
"createFileEnd" + "_" + filename,
);
cb(Effect.succeed({id: event.data.handle, filename}));
resolve({ id: event.data.handle, filename });
});
this.opfsWorker.postMessage({
type: "createFile",
@@ -94,10 +91,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
openToWrite(
filename: string,
): Effect.Effect<{id: number, filename: string}, FSErr, never> {
return Effect.async((cb) => {
openToWrite(filename: string): Promise<{ id: number; filename: string }> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("openToWrite" + "_" + filename);
this.callbacks.set(requestId, (event) => {
@@ -107,7 +102,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"openToWrite" + "_" + filename,
"openToWriteEnd" + "_" + filename,
);
cb(Effect.succeed({id: event.data.handle, filename}));
resolve({ id: event.data.handle, filename });
});
this.opfsWorker.postMessage({
type: "openToWrite",
@@ -118,10 +113,10 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
append(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
data: Uint8Array,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("append" + "_" + handle.filename);
this.callbacks.set(requestId, (_) => {
@@ -131,7 +126,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"append" + "_" + handle.filename,
"appendEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "append",
@@ -143,11 +138,11 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
read(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
offset: number,
length: number,
): Effect.Effect<Uint8Array, FSErr, never> {
return Effect.async((cb) => {
): Promise<Uint8Array> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("read" + "_" + handle.filename);
this.callbacks.set(requestId, (event) => {
@@ -157,7 +152,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"read" + "_" + handle.filename,
"readEnd" + "_" + handle.filename,
);
cb(Effect.succeed(event.data.data));
resolve(event.data.data);
});
this.opfsWorker.postMessage({
type: "read",
@@ -169,8 +164,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
close(handle: {id: number, filename: string}): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
close(handle: { id: number; filename: string }): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("close" + "_" + handle.filename);
this.callbacks.set(requestId, (_) => {
@@ -180,7 +175,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"close" + "_" + handle.filename,
"closeEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "close",
@@ -191,22 +186,20 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
}
closeAndRename(
handle: {id: number, filename: string},
handle: { id: number; filename: string },
filename: BlockFilename,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("closeAndRename" + "_" + handle.filename);
this.callbacks.set(requestId, () => {
performance.mark(
"closeAndRenameEnd" + "_" + handle.filename,
);
performance.mark("closeAndRenameEnd" + "_" + handle.filename);
performance.measure(
"closeAndRename" + "_" + handle.filename,
"closeAndRename" + "_" + handle.filename,
"closeAndRenameEnd" + "_" + handle.filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "closeAndRename",
@@ -217,10 +210,8 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
});
}
removeFile(
filename: BlockFilename | WalFilename,
): Effect.Effect<void, FSErr, never> {
return Effect.async((cb) => {
removeFile(filename: BlockFilename | WalFilename): Promise<void> {
return new Promise((resolve) => {
const requestId = this.nextRequestId++;
performance.mark("removeFile" + "_" + filename);
this.callbacks.set(requestId, () => {
@@ -230,7 +221,7 @@ export class OPFSFilesystem implements FileSystem<{id: number, filename: string}
"removeFile" + "_" + filename,
"removeFileEnd" + "_" + filename,
);
cb(Effect.succeed(undefined));
resolve(undefined);
});
this.opfsWorker.postMessage({
type: "removeFile",

View File

@@ -14,7 +14,6 @@ import { AccountID, LSMStorage } from "cojson";
import { AuthProvider } from "./auth/auth.js";
import { OPFSFilesystem } from "./OPFSFilesystem.js";
import { IDBStorage } from "cojson-storage-indexeddb";
import { Effect, Queue } from "effect";
import { createWebSocketPeer } from "cojson-transport-ws";
export * from "./auth/auth.js";
@@ -42,13 +41,11 @@ export async function createJazzBrowserContext<Acc extends Account>({
const crypto = customCrypto || (await WasmCrypto.create());
let sessionDone: () => void;
const firstWsPeer = await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
);
const firstWsPeer = createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
});
let shouldTryToReconnect = true;
let currentReconnectionTimeout = initialReconnectionTimeout;
@@ -112,13 +109,11 @@ export async function createJazzBrowserContext<Acc extends Account>({
});
me._raw.core.node.syncManager.addPeer(
await Effect.runPromise(
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
}),
),
createWebSocketPeer({
websocket: new WebSocket(peerAddr),
id: peerAddr + "@" + new Date().toISOString(),
role: "server",
})
);
}
}
@@ -132,11 +127,7 @@ export async function createJazzBrowserContext<Acc extends Account>({
shouldTryToReconnect = false;
window.removeEventListener("online", onOnline);
console.log("Cleaning up node");
for (const peer of Object.values(
me._raw.core.node.syncManager.peers,
)) {
void Effect.runPromise(Queue.shutdown(peer.outgoing));
}
me._raw.core.node.gracefulShutdown();
sessionDone?.();
},
};

View File

@@ -1,5 +1,47 @@
# jazz-autosub
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- cojson-transport-ws@0.7.29
- jazz-tools@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- cojson-transport-ws@0.7.28
- jazz-tools@0.7.28
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
## 0.7.24
### Patch Changes

View File

@@ -5,11 +5,10 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.24",
"version": "0.7.29",
"dependencies": {
"cojson": "workspace:*",
"cojson-transport-ws": "workspace:*",
"effect": "^3.5.2",
"jazz-tools": "workspace:*",
"ws": "^8.14.2"
},

View File

@@ -1,7 +1,6 @@
import { AgentSecret, Peer, SessionID, WasmCrypto } from "cojson";
import { createWebSocketPeer } from "cojson-transport-ws";
import { Account, CoValueClass, ID } from "jazz-tools";
import { Effect } from "effect";
import { WebSocket } from "ws";
/** @category Context Creation */
@@ -18,13 +17,11 @@ export async function startWorker<Acc extends Account>({
syncServer?: string;
accountSchema?: CoValueClass<Acc> & typeof Account;
}): Promise<{ worker: Acc }> {
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
const wsPeer: Peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
});
if (!accountID) {
throw new Error("No accountID provided");
@@ -52,13 +49,11 @@ export async function startWorker<Acc extends Account>({
if (!worker._raw.core.node.syncManager.peers["upstream"]) {
console.log(new Date(), "Reconnecting to upstream " + peer);
const wsPeer: Peer = await Effect.runPromise(
createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
}),
);
const wsPeer: Peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peer),
role: "server",
});
worker._raw.core.node.syncManager.addPeer(wsPeer);
}

View File

@@ -1,5 +1,47 @@
# jazz-react
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- jazz-browser@0.7.29
- jazz-tools@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- jazz-browser@0.7.28
- jazz-tools@0.7.28
## 0.7.27
### Patch Changes
- jazz-browser@0.7.27
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
- jazz-browser@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
- jazz-browser@0.7.25
## 0.7.24
### Patch Changes

View File

@@ -1,6 +1,6 @@
{
"name": "jazz-react",
"version": "0.7.24",
"version": "0.7.29",
"type": "module",
"main": "dist/index.js",
"types": "src/index.ts",

View File

@@ -1,5 +1,46 @@
# jazz-autosub
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
- cojson-transport-ws@0.7.29
- jazz-tools@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
- cojson-transport-ws@0.7.28
- jazz-tools@0.7.28
## 0.7.27
### Patch Changes
- Updated dependencies
- cojson-transport-ws@0.7.27
## 0.7.26
### Patch Changes
- Updated dependencies
- cojson@0.7.26
- cojson-transport-ws@0.7.26
- jazz-tools@0.7.26
## 0.7.25
### Patch Changes
- Updated dependencies
- jazz-tools@0.7.25
## 0.7.24
### Patch Changes

View File

@@ -3,7 +3,7 @@
"bin": "./dist/index.js",
"type": "module",
"license": "MIT",
"version": "0.7.24",
"version": "0.7.29",
"scripts": {
"lint": "eslint . --ext ts,tsx",
"format": "prettier --write './src/**/*.{ts,tsx}'",

View File

@@ -25,7 +25,7 @@ const accountCreate = Command.make(
return Effect.gen(function* () {
const crypto = yield* Effect.promise(() => WasmCrypto.create());
const peer = yield* createWebSocketPeer({
const peer = createWebSocketPeer({
id: "upstream",
websocket: new WebSocket(peerAddr),
role: "server",
@@ -53,7 +53,7 @@ const accountCreate = Command.make(
),
);
const peer2 = yield* createWebSocketPeer({
const peer2 = createWebSocketPeer({
id: "upstream2",
websocket: new WebSocket(peerAddr),
role: "server",

View File

@@ -1,5 +1,33 @@
# jazz-autosub
## 0.7.29
### Patch Changes
- Updated dependencies
- cojson@0.7.29
## 0.7.28
### Patch Changes
- Updated dependencies
- cojson@0.7.28
## 0.7.26
### Patch Changes
- Remove Effect from jazz/cojson internals
- Updated dependencies
- cojson@0.7.26
## 0.7.25
### Patch Changes
- Implement applyDiff on CoMap to only update changed fields
## 0.7.24
### Patch Changes

View File

@@ -5,11 +5,9 @@
"types": "./src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.7.24",
"version": "0.7.29",
"dependencies": {
"@effect/schema": "^0.66.16",
"cojson": "workspace:*",
"effect": "^3.5.2",
"fast-check": "^3.17.2"
},
"scripts": {

View File

@@ -11,7 +11,6 @@ import type {
RawControlledAccount,
SessionID,
} from "cojson";
import { Context, Effect } from "effect";
import {
CoMap,
CoValue,
@@ -221,12 +220,10 @@ export class Account extends CoValueBase implements CoValue {
},
) {
// TODO: is there a cleaner way to do this?
const connectedPeers = await Effect.runPromise(
cojsonInternals.connectedPeers(
"creatingAccount",
"createdAccount",
{ peer1role: "server", peer2role: "client" },
),
const connectedPeers = cojsonInternals.connectedPeers(
"creatingAccount",
"createdAccount",
{ peer1role: "server", peer2role: "client" },
);
as._raw.core.node.syncManager.addPeer(connectedPeers[1]);
@@ -378,9 +375,6 @@ export const AccountAndGroupProxyHandler: ProxyHandler<Account | Group> = {
},
};
/** @category Identity & Permissions */
export class AccountCtx extends Context.Tag("Account")<AccountCtx, Account>() {}
/** @category Identity & Permissions */
export function isControlledAccount(account: Account): account is Account & {
isMe: true;

View File

@@ -1,4 +1,4 @@
import type { RawCoList } from "cojson";
import type { JsonValue, RawCoList } from "cojson";
import { RawAccount } from "cojson";
import type {
CoValue,
@@ -26,8 +26,8 @@ import {
makeRefs,
subscribeToCoValue,
subscribeToExistingCoValue,
subscriptionsScopes,
} from "../internal.js";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
/**
* CoLists are collaborative versions of plain arrays.
@@ -303,7 +303,7 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
} else if ("encoded" in itemDescriptor) {
return this._raw
.asArray()
.map((e) => encodeSync(itemDescriptor.encoded)(e));
.map((e) => itemDescriptor.encoded.encode(e));
} else if (isRefEncoded(itemDescriptor)) {
return this.map((item, idx) =>
seenAbove?.includes((item as CoValue)?.id)
@@ -444,16 +444,21 @@ export class CoList<Item = any> extends Array<Item> implements CoValue {
castAs<Cl extends CoValueClass & CoValueFromRaw<CoValue>>(
cl: Cl,
): InstanceType<Cl> {
return cl.fromRaw(this._raw) as InstanceType<Cl>;
const casted = cl.fromRaw(this._raw) as InstanceType<Cl>;
const subscriptionScope = subscriptionsScopes.get(this);
if (subscriptionScope) {
subscriptionsScopes.set(casted, subscriptionScope);
}
return casted;
}
}
function toRawItems<Item>(items: Item[], itemDescriptor: Schema) {
const rawItems =
itemDescriptor === "json"
? items
? (items as JsonValue[])
: "encoded" in itemDescriptor
? items?.map((e) => encodeSync(itemDescriptor.encoded)(e))
? items?.map((e) => itemDescriptor.encoded.encode(e))
: isRefEncoded(itemDescriptor)
? items?.map((v) => (v as unknown as CoValue).id)
: (() => {
@@ -472,7 +477,7 @@ const CoListProxyHandler: ProxyHandler<CoList> = {
} else if ("encoded" in itemDescriptor) {
return rawValue === undefined
? undefined
: decodeSync(itemDescriptor.encoded)(rawValue);
: itemDescriptor.encoded.decode(rawValue);
} else if (isRefEncoded(itemDescriptor)) {
return rawValue === undefined
? undefined
@@ -505,7 +510,7 @@ const CoListProxyHandler: ProxyHandler<CoList> = {
if (itemDescriptor === "json") {
rawValue = value;
} else if ("encoded" in itemDescriptor) {
rawValue = encodeSync(itemDescriptor.encoded)(value);
rawValue = itemDescriptor.encoded.encode(value);
} else if (isRefEncoded(itemDescriptor)) {
rawValue = value.id;
}

View File

@@ -1,6 +1,4 @@
import type { JsonValue, RawCoMap } from "cojson";
import type { Simplify } from "effect/Types";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
import type {
CoValue,
Schema,
@@ -36,6 +34,10 @@ type CoMapEdit<V> = {
madeAt: Date;
};
export type Simplify<A> = {
[K in keyof A]: A[K]
} extends infer B ? B : never
/**
* CoMaps are collaborative versions of plain objects, mapping string-like keys to values.
*
@@ -147,7 +149,7 @@ export class CoMap extends CoValueBase implements CoValue {
descriptor === "json"
? rawEdit.value
: "encoded" in descriptor
? decodeSync(descriptor.encoded)(rawEdit.value)
? descriptor.encoded.encode(rawEdit.value)
: new Ref(
rawEdit.value as ID<CoValue>,
target._loadedAs,
@@ -317,7 +319,7 @@ export class CoMap extends CoValueBase implements CoValue {
rawInit[key] = (initValue as unknown as CoValue).id;
}
} else if ("encoded" in descriptor) {
rawInit[key] = encodeSync(descriptor.encoded)(
rawInit[key] = descriptor.encoded.encode(
// eslint-disable-next-line @typescript-eslint/no-explicit-any
initValue as any,
);
@@ -454,6 +456,37 @@ export class CoMap extends CoValueBase implements CoValue {
): () => void {
return subscribeToExistingCoValue(this, depth, listener);
}
applyDiff(newValues: Partial<CoMapInit<this>>) {
for (const key in newValues) {
if (Object.prototype.hasOwnProperty.call(newValues, key)) {
const tKey = key as keyof typeof newValues & keyof this;
const descriptor = (this._schema[tKey as string] ||
this._schema[ItemsSym]) as Schema;
if (tKey in this._schema) {
const newValue = newValues[tKey];
const currentValue = this[tKey];
if (descriptor === "json" || "encoded" in descriptor) {
if (currentValue !== newValue) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this as any)[tKey] = newValue;
}
} else if (isRefEncoded(descriptor)) {
const currentId = (currentValue as CoValue | undefined)
?.id;
const newId = (newValue as CoValue | undefined)?.id;
if (currentId !== newId) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
(this as any)[tKey] = newValue;
}
}
}
}
}
return this;
}
}
export type CoKeys<Map extends object> = Exclude<
@@ -485,7 +518,7 @@ const CoMapProxyHandler: ProxyHandler<CoMap> = {
} else if ("encoded" in descriptor) {
return raw === undefined
? undefined
: decodeSync(descriptor.encoded)(raw);
: descriptor.encoded.decode(raw);
} else if (isRefEncoded(descriptor)) {
return raw === undefined
? undefined
@@ -519,7 +552,7 @@ const CoMapProxyHandler: ProxyHandler<CoMap> = {
if (descriptor === "json") {
target._raw.set(key, value);
} else if ("encoded" in descriptor) {
target._raw.set(key, encodeSync(descriptor.encoded)(value));
target._raw.set(key, descriptor.encoded.encode(value));
} else if (isRefEncoded(descriptor)) {
target._raw.set(key, value.id);
subscriptionsScopes

View File

@@ -35,7 +35,6 @@ import {
ensureCoValueLoaded,
subscribeToExistingCoValue,
} from "../internal.js";
import { encodeSync, decodeSync } from "@effect/schema/Schema";
export type CoStreamEntry<Item> = SingleCoStreamEntry<Item> & {
all: IterableIterator<SingleCoStreamEntry<Item>>;
@@ -141,7 +140,7 @@ export class CoStream<Item = any> extends CoValueBase implements CoValue {
if (itemDescriptor === "json") {
this._raw.push(item as JsonValue);
} else if ("encoded" in itemDescriptor) {
this._raw.push(encodeSync(itemDescriptor.encoded)(item));
this._raw.push(itemDescriptor.encoded.encode(item));
} else if (isRefEncoded(itemDescriptor)) {
this._raw.push((item as unknown as CoValue).id);
}
@@ -153,7 +152,7 @@ export class CoStream<Item = any> extends CoValueBase implements CoValue {
itemDescriptor === "json"
? (v: unknown) => v
: "encoded" in itemDescriptor
? encodeSync(itemDescriptor.encoded)
? itemDescriptor.encoded.encode
: (v: unknown) => v && (v as CoValue).id;
return {
@@ -247,7 +246,7 @@ function entryFromRawEntry<Item>(
? (CoValue & Item) | null
: Item;
} else if ("encoded" in itemField) {
return decodeSync(itemField.encoded)(rawEntry.value);
return itemField.encoded.decode(rawEntry.value);
} else if (isRefEncoded(itemField)) {
return this.ref?.accessFrom(
accessFrom,

View File

@@ -122,7 +122,12 @@ export class CoValueBase implements CoValue {
castAs<Cl extends CoValueClass & CoValueFromRaw<CoValue>>(
cl: Cl,
): InstanceType<Cl> {
return cl.fromRaw(this._raw) as InstanceType<Cl>;
const casted = cl.fromRaw(this._raw) as InstanceType<Cl>;
const subscriptionScope = subscriptionsScopes.get(this);
if (subscriptionScope) {
subscriptionsScopes.set(casted, subscriptionScope);
}
return casted;
}
}

View File

@@ -5,7 +5,6 @@ import {
isCoValueClass,
CoValueFromRaw,
} from "../internal.js";
import type { Schema as EffectSchema, TypeId } from "@effect/schema/Schema";
export type CoMarker = { readonly __co: unique symbol };
/** @category Schema definition */
@@ -113,7 +112,7 @@ function ref<
}
export type JsonEncoded = "json";
export type EncodedAs<V> = { encoded: Encoder<V> };
export type EncodedAs<V> = { encoded: Encoder<V> | OptionalEncoder<V> };
export type RefEncoded<V extends CoValue> = {
ref: CoValueClass<V> | ((raw: RawCoValue) => CoValueClass<V>);
optional: boolean;
@@ -152,31 +151,23 @@ export type SchemaFor<Field> = NonNullable<Field> extends CoValue
? JsonEncoded
: EncodedAs<NonNullable<Field>>;
export type EffectSchemaWithInputAndOutput<A, I = A> = EffectSchema<
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
any,
never
> & {
[TypeId]: {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_A: (_: any) => A;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
_I: (_: any) => I;
};
export type Encoder<V> = {
encode: (value: V) => JsonValue;
decode: (value: JsonValue) => V;
};
export type OptionalEncoder<V> =
| Encoder<V>
| {
encode: (value: V | undefined) => JsonValue;
decode: (value: JsonValue) => V | undefined;
};
export type Encoder<V> = EffectSchemaWithInputAndOutput<V, JsonValue>;
export type OptionalEncoder<V> = EffectSchemaWithInputAndOutput<
V,
JsonValue | undefined
>;
import { Date } from "@effect/schema/Schema";
import { SchemaInit, ItemsSym, MembersSym } from "./symbols.js";
/** @category Schema definition */
export const Encoders = {
Date,
Date: {
encode: (value: Date) => value.toISOString(),
decode: (value: JsonValue) => new Date(value as string),
},
};

View File

@@ -1,12 +1,12 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
Account,
CoList,
WasmCrypto,
co,
cojsonInternals,
isControlledAccount,
} from "../index.js";
@@ -157,11 +157,13 @@ describe("CoList resolution", async () => {
test("Loading and availability", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -217,11 +219,13 @@ describe("CoList resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, list } = await initNodeAndList();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -236,63 +240,52 @@ describe("CoList resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestList>());
const queue = new cojsonInternals.Channel();
TestList.subscribe(
list.id,
meOnSecondPeer,
[],
(subscribedList) => {
console.log(
"subscribedList?.[0]?.[0]?.[0]",
subscribedList?.[0]?.[0]?.[0],
);
void Effect.runPromise(
Queue.offer(queue, subscribedList),
);
},
);
TestList.subscribe(list.id, meOnSecondPeer, [], (subscribedList) => {
console.log(
"subscribedList?.[0]?.[0]?.[0]",
subscribedList?.[0]?.[0]?.[0],
);
void queue.push(subscribedList);
});
const update1 = yield* $(Queue.take(queue));
expect(update1?.[0]).toBe(null);
const update1 = (await queue.next()).value;
expect(update1?.[0]).toBe(null);
const update2 = yield* $(Queue.take(queue));
expect(update2?.[0]).toBeDefined();
expect(update2?.[0]?.[0]).toBe(null);
const update2 = (await queue.next()).value;
expect(update2?.[0]).toBeDefined();
expect(update2?.[0]?.[0]).toBe(null);
const update3 = yield* $(Queue.take(queue));
expect(update3?.[0]?.[0]).toBeDefined();
expect(update3?.[0]?.[0]?.[0]).toBe("a");
expect(update3?.[0]?.[0]?.joined()).toBe("a,b");
const update3 = (await queue.next()).value;
expect(update3?.[0]?.[0]).toBeDefined();
expect(update3?.[0]?.[0]?.[0]).toBe("a");
expect(update3?.[0]?.[0]?.joined()).toBe("a,b");
update3[0]![0]![0] = "x";
update3[0]![0]![0] = "x";
const update4 = yield* $(Queue.take(queue));
expect(update4?.[0]?.[0]?.[0]).toBe("x");
const update4 = (await queue.next()).value;
expect(update4?.[0]?.[0]?.[0]).toBe("x");
// When assigning a new nested value, we get an update
// When assigning a new nested value, we get an update
const newTwiceNestedList = TwiceNestedList.create(["y", "z"], {
owner: meOnSecondPeer,
});
const newTwiceNestedList = TwiceNestedList.create(["y", "z"], {
owner: meOnSecondPeer,
});
const newNestedList = NestedList.create([newTwiceNestedList], {
owner: meOnSecondPeer,
});
const newNestedList = NestedList.create([newTwiceNestedList], {
owner: meOnSecondPeer,
});
update4[0] = newNestedList;
update4[0] = newNestedList;
const update5 = yield* $(Queue.take(queue));
expect(update5?.[0]?.[0]?.[0]).toBe("y");
expect(update5?.[0]?.[0]?.joined()).toBe("y,z");
const update5 = (await queue.next()).value;
expect(update5?.[0]?.[0]?.[0]).toBe("y");
expect(update5?.[0]?.[0]?.joined()).toBe("y,z");
// we get updates when the new nested value changes
newTwiceNestedList[0] = "w";
const update6 = yield* $(Queue.take(queue));
expect(update6?.[0]?.[0]?.[0]).toBe("w");
}),
);
// we get updates when the new nested value changes
newTwiceNestedList[0] = "w";
const update6 = (await queue.next()).value;
expect(update6?.[0]?.[0]?.[0]).toBe("w");
});
});

View File

@@ -1,7 +1,6 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
Account,
Encoders,
@@ -9,8 +8,8 @@ import {
co,
WasmCrypto,
isControlledAccount,
cojsonInternals,
} from "../index.js";
import { Schema } from "@effect/schema";
const Crypto = await WasmCrypto.create();
@@ -25,7 +24,10 @@ describe("Simple CoMap operations", async () => {
_height = co.number;
birthday = co.encoded(Encoders.Date);
name? = co.string;
nullable = co.optional.encoded(Schema.NullishOr(Schema.String));
nullable = co.optional.encoded<string | undefined>({
encode: (value: string | undefined) => value || null,
decode: (value: unknown) => (value as string) || undefined,
});
optionalDate = co.optional.encoded(Encoders.Date);
get roughColor() {
@@ -42,7 +44,7 @@ describe("Simple CoMap operations", async () => {
color: "red",
_height: 10,
birthday: birthday,
nullable: null,
nullable: undefined,
},
{ owner: me },
);
@@ -94,7 +96,7 @@ describe("Simple CoMap operations", async () => {
expect(map._raw.get("_height")).toEqual(20);
map.nullable = "not null";
map.nullable = null;
map.nullable = undefined;
delete map.nullable;
map.nullable = undefined;
@@ -282,12 +284,12 @@ describe("CoMap resolution", async () => {
test("Loading and availability", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
const [initialAsPeer, secondPeer] =
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -353,12 +355,12 @@ describe("CoMap resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, map } = await initNodeAndMap();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
const [initialAsPeer, secondAsPeer] =
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -372,9 +374,8 @@ describe("CoMap resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestMap>());
const queue = new cojsonInternals.Channel<TestMap>();
TestMap.subscribe(
map.id,
@@ -385,22 +386,20 @@ describe("CoMap resolution", async () => {
"subscribedMap.nested?.twiceNested?.taste",
subscribedMap.nested?.twiceNested?.taste,
);
void Effect.runPromise(
Queue.offer(queue, subscribedMap),
);
void queue.push(subscribedMap);
},
);
const update1 = yield* $(Queue.take(queue));
const update1 = (await queue.next()).value;
expect(update1.nested).toEqual(null);
const update2 = yield* $(Queue.take(queue));
const update2 = (await queue.next()).value;
expect(update2.nested?.name).toEqual("nested");
map.nested!.name = "nestedUpdated";
const _ = yield* $(Queue.take(queue));
const update3 = yield* $(Queue.take(queue));
const _ = (await queue.next()).value;
const update3 = (await queue.next()).value;
expect(update3.nested?.name).toEqual("nestedUpdated");
const oldTwiceNested = update3.nested!.twiceNested;
@@ -424,23 +423,21 @@ describe("CoMap resolution", async () => {
update3.nested = newNested;
yield* $(Queue.take(queue));
// const update4 = yield* $(Queue.take(queue));
const update4b = yield* $(Queue.take(queue));
(await queue.next()).value;
// const update4 = (await queue.next()).value;
const update4b = (await queue.next()).value;
expect(update4b.nested?.name).toEqual("newNested");
expect(update4b.nested?.twiceNested?.taste).toEqual("sweet");
// we get updates when the new nested value changes
newTwiceNested.taste = "salty";
const update5 = yield* $(Queue.take(queue));
const update5 = (await queue.next()).value;
expect(update5.nested?.twiceNested?.taste).toEqual("salty");
newTwiceNested.taste = "umami";
const update6 = yield* $(Queue.take(queue));
const update6 = (await queue.next()).value;
expect(update6.nested?.twiceNested?.taste).toEqual("umami");
}),
);
});
class TestMapWithOptionalRef extends CoMap {
@@ -577,3 +574,163 @@ describe("CoMap resolution", async () => {
]);
});
});
describe("CoMap applyDiff", async () => {
const me = await Account.create({
creationProps: { name: "Tester McTesterson" },
crypto: Crypto,
});
class TestMap extends CoMap {
name = co.string;
age = co.number;
isActive = co.boolean;
birthday = co.encoded(Encoders.Date);
nested = co.ref(NestedMap);
optionalField = co.optional.string;
}
class NestedMap extends CoMap {
value = co.string;
}
test("Basic applyDiff", () => {
const map = TestMap.create(
{
name: "Alice",
age: 30,
isActive: true,
birthday: new Date("1990-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const newValues = {
name: "Bob",
age: 35,
isActive: false,
};
map.applyDiff(newValues);
expect(map.name).toEqual("Bob");
expect(map.age).toEqual(35);
expect(map.isActive).toEqual(false);
expect(map.birthday).toEqual(new Date("1990-01-01"));
expect(map.nested?.value).toEqual("original");
});
test("applyDiff with nested changes", () => {
const map = TestMap.create(
{
name: "Charlie",
age: 25,
isActive: true,
birthday: new Date("1995-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const newValues = {
name: "David",
nested: NestedMap.create({ value: "updated" }, { owner: me }),
};
map.applyDiff(newValues);
expect(map.name).toEqual("David");
expect(map.age).toEqual(25);
expect(map.nested?.value).toEqual("updated");
});
test("applyDiff with encoded fields", () => {
const map = TestMap.create(
{
name: "Eve",
age: 28,
isActive: true,
birthday: new Date("1993-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const newValues = {
birthday: new Date("1993-06-15"),
};
map.applyDiff(newValues);
expect(map.birthday).toEqual(new Date("1993-06-15"));
});
test("applyDiff with optional fields", () => {
const map = TestMap.create(
{
name: "Frank",
age: 40,
isActive: true,
birthday: new Date("1980-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const newValues = {
optionalField: "New optional value",
};
map.applyDiff(newValues);
expect(map.optionalField).toEqual("New optional value");
map.applyDiff({ optionalField: undefined });
expect(map.optionalField).toBeUndefined();
});
test("applyDiff with no changes", () => {
const map = TestMap.create(
{
name: "Grace",
age: 35,
isActive: true,
birthday: new Date("1985-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const originalJSON = map.toJSON();
map.applyDiff({});
expect(map.toJSON()).toEqual(originalJSON);
});
test("applyDiff with invalid field", () => {
const map = TestMap.create(
{
name: "Henry",
age: 45,
isActive: false,
birthday: new Date("1975-01-01"),
nested: NestedMap.create({ value: "original" }, { owner: me }),
},
{ owner: me },
);
const newValues = {
name: "Ian",
invalidField: "This should be ignored",
};
// eslint-disable-next-line @typescript-eslint/no-explicit-any
map.applyDiff(newValues as any);
expect(map.name).toEqual("Ian");
// eslint-disable-next-line @typescript-eslint/no-explicit-any
expect((map as any).invalidField).toBeUndefined();
});
});

View File

@@ -1,7 +1,6 @@
import { expect, describe, test } from "vitest";
import { connectedPeers } from "cojson/src/streamUtils.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect, Queue } from "effect";
import {
BinaryCoStream,
ID,
@@ -10,6 +9,7 @@ import {
co,
WasmCrypto,
isControlledAccount,
cojsonInternals,
} from "../index.js";
const Crypto = await WasmCrypto.create();
@@ -83,11 +83,13 @@ describe("CoStream resolution", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -176,12 +178,15 @@ describe("CoStream resolution", async () => {
test("Subscription & auto-resolution", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -195,78 +200,68 @@ describe("CoStream resolution", async () => {
crypto: Crypto,
});
await Effect.runPromise(
Effect.gen(function* ($) {
const queue = yield* $(Queue.unbounded<TestStream>());
const queue = new cojsonInternals.Channel();
TestStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
console.log(
"subscribedStream[me.id]",
subscribedStream[me.id],
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value,
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value?.[
me.id
]?.value,
);
void Effect.runPromise(
Queue.offer(queue, subscribedStream),
);
},
TestStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
console.log("subscribedStream[me.id]", subscribedStream[me.id]);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value,
);
console.log(
"subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]?.value",
subscribedStream[me.id]?.value?.[me.id]?.value?.[me.id]
?.value,
);
void queue.push(subscribedStream);
},
);
const update1 = yield* $(Queue.take(queue));
expect(update1[me.id]?.value).toEqual(null);
const update1 = (await queue.next()).value;
expect(update1[me.id]?.value).toEqual(null);
const update2 = yield* $(Queue.take(queue));
expect(update2[me.id]?.value).toBeDefined();
expect(update2[me.id]?.value?.[me.id]?.value).toBe(null);
const update2 = (await queue.next()).value;
expect(update2[me.id]?.value).toBeDefined();
expect(update2[me.id]?.value?.[me.id]?.value).toBe(null);
const update3 = yield* $(Queue.take(queue));
expect(update3[me.id]?.value?.[me.id]?.value).toBeDefined();
expect(
update3[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("milk");
const update3 = (await queue.next()).value;
expect(update3[me.id]?.value?.[me.id]?.value).toBeDefined();
expect(update3[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"milk",
);
update3[me.id]!.value![me.id]!.value!.push("bread");
update3[me.id]!.value![me.id]!.value!.push("bread");
const update4 = yield* $(Queue.take(queue));
expect(
update4[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("bread");
const update4 = (await queue.next()).value;
expect(update4[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"bread",
);
// When assigning a new nested stream, we get an update
const newTwiceNested = TwiceNestedStream.create(["butter"], {
owner: meOnSecondPeer,
});
// When assigning a new nested stream, we get an update
const newTwiceNested = TwiceNestedStream.create(["butter"], {
owner: meOnSecondPeer,
});
const newNested = NestedStream.create([newTwiceNested], {
owner: meOnSecondPeer,
});
const newNested = NestedStream.create([newTwiceNested], {
owner: meOnSecondPeer,
});
update4.push(newNested);
update4.push(newNested);
const update5 = yield* $(Queue.take(queue));
expect(
update5[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("butter");
const update5 = (await queue.next()).value;
expect(update5[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"butter",
);
// we get updates when the new nested stream changes
newTwiceNested.push("jam");
const update6 = yield* $(Queue.take(queue));
expect(
update6[me.id]?.value?.[me.id]?.value?.[me.id]?.value,
).toBe("jam");
}),
// we get updates when the new nested stream changes
newTwiceNested.push("jam");
const update6 = (await queue.next()).value;
expect(update6[me.id]?.value?.[me.id]?.value?.[me.id]?.value).toBe(
"jam",
);
});
});
@@ -327,11 +322,13 @@ describe("BinaryCoStream loading & Subscription", async () => {
test("Loading and availability", async () => {
const { me, stream } = await initNodeAndStream();
const [initialAsPeer, secondAsPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{
peer1role: "server",
peer2role: "client",
}),
},
);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
@@ -360,98 +357,83 @@ describe("BinaryCoStream loading & Subscription", async () => {
});
test("Subscription", async () => {
await Effect.runPromise(
Effect.gen(function* ($) {
const { me } = yield* Effect.promise(() => initNodeAndStream());
const { me } = await initNodeAndStream();
const stream = BinaryCoStream.create({ owner: me });
const stream = BinaryCoStream.create({ owner: me });
const [initialAsPeer, secondAsPeer] = yield* connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = yield* Effect.promise(() =>
Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
}),
);
const queue = yield* $(Queue.unbounded<BinaryCoStream>());
BinaryCoStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
void Effect.runPromise(
Queue.offer(queue, subscribedStream),
);
},
);
const update1 = yield* $(Queue.take(queue));
expect(update1.getChunks()).toBe(undefined);
stream.start({ mimeType: "text/plain" });
const update2 = yield* $(Queue.take(queue));
expect(update2.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([1, 2, 3]));
const update3 = yield* $(Queue.take(queue));
expect(update3.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3])],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([4, 5, 6]));
const update4 = yield* $(Queue.take(queue));
expect(update4.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
],
totalSizeBytes: undefined,
finished: false,
});
stream.end();
const update5 = yield* $(Queue.take(queue));
expect(update5.getChunks()).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
],
totalSizeBytes: undefined,
finished: true,
});
}),
const [initialAsPeer, secondAsPeer] = connectedPeers(
"initial",
"second",
{ peer1role: "server", peer2role: "client" },
);
me._raw.core.node.syncManager.addPeer(secondAsPeer);
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
const meOnSecondPeer = await Account.become({
accountID: me.id,
accountSecret: me._raw.agentSecret,
peersToLoadFrom: [initialAsPeer],
// eslint-disable-next-line @typescript-eslint/no-explicit-any
sessionID: newRandomSessionID(me.id as any),
crypto: Crypto,
});
const queue = new cojsonInternals.Channel();
BinaryCoStream.subscribe(
stream.id,
meOnSecondPeer,
[],
(subscribedStream) => {
void queue.push(subscribedStream);
},
);
const update1 = (await queue.next()).value;
expect(update1.getChunks()).toBe(undefined);
stream.start({ mimeType: "text/plain" });
const update2 = (await queue.next()).value;
expect(update2.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([1, 2, 3]));
const update3 = (await queue.next()).value;
expect(update3.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3])],
totalSizeBytes: undefined,
finished: false,
});
stream.push(new Uint8Array([4, 5, 6]));
const update4 = (await queue.next()).value;
expect(update4.getChunks({ allowUnfinished: true })).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
totalSizeBytes: undefined,
finished: false,
});
stream.end();
const update5 = (await queue.next()).value;
expect(update5.getChunks()).toEqual({
mimeType: "text/plain",
fileName: undefined,
chunks: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
totalSizeBytes: undefined,
finished: true,
});
});
});

View File

@@ -14,7 +14,6 @@ import {
ID,
} from "../index.js";
import { newRandomSessionID } from "cojson/src/coValueCore.js";
import { Effect } from "effect";
class TestMap extends CoMap {
list = co.ref(TestList);
@@ -39,12 +38,11 @@ describe("Deep loading with depth arg", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
@@ -140,8 +138,8 @@ describe("Deep loading with depth arg", async () => {
throw new Error("map4 is undefined");
}
expect(map4.list[0]?.stream).not.toBe(null);
expect(map4.list[0]?.stream?.[me.id]?.value).toBe(null);
expect(map4.list[0]?.stream?.byMe?.value).toBe(null);
expect(map4.list[0]?.stream?.[me.id]).toBe(undefined)
expect(map4.list[0]?.stream?.byMe?.value).toBe(undefined);
const map5 = await TestMap.load(map.id, meOnSecondPeer, {
list: [{ stream: [{}] }],
@@ -254,15 +252,15 @@ test("Deep loading a record-like coMap", async () => {
crypto: Crypto,
});
const [initialAsPeer, secondPeer] = await Effect.runPromise(
connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
}),
);
const [initialAsPeer, secondPeer] = connectedPeers("initial", "second", {
peer1role: "server",
peer2role: "client",
});
if (!isControlledAccount(me)) {
throw "me is not a controlled account";
}
me._raw.core.node.syncManager.addPeer(secondPeer);
const meOnSecondPeer = await Account.become({
accountID: me.id,

43
pnpm-lock.yaml generated
View File

@@ -156,9 +156,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../../packages/cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
hash-slash:
specifier: workspace:*
version: link:../../packages/hash-slash
@@ -444,12 +441,12 @@ importers:
'@scure/base':
specifier: ^1.1.1
version: 1.1.5
effect:
specifier: ^3.5.2
version: 3.5.2
hash-wasm:
specifier: ^4.9.0
version: 4.11.0
queueable:
specifier: ^5.3.2
version: 5.3.2
devDependencies:
'@types/jest':
specifier: ^29.5.3
@@ -481,9 +478,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -506,9 +500,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -522,9 +513,6 @@ importers:
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
typescript:
specifier: ^5.1.6
version: 5.3.3
@@ -559,9 +547,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
jazz-tools:
specifier: workspace:*
version: link:../jazz-tools
@@ -602,9 +587,6 @@ importers:
cojson-transport-ws:
specifier: workspace:*
version: link:../cojson-transport-ws
effect:
specifier: ^3.5.2
version: 3.5.2
jazz-tools:
specifier: workspace:*
version: link:../jazz-tools
@@ -689,15 +671,9 @@ importers:
packages/jazz-tools:
dependencies:
'@effect/schema':
specifier: ^0.66.16
version: 0.66.16(effect@3.5.2)(fast-check@3.17.2)
cojson:
specifier: workspace:*
version: link:../cojson
effect:
specifier: ^3.5.2
version: 3.5.2
fast-check:
specifier: ^3.17.2
version: 3.17.2
@@ -2639,6 +2615,9 @@ packages:
fast-levenshtein@2.0.6:
resolution: {integrity: sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==}
fast-list@1.0.3:
resolution: {integrity: sha512-Lm56Ci3EqefHNdIneRFuzhpPcpVVBz9fgqVmG3UQIxAefJv1mEYsZ1WQLTWqmdqeGEwbI2t6fbZgp9TqTYARuA==}
fast-loops@1.1.3:
resolution: {integrity: sha512-8EZzEP0eKkEEVX+drtd9mtuQ+/QrlfW/5MlwcwK5Nds6EkZ/tRzEexkzUY2mIssnAyVLT+TKHuRXmFNNXYUd6g==}
@@ -3743,6 +3722,10 @@ packages:
queue-tick@1.0.1:
resolution: {integrity: sha512-kJt5qhMxoszgU/62PLP1CJytzd2NKetjSRnyuj31fDd3Rlcz3fzlFdFLD1SItunPwyqEOkca6GbV612BWfaBag==}
queueable@5.3.2:
resolution: {integrity: sha512-/2ZxV1PJh7J9Q/h9ewZ4fLMmDreUlbwrWsBnluvDoKW6Nw0gbWm5hN+kiWfdDMw1o/QTAFxV9wx4KpuN5IA7OA==}
engines: {node: '>=18'}
quick-lru@4.0.1:
resolution: {integrity: sha512-ARhCpm70fzdcvNQfPoy49IaanKkTlRWF2JMzqhcJbhSFRZv7nPTvZJdcY7301IPmvW+/p0RgIWnQDLJxifsQ7g==}
engines: {node: '>=8'}
@@ -6893,6 +6876,8 @@ snapshots:
fast-levenshtein@2.0.6: {}
fast-list@1.0.3: {}
fast-loops@1.1.3: {}
fast-querystring@1.1.2:
@@ -8010,6 +7995,10 @@ snapshots:
queue-tick@1.0.1: {}
queueable@5.3.2:
dependencies:
fast-list: 1.0.3
quick-lru@4.0.1: {}
quick-lru@5.1.1: {}