Skip to content

Commit 5e4756f

Browse files
authored
fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport) (#2438)
* fix(v3): eagerly dequeue messages from a queue when that queue is added to or removed from (v4 backport) * fixed configuration
1 parent c16cc57 commit 5e4756f

File tree

2 files changed

+161
-0
lines changed

2 files changed

+161
-0
lines changed

apps/webapp/app/env.server.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,15 @@ const EnvironmentSchema = z.object({
413413
MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS: z.coerce.number().int().default(250),
414414
MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT: z.coerce.number().int().default(10),
415415

416+
MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED: z.string().default("0"),
417+
MARQS_WORKER_ENABLED: z.string().default("0"),
418+
MARQS_WORKER_COUNT: z.coerce.number().int().default(2),
419+
MARQS_WORKER_CONCURRENCY_LIMIT: z.coerce.number().int().default(50),
420+
MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER: z.coerce.number().int().default(5),
421+
MARQS_WORKER_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
422+
MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS: z.coerce.number().int().default(100),
423+
MARQS_WORKER_SHUTDOWN_TIMEOUT_MS: z.coerce.number().int().default(60_000),
424+
416425
PROD_TASK_HEARTBEAT_INTERVAL_MS: z.coerce.number().int().optional(),
417426

418427
VERBOSE_GRAPHILE_LOGGING: z.string().default("false"),

apps/webapp/app/v3/marqs/index.server.ts

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import {
88
trace,
99
Tracer,
1010
} from "@opentelemetry/api";
11+
import { type RedisOptions } from "@internal/redis";
1112
import {
1213
SEMATTRS_MESSAGE_ID,
1314
SEMATTRS_MESSAGING_SYSTEM,
@@ -44,6 +45,9 @@ import {
4445
} from "./constants.server";
4546
import { setInterval } from "node:timers/promises";
4647
import { tryCatch } from "@trigger.dev/core/utils";
48+
import { Worker, type WorkerConcurrencyOptions } from "@trigger.dev/redis-worker";
49+
import z from "zod";
50+
import { Logger } from "@trigger.dev/core/logger";
4751

4852
const KEY_PREFIX = "marqs:";
4953

@@ -74,6 +78,25 @@ export type MarQSOptions = {
7478
subscriber?: MessageQueueSubscriber;
7579
sharedWorkerQueueConsumerIntervalMs?: number;
7680
sharedWorkerQueueMaxMessageCount?: number;
81+
eagerDequeuingEnabled?: boolean;
82+
workerOptions: {
83+
pollIntervalMs?: number;
84+
immediatePollIntervalMs?: number;
85+
shutdownTimeoutMs?: number;
86+
concurrency?: WorkerConcurrencyOptions;
87+
enabled?: boolean;
88+
redisOptions: RedisOptions;
89+
};
90+
};
91+
92+
const workerCatalog = {
93+
processQueueForWorkerQueue: {
94+
schema: z.object({
95+
queueKey: z.string(),
96+
parentQueueKey: z.string(),
97+
}),
98+
visibilityTimeoutMs: 30_000,
99+
},
77100
};
78101

79102
/**
@@ -83,6 +106,7 @@ export class MarQS {
83106
private redis: Redis;
84107
public keys: MarQSKeyProducer;
85108
#rebalanceWorkers: Array<AsyncWorker> = [];
109+
private worker: Worker<typeof workerCatalog>;
86110

87111
constructor(private readonly options: MarQSOptions) {
88112
this.redis = options.redis;
@@ -91,6 +115,26 @@ export class MarQS {
91115

92116
this.#startRebalanceWorkers();
93117
this.#registerCommands();
118+
119+
this.worker = new Worker({
120+
name: "marqs-worker",
121+
redisOptions: options.workerOptions.redisOptions,
122+
catalog: workerCatalog,
123+
concurrency: options.workerOptions?.concurrency,
124+
pollIntervalMs: options.workerOptions?.pollIntervalMs ?? 1000,
125+
immediatePollIntervalMs: options.workerOptions?.immediatePollIntervalMs ?? 100,
126+
shutdownTimeoutMs: options.workerOptions?.shutdownTimeoutMs ?? 10_000,
127+
logger: new Logger("MarQSWorker", "info"),
128+
jobs: {
129+
processQueueForWorkerQueue: async (job) => {
130+
await this.#processQueueForWorkerQueue(job.payload.queueKey, job.payload.parentQueueKey);
131+
},
132+
},
133+
});
134+
135+
if (options.workerOptions?.enabled) {
136+
this.worker.start();
137+
}
94138
}
95139

96140
get name() {
@@ -280,6 +324,21 @@ export class MarQS {
280324
span.setAttribute("reserve_recursive_queue", reserve.recursiveQueue);
281325
}
282326

327+
if (env.type !== "DEVELOPMENT" && this.options.eagerDequeuingEnabled) {
328+
// This will move the message to the worker queue so it can be dequeued
329+
await this.worker.enqueueOnce({
330+
id: messageQueue, // dedupe by environment, queue, and concurrency key
331+
job: "processQueueForWorkerQueue",
332+
payload: {
333+
queueKey: messageQueue,
334+
parentQueueKey: parentQueue,
335+
},
336+
// Add a small delay to dedupe messages so at most one of these will processed,
337+
// every 500ms per queue, concurrency key, and environment
338+
availableAt: new Date(Date.now() + 500), // 500ms from now
339+
});
340+
}
341+
283342
const result = await this.#callEnqueueMessage(messagePayload, reserve);
284343

285344
if (result) {
@@ -870,6 +929,64 @@ export class MarQS {
870929
);
871930
}
872931

932+
async #processQueueForWorkerQueue(queueKey: string, parentQueueKey: string) {
933+
return this.#trace("processQueueForWorkerQueue", async (span) => {
934+
span.setAttributes({
935+
[SemanticAttributes.QUEUE]: queueKey,
936+
[SemanticAttributes.PARENT_QUEUE]: parentQueueKey,
937+
});
938+
939+
const maxCount = this.options.sharedWorkerQueueMaxMessageCount ?? 10;
940+
941+
const dequeuedMessages = await this.#callDequeueMessages({
942+
messageQueue: queueKey,
943+
parentQueue: parentQueueKey,
944+
maxCount,
945+
});
946+
947+
if (!dequeuedMessages || dequeuedMessages.length === 0) {
948+
return;
949+
}
950+
951+
await this.#trace(
952+
"addToWorkerQueue",
953+
async (addToWorkerQueueSpan) => {
954+
const workerQueueKey = this.keys.sharedWorkerQueueKey();
955+
956+
addToWorkerQueueSpan.setAttributes({
957+
message_count: dequeuedMessages.length,
958+
[SemanticAttributes.PARENT_QUEUE]: workerQueueKey,
959+
});
960+
961+
await this.redis.rpush(
962+
workerQueueKey,
963+
...dequeuedMessages.map((message) => message.messageId)
964+
);
965+
},
966+
{
967+
kind: SpanKind.INTERNAL,
968+
attributes: {
969+
[SEMATTRS_MESSAGING_OPERATION]: "receive",
970+
[SEMATTRS_MESSAGING_SYSTEM]: "marqs",
971+
},
972+
}
973+
);
974+
975+
// If we dequeued the max count, we need to enqueue another job to dequeue the next batch
976+
if (dequeuedMessages.length === maxCount) {
977+
await this.worker.enqueueOnce({
978+
id: queueKey,
979+
job: "processQueueForWorkerQueue",
980+
payload: {
981+
queueKey,
982+
parentQueueKey,
983+
},
984+
availableAt: new Date(Date.now() + 500), // 500ms from now
985+
});
986+
}
987+
});
988+
}
989+
873990
public async acknowledgeMessage(messageId: string, reason: string = "unknown") {
874991
return this.#trace(
875992
"acknowledgeMessage",
@@ -901,6 +1018,20 @@ export class MarQS {
9011018
messageId,
9021019
});
9031020

1021+
const sharedQueueKey = this.keys.sharedQueueKey();
1022+
1023+
if (this.options.eagerDequeuingEnabled && message.parentQueue === sharedQueueKey) {
1024+
await this.worker.enqueueOnce({
1025+
id: message.queue,
1026+
job: "processQueueForWorkerQueue",
1027+
payload: {
1028+
queueKey: message.queue,
1029+
parentQueueKey: message.parentQueue,
1030+
},
1031+
availableAt: new Date(Date.now() + 500), // 500ms from now
1032+
});
1033+
}
1034+
9041035
await this.options.subscriber?.messageAcked(message);
9051036
},
9061037
{
@@ -2482,5 +2613,26 @@ function getMarQSClient() {
24822613
subscriber: concurrencyTracker,
24832614
sharedWorkerQueueConsumerIntervalMs: env.MARQS_SHARED_WORKER_QUEUE_CONSUMER_INTERVAL_MS,
24842615
sharedWorkerQueueMaxMessageCount: env.MARQS_SHARED_WORKER_QUEUE_MAX_MESSAGE_COUNT,
2616+
eagerDequeuingEnabled: env.MARQS_SHARED_WORKER_QUEUE_EAGER_DEQUEUE_ENABLED === "1",
2617+
workerOptions: {
2618+
enabled: env.MARQS_WORKER_ENABLED === "1",
2619+
pollIntervalMs: env.MARQS_WORKER_POLL_INTERVAL_MS,
2620+
immediatePollIntervalMs: env.MARQS_WORKER_IMMEDIATE_POLL_INTERVAL_MS,
2621+
shutdownTimeoutMs: env.MARQS_WORKER_SHUTDOWN_TIMEOUT_MS,
2622+
concurrency: {
2623+
workers: env.MARQS_WORKER_COUNT,
2624+
tasksPerWorker: env.MARQS_WORKER_CONCURRENCY_TASKS_PER_WORKER,
2625+
limit: env.MARQS_WORKER_CONCURRENCY_LIMIT,
2626+
},
2627+
redisOptions: {
2628+
keyPrefix: KEY_PREFIX,
2629+
port: env.REDIS_PORT ?? undefined,
2630+
host: env.REDIS_HOST ?? undefined,
2631+
username: env.REDIS_USERNAME ?? undefined,
2632+
password: env.REDIS_PASSWORD ?? undefined,
2633+
enableAutoPipelining: true,
2634+
...(env.REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }),
2635+
},
2636+
},
24852637
});
24862638
}

0 commit comments

Comments
 (0)