Skip to content

Commit ccb2359

Browse files
committed
Give access to send errors for to-device messages (sendQueue)
1 parent 6346518 commit ccb2359

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed

src/ToDeviceMessageQueue.ts

+17-4
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,14 @@ export class ToDeviceMessageQueue {
5555
this.client.removeListener(ClientEvent.Sync, this.onResumedSync);
5656
}
5757

58-
public async queueBatch(batch: ToDeviceBatch): Promise<void> {
58+
/**
59+
* queues a batch of to-device messages for sending. The batch is split into
60+
* smaller batches of size MAX_BATCH_SIZE, and each batch is given a unique
61+
* transaction ID.
62+
* @param batch the total (not split) batch of to-device messages.
63+
* @param sendCallback a callback that is called once all batches are sent.
64+
*/
65+
public async queueBatch(batch: ToDeviceBatch, sendCallback?: (result: Error | undefined) => void): Promise<void> {
5966
const batches: ToDeviceBatchWithTxnId[] = [];
6067
for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) {
6168
const batchWithTxnId = {
@@ -74,10 +81,15 @@ export class ToDeviceMessageQueue {
7481
}
7582

7683
await this.client.store.saveToDeviceBatches(batches);
77-
this.sendQueue();
84+
this.sendQueue().then(sendCallback);
7885
}
7986

80-
public sendQueue = async (): Promise<void> => {
87+
/**
88+
* sends the queues to device messages currently saved in client.store.
89+
* @returns resolves to undefined if the queue was sent successfully, or an error if
90+
* the queue could not be sent.
91+
*/
92+
public sendQueue = async (): Promise<Error | undefined> => {
8193
if (this.retryTimeout !== null) clearTimeout(this.retryTimeout);
8294
this.retryTimeout = null;
8395

@@ -114,13 +126,14 @@ export class ToDeviceMessageQueue {
114126
} else {
115127
logger.info("Automatic retry limit reached for to-device messages.");
116128
}
117-
return;
129+
return Error("max to devices retries reached");
118130
}
119131

120132
logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e);
121133
this.retryTimeout = setTimeout(this.sendQueue, retryDelay);
122134
} finally {
123135
this.sending = false;
136+
return undefined;
124137
}
125138
};
126139

src/client.ts

+30-7
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,16 @@ import {
5252
type GroupCallEventHandlerEventHandlerMap,
5353
} from "./webrtc/groupCallEventHandler.ts";
5454
import * as utils from "./utils.ts";
55-
import { deepCompare, defer, noUnsafeEventProps, type QueryDict, replaceParam, safeSet, sleep } from "./utils.ts";
55+
import {
56+
deepCompare,
57+
defer,
58+
MapWithDefault,
59+
noUnsafeEventProps,
60+
type QueryDict,
61+
replaceParam,
62+
safeSet,
63+
sleep,
64+
} from "./utils.ts";
5665
import { Direction, EventTimeline } from "./models/event-timeline.ts";
5766
import { type IActionsObject, PushProcessor } from "./pushprocessor.ts";
5867
import { AutoDiscovery, type AutoDiscoveryAction } from "./autodiscovery.ts";
@@ -7948,7 +7957,8 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
79487957
* @param eventType - The type of event to send
79497958
* @param devices - The list of devices to send the event to.
79507959
* @param payload - The payload to send. This will be encrypted.
7951-
* @returns Promise which resolves once queued there is no error feedback when sending fails.
7960+
* @returns Promise which resolves once send there. Can be rejected with an error if sending fails
7961+
* Sending will retry automatically but there is a Max retries limit.
79527962
*/
79537963
public async encryptAndSendToDevice(
79547964
eventType: string,
@@ -7960,9 +7970,20 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
79607970
}
79617971
const batch = await this.cryptoBackend.encryptToDeviceMessages(eventType, devices, payload);
79627972

7963-
// TODO The batch mechanism removes all possibility to get error feedbacks..
7964-
// We might want instead to do the API call directly and pass the errors back.
7965-
await this.queueToDevice(batch);
7973+
const contentMap: MapWithDefault<string, Map<string, ToDevicePayload>> = new MapWithDefault(() => new Map());
7974+
for (const item of batch.batch) {
7975+
contentMap.getOrCreate(item.userId).set(item.deviceId, item.payload);
7976+
}
7977+
7978+
return new Promise<void>((resolve, reject) => {
7979+
this.queueToDevice(batch, (result) => {
7980+
if (result === undefined) {
7981+
resolve;
7982+
} else if (result) {
7983+
reject(result);
7984+
}
7985+
});
7986+
});
79667987
}
79677988

79687989
/**
@@ -7971,9 +7992,11 @@ export class MatrixClient extends TypedEventEmitter<EmittedEvents, ClientEventHa
79717992
* batches for sending and stored in the store so they can be retried
79727993
* later if they fail to send. Retries will happen automatically.
79737994
* @param batch - The to-device messages to send
7995+
* @param sendCallback - Optional callback to call when the batch is sent
7996+
* @returns Promise which resolves once the batch is queued.
79747997
*/
7975-
public queueToDevice(batch: ToDeviceBatch): Promise<void> {
7976-
return this.toDeviceMessageQueue.queueBatch(batch);
7998+
public queueToDevice(batch: ToDeviceBatch, sendCallback?: (result: Error | undefined) => void): Promise<void> {
7999+
return this.toDeviceMessageQueue.queueBatch(batch, sendCallback);
79778000
}
79788001

79798002
/**

0 commit comments

Comments
 (0)