Various small fixes

This commit is contained in:
Anselm
2023-08-15 17:55:46 +01:00
parent e7d45798e6
commit bb9c4f8ff8
7 changed files with 139 additions and 135 deletions

View File

@@ -5,7 +5,7 @@
"types": "src/index.ts",
"type": "module",
"license": "MIT",
"version": "0.0.12",
"version": "0.0.14",
"devDependencies": {
"@types/jest": "^29.5.3",
"@typescript-eslint/eslint-plugin": "^6.2.1",

View File

@@ -1,6 +1,6 @@
import { newRandomSessionID } from "./coValue.js";
import { LocalNode } from "./node.js";
import { connectedPeers } from "./testUtils.js";
import { connectedPeers } from "./streamUtils.js";
test("Can create a node while creating a new account with profile", async () => {
const { node, accountID, accountSecret, sessionID } =

View File

@@ -2,6 +2,7 @@ import { CoValue, newRandomSessionID } from "./coValue.js";
import { LocalNode } from "./node.js";
import { CoMap } from "./contentTypes/coMap.js";
import { agentSecretFromBytes, agentSecretToBytes } from "./crypto.js";
import { connectedPeers } from "./streamUtils.js";
import type { SessionID } from "./ids.js";
import type { CoID, ContentType } from "./contentType.js";
@@ -15,6 +16,7 @@ const internals = {
agentSecretFromBytes,
agentSecretToBytes,
newRandomSessionID,
connectedPeers
};
export { LocalNode, CoValue, CoMap, internals };

130
src/streamUtils.ts Normal file
View File

@@ -0,0 +1,130 @@
import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams";
import { Peer, PeerID, SyncMessage } from "./sync.js";
export function connectedPeers(
peer1id: PeerID,
peer2id: PeerID,
{
trace = false, peer1role = "peer", peer2role = "peer",
}: {
trace?: boolean;
peer1role?: Peer["role"];
peer2role?: Peer["role"];
} = {}
): [Peer, Peer] {
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
void outRx2
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void; }
) {
trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2));
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx1);
void outRx1
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void; }
) {
trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2));
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx2);
const peer2AsPeer: Peer = {
id: peer2id,
incoming: inRx1,
outgoing: outTx1,
role: peer2role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: inRx2,
outgoing: outTx2,
role: peer1role,
};
return [peer1AsPeer, peer2AsPeer];
}
export function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];
let resolveNextItemReady: () => void = () => { };
let nextItemReady: Promise<void> = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
let writerClosed = false;
let readerClosed = false;
const readable = new ReadableStream<T>({
async pull(controller) {
let retriesLeft = 3;
while (retriesLeft > 0) {
if (writerClosed) {
controller.close();
return;
}
retriesLeft--;
if (queue.length > 0) {
controller.enqueue(queue.shift()!);
if (queue.length === 0) {
nextItemReady = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
}
return;
} else {
await nextItemReady;
}
}
throw new Error(
"Should only use one retry to get next item in queue."
);
},
cancel(reason) {
console.log("Manually closing reader");
readerClosed = true;
},
});
const writable = new WritableStream<T>({
write(chunk, controller) {
if (readerClosed) {
console.log("Reader closed, not writing chunk", chunk);
throw new Error("Reader closed, not writing chunk");
}
queue.push(chunk);
if (queue.length === 1) {
// make sure that await write resolves before corresponding read
setTimeout(() => resolveNextItemReady());
}
},
abort(reason) {
console.log("Manually closing writer");
writerClosed = true;
resolveNextItemReady();
return Promise.resolve();
},
});
return [readable, writable];
}

View File

@@ -10,11 +10,13 @@ import {
TransformStream,
} from "isomorphic-streams";
import {
connectedPeers,
newStreamPair,
randomAnonymousAccountAndSessionID,
shouldNotResolve,
} from "./testUtils.js";
import {
connectedPeers,
newStreamPair
} from "./streamUtils.js";
import { AccountID } from "./account.js";
test("Node replies with initial tx and header to empty subscribe", async () => {

View File

@@ -285,7 +285,7 @@ export class SyncManager {
trySendToPeer(peer: PeerState, msg: SyncMessage) {
return peer.outgoing.write(msg).catch((e) => {
console.error("Error writing to peer, disconnecting", e);
console.error(new Error("Error writing to peer, disconnecting", {cause: e}));
delete this.peers[peer.id];
});
}

View File

@@ -4,8 +4,6 @@ import { LocalNode } from "./node.js";
import { expectTeamContent } from "./permissions.js";
import { AnonymousControlledAccount } from "./account.js";
import { SessionID } from "./ids.js";
import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams";
import { Peer, PeerID, SyncMessage } from "./sync.js";
export function randomAnonymousAccountAndSessionID(): [AnonymousControlledAccount, SessionID] {
const agentSecret = newRandomAgentSecret();
@@ -80,71 +78,6 @@ export function teamWithTwoAdminsHighLevel() {
return { admin, node, team, otherAdmin };
}
export function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
const queue: T[] = [];
let resolveNextItemReady: () => void = () => {};
let nextItemReady: Promise<void> = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
let writerClosed = false;
let readerClosed = false;
const readable = new ReadableStream<T>({
async pull(controller) {
let retriesLeft = 3;
while (retriesLeft > 0) {
if (writerClosed) {
controller.close();
return;
}
retriesLeft--;
if (queue.length > 0) {
controller.enqueue(queue.shift()!);
if (queue.length === 0) {
nextItemReady = new Promise((resolve) => {
resolveNextItemReady = resolve;
});
}
return;
} else {
await nextItemReady;
}
}
throw new Error(
"Should only use one retry to get next item in queue."
);
},
cancel(reason) {
console.log("Manually closing reader");
readerClosed = true;
},
});
const writable = new WritableStream<T>({
write(chunk, controller) {
if (readerClosed) {
console.log("Reader closed, not writing chunk", chunk);
throw new Error("Reader closed, not writing chunk");
}
queue.push(chunk);
if (queue.length === 1) {
// make sure that await write resolves before corresponding read
process.nextTick(() => resolveNextItemReady());
}
},
abort(reason) {
console.log("Manually closing writer");
writerClosed = true;
resolveNextItemReady();
return Promise.resolve();
},
});
return [readable, writable];
}
export function shouldNotResolve<T>(
promise: Promise<T>,
ops: { timeout: number }
@@ -164,66 +97,3 @@ export function shouldNotResolve<T>(
});
}
export function connectedPeers(
peer1id: PeerID,
peer2id: PeerID,
{
trace = false,
peer1role = "peer",
peer2role = "peer",
}: {
trace?: boolean;
peer1role?: Peer["role"];
peer2role?: Peer["role"];
} = {}
): [Peer, Peer] {
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
void outRx2
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void }
) {
trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2));
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx1);
void outRx1
.pipeThrough(
new TransformStream({
transform(
chunk: SyncMessage,
controller: { enqueue: (msg: SyncMessage) => void }
) {
trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2));
controller.enqueue(chunk);
},
})
)
.pipeTo(inTx2);
const peer2AsPeer: Peer = {
id: peer2id,
incoming: inRx1,
outgoing: outTx1,
role: peer2role,
};
const peer1AsPeer: Peer = {
id: peer1id,
incoming: inRx2,
outgoing: outTx2,
role: peer1role,
};
return [peer1AsPeer, peer2AsPeer];
}