Compare commits

...

2 Commits

Author SHA1 Message Date
Guido D'Orsi
5863badbb0 feat: make it possible to customize the logger in cojson 2025-01-20 12:34:31 +01:00
Guido D'Orsi
909165d813 test: cover nested SchemaUnion 2025-01-18 00:27:33 +01:00
27 changed files with 364 additions and 188 deletions

View File

@@ -0,0 +1,8 @@
---
"cojson-storage-sqlite": patch
"cojson-transport-ws": patch
"cojson-storage": patch
"cojson": patch
---
Wrap all the console logs with a logger class to make possible to customize the logger

View File

@@ -1,5 +1,10 @@
import { Database as DatabaseT } from "better-sqlite3";
import { CojsonInternalTypes, OutgoingSyncQueue, SessionID } from "cojson";
import {
CojsonInternalTypes,
OutgoingSyncQueue,
SessionID,
logger,
} from "cojson";
import RawCoID = CojsonInternalTypes.RawCoID;
import Signature = CojsonInternalTypes.Signature;
import Transaction = CojsonInternalTypes.Transaction;
@@ -48,7 +53,7 @@ export class SQLiteClient implements DBClientInterface {
header: parsedHeader,
};
} catch (e) {
console.warn(coValueId, "Invalid JSON in header", e, coValueRow?.header);
logger.warn(coValueId, "Invalid JSON in header", e, coValueRow?.header);
return;
}
}
@@ -75,7 +80,7 @@ export class SQLiteClient implements DBClientInterface {
tx: JSON.parse(transactionRow.tx) as Transaction,
}));
} catch (e) {
console.warn("Invalid JSON in transaction", e);
logger.warn("Invalid JSON in transaction", e);
return [];
}
}

View File

@@ -4,6 +4,7 @@ import {
OutgoingSyncQueue,
Peer,
cojsonInternals,
logger,
} from "cojson";
import { SyncManager, TransactionRow } from "cojson-storage";
import { SQLiteClient } from "./sqliteClient.js";
@@ -40,25 +41,23 @@ export class SQLiteNode {
await new Promise((resolve) => setTimeout(resolve, 0));
}
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
logger.error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
e,
);
}
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in sqlite", e),
);
processMessages().catch((e) =>
logger.error("Error in processMessages in sqlite", e),
);
};
}
static async asPeer({
@@ -97,10 +96,7 @@ export class SQLiteNode {
db.pragma("user_version") as [{ user_version: number }]
)[0].user_version as number;
console.log("DB version", oldVersion);
if (oldVersion === 0) {
console.log("Migration 0 -> 1: Basic schema");
db.prepare(
`CREATE TABLE IF NOT EXISTS transactions (
ses INTEGER,
@@ -138,15 +134,10 @@ export class SQLiteNode {
).run();
db.pragma("user_version = 1");
console.log("Migration 0 -> 1: Basic schema - done");
}
if (oldVersion <= 1) {
// fix embarrassing off-by-one error for transaction indices
console.log(
"Migration 1 -> 2: Fix off-by-one error for transaction indices",
);
const txs = db
.prepare(`SELECT * FROM transactions`)
.all() as TransactionRow[];
@@ -163,14 +154,9 @@ export class SQLiteNode {
}
db.pragma("user_version = 2");
console.log(
"Migration 1 -> 2: Fix off-by-one error for transaction indices - done",
);
}
if (oldVersion <= 2) {
console.log("Migration 2 -> 3: Add signatureAfter");
db.prepare(
`CREATE TABLE IF NOT EXISTS signatureAfter (
ses INTEGER,
@@ -185,7 +171,6 @@ export class SQLiteNode {
).run();
db.pragma("user_version = 3");
console.log("Migration 2 -> 3: Add signatureAfter - done!!");
}
return new SQLiteNode(db, fromLocalNode, toLocalNode);

View File

@@ -6,6 +6,7 @@ import {
SyncMessage,
cojsonInternals,
emptyKnownState,
logger,
} from "cojson";
import { collectNewTxs, getDependedOnCoValues } from "./syncUtils.js";
import { DBClientInterface, StoredSessionRow } from "./types.js";
@@ -314,7 +315,7 @@ export class SyncManager {
return this.toLocalNode
.push(msg)
.catch((e) =>
console.error(`Error sending ${msg.action} state, id ${msg.id}`, e),
logger.error(`Error sending ${msg.action} state, id ${msg.id}`, e),
);
}
}

View File

@@ -4,6 +4,7 @@ import {
PingTimeoutError,
SyncMessage,
cojsonInternals,
logger,
} from "cojson";
import { BatchedOutgoingMessages } from "./BatchedOutgoingMessages.js";
import { deserializeMessages } from "./serialization.js";
@@ -136,7 +137,7 @@ export function createWebSocketPeer({
function handleClose() {
incoming
.push("Disconnected")
.catch((e) => console.error("Error while pushing disconnect msg", e));
.catch((e) => logger.error("Error while pushing disconnect msg", e));
emitClosedEvent();
}
@@ -145,7 +146,7 @@ export function createWebSocketPeer({
const pingTimeout = createPingTimeoutListener(expectPings, () => {
incoming
.push("PingTimeout")
.catch((e) => console.error("Error while pushing ping timeout", e));
.catch((e) => logger.error("Error while pushing ping timeout", e));
emitClosedEvent();
});
@@ -156,14 +157,13 @@ export function createWebSocketPeer({
function handleIncomingMsg(event: { data: unknown }) {
if (event.data === "") {
console.log("client", id, "sent empty message");
return;
}
const result = deserializeMessages(event.data);
if (!result.ok) {
console.error(
logger.warn(
"Error while deserializing messages",
event.data,
result.error,
@@ -184,7 +184,7 @@ export function createWebSocketPeer({
if (msg && "action" in msg) {
incoming
.push(msg)
.catch((e) => console.error("Error while pushing incoming msg", e));
.catch((e) => logger.error("Error while pushing incoming msg", e));
}
}
}
@@ -197,7 +197,6 @@ export function createWebSocketPeer({
outgoing: {
push: outgoingMessages.sendMessage,
close() {
console.log("Trying to close", id, websocket.readyState);
outgoingMessages.close();
websocket.removeEventListener("message", handleIncomingMsg);

View File

@@ -1,4 +1,4 @@
import { SyncMessage } from "cojson";
import { SyncMessage, logger } from "cojson";
import { PingMsg } from "./types.js";
export function addMessageToBacklog(backlog: string, message: SyncMessage) {
@@ -24,7 +24,7 @@ export function deserializeMessages(messages: unknown) {
| PingMsg[],
} as const;
} catch (e) {
console.error("Error while deserializing messages", e);
logger.error("Error while deserializing messages", e);
return {
ok: false,
error: e,

View File

@@ -5,6 +5,7 @@ import {
} from "./PriorityBasedMessageQueue.js";
import { TryAddTransactionsError } from "./coValueCore.js";
import { RawCoID } from "./ids.js";
import { logger } from "./logger.js";
import { CO_VALUE_PRIORITY } from "./priority.js";
import { Peer, SyncMessage } from "./sync.js";
@@ -137,7 +138,7 @@ export class PeerState {
}
gracefulShutdown() {
console.debug("Gracefully closing", this.id);
logger.debug("Gracefully closing", this.id);
this.closeQueue();
this.peer.outgoing.close();
this.closed = true;

View File

@@ -43,12 +43,7 @@ export function bytesToBase64url(bytes: Uint8Array) {
let base64 = decoder.decode(new Uint8Array(encoded.buffer, 0, n));
if (k === 1) base64 += "==";
if (k === 2) base64 += "=";
// const after = performance.now();
// console.log(
// "bytesToBase64url bandwidth in MB/s for length",
// (1000 * bytes.length / (after - before)) / (1024 * 1024),
// bytes.length
// );
return base64;
}

View File

@@ -24,6 +24,7 @@ import {
import { Stringified, parseJSON, stableStringify } from "./jsonStringify.js";
import { JsonObject, JsonValue } from "./jsonValue.js";
import { LocalNode, ResolveAccountAgentError } from "./localNode.js";
import { logger } from "./logger.js";
import {
PermissionsDef as RulesetDef,
determineValidTransactions,
@@ -209,16 +210,10 @@ export class CoValueCore {
.andThen((agent) => {
const signerID = this.crypto.getAgentSignerID(agent);
// const beforeHash = performance.now();
const { expectedNewHash, newStreamingHash } = this.expectedNewHashAfter(
sessionID,
newTransactions,
);
// const afterHash = performance.now();
// console.log(
// "Hashing took",
// afterHash - beforeHash
// );
if (givenExpectedNewHash && givenExpectedNewHash !== expectedNewHash) {
return err({
@@ -391,7 +386,6 @@ export class CoValueCore {
streamingHash.update(transaction);
const after = performance.now();
if (after - before > 1) {
// console.log("Hashing blocked for", after - before);
await new Promise((resolve) => setTimeout(resolve, 0));
before = performance.now();
}
@@ -540,7 +534,7 @@ export class CoValueCore {
}
if (!decryptedChanges) {
console.error("Failed to decrypt transaction despite having key");
logger.error("Failed to decrypt transaction despite having key");
continue;
}
@@ -685,7 +679,7 @@ export class CoValueCore {
if (secret) {
return secret as KeySecret;
} else {
console.error(
logger.warn(
`Encrypting ${encryptingKeyID} key didn't decrypt ${keyID}`,
);
}
@@ -725,7 +719,7 @@ export class CoValueCore {
if (secret) {
return secret as KeySecret;
} else {
console.error(
logger.warn(
`Encrypting parent ${parentKey.id} key didn't decrypt ${keyID}`,
);
}

View File

@@ -1,6 +1,7 @@
import { PeerState } from "./PeerState.js";
import { CoValueCore } from "./coValueCore.js";
import { RawCoID } from "./ids.js";
import { logger } from "./logger.js";
import { PeerID } from "./sync.js";
export const CO_VALUE_LOADING_MAX_RETRIES = 5;
@@ -282,7 +283,7 @@ async function loadCoValueFromPeers(
...coValueEntry.state.coValue.knownState(),
})
.catch((err) => {
console.error(`Failed to push load message to peer ${peer.id}`, err);
logger.warn(`Failed to push load message to peer ${peer.id}`, err);
});
} else {
/**
@@ -296,14 +297,14 @@ async function loadCoValueFromPeers(
sessions: {},
})
.catch((err) => {
console.error(`Failed to push load message to peer ${peer.id}`, err);
logger.warn(`Failed to push load message to peer ${peer.id}`, err);
});
}
if (coValueEntry.state.type === "loading") {
const timeout = setTimeout(() => {
if (coValueEntry.state.type === "loading") {
console.error("Failed to load coValue from peer", peer.id);
logger.warn("Failed to load coValue from peer", peer.id);
coValueEntry.dispatch({
type: "not-found-in-peer",
peerId: peer.id,
@@ -356,7 +357,7 @@ function sleep(ms: number) {
function getPeersWithoutErrors(peers: PeerState[], coValueId: RawCoID) {
return peers.filter((p) => {
if (p.erroredCoValues.has(coValueId)) {
console.error(
logger.warn(
`Skipping load on errored coValue ${coValueId} from peer ${p.id}`,
);
return false;

View File

@@ -16,6 +16,7 @@ import {
import { AgentID } from "../ids.js";
import { JsonObject } from "../jsonValue.js";
import { LocalNode } from "../localNode.js";
import { logger } from "../logger.js";
import type { AccountRole } from "../permissions.js";
import { RawCoMap } from "./coMap.js";
import { InviteSecret, RawGroup } from "./group.js";
@@ -59,7 +60,7 @@ export class RawAccount<
);
if (agents.length !== 1) {
console.warn("Account has " + agents.length + " agents", this.id);
logger.warn("Account has " + agents.length + " agents", this.id);
}
this._cachedCurrentAgentID = agents[0];

View File

@@ -133,10 +133,6 @@ export class RawCoListView<
change.before.txIndex
]?.[change.before.changeIdx];
if (!beforeEntry) {
// console.error(
// "Insertion before missing op " +
// change.before
// );
continue;
}
beforeEntry.predecessors.splice(0, 0, {
@@ -156,9 +152,6 @@ export class RawCoListView<
change.after.txIndex
]?.[change.after.changeIdx];
if (!afterEntry) {
// console.error(
// "Insertion after missing op " + change.after
// );
continue;
}
afterEntry.successors.push({

View File

@@ -3,6 +3,7 @@ import { CoID, RawCoValue } from "../coValue.js";
import { CoValueCore } from "../coValueCore.js";
import { AgentID, SessionID, TransactionID } from "../ids.js";
import { JsonObject, JsonValue } from "../jsonValue.js";
import { logger } from "../logger.js";
import { CoValueKnownState } from "../sync.js";
import { accountOrAgentIDfromSessionID } from "../typeUtils/accountOrAgentIDfromSessionID.js";
import { isAccountID } from "../typeUtils/isAccountID.js";
@@ -309,7 +310,7 @@ export class RawBinaryCoStreamView<
const start = items[0];
if (start?.type !== "start") {
console.error("Invalid binary stream start", start);
logger.error("Invalid binary stream start", start);
return;
}
@@ -328,7 +329,7 @@ export class RawBinaryCoStreamView<
}
if (item.type !== "chunk") {
console.error("Invalid binary stream chunk", item);
logger.error("Invalid binary stream chunk", item);
return undefined;
}
@@ -382,7 +383,6 @@ export class RawBinaryCoStream<
chunk: Uint8Array,
privacy: "private" | "trusting" = "private",
): void {
// const before = performance.now();
this.push(
{
type: "chunk",
@@ -391,11 +391,6 @@ export class RawBinaryCoStream<
privacy,
false,
);
// const after = performance.now();
// console.log(
// "pushBinaryStreamChunk bandwidth in MB/s",
// (1000 * chunk.length) / (after - before) / (1024 * 1024)
// );
}
endBinaryStream(privacy: "private" | "trusting" = "private") {

View File

@@ -13,6 +13,7 @@ import {
isParentGroupReference,
} from "../ids.js";
import { JsonObject } from "../jsonValue.js";
import { logger } from "../logger.js";
import { AccountRole, Role } from "../permissions.js";
import { expectGroup } from "../typeUtils/expectGroup.js";
import {
@@ -153,7 +154,7 @@ export class RawGroup<
child.state.type === "unavailable"
) {
child.loadFromPeers(peers).catch(() => {
console.error(`Failed to load child group ${id}`);
logger.error(`Failed to load child group ${id}`);
});
}
@@ -321,7 +322,7 @@ export class RawGroup<
const secret = this.core.getReadKey(keyID);
if (!secret) {
console.error("Can't find key", keyID);
logger.error("Can't find key", keyID);
continue;
}

View File

@@ -7,6 +7,7 @@ import { base64URLtoBytes, bytesToBase64url } from "../base64url.js";
import { RawCoID, TransactionID } from "../ids.js";
import { Stringified, stableStringify } from "../jsonStringify.js";
import { JsonValue } from "../jsonValue.js";
import { logger } from "../logger.js";
import {
CryptoProvider,
Encrypted,
@@ -192,7 +193,7 @@ export class PureJSCrypto extends CryptoProvider<Blake3State> {
try {
return JSON.parse(textDecoder.decode(plaintext));
} catch (e) {
console.error("Failed to decrypt/parse sealed message", e);
logger.error("Failed to decrypt/parse sealed message", e);
return undefined;
}
}

View File

@@ -15,6 +15,7 @@ import { base64URLtoBytes, bytesToBase64url } from "../base64url.js";
import { RawCoID, TransactionID } from "../ids.js";
import { Stringified, stableStringify } from "../jsonStringify.js";
import { JsonValue } from "../jsonValue.js";
import { logger } from "../logger.js";
import {
CryptoProvider,
Encrypted,
@@ -240,7 +241,7 @@ export class WasmCrypto extends CryptoProvider<Uint8Array> {
try {
return JSON.parse(textDecoder.decode(plaintext));
} catch (e) {
console.error("Failed to decrypt/parse sealed message", e);
logger.error("Failed to decrypt/parse sealed message", e);
return undefined;
}
}

View File

@@ -4,6 +4,7 @@ import { AgentID, RawCoID, TransactionID } from "../ids.js";
import { SessionID } from "../ids.js";
import { Stringified, parseJSON, stableStringify } from "../jsonStringify.js";
import { JsonValue } from "../jsonValue.js";
import { logger } from "../logger.js";
export type SignerSecret = `signerSecret_z${string}`;
export type SignerID = `signer_z${string}`;
@@ -159,7 +160,7 @@ export abstract class CryptoProvider<Blake3State = any> {
try {
return parseJSON(this.decryptRaw(encrypted, keySecret, nOnceMaterial));
} catch (e) {
console.error("Decryption error", e);
logger.error("Decryption error", e);
return undefined;
}
}
@@ -305,10 +306,7 @@ export class StreamingHash {
update(value: JsonValue): Uint8Array {
const encoded = textEncoder.encode(stableStringify(value));
// const before = performance.now();
this.state = this.crypto.blake3IncrementalUpdate(this.state, encoded);
// const after = performance.now();
// console.log(`Hashing throughput in MB/s`, 1000 * (encoded.length / (after - before)) / (1024 * 1024));
return encoded;
}

View File

@@ -74,6 +74,7 @@ import {
type Value = JsonValue | AnyRawCoValue;
import { logger } from "./logger.js";
import { getPriorityFromHeader } from "./priority.js";
import { FileSystem } from "./storage/FileSystem.js";
import { BlockFilename, LSMStorage, WalFilename } from "./storage/index.js";
@@ -141,6 +142,7 @@ export {
emptyKnownState,
RawCoPlainText,
stringifyOpID,
logger,
};
export type {

View File

@@ -27,6 +27,7 @@ import {
} from "./coValues/group.js";
import { AgentSecret, CryptoProvider } from "./crypto/crypto.js";
import { AgentID, RawCoID, SessionID, isAgentID } from "./ids.js";
import { logger } from "./logger.js";
import { Peer, PeerID, SyncManager } from "./sync.js";
import { expectGroup } from "./typeUtils/expectGroup.js";
@@ -230,7 +231,7 @@ export class LocalNode {
return node;
} catch (e) {
console.error("Error withLoadedAccount", e);
logger.error("Error withLoadedAccount", e);
throw e;
}
}
@@ -269,7 +270,7 @@ export class LocalNode {
this.syncManager.getServerAndStoragePeers(skipLoadingFromPeer);
await entry.loadFromPeers(peers).catch((e) => {
console.error("Error loading from peers", id, e);
logger.error("Error loading from peers", id, e);
});
}
@@ -311,8 +312,6 @@ export class LocalNode {
let stopped = false;
let unsubscribe!: () => void;
// console.log("Subscribing to " + id);
this.load(id)
.then((coValue) => {
if (stopped) {
@@ -325,11 +324,10 @@ export class LocalNode {
unsubscribe = coValue.subscribe(callback);
})
.catch((e) => {
console.error("Error subscribing to ", id, e);
logger.error("Error subscribing to ", id, e);
});
return () => {
console.log("Unsubscribing from " + id);
stopped = true;
unsubscribe?.();
};
@@ -390,9 +388,7 @@ export class LocalNode {
(existingRole === "reader" && inviteRole === "readerInvite") ||
(existingRole && inviteRole === "writeOnlyInvite")
) {
console.debug(
"Not accepting invite that would replace or downgrade role",
);
logger.debug("Not accepting invite that would replace or downgrade role");
return;
}

View File

@@ -0,0 +1,78 @@
export enum LogLevel {
DEBUG = 0,
INFO = 1,
WARN = 2,
ERROR = 3,
NONE = 4,
}
export interface LogSystem {
debug(message: string, ...args: any[]): void;
info(message: string, ...args: any[]): void;
warn(message: string, ...args: any[]): void;
error(message: string, ...args: any[]): void;
}
// Default console-based logging system
export class ConsoleLogSystem implements LogSystem {
debug(message: string, ...args: any[]) {
console.debug(message, ...args);
}
info(message: string, ...args: any[]) {
console.info(message, ...args);
}
warn(message: string, ...args: any[]) {
console.warn(message, ...args);
}
error(message: string, ...args: any[]) {
console.error(message, ...args);
}
}
export class Logger {
private level: LogLevel;
private logSystem: LogSystem;
constructor(
level: LogLevel = LogLevel.INFO,
logSystem: LogSystem = new ConsoleLogSystem(),
) {
this.level = level;
this.logSystem = logSystem;
}
setLevel(level: LogLevel) {
this.level = level;
}
setLogSystem(logSystem: LogSystem) {
this.logSystem = logSystem;
}
debug(message: string, ...args: any[]) {
if (this.level <= LogLevel.DEBUG) {
this.logSystem.debug(message, ...args);
}
}
info(message: string, ...args: any[]) {
if (this.level <= LogLevel.INFO) {
this.logSystem.info(message, ...args);
}
}
warn(message: string, ...args: any[]) {
if (this.level <= LogLevel.WARN) {
this.logSystem.warn(message, ...args);
}
}
error(message: string, ...args: any[]) {
if (this.level <= LogLevel.ERROR) {
this.logSystem.error(message, ...args);
}
}
}
// Create default logger instance
export const logger = new Logger();

View File

@@ -14,6 +14,7 @@ import {
} from "./ids.js";
import { parseJSON } from "./jsonStringify.js";
import { JsonValue } from "./jsonValue.js";
import { logger } from "./logger.js";
import { accountOrAgentIDfromSessionID } from "./typeUtils/accountOrAgentIDfromSessionID.js";
import { expectGroup } from "./typeUtils/expectGroup.js";
@@ -46,7 +47,7 @@ function logPermissionError(...args: unknown[]) {
return;
}
console.warn(...args);
logger.warn("Permission error", ...args);
}
export function determineValidTransactions(
@@ -204,7 +205,6 @@ function determineValidTransactionsForGroup(
const writeKeys = new Set<string>();
for (const { sessionID, txIndex, tx } of allTransactionsSorted) {
// console.log("before", { memberState, validTransactions });
const transactor = accountOrAgentIDfromSessionID(sessionID);
if (tx.privacy === "private") {
@@ -458,8 +458,6 @@ function determineValidTransactionsForGroup(
memberState[affectedMember] = change.value;
validTransactions.push({ txID: { sessionID, txIndex }, tx });
// console.log("after", { memberState, validTransactions });
}
return { validTransactions, memberState };
@@ -473,7 +471,7 @@ function agentInAccountOrMemberInGroup(
return groupAtTime.currentAgentID().match(
(agentID) => agentID,
(e) => {
console.error(
logger.error(
"Error while determining current agent ID in valid transactions",
e,
);

View File

@@ -87,26 +87,15 @@ export async function writeBlock<WH, RH, FS extends FileSystem<WH, RH>>(
const headerBytes = textEncoder.encode(JSON.stringify(blockHeader));
await fs.append(file, headerBytes);
// console.log(
// "full file",
// yield* $(
// fs.read(file as unknown as RH, 0, offset + headerBytes.length),
// ),
// );
const filename: BlockFilename = `L${level}-${(blockNumber + "").padStart(
3,
"0",
)}-${hash.digest().replace("hash_", "").slice(0, 15)}-H${
headerBytes.length
}.jsonl`;
// console.log("renaming to" + filename);
await fs.closeAndRename(file, filename);
return filename;
// console.log("Wrote block", filename, blockHeader);
// console.log("IDs in block", blockHeader.map(e => e.id));
}
export async function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
@@ -120,6 +109,5 @@ export async function writeToWal<WH, RH, FS extends FileSystem<WH, RH>>(
...chunk,
};
const bytes = textEncoder.encode(JSON.stringify(walEntry) + "\n");
console.log("writing to WAL", handle, id, bytes.length);
return fs.append(handle, bytes);
}

View File

@@ -2,6 +2,7 @@ import { CoID, RawCoValue } from "../coValue.js";
import { CoValueHeader, Transaction } from "../coValueCore.js";
import { Signature } from "../crypto/crypto.js";
import { RawCoID } from "../ids.js";
import { logger } from "../logger.js";
import { connectedPeers } from "../streamUtils.js";
import {
CoValueKnownState,
@@ -68,7 +69,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
const processMessages = async () => {
for await (const msg of fromLocalNode) {
console.log("Storage msg start", nMsg);
try {
if (msg === "Disconnected" || msg === "PingTimeout") {
throw new Error("Unexpected Disconnected message");
@@ -83,32 +83,29 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
await this.sendNewContent(msg.id, msg, undefined);
}
} catch (e) {
console.error(
new Error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
{ cause: e },
),
logger.error(
`Error reading from localNode, handling msg\n\n${JSON.stringify(
msg,
(k, v) =>
k === "changes" || k === "encryptedChanges"
? v.slice(0, 20) + "..."
: v,
)}`,
e,
);
}
console.log("Storage msg end", nMsg);
nMsg++;
}
};
processMessages().catch((e) =>
console.error("Error in processMessages in storage", e),
logger.error("Error in processMessages in storage", e),
);
setTimeout(
() =>
this.compact().catch((e) => {
console.error("Error while compacting", e);
logger.error("Error while compacting", e);
}),
20000,
);
@@ -134,7 +131,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
sessions: {},
asDependencyOf,
})
.catch((e) => console.error("Error while pushing known", e));
.catch((e) => logger.error("Error while pushing known", e));
return;
}
@@ -190,13 +187,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
...ourKnown,
asDependencyOf,
})
.catch((e) => console.error("Error while pushing known", e));
.catch((e) => logger.error("Error while pushing known", e));
for (const message of newContentMessages) {
if (Object.keys(message.new).length === 0) continue;
this.toLocalNode
.push(message)
.catch((e) => console.error("Error while pushing new content", e));
.catch((e) => logger.error("Error while pushing new content", e));
}
this.coValues[id] = coValue;
@@ -232,20 +229,19 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
if (!coValue) {
if (newContent.header) {
// console.log("Creating in WAL", newContent.id);
await this.withWAL((wal) =>
writeToWal(wal, this.fs, newContent.id, newContentAsChunk),
);
this.coValues[newContent.id] = newContentAsChunk;
} else {
console.warn("Incontiguous incoming update for " + newContent.id);
logger.warn("Incontiguous incoming update for " + newContent.id);
return;
}
} else {
const merged = mergeChunks(coValue, newContentAsChunk);
if (merged === "nonContigous") {
console.warn(
logger.warn(
"Non-contigous new content for " + newContent.id,
Object.entries(coValue.sessionEntries).map(([session, entries]) =>
entries.map((entry) => ({
@@ -264,7 +260,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
),
);
} else {
// console.log("Appending to WAL", newContent.id);
await this.withWAL((wal) =>
writeToWal(wal, this.fs, newContent.id, newContentAsChunk),
);
@@ -301,8 +296,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
const { handle, size } = await this.getBlockHandle(blockFile, fs);
// console.log("Attempting to load", id, blockFile);
if (!cachedHeader) {
cachedHeader = {};
const header = await readHeader(blockFile, handle, size, fs);
@@ -317,15 +310,13 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
}
const headerEntry = cachedHeader[id];
// console.log("Header entry", id, headerEntry);
if (headerEntry) {
const nextChunk = await readChunk(handle, headerEntry, fs);
if (result) {
const merged = mergeChunks(result, nextChunk);
if (merged === "nonContigous") {
console.warn(
logger.warn(
"Non-contigous chunks while loading " + id,
result,
nextChunk,
@@ -354,7 +345,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
const coValues = new Map<RawCoID, CoValueChunk>();
console.log("Compacting WAL files", walFiles);
if (walFiles.length === 0) return;
const oldWal = this.currentWal;
@@ -385,7 +375,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
if (existingChunk) {
const merged = mergeChunks(existingChunk, chunk);
if (merged === "nonContigous") {
console.log(
logger.info(
"Non-contigous chunks in " + chunk.id + ", " + fileName,
existingChunk,
chunk,
@@ -411,8 +401,6 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
return acc;
}, 0);
console.log([...coValues.keys()], fileNames, highestBlockNumber);
await writeBlock(coValues, MAX_N_LEVELS, highestBlockNumber + 1, this.fs);
for (const walFile of walFiles) {
@@ -438,15 +426,11 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
blockFilesByLevelInOrder[level]!.push(blockFile);
}
console.log(blockFilesByLevelInOrder);
for (let level = MAX_N_LEVELS; level > 0; level--) {
const nBlocksDesired = Math.pow(2, level);
const blocksInLevel = blockFilesByLevelInOrder[level];
if (blocksInLevel && blocksInLevel.length > nBlocksDesired) {
console.log("Compacting blocks in level", level, blocksInLevel);
const coValues = new Map<RawCoID, CoValueChunk>();
for (const blockFile of blocksInLevel) {
@@ -465,7 +449,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
if (existingChunk) {
const merged = mergeChunks(existingChunk, chunk);
if (merged === "nonContigous") {
console.log(
logger.info(
"Non-contigous chunks in " + entry.id + ", " + blockFile,
existingChunk,
chunk,
@@ -517,7 +501,7 @@ export class LSMStorage<WH, RH, FS extends FileSystem<WH, RH>> {
setTimeout(
() =>
this.compact().catch((e) => {
console.error("Error while compacting", e);
logger.error("Error while compacting", e);
}),
5000,
);

View File

@@ -6,6 +6,7 @@ import { CoValueCore } from "./coValueCore.js";
import { Signature } from "./crypto/crypto.js";
import { RawCoID, SessionID } from "./ids.js";
import { LocalNode } from "./localNode.js";
import { logger } from "./logger.js";
import { CoValuePriority } from "./priority.js";
export type CoValueKnownState = {
@@ -150,7 +151,7 @@ export class SyncManager {
async handleSyncMessage(msg: SyncMessage, peer: PeerState) {
if (peer.erroredCoValues.has(msg.id)) {
console.error(
logger.warn(
`Skipping message ${msg.action} on errored coValue ${msg.id} from peer ${peer.id}`,
);
return;
@@ -182,7 +183,7 @@ export class SyncManager {
if (entry.state.type !== "available") {
entry.loadFromPeers([peer]).catch((e: unknown) => {
console.error("Error sending load", e);
logger.error("Error sending load", e);
});
return;
}
@@ -199,7 +200,7 @@ export class SyncManager {
action: "load",
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending load", e);
logger.error("Error sending load", e);
});
}
}
@@ -229,7 +230,7 @@ export class SyncManager {
asDependencyOf,
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending known state", e);
logger.error("Error sending known state", e);
});
peer.toldKnownState.add(id);
@@ -256,15 +257,8 @@ export class SyncManager {
const sendPieces = async () => {
let lastYield = performance.now();
for (const [_i, piece] of newContentPieces.entries()) {
// console.log(
// `${id} -> ${peer.id}: Sending content piece ${i + 1}/${
// newContentPieces.length
// } header: ${!!piece.header}`,
// // Object.values(piece.new).map((s) => s.newTransactions)
// );
this.trySendToPeer(peer, piece).catch((e: unknown) => {
console.error("Error sending content piece", e);
logger.error("Error sending content piece", e);
});
if (performance.now() - lastYield > 10) {
@@ -277,7 +271,7 @@ export class SyncManager {
};
sendPieces().catch((e) => {
console.error("Error sending new content piece, retrying", e);
logger.error("Error sending new content piece, retrying", e);
peer.optimisticKnownStates.dispatch({
type: "SET",
id,
@@ -337,7 +331,7 @@ export class SyncManager {
return;
}
if (msg === "PingTimeout") {
console.error("Ping timeout from peer", peer.id);
logger.error("Ping timeout from peer", peer.id);
return;
}
try {
@@ -360,13 +354,13 @@ export class SyncManager {
processMessages()
.then(() => {
if (peer.crashOnClose) {
console.error("Unexepcted close from peer", peer.id);
logger.warn("Unexepcted close from peer", peer.id);
this.local.crashed = new Error("Unexpected close from peer");
throw new Error("Unexpected close from peer");
}
})
.catch((e) => {
console.error("Error processing messages from peer", peer.id, e);
logger.error("Error processing messages from peer", peer.id, e);
if (peer.crashOnClose) {
this.local.crashed = e;
throw new Error(e);
@@ -406,13 +400,13 @@ export class SyncManager {
// where we can get informations about the coValue
if (msg.header || Object.keys(msg.sessions).length > 0) {
entry.loadFromPeers([peer]).catch((e) => {
console.error("Error loading coValue in handleLoad", e);
logger.error("Error loading coValue in handleLoad", e);
});
}
return;
} else {
this.local.loadCoValueCore(msg.id, peer.id).catch((e) => {
console.error("Error loading coValue in handleLoad", e);
logger.error("Error loading coValue in handleLoad", e);
});
}
}
@@ -439,7 +433,7 @@ export class SyncManager {
header: false,
sessions: {},
}).catch((e) => {
console.error("Error sending known state back", e);
logger.error("Error sending known state back", e);
});
return;
@@ -449,7 +443,7 @@ export class SyncManager {
await this.sendNewContentIncludingDependencies(msg.id, peer);
})
.catch((e) => {
console.error("Error loading coValue in handleLoad loading state", e);
logger.error("Error loading coValue in handleLoad loading state", e);
});
}
@@ -484,7 +478,7 @@ export class SyncManager {
peer.role === "storage" ? undefined : peer.id,
)
.catch((e) => {
console.error(
logger.error(
`Error loading coValue ${msg.id} to create loading state, as dependency of ${msg.asDependencyOf}`,
e,
);
@@ -521,7 +515,7 @@ export class SyncManager {
if (entry.state.type !== "available") {
if (!msg.header) {
console.error("Expected header to be sent in first message");
logger.error("Expected header to be sent in first message");
return;
}
@@ -584,7 +578,7 @@ export class SyncManager {
: t.changes.length,
)
.reduce((a, b) => a + b, 0);
console.log(
logger.debug(
`Adding incoming transactions took ${(after - before).toFixed(
2,
)}ms for ${totalTxLength} bytes = bandwidth: ${(
@@ -602,7 +596,7 @@ export class SyncManager {
// );
if (result.isErr()) {
console.error(
logger.error(
"Failed to add transactions from",
peer.id,
result.error,
@@ -633,7 +627,7 @@ export class SyncManager {
isCorrection: true,
...coValue.knownState(),
}).catch((e) => {
console.error("Error sending known state correction", e);
logger.error("Error sending known state correction", e);
});
} else {
/**
@@ -647,7 +641,7 @@ export class SyncManager {
action: "known",
...coValue.knownState(),
}).catch((e: unknown) => {
console.error("Error sending known state", e);
logger.error("Error sending known state", e);
});
}
@@ -681,9 +675,6 @@ export class SyncManager {
const done = new Promise<void>((resolve) => {
queueMicrotask(async () => {
delete this.requestedSyncs[coValue.id];
// if (entry.nRequestsThisTick >= 2) {
// console.log("Syncing", coValue.id, "for", entry.nRequestsThisTick, "requests");
// }
await this.actuallySyncCoValue(coValue);
resolve();
});

View File

@@ -49,12 +49,9 @@ describe("PeerState", () => {
test("should perform graceful shutdown", () => {
const { mockPeer, peerState } = setup();
const consoleSpy = vi.spyOn(console, "debug").mockImplementation(() => {});
peerState.gracefulShutdown();
expect(mockPeer.outgoing.close).toHaveBeenCalled();
expect(peerState.closed).toBe(true);
expect(consoleSpy).toHaveBeenCalledWith("Gracefully closing", "test-peer");
consoleSpy.mockRestore();
});
test("should empty the queue when closing", async () => {

View File

@@ -0,0 +1,145 @@
import { describe, expect, test, vi } from "vitest";
import { LogLevel, Logger } from "../logger";
describe("Logger", () => {
describe("Log Level Filtering", () => {
test("should respect log level hierarchy", () => {
const mockLogSystem = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const logger = new Logger(LogLevel.WARN, mockLogSystem);
logger.debug("Debug message");
logger.info("Info message");
logger.warn("Warning message");
logger.error("Error message");
expect(mockLogSystem.debug).not.toHaveBeenCalled();
expect(mockLogSystem.info).not.toHaveBeenCalled();
expect(mockLogSystem.warn).toHaveBeenCalledWith("Warning message");
expect(mockLogSystem.error).toHaveBeenCalledWith("Error message");
});
test("should pass additional arguments to log system", () => {
const mockLogSystem = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const logger = new Logger(LogLevel.DEBUG, mockLogSystem);
const additionalArgs = [{ foo: "bar" }, 42, "extra"];
logger.debug("Debug message", ...additionalArgs);
expect(mockLogSystem.debug).toHaveBeenCalledWith(
"Debug message",
...additionalArgs,
);
});
});
describe("Log System Configuration", () => {
test("should allow changing log level at runtime", () => {
const mockLogSystem = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const logger = new Logger(LogLevel.ERROR, mockLogSystem);
logger.warn("Warning 1"); // Should not log
expect(mockLogSystem.warn).not.toHaveBeenCalled();
logger.setLevel(LogLevel.WARN);
logger.warn("Warning 2"); // Should log
expect(mockLogSystem.warn).toHaveBeenCalledWith("Warning 2");
});
test("should allow changing log system at runtime", () => {
const mockLogSystem1 = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const mockLogSystem2 = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const logger = new Logger(LogLevel.INFO, mockLogSystem1);
logger.info("Message 1");
expect(mockLogSystem1.info).toHaveBeenCalledWith("Message 1");
expect(mockLogSystem2.info).not.toHaveBeenCalled();
logger.setLogSystem(mockLogSystem2);
logger.info("Message 2");
expect(mockLogSystem2.info).toHaveBeenCalledWith("Message 2");
expect(mockLogSystem1.info).toHaveBeenCalledTimes(1);
});
});
describe("Default Console Log System", () => {
test("should use console methods by default", () => {
const consoleSpy = {
debug: vi.spyOn(console, "debug").mockImplementation(() => {}),
info: vi.spyOn(console, "info").mockImplementation(() => {}),
warn: vi.spyOn(console, "warn").mockImplementation(() => {}),
error: vi.spyOn(console, "error").mockImplementation(() => {}),
};
const logger = new Logger();
logger.setLevel(LogLevel.DEBUG);
const testMessage = "Test message";
const testArgs = [{ data: "test" }, 123];
logger.debug(testMessage, ...testArgs);
logger.info(testMessage, ...testArgs);
logger.warn(testMessage, ...testArgs);
logger.error(testMessage, ...testArgs);
expect(consoleSpy.debug).toHaveBeenCalledWith(testMessage, ...testArgs);
expect(consoleSpy.info).toHaveBeenCalledWith(testMessage, ...testArgs);
expect(consoleSpy.warn).toHaveBeenCalledWith(testMessage, ...testArgs);
expect(consoleSpy.error).toHaveBeenCalledWith(testMessage, ...testArgs);
// Cleanup
Object.values(consoleSpy).forEach((spy) => spy.mockRestore());
});
});
describe("Log Level NONE", () => {
test("should not log anything when level is NONE", () => {
const mockLogSystem = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const logger = new Logger(LogLevel.NONE, mockLogSystem);
logger.debug("Debug message");
logger.info("Info message");
logger.warn("Warning message");
logger.error("Error message");
expect(mockLogSystem.debug).not.toHaveBeenCalled();
expect(mockLogSystem.info).not.toHaveBeenCalled();
expect(mockLogSystem.warn).not.toHaveBeenCalled();
expect(mockLogSystem.error).not.toHaveBeenCalled();
});
});
});

View File

@@ -13,11 +13,29 @@ class BaseWidget extends CoMap {
type = co.string;
}
class ButtonWidget extends BaseWidget {
class RedButtonWidget extends BaseWidget {
type = co.literal("button");
color = co.literal("red");
label = co.string;
}
class BlueButtonWidget extends BaseWidget {
type = co.literal("button");
color = co.literal("blue");
label = co.string;
}
const ButtonWidget = SchemaUnion.Of<BaseWidget>((raw) => {
switch (raw.get("color")) {
case "red":
return RedButtonWidget;
case "blue":
return BlueButtonWidget;
default:
throw new Error(`Unknown button color: ${raw.get("color")}`);
}
});
class SliderWidget extends BaseWidget {
type = co.literal("slider");
min = co.number;
@@ -57,8 +75,8 @@ describe("SchemaUnion", () => {
});
it("should instantiate the correct subclass based on schema and provided data", async () => {
const buttonWidget = ButtonWidget.create(
{ type: "button", label: "Submit" },
const buttonWidget = RedButtonWidget.create(
{ type: "button", color: "red", label: "Submit" },
{ owner: me },
);
const sliderWidget = SliderWidget.create(
@@ -89,14 +107,14 @@ describe("SchemaUnion", () => {
{},
);
expect(loadedButtonWidget).toBeInstanceOf(ButtonWidget);
expect(loadedButtonWidget).toBeInstanceOf(RedButtonWidget);
expect(loadedSliderWidget).toBeInstanceOf(SliderWidget);
expect(loadedCheckboxWidget).toBeInstanceOf(CheckboxWidget);
});
it("should integrate with subscribeToCoValue correctly", async () => {
const buttonWidget = ButtonWidget.create(
{ type: "button", label: "Submit" },
const buttonWidget = BlueButtonWidget.create(
{ type: "button", color: "blue", label: "Submit" },
{ owner: me },
);
let currentValue = "Submit";
@@ -106,7 +124,7 @@ describe("SchemaUnion", () => {
me,
{},
(value: BaseWidget) => {
if (value instanceof ButtonWidget) {
if (value instanceof BlueButtonWidget) {
expect(value.label).toBe(currentValue);
} else {
throw new Error("Unexpected widget type");