Compare commits

...

8 Commits

Author SHA1 Message Date
Giordano Ricci
c1193c3c63 add explainer for counters restarts 2025-04-08 11:43:48 +01:00
Giordano Ricci
f30130b92f reduce possible priority values 2025-04-08 11:43:48 +01:00
Giordano Ricci
1a77233ecb force metric creation on queue creation 2025-04-08 11:43:48 +01:00
Giordano Ricci
09a95b8542 move queue initialization into constructor 2025-04-08 11:43:48 +01:00
Giordano Ricci
f46329ac68 move meter 2025-04-08 11:43:48 +01:00
Giordano Ricci
c551839179 extract queue meter logic, add tests 2025-04-08 11:43:47 +01:00
Giordano Ricci
a0683f9d21 add role attribute to queue metrics 2025-04-08 11:43:47 +01:00
Giordano Ricci
a56958c69e WIP: queue counters 2025-04-08 11:43:47 +01:00
6 changed files with 229 additions and 87 deletions

View File

@@ -0,0 +1,5 @@
---
"cojson": patch
---
Add jazz.messagequeue.pushed/pulled counters, remove jazz.messagequeue.size gauge

View File

@@ -126,7 +126,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);
@@ -214,7 +214,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);
@@ -243,7 +243,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);
@@ -269,7 +269,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
const stream: SyncMessage[] = [];
@@ -316,7 +316,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
const stream: SyncMessage[] = [];
@@ -365,7 +365,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);
@@ -411,7 +411,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);
@@ -450,7 +450,7 @@ describe("createWebSocketPeer", () => {
action: "content",
id: "co_zlow",
new: {},
priority: 1,
priority: 6,
};
void peer.outgoing.push(message1);

View File

@@ -10,10 +10,21 @@ import { CO_VALUE_PRIORITY } from "./priority.js";
import { Peer, SyncMessage } from "./sync.js";
export class PeerState {
private queue: PriorityBasedMessageQueue;
constructor(
private peer: Peer,
knownStates: PeerKnownStates | undefined,
) {
/**
* We set as default priority HIGH to handle all the messages without a
* priority property as HIGH priority.
*
* This way we consider all the non-content messsages as HIGH priority.
*/
this.queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.HIGH, {
peerRole: peer.role,
});
this.optimisticKnownStates = knownStates?.clone() ?? new PeerKnownStates();
// We assume that exchanges with storage peers are always successful
@@ -76,13 +87,6 @@ export class PeerState {
return this.peer.role === "server" || this.peer.role === "storage";
}
/**
* We set as default priority HIGH to handle all the messages without a
* priority property as HIGH priority.
*
* This way we consider all the non-content messsages as HIGH priority.
*/
private queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.HIGH);
private processing = false;
public closed = false;

View File

@@ -1,5 +1,5 @@
import { ValueType, metrics } from "@opentelemetry/api";
import type { CoValuePriority } from "./priority.js";
import { Counter, ValueType, metrics } from "@opentelemetry/api";
import { CO_VALUE_PRIORITY, type CoValuePriority } from "./priority.js";
import type { SyncMessage } from "./sync.js";
function promiseWithResolvers<R>() {
@@ -34,7 +34,7 @@ type Tuple<T, N extends number, A extends unknown[] = []> = A extends {
? A
: Tuple<T, N, [...A, T]>;
type QueueTuple = Tuple<LinkedList<QueueEntry>, 8>;
type QueueTuple = Tuple<LinkedList<QueueEntry>, 3>;
type LinkedListNode<T> = {
value: T;
@@ -46,6 +46,8 @@ type LinkedListNode<T> = {
* as our queues can grow very large when the system is under pressure.
*/
export class LinkedList<T> {
constructor(private meter?: QueueMeter) {}
head: LinkedListNode<T> | undefined = undefined;
tail: LinkedListNode<T> | undefined = undefined;
length = 0;
@@ -64,6 +66,7 @@ export class LinkedList<T> {
}
this.length++;
this.meter?.push();
}
shift() {
@@ -82,34 +85,83 @@ export class LinkedList<T> {
this.length--;
this.meter?.pull();
return value;
}
}
export class PriorityBasedMessageQueue {
private queues: QueueTuple = [
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
new LinkedList<QueueEntry>(),
];
queueSizeCounter = metrics
.getMeter("cojson")
.createUpDownCounter("jazz.messagequeue.size", {
description: "Size of the message queue",
valueType: ValueType.INT,
unit: "entry",
});
class QueueMeter {
private pullCounter: Counter;
private pushCounter: Counter;
private getQueue(priority: CoValuePriority) {
return this.queues[priority];
constructor(
prefix: string,
private attrs?: Record<string, string | number>,
) {
this.pullCounter = metrics
.getMeter("cojosn")
.createCounter(`${prefix}.pulled`, {
description: "Number of messages pulled from the queue",
valueType: ValueType.INT,
unit: "1",
});
this.pushCounter = metrics
.getMeter("cojosn")
.createCounter(`${prefix}.pushed`, {
description: "Number of messages pushed to the queue",
valueType: ValueType.INT,
unit: "1",
});
/**
* This makes sure that those metrics are generated (and emitted) as soon as the queue is created.
* This is to avoid edge cases where one series reset is delayed, which would cause spikes or dips
* when queried - and it also more correctly represents the actual state of the queue after a restart.
*/
this.pullCounter.add(0, this.attrs);
this.pushCounter.add(0, this.attrs);
}
constructor(private defaultPriority: CoValuePriority) {}
public pull() {
this.pullCounter.add(1, this.attrs);
}
public push() {
this.pushCounter.add(1, this.attrs);
}
}
function meteredList<T>(attrs?: Record<string, string | number>) {
return new LinkedList<T>(new QueueMeter("jazz.messagequeue", attrs));
}
const PRIORITY_TO_QUEUE_INDEX = {
[CO_VALUE_PRIORITY.HIGH]: 0,
[CO_VALUE_PRIORITY.MEDIUM]: 1,
[CO_VALUE_PRIORITY.LOW]: 2,
} as const;
export class PriorityBasedMessageQueue {
private queues: QueueTuple;
constructor(
private defaultPriority: CoValuePriority,
/**
* Optional attributes to be added to the generated metrics.
* By default the metrics will have the priority as an attribute.
*/
attrs?: Record<string, string | number>,
) {
this.queues = [
meteredList({ priority: CO_VALUE_PRIORITY.HIGH, ...attrs }),
meteredList({ priority: CO_VALUE_PRIORITY.MEDIUM, ...attrs }),
meteredList({ priority: CO_VALUE_PRIORITY.LOW, ...attrs }),
];
}
private getQueue(priority: CoValuePriority) {
return this.queues[PRIORITY_TO_QUEUE_INDEX[priority]];
}
public push(msg: SyncMessage) {
const { promise, resolve, reject } = promiseWithResolvers<void>();
@@ -119,24 +171,12 @@ export class PriorityBasedMessageQueue {
this.getQueue(priority).push(entry);
this.queueSizeCounter.add(1, {
priority,
});
return promise;
}
public pull() {
const priority = this.queues.findIndex((queue) => queue.length > 0);
if (priority === -1) {
return;
}
this.queueSizeCounter.add(-1, {
priority,
});
return this.queues[priority]?.shift();
}
}

View File

@@ -7,7 +7,7 @@ import { type CoValueHeader } from "./coValueCore.js";
* The priority value is handled as weight in the weighed round robin algorithm
* used to determine the order in which messages are sent.
*
* Follows the HTTP urgency range and order:
* Loosely follows the HTTP urgency range and order, but limited to 3 values:
* - https://www.rfc-editor.org/rfc/rfc9218.html#name-urgency
*/
export const CO_VALUE_PRIORITY = {
@@ -16,7 +16,7 @@ export const CO_VALUE_PRIORITY = {
LOW: 6,
} as const;
export type CoValuePriority = 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7;
export type CoValuePriority = 0 | 3 | 6;
export function getPriorityFromHeader(
header: CoValueHeader | undefined | boolean,

View File

@@ -7,9 +7,9 @@ import {
tearDownTestMetricReader,
} from "./testUtils.js";
function setup() {
function setup(attrs?: Record<string, string | number>) {
const metricReader = createTestMetricReader();
const queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.MEDIUM);
const queue = new PriorityBasedMessageQueue(CO_VALUE_PRIORITY.MEDIUM, attrs);
return { queue, metricReader };
}
@@ -18,10 +18,133 @@ describe("PriorityBasedMessageQueue", () => {
tearDownTestMetricReader();
});
describe("meteredQueue", () => {
test("should corretly count pushes", async () => {
const { queue, metricReader } = setup();
const message: SyncMessage = {
action: "load",
id: "co_ztest-id",
header: false,
sessions: {},
};
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(0);
void queue.push(message);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(1);
void queue.push(message);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(2);
});
test("should corretly count pulls", async () => {
const { queue, metricReader } = setup();
const message: SyncMessage = {
action: "load",
id: "co_ztest-id",
header: false,
sessions: {},
};
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(0);
void queue.push(message);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(0);
void queue.pull();
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(1);
// We only have one item in the queue, so this should not change the metric value
void queue.pull();
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
}),
).toBe(1);
});
test("should corretly set custom attributes to the metrics", async () => {
const { queue, metricReader } = setup({ role: "server" });
const message: SyncMessage = {
action: "load",
id: "co_ztest-id",
header: false,
sessions: {},
};
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "server",
}),
).toBe(0);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "client",
}),
).toBeUndefined();
void queue.push(message);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "server",
}),
).toBe(1);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "server",
}),
).toBe(0);
void queue.pull();
expect(
await metricReader.getMetricValue("jazz.messagequeue.pushed", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "server",
}),
).toBe(1);
expect(
await metricReader.getMetricValue("jazz.messagequeue.pulled", {
priority: CO_VALUE_PRIORITY.MEDIUM,
role: "server",
}),
).toBe(1);
});
});
test("should initialize with correct properties", () => {
const { queue } = setup();
expect(queue["defaultPriority"]).toBe(CO_VALUE_PRIORITY.MEDIUM);
expect(queue["queues"].length).toBe(8);
expect(queue["queues"].length).toBe(3);
expect(queue["queues"].every((q) => !q.length)).toBe(true);
});
@@ -52,7 +175,7 @@ describe("PriorityBasedMessageQueue", () => {
});
test("should pull messages in priority order", async () => {
const { queue, metricReader } = setup();
const { queue } = setup();
const lowPriorityMsg: SyncMessage = {
action: "content",
id: "co_zlow",
@@ -73,42 +196,12 @@ describe("PriorityBasedMessageQueue", () => {
};
void queue.push(lowPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: lowPriorityMsg.priority,
}),
).toBe(1);
void queue.push(mediumPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: mediumPriorityMsg.priority,
}),
).toBe(1);
void queue.push(highPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: highPriorityMsg.priority,
}),
).toBe(1);
expect(queue.pull()?.msg).toEqual(highPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: highPriorityMsg.priority,
}),
).toBe(0);
expect(queue.pull()?.msg).toEqual(mediumPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: mediumPriorityMsg.priority,
}),
).toBe(0);
expect(queue.pull()?.msg).toEqual(lowPriorityMsg);
expect(
await metricReader.getMetricValue("jazz.messagequeue.size", {
priority: lowPriorityMsg.priority,
}),
).toBe(0);
});
test("should return undefined when pulling from empty queue", () => {