Compare commits

...

1 Commits

Author SHA1 Message Date
Guido D'Orsi
bc33e98dc4 Revert "perf: optimize queue processing under heavy load" 2025-01-21 16:15:15 +01:00
3 changed files with 14 additions and 70 deletions

View File

@@ -93,7 +93,7 @@ export class PeerState {
this.processing = true;
let entry: QueueEntry<SyncMessage> | undefined;
let entry: QueueEntry | undefined;
while ((entry = this.queue.pull())) {
// Awaiting the push to send one message at a time
// This way when the peer is "under pressure" we can enqueue all
@@ -130,7 +130,7 @@ export class PeerState {
}
private closeQueue() {
let entry: QueueEntry<SyncMessage> | undefined;
let entry: QueueEntry | undefined;
while ((entry = this.queue.pull())) {
// Using resolve here to avoid unnecessary noise in the logs
entry.resolve();

View File

@@ -18,12 +18,11 @@ function promiseWithResolvers<R>() {
};
}
export type QueueEntry<V> = {
msg: V;
export type QueueEntry = {
msg: SyncMessage;
promise: Promise<void>;
resolve: () => void;
reject: (_: unknown) => void;
next: QueueEntry<V> | undefined;
};
/**
@@ -34,68 +33,10 @@ type Tuple<T, N extends number, A extends unknown[] = []> = A extends {
}
? A
: Tuple<T, N, [...A, T]>;
type QueueTuple = Tuple<Queue<SyncMessage>, 8>;
class Queue<V> {
head: QueueEntry<V> | undefined = undefined;
tail: QueueEntry<V> | undefined = undefined;
push(msg: V) {
const { promise, resolve, reject } = promiseWithResolvers<void>();
const entry: QueueEntry<V> = {
msg,
promise,
resolve,
reject,
next: undefined,
};
if (this.head === undefined) {
this.head = entry;
} else {
if (this.tail === undefined) {
throw new Error("Tail is null but head is not");
}
this.tail.next = entry;
}
this.tail = entry;
return entry;
}
pull() {
const entry = this.head;
if (entry) {
this.head = entry.next;
}
if (this.head === undefined) {
this.tail = undefined;
}
return entry;
}
isNonEmpty() {
return this.head !== undefined;
}
}
type QueueTuple = Tuple<QueueEntry[], 8>;
export class PriorityBasedMessageQueue {
private queues: QueueTuple = [
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
new Queue(),
];
private queues: QueueTuple = [[], [], [], [], [], [], [], []];
queueSizeCounter = metrics
.getMeter("cojson")
.createUpDownCounter("jazz.messagequeue.size", {
@@ -111,19 +52,22 @@ export class PriorityBasedMessageQueue {
constructor(private defaultPriority: CoValuePriority) {}
public push(msg: SyncMessage) {
const { promise, resolve, reject } = promiseWithResolvers<void>();
const entry: QueueEntry = { msg, promise, resolve, reject };
const priority = "priority" in msg ? msg.priority : this.defaultPriority;
const entry = this.getQueue(priority).push(msg);
this.getQueue(priority).push(entry);
this.queueSizeCounter.add(1, {
priority,
});
return entry.promise;
return promise;
}
public pull() {
const priority = this.queues.findIndex((queue) => queue.isNonEmpty());
const priority = this.queues.findIndex((queue) => queue.length > 0);
if (priority === -1) {
return;
@@ -133,6 +77,6 @@ export class PriorityBasedMessageQueue {
priority,
});
return this.queues[priority]?.pull();
return this.queues[priority]?.shift();
}
}

View File

@@ -22,7 +22,7 @@ describe("PriorityBasedMessageQueue", () => {
const { queue } = setup();
expect(queue["defaultPriority"]).toBe(CO_VALUE_PRIORITY.MEDIUM);
expect(queue["queues"].length).toBe(8);
expect(queue["queues"].every((q) => !q.isNonEmpty())).toBe(true);
expect(queue["queues"].every((q) => q.length === 0)).toBe(true);
});
test("should push message with default priority", async () => {