Skip to content

Commit 88326b3

Browse files
committed
fix(v3): prevent saturated queues from dominating dequeue attempts by adding a cooloff period of successive failed dequeues
1 parent 5e4756f commit 88326b3

File tree

1 file changed

+51
-2
lines changed

1 file changed

+51
-2
lines changed

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ import {
4343
MARQS_RETRY_PRIORITY_TIMESTAMP_OFFSET,
4444
MARQS_SCHEDULED_REQUEUE_AVAILABLE_AT_THRESHOLD_IN_MS,
4545
} from "./constants.server";
46-
import { setInterval } from "node:timers/promises";
46+
import { setInterval as setIntervalAsync } from "node:timers/promises";
4747
import { tryCatch } from "@trigger.dev/core/utils";
4848
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
4949
import z from "zod";
@@ -78,6 +78,8 @@ export type MarQSOptions = {
7878
subscriber?: MessageQueueSubscriber;
7979
sharedWorkerQueueConsumerIntervalMs?: number;
8080
sharedWorkerQueueMaxMessageCount?: number;
81+
sharedWorkerQueueCooloffPeriodMs?: number;
82+
sharedWorkerQueueCooloffCountThreshold?: number;
8183
eagerDequeuingEnabled?: boolean;
8284
workerOptions: {
8385
pollIntervalMs?: number;
@@ -107,6 +109,9 @@ export class MarQS {
107109
public keys: MarQSKeyProducer;
108110
#rebalanceWorkers: Array<AsyncWorker> = [];
109111
private worker: Worker<typeof workerCatalog>;
112+
private queueDequeueCooloffPeriod: Map<string, number> = new Map();
113+
private queueDequeueCooloffCounts: Map<string, number> = new Map();
114+
private clearCooloffPeriodInterval: NodeJS.Timeout;
110115

111116
constructor(private readonly options: MarQSOptions) {
112117
this.redis = options.redis;
@@ -116,6 +121,12 @@ export class MarQS {
116121
this.#startRebalanceWorkers();
117122
this.#registerCommands();
118123

124+
// This will prevent these cooloff maps from growing indefinitely
125+
this.clearCooloffPeriodInterval = setInterval(() => {
126+
this.queueDequeueCooloffCounts.clear();
127+
this.queueDequeueCooloffPeriod.clear();
128+
}, 60_000 * 10); // 10 minutes
129+
119130
this.worker = new Worker({
120131
name: "marqs-worker",
121132
redisOptions: options.workerOptions.redisOptions,
@@ -737,7 +748,7 @@ export class MarQS {
737748
let processedCount = 0;
738749

739750
try {
740-
for await (const _ of setInterval(
751+
for await (const _ of setIntervalAsync(
741752
this.options.sharedWorkerQueueConsumerIntervalMs ?? 500,
742753
null,
743754
{
@@ -821,6 +832,7 @@ export class MarQS {
821832
let attemptedEnvs = 0;
822833
let attemptedQueues = 0;
823834
let messageCount = 0;
835+
let coolOffPeriodCount = 0;
824836

825837
// Try each queue in order, attempt to dequeue a message from each queue, keep going until we've tried all the queues
826838
for (const env of envQueues) {
@@ -829,6 +841,20 @@ export class MarQS {
829841
for (const messageQueue of env.queues) {
830842
attemptedQueues++;
831843

844+
const cooloffPeriod = this.queueDequeueCooloffPeriod.get(messageQueue);
845+
846+
// If the queue is in a cooloff period, skip attempting to dequeue from it
847+
if (cooloffPeriod) {
848+
// If the cooloff period is still active, skip attempting to dequeue from it
849+
if (cooloffPeriod > Date.now()) {
850+
coolOffPeriodCount++;
851+
continue;
852+
} else {
853+
// If the cooloff period is over, delete the cooloff period and attempt to dequeue from the queue
854+
this.queueDequeueCooloffPeriod.delete(messageQueue);
855+
}
856+
}
857+
832858
await this.#trace(
833859
"attemptDequeue",
834860
async (attemptDequeueSpan) => {
@@ -862,10 +888,32 @@ export class MarQS {
862888
);
863889

864890
if (!messages || messages.length === 0) {
891+
const cooloffCount = this.queueDequeueCooloffCounts.get(messageQueue) ?? 0;
892+
893+
const cooloffCountThreshold = Math.max(
894+
10,
895+
this.options.sharedWorkerQueueCooloffCountThreshold ?? 10
896+
); // minimum of 10
897+
898+
if (cooloffCount >= cooloffCountThreshold) {
899+
// If no messages were dequeued, set a cooloff period for the queue
900+
// This is to prevent the queue from being dequeued too frequently
901+
// and to give other queues a chance to dequeue messages more frequently
902+
this.queueDequeueCooloffPeriod.set(
903+
messageQueue,
904+
Date.now() + (this.options.sharedWorkerQueueCooloffPeriodMs ?? 10_000) // defaults to 10 seconds
905+
);
906+
this.queueDequeueCooloffCounts.delete(messageQueue);
907+
} else {
908+
this.queueDequeueCooloffCounts.set(messageQueue, cooloffCount + 1);
909+
}
910+
865911
attemptDequeueSpan.setAttribute("message_count", 0);
866912
return null; // Try next queue if no message was dequeued
867913
}
868914

915+
this.queueDequeueCooloffCounts.delete(messageQueue);
916+
869917
messageCount += messages.length;
870918

871919
attemptDequeueSpan.setAttribute("message_count", messages.length);
@@ -916,6 +964,7 @@ export class MarQS {
916964
span.setAttribute("attempted_queues", attemptedQueues);
917965
span.setAttribute("attempted_envs", attemptedEnvs);
918966
span.setAttribute("message_count", messageCount);
967+
span.setAttribute("cooloff_period_count", coolOffPeriodCount);
919968

920969
return;
921970
},

0 commit comments

Comments
 (0)