Skip to content

Commit 09bed00

Browse files
authored
Prevent error on concurrent queue processing (#453)
1 parent 72cee91 commit 09bed00

File tree

2 files changed

+47
-1
lines changed

2 files changed

+47
-1
lines changed

src/channel/queuedChannel.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ export class QueuedChannel<T> implements OutputChannel<T> {
5858
this.logger.debug('Dequeuing message...');
5959
this.logger.debug(`Queue length: ${Math.max(0, this.queue.length() - 1)}`);
6060

61-
this.queue.shift();
61+
if (this.queue.length() === 0) {
62+
this.logger.debug('Queue unexpectedly empty, possibly due to concurrent modification.');
63+
} else {
64+
this.queue.shift();
65+
}
6266
}
6367

6468
private requeue(): Promise<void> {

test/channel/queuedChannel.test.ts

+42
Original file line numberDiff line numberDiff line change
@@ -341,4 +341,46 @@ describe('A queued channel', () => {
341341

342342
expect(outputChannel.close).toHaveBeenCalled();
343343
});
344+
345+
it('should not dequeue if the query is already empty', async () => {
346+
const outputChannel: OutputChannel<string> = {
347+
close: jest.fn().mockResolvedValue(undefined),
348+
publish: jest.fn().mockResolvedValue(undefined),
349+
};
350+
351+
const logger: Logger = {
352+
debug: jest.fn(),
353+
info: jest.fn(),
354+
warn: jest.fn(),
355+
error: jest.fn(),
356+
};
357+
358+
const queue = new class extends InMemoryQueue<any> {
359+
public push(): void {
360+
}
361+
}();
362+
363+
const dequeue = jest.spyOn(queue, 'shift').mockReturnValue(undefined);
364+
const push = jest.spyOn(queue, 'push').mockReturnValue(undefined);
365+
366+
const channel = new QueuedChannel(outputChannel, queue, logger);
367+
368+
await channel.publish('foo');
369+
370+
expect(outputChannel.publish).toHaveBeenCalledTimes(1);
371+
expect(push).toHaveBeenCalledTimes(1);
372+
373+
expect(dequeue).toHaveBeenCalledTimes(0);
374+
375+
expect(logger.debug).toHaveBeenCalledTimes(5);
376+
377+
expect(logger.debug).toHaveBeenNthCalledWith(1, 'Enqueueing message...');
378+
expect(logger.debug).toHaveBeenNthCalledWith(2, 'Queue length: 1');
379+
expect(logger.debug).toHaveBeenNthCalledWith(3, 'Dequeuing message...');
380+
expect(logger.debug).toHaveBeenNthCalledWith(4, 'Queue length: 0');
381+
expect(logger.debug).toHaveBeenNthCalledWith(
382+
5,
383+
'Queue unexpectedly empty, possibly due to concurrent modification.',
384+
);
385+
});
344386
});

0 commit comments

Comments
 (0)