Merge pull request #12 from gardencmp/anselm/gar-102-implement-account-profiles-integrate-with-local-auth
Implement accounts & profiles
This commit is contained in:
67
src/account.test.ts
Normal file
67
src/account.test.ts
Normal file
@@ -0,0 +1,67 @@
|
||||
import { newRandomSessionID } from "./coValue.js";
|
||||
import { LocalNode } from "./node.js";
|
||||
import { connectedPeers } from "./testUtils.js";
|
||||
|
||||
test("Can create a node while creating a new account with profile", async () => {
|
||||
const { node, accountID, accountSecret, sessionID } =
|
||||
LocalNode.withNewlyCreatedAccount("Hermes Puggington");
|
||||
|
||||
expect(node).not.toBeNull();
|
||||
expect(accountID).not.toBeNull();
|
||||
expect(accountSecret).not.toBeNull();
|
||||
expect(sessionID).not.toBeNull();
|
||||
|
||||
expect(node.expectProfileLoaded(accountID).get("name")).toEqual(
|
||||
"Hermes Puggington"
|
||||
);
|
||||
expect((await node.loadProfile(accountID)).get("name")).toEqual(
|
||||
"Hermes Puggington"
|
||||
);
|
||||
});
|
||||
|
||||
test("A node with an account can create teams and and objects within them", async () => {
|
||||
const { node, accountID } =
|
||||
LocalNode.withNewlyCreatedAccount("Hermes Puggington");
|
||||
|
||||
const team = await node.createTeam();
|
||||
expect(team).not.toBeNull();
|
||||
|
||||
let map = team.createMap();
|
||||
map = map.edit((edit) => {
|
||||
edit.set("foo", "bar", "private");
|
||||
expect(edit.get("foo")).toEqual("bar");
|
||||
});
|
||||
|
||||
expect(map.get("foo")).toEqual("bar");
|
||||
|
||||
expect(map.getLastEditor("foo")).toEqual(accountID);
|
||||
});
|
||||
|
||||
test("Can create account with one node, and then load it on another", async () => {
|
||||
const { node, accountID, accountSecret } =
|
||||
LocalNode.withNewlyCreatedAccount("Hermes Puggington");
|
||||
|
||||
const team = await node.createTeam();
|
||||
expect(team).not.toBeNull();
|
||||
|
||||
let map = team.createMap();
|
||||
map = map.edit((edit) => {
|
||||
edit.set("foo", "bar", "private");
|
||||
expect(edit.get("foo")).toEqual("bar");
|
||||
});
|
||||
|
||||
const [node1asPeer, node2asPeer] = connectedPeers("node1", "node2", {trace: true, peer1role: "server", peer2role: "client"});
|
||||
|
||||
node.sync.addPeer(node2asPeer);
|
||||
|
||||
const node2 = await LocalNode.withLoadedAccount(
|
||||
accountID,
|
||||
accountSecret,
|
||||
newRandomSessionID(accountID),
|
||||
[node1asPeer]
|
||||
);
|
||||
|
||||
const map2 = await node2.load(map.id);
|
||||
|
||||
expect(map2.get("foo")).toEqual("bar");
|
||||
});
|
||||
@@ -1,33 +1,50 @@
|
||||
import { CoValueHeader } from './coValue.js';
|
||||
import { CoID } from './contentType.js';
|
||||
import { AgentSecret, SealerID, SealerSecret, SignerID, SignerSecret, getAgentID, getAgentSealerID, getAgentSealerSecret, getAgentSignerID, getAgentSignerSecret } from './crypto.js';
|
||||
import { AgentID } from './ids.js';
|
||||
import { CoMap, LocalNode } from './index.js';
|
||||
import { Team, TeamContent } from './permissions.js';
|
||||
import { CoValueHeader } from "./coValue.js";
|
||||
import { CoID } from "./contentType.js";
|
||||
import {
|
||||
AgentSecret,
|
||||
SealerID,
|
||||
SealerSecret,
|
||||
SignerID,
|
||||
SignerSecret,
|
||||
getAgentID,
|
||||
getAgentSealerID,
|
||||
getAgentSealerSecret,
|
||||
getAgentSignerID,
|
||||
getAgentSignerSecret,
|
||||
} from "./crypto.js";
|
||||
import { AgentID } from "./ids.js";
|
||||
import { CoMap, LocalNode } from "./index.js";
|
||||
import { Team, TeamContent } from "./permissions.js";
|
||||
|
||||
export function accountHeaderForInitialAgentSecret(agentSecret: AgentSecret): CoValueHeader {
|
||||
export function accountHeaderForInitialAgentSecret(
|
||||
agentSecret: AgentSecret
|
||||
): CoValueHeader {
|
||||
const agent = getAgentID(agentSecret);
|
||||
return {
|
||||
type: "comap",
|
||||
ruleset: {type: "team", initialAdmin: agent},
|
||||
ruleset: { type: "team", initialAdmin: agent },
|
||||
meta: {
|
||||
type: "account"
|
||||
type: "account",
|
||||
},
|
||||
createdAt: null,
|
||||
uniqueness: null,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
export class Account extends Team {
|
||||
get id(): AccountID {
|
||||
return this.teamMap.id;
|
||||
return this.teamMap.id as AccountID;
|
||||
}
|
||||
|
||||
getCurrentAgentID(): AgentID {
|
||||
const agents = this.teamMap.keys().filter((k): k is AgentID => k.startsWith("sealer_"));
|
||||
const agents = this.teamMap
|
||||
.keys()
|
||||
.filter((k): k is AgentID => k.startsWith("sealer_"));
|
||||
|
||||
if (agents.length !== 1) {
|
||||
throw new Error("Expected exactly one agent in account, got " + agents.length);
|
||||
throw new Error(
|
||||
"Expected exactly one agent in account, got " + agents.length
|
||||
);
|
||||
}
|
||||
|
||||
return agents[0]!;
|
||||
@@ -45,10 +62,17 @@ export interface GeneralizedControlledAccount {
|
||||
currentSealerSecret: () => SealerSecret;
|
||||
}
|
||||
|
||||
export class ControlledAccount extends Account implements GeneralizedControlledAccount {
|
||||
export class ControlledAccount
|
||||
extends Account
|
||||
implements GeneralizedControlledAccount
|
||||
{
|
||||
agentSecret: AgentSecret;
|
||||
|
||||
constructor(agentSecret: AgentSecret, teamMap: CoMap<TeamContent, AccountMeta>, node: LocalNode) {
|
||||
constructor(
|
||||
agentSecret: AgentSecret,
|
||||
teamMap: CoMap<AccountContent, AccountMeta>,
|
||||
node: LocalNode
|
||||
) {
|
||||
super(teamMap, node);
|
||||
|
||||
this.agentSecret = agentSecret;
|
||||
@@ -75,7 +99,9 @@ export class ControlledAccount extends Account implements GeneralizedControlledA
|
||||
}
|
||||
}
|
||||
|
||||
export class AnonymousControlledAccount implements GeneralizedControlledAccount {
|
||||
export class AnonymousControlledAccount
|
||||
implements GeneralizedControlledAccount
|
||||
{
|
||||
agentSecret: AgentSecret;
|
||||
|
||||
constructor(agentSecret: AgentSecret) {
|
||||
@@ -107,9 +133,20 @@ export class AnonymousControlledAccount implements GeneralizedControlledAccount
|
||||
}
|
||||
}
|
||||
|
||||
export type AccountMeta = {type: "account"};
|
||||
export type AccountID = CoID<CoMap<TeamContent, AccountMeta>>;
|
||||
export type AccountContent = TeamContent & { profile: CoID<Profile> };
|
||||
export type AccountMeta = { type: "account" };
|
||||
export type AccountID = CoID<CoMap<AccountContent, AccountMeta>>;
|
||||
|
||||
export type AccountIDOrAgentID = AgentID | AccountID;
|
||||
export type AccountOrAgentID = AgentID | Account;
|
||||
export type AccountOrAgentSecret = AgentSecret | Account;
|
||||
|
||||
export function isAccountID(id: AccountIDOrAgentID): id is AccountID {
|
||||
return id.startsWith("co_");
|
||||
}
|
||||
|
||||
export type ProfileContent = {
|
||||
name: string;
|
||||
};
|
||||
export type ProfileMeta = { type: "profile" };
|
||||
export type Profile = CoMap<ProfileContent, ProfileMeta>;
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
import { JsonObject, JsonValue } from '../jsonValue.js';
|
||||
import { TransactionID } from '../ids.js';
|
||||
import { CoID } from '../contentType.js';
|
||||
import { CoValue } from '../coValue.js';
|
||||
import { CoValue, accountOrAgentIDfromSessionID } from '../coValue.js';
|
||||
import { AccountID, isAccountID } from '../account.js';
|
||||
|
||||
type MapOp<K extends string, V extends JsonValue> = {
|
||||
txID: TransactionID;
|
||||
@@ -106,6 +107,19 @@ export class CoMap<
|
||||
}
|
||||
}
|
||||
|
||||
getLastEditor<K extends MapK<M>>(key: K): AccountID | undefined {
|
||||
const tx = this.getLastTxID(key);
|
||||
if (!tx) {
|
||||
return undefined;
|
||||
}
|
||||
const accountID = accountOrAgentIDfromSessionID(tx.sessionID);
|
||||
if (isAccountID(accountID)) {
|
||||
return accountID;
|
||||
} else {
|
||||
return undefined;
|
||||
}
|
||||
}
|
||||
|
||||
getLastTxID<K extends MapK<M>>(key: K): TransactionID | undefined {
|
||||
const ops = this.ops[key];
|
||||
if (!ops) {
|
||||
|
||||
105
src/node.ts
105
src/node.ts
@@ -1,4 +1,5 @@
|
||||
import {
|
||||
AgentSecret,
|
||||
createdNowUnique,
|
||||
getAgentID,
|
||||
getAgentSealerID,
|
||||
@@ -9,7 +10,7 @@ import {
|
||||
} from "./crypto.js";
|
||||
import { CoValue, CoValueHeader, newRandomSessionID } from "./coValue.js";
|
||||
import { Team, TeamContent, expectTeamContent } from "./permissions.js";
|
||||
import { SyncManager } from "./sync.js";
|
||||
import { Peer, SyncManager } from "./sync.js";
|
||||
import { AgentID, RawCoID, SessionID, isAgentID } from "./ids.js";
|
||||
import { CoID, ContentType } from "./contentType.js";
|
||||
import {
|
||||
@@ -20,6 +21,11 @@ import {
|
||||
GeneralizedControlledAccount,
|
||||
ControlledAccount,
|
||||
AnonymousControlledAccount,
|
||||
AccountID,
|
||||
Profile,
|
||||
AccountContent,
|
||||
ProfileContent,
|
||||
ProfileMeta,
|
||||
} from "./account.js";
|
||||
import { CoMap } from "./index.js";
|
||||
|
||||
@@ -37,6 +43,52 @@ export class LocalNode {
|
||||
this.ownSessionID = ownSessionID;
|
||||
}
|
||||
|
||||
static withNewlyCreatedAccount(name: string): {
|
||||
node: LocalNode;
|
||||
accountID: AccountID;
|
||||
accountSecret: AgentSecret;
|
||||
sessionID: SessionID;
|
||||
} {
|
||||
const throwawayAgent = newRandomAgentSecret();
|
||||
const setupNode = new LocalNode(
|
||||
new AnonymousControlledAccount(throwawayAgent),
|
||||
newRandomSessionID(getAgentID(throwawayAgent))
|
||||
);
|
||||
|
||||
const account = setupNode.createAccount(name);
|
||||
|
||||
const nodeWithAccount = account.node.testWithDifferentAccount(
|
||||
account,
|
||||
newRandomSessionID(account.id)
|
||||
);
|
||||
|
||||
return {
|
||||
node: nodeWithAccount,
|
||||
accountID: account.id,
|
||||
accountSecret: account.agentSecret,
|
||||
sessionID: nodeWithAccount.ownSessionID,
|
||||
};
|
||||
}
|
||||
|
||||
static async withLoadedAccount(accountID: AccountID, accountSecret: AgentSecret, sessionID: SessionID, peersToLoadFrom: Peer[]): Promise<LocalNode> {
|
||||
const loadingNode = new LocalNode(new AnonymousControlledAccount(accountSecret), newRandomSessionID(accountID));
|
||||
|
||||
const accountPromise = loadingNode.load(accountID);
|
||||
|
||||
for (const peer of peersToLoadFrom) {
|
||||
loadingNode.sync.addPeer(peer);
|
||||
}
|
||||
|
||||
const account = await accountPromise;
|
||||
|
||||
// since this is all synchronous, we can just swap out nodes for the SyncManager
|
||||
const node = loadingNode.testWithDifferentAccount(new ControlledAccount(accountSecret, account, loadingNode), sessionID);
|
||||
node.sync = loadingNode.sync;
|
||||
node.sync.local = node;
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
createCoValue(header: CoValueHeader): CoValue {
|
||||
const coValue = new CoValue(header, this);
|
||||
this.coValues[coValue.id] = { state: "loaded", coValue: coValue };
|
||||
@@ -65,6 +117,16 @@ export class LocalNode {
|
||||
return (await this.loadCoValue(id)).getCurrentContent() as T;
|
||||
}
|
||||
|
||||
async loadProfile(id: AccountID): Promise<Profile> {
|
||||
const account = await this.load<CoMap<AccountContent>>(id);
|
||||
const profileID = account.get("profile");
|
||||
|
||||
if (!profileID) {
|
||||
throw new Error(`Account ${id} has no profile`);
|
||||
}
|
||||
return (await this.loadCoValue(profileID)).getCurrentContent() as Profile;
|
||||
}
|
||||
|
||||
expectCoValueLoaded(id: RawCoID, expectation?: string): CoValue {
|
||||
const entry = this.coValues[id];
|
||||
if (!entry) {
|
||||
@@ -82,7 +144,20 @@ export class LocalNode {
|
||||
return entry.coValue;
|
||||
}
|
||||
|
||||
createAccount(_publicNickname: string): ControlledAccount {
|
||||
expectProfileLoaded(id: AccountID, expectation?: string): Profile {
|
||||
const account = this.expectCoValueLoaded(id, expectation);
|
||||
const profileID = expectTeamContent(account.getCurrentContent()).get("profile");
|
||||
if (!profileID) {
|
||||
throw new Error(
|
||||
`${
|
||||
expectation ? expectation + ": " : ""
|
||||
}Account ${id} has no profile`
|
||||
);
|
||||
}
|
||||
return this.expectCoValueLoaded(profileID, expectation).getCurrentContent() as Profile;
|
||||
}
|
||||
|
||||
createAccount(name: string): ControlledAccount {
|
||||
const agentSecret = newRandomAgentSecret();
|
||||
|
||||
const account = this.createCoValue(
|
||||
@@ -92,7 +167,9 @@ export class LocalNode {
|
||||
newRandomSessionID(getAgentID(agentSecret))
|
||||
);
|
||||
|
||||
expectTeamContent(account.getCurrentContent()).edit((editable) => {
|
||||
const accountAsTeam = new Team(expectTeamContent(account.getCurrentContent()), account.node);
|
||||
|
||||
accountAsTeam.teamMap.edit((editable) => {
|
||||
editable.set(getAgentID(agentSecret), "admin", "trusting");
|
||||
|
||||
const readKey = newRandomKeySecret();
|
||||
@@ -111,14 +188,26 @@ export class LocalNode {
|
||||
"trusting"
|
||||
);
|
||||
|
||||
editable.set('readKey', readKey.id, "trusting");
|
||||
editable.set("readKey", readKey.id, "trusting");
|
||||
});
|
||||
|
||||
return new ControlledAccount(
|
||||
const controlledAccount = new ControlledAccount(
|
||||
agentSecret,
|
||||
account.getCurrentContent() as CoMap<TeamContent, AccountMeta>,
|
||||
this
|
||||
account.getCurrentContent() as CoMap<AccountContent, AccountMeta>,
|
||||
account.node
|
||||
);
|
||||
|
||||
const profile = accountAsTeam.createMap<ProfileContent, ProfileMeta>({ type: "profile" });
|
||||
|
||||
profile.edit((editable) => {
|
||||
editable.set("name", name, "trusting");
|
||||
});
|
||||
|
||||
accountAsTeam.teamMap.edit((editable) => {
|
||||
editable.set("profile", profile.id, "trusting");
|
||||
});
|
||||
|
||||
return controlledAccount;
|
||||
}
|
||||
|
||||
resolveAccountAgent(id: AccountIDOrAgentID, expectation?: string): AgentID {
|
||||
@@ -177,7 +266,7 @@ export class LocalNode {
|
||||
"trusting"
|
||||
);
|
||||
|
||||
editable.set('readKey', readKey.id, "trusting");
|
||||
editable.set("readKey", readKey.id, "trusting");
|
||||
});
|
||||
|
||||
return new Team(teamContent, this);
|
||||
|
||||
@@ -20,7 +20,7 @@ import {
|
||||
} from "./coValue.js";
|
||||
import { LocalNode } from "./node.js";
|
||||
import { RawCoID, SessionID, TransactionID, isAgentID } from "./ids.js";
|
||||
import { AccountIDOrAgentID, GeneralizedControlledAccount } from "./account.js";
|
||||
import { AccountIDOrAgentID, GeneralizedControlledAccount, Profile } from "./account.js";
|
||||
|
||||
export type PermissionsDef =
|
||||
| { type: "team"; initialAdmin: AccountIDOrAgentID }
|
||||
@@ -77,7 +77,8 @@ export function determineValidTransactions(
|
||||
|
||||
const change = tx.changes[0] as
|
||||
| MapOpPayload<AccountIDOrAgentID, Role>
|
||||
| MapOpPayload<"readKey", JsonValue>;
|
||||
| MapOpPayload<"readKey", JsonValue>
|
||||
| MapOpPayload<"profile", CoID<Profile>>;
|
||||
if (tx.changes.length !== 1) {
|
||||
console.warn("Team transaction must have exactly one change");
|
||||
continue;
|
||||
@@ -94,6 +95,14 @@ export function determineValidTransactions(
|
||||
continue;
|
||||
}
|
||||
|
||||
validTransactions.push({ txID: { sessionID, txIndex }, tx });
|
||||
continue;
|
||||
} else if (change.key === 'profile') {
|
||||
if (memberState[transactor] !== "admin") {
|
||||
console.warn("Only admins can set profile");
|
||||
continue;
|
||||
}
|
||||
|
||||
validTransactions.push({ txID: { sessionID, txIndex }, tx });
|
||||
continue;
|
||||
} else if (isKeyForKeyField(change.key) || isKeyForAccountField(change.key)) {
|
||||
@@ -205,6 +214,7 @@ export function determineValidTransactions(
|
||||
}
|
||||
|
||||
export type TeamContent = {
|
||||
profile: CoID<Profile> | null;
|
||||
[key: AccountIDOrAgentID]: Role;
|
||||
readKey: KeyID;
|
||||
[revelationFor: `${KeyID}_for_${AccountIDOrAgentID}`]: Sealed<KeySecret>;
|
||||
@@ -343,7 +353,7 @@ export class Team {
|
||||
}
|
||||
|
||||
createMap<M extends { [key: string]: JsonValue }, Meta extends JsonObject | null>(
|
||||
meta?: M
|
||||
meta?: Meta
|
||||
): CoMap<M, Meta> {
|
||||
return this.node
|
||||
.createCoValue({
|
||||
|
||||
148
src/sync.test.ts
148
src/sync.test.ts
@@ -9,7 +9,7 @@ import {
|
||||
WritableStream,
|
||||
TransformStream,
|
||||
} from "isomorphic-streams";
|
||||
import { randomAnonymousAccountAndSessionID } from "./testUtils.js";
|
||||
import { connectedPeers, newStreamPair, randomAnonymousAccountAndSessionID, shouldNotResolve } from "./testUtils.js";
|
||||
import { AccountID } from "./account.js";
|
||||
|
||||
test("Node replies with initial tx and header to empty subscribe", async () => {
|
||||
@@ -1124,150 +1124,4 @@ function admStateEx(adminID: AccountID) {
|
||||
};
|
||||
}
|
||||
|
||||
function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
|
||||
const queue: T[] = [];
|
||||
let resolveNextItemReady: () => void = () => {};
|
||||
let nextItemReady: Promise<void> = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
|
||||
let writerClosed = false;
|
||||
let readerClosed = false;
|
||||
|
||||
const readable = new ReadableStream<T>({
|
||||
async pull(controller) {
|
||||
let retriesLeft = 3;
|
||||
while (retriesLeft > 0) {
|
||||
if (writerClosed) {
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
retriesLeft--;
|
||||
if (queue.length > 0) {
|
||||
controller.enqueue(queue.shift()!);
|
||||
if (queue.length === 0) {
|
||||
nextItemReady = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
await nextItemReady;
|
||||
}
|
||||
}
|
||||
throw new Error(
|
||||
"Should only use one retry to get next item in queue."
|
||||
);
|
||||
},
|
||||
|
||||
cancel(reason) {
|
||||
console.log("Manually closing reader");
|
||||
readerClosed = true;
|
||||
},
|
||||
});
|
||||
|
||||
const writable = new WritableStream<T>({
|
||||
write(chunk, controller) {
|
||||
if (readerClosed) {
|
||||
console.log("Reader closed, not writing chunk", chunk);
|
||||
throw new Error("Reader closed, not writing chunk");
|
||||
}
|
||||
queue.push(chunk);
|
||||
if (queue.length === 1) {
|
||||
// make sure that await write resolves before corresponding read
|
||||
process.nextTick(() => resolveNextItemReady());
|
||||
}
|
||||
},
|
||||
abort(reason) {
|
||||
console.log("Manually closing writer");
|
||||
writerClosed = true;
|
||||
resolveNextItemReady();
|
||||
return Promise.resolve();
|
||||
},
|
||||
});
|
||||
|
||||
return [readable, writable];
|
||||
}
|
||||
|
||||
function shouldNotResolve<T>(
|
||||
promise: Promise<T>,
|
||||
ops: { timeout: number }
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
promise
|
||||
.then((v) =>
|
||||
reject(
|
||||
new Error(
|
||||
"Should not have resolved, but resolved to " +
|
||||
JSON.stringify(v)
|
||||
)
|
||||
)
|
||||
)
|
||||
.catch(reject);
|
||||
setTimeout(resolve, ops.timeout);
|
||||
});
|
||||
}
|
||||
|
||||
function connectedPeers(
|
||||
peer1id: PeerID,
|
||||
peer2id: PeerID,
|
||||
{
|
||||
trace = false,
|
||||
peer1role = "peer",
|
||||
peer2role = "peer",
|
||||
}: {
|
||||
trace?: boolean;
|
||||
peer1role?: Peer["role"];
|
||||
peer2role?: Peer["role"];
|
||||
} = {}
|
||||
): [Peer, Peer] {
|
||||
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
|
||||
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
|
||||
|
||||
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
|
||||
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
|
||||
|
||||
void outRx2
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void }
|
||||
) {
|
||||
trace && console.log(`${peer2id} -> ${peer1id}`, chunk);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx1);
|
||||
|
||||
void outRx1
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void }
|
||||
) {
|
||||
trace && console.log(`${peer1id} -> ${peer2id}`, chunk);
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx2);
|
||||
|
||||
const peer2AsPeer: Peer = {
|
||||
id: peer2id,
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
role: peer2role,
|
||||
};
|
||||
|
||||
const peer1AsPeer: Peer = {
|
||||
id: peer1id,
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
role: peer1role,
|
||||
};
|
||||
|
||||
return [peer1AsPeer, peer2AsPeer];
|
||||
}
|
||||
|
||||
150
src/testUtils.ts
150
src/testUtils.ts
@@ -4,6 +4,8 @@ import { LocalNode } from "./node.js";
|
||||
import { expectTeamContent } from "./permissions.js";
|
||||
import { AnonymousControlledAccount } from "./account.js";
|
||||
import { SessionID } from "./ids.js";
|
||||
import { ReadableStream, TransformStream, WritableStream } from "isomorphic-streams";
|
||||
import { Peer, PeerID, SyncMessage } from "./sync.js";
|
||||
|
||||
export function randomAnonymousAccountAndSessionID(): [AnonymousControlledAccount, SessionID] {
|
||||
const agentSecret = newRandomAgentSecret();
|
||||
@@ -76,4 +78,152 @@ export function teamWithTwoAdminsHighLevel() {
|
||||
team.addMember(otherAdmin.id, "admin");
|
||||
|
||||
return { admin, node, team, otherAdmin };
|
||||
}
|
||||
|
||||
export function newStreamPair<T>(): [ReadableStream<T>, WritableStream<T>] {
|
||||
const queue: T[] = [];
|
||||
let resolveNextItemReady: () => void = () => {};
|
||||
let nextItemReady: Promise<void> = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
|
||||
let writerClosed = false;
|
||||
let readerClosed = false;
|
||||
|
||||
const readable = new ReadableStream<T>({
|
||||
async pull(controller) {
|
||||
let retriesLeft = 3;
|
||||
while (retriesLeft > 0) {
|
||||
if (writerClosed) {
|
||||
controller.close();
|
||||
return;
|
||||
}
|
||||
retriesLeft--;
|
||||
if (queue.length > 0) {
|
||||
controller.enqueue(queue.shift()!);
|
||||
if (queue.length === 0) {
|
||||
nextItemReady = new Promise((resolve) => {
|
||||
resolveNextItemReady = resolve;
|
||||
});
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
await nextItemReady;
|
||||
}
|
||||
}
|
||||
throw new Error(
|
||||
"Should only use one retry to get next item in queue."
|
||||
);
|
||||
},
|
||||
|
||||
cancel(reason) {
|
||||
console.log("Manually closing reader");
|
||||
readerClosed = true;
|
||||
},
|
||||
});
|
||||
|
||||
const writable = new WritableStream<T>({
|
||||
write(chunk, controller) {
|
||||
if (readerClosed) {
|
||||
console.log("Reader closed, not writing chunk", chunk);
|
||||
throw new Error("Reader closed, not writing chunk");
|
||||
}
|
||||
queue.push(chunk);
|
||||
if (queue.length === 1) {
|
||||
// make sure that await write resolves before corresponding read
|
||||
process.nextTick(() => resolveNextItemReady());
|
||||
}
|
||||
},
|
||||
abort(reason) {
|
||||
console.log("Manually closing writer");
|
||||
writerClosed = true;
|
||||
resolveNextItemReady();
|
||||
return Promise.resolve();
|
||||
},
|
||||
});
|
||||
|
||||
return [readable, writable];
|
||||
}
|
||||
|
||||
export function shouldNotResolve<T>(
|
||||
promise: Promise<T>,
|
||||
ops: { timeout: number }
|
||||
): Promise<void> {
|
||||
return new Promise((resolve, reject) => {
|
||||
promise
|
||||
.then((v) =>
|
||||
reject(
|
||||
new Error(
|
||||
"Should not have resolved, but resolved to " +
|
||||
JSON.stringify(v)
|
||||
)
|
||||
)
|
||||
)
|
||||
.catch(reject);
|
||||
setTimeout(resolve, ops.timeout);
|
||||
});
|
||||
}
|
||||
|
||||
export function connectedPeers(
|
||||
peer1id: PeerID,
|
||||
peer2id: PeerID,
|
||||
{
|
||||
trace = false,
|
||||
peer1role = "peer",
|
||||
peer2role = "peer",
|
||||
}: {
|
||||
trace?: boolean;
|
||||
peer1role?: Peer["role"];
|
||||
peer2role?: Peer["role"];
|
||||
} = {}
|
||||
): [Peer, Peer] {
|
||||
const [inRx1, inTx1] = newStreamPair<SyncMessage>();
|
||||
const [outRx1, outTx1] = newStreamPair<SyncMessage>();
|
||||
|
||||
const [inRx2, inTx2] = newStreamPair<SyncMessage>();
|
||||
const [outRx2, outTx2] = newStreamPair<SyncMessage>();
|
||||
|
||||
void outRx2
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void }
|
||||
) {
|
||||
trace && console.log(`${peer2id} -> ${peer1id}`, JSON.stringify(chunk, null, 2));
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx1);
|
||||
|
||||
void outRx1
|
||||
.pipeThrough(
|
||||
new TransformStream({
|
||||
transform(
|
||||
chunk: SyncMessage,
|
||||
controller: { enqueue: (msg: SyncMessage) => void }
|
||||
) {
|
||||
trace && console.log(`${peer1id} -> ${peer2id}`, JSON.stringify(chunk, null, 2));
|
||||
controller.enqueue(chunk);
|
||||
},
|
||||
})
|
||||
)
|
||||
.pipeTo(inTx2);
|
||||
|
||||
const peer2AsPeer: Peer = {
|
||||
id: peer2id,
|
||||
incoming: inRx1,
|
||||
outgoing: outTx1,
|
||||
role: peer2role,
|
||||
};
|
||||
|
||||
const peer1AsPeer: Peer = {
|
||||
id: peer1id,
|
||||
incoming: inRx2,
|
||||
outgoing: outTx2,
|
||||
role: peer1role,
|
||||
};
|
||||
|
||||
return [peer1AsPeer, peer2AsPeer];
|
||||
}
|
||||
Reference in New Issue
Block a user