From eab6692985e446ef6e6fc31722d33d25fc41927e Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Fri, 13 Dec 2024 12:30:58 +0100 Subject: [PATCH 1/6] Initial commit --- src/concurrency.ts | 117 ++++++++++++++ src/live_location_manager.ts | 294 +++++++++++++++++++++++++++++++++++ src/types.ts | 8 + 3 files changed, 419 insertions(+) create mode 100644 src/concurrency.ts create mode 100644 src/live_location_manager.ts diff --git a/src/concurrency.ts b/src/concurrency.ts new file mode 100644 index 0000000000..9c3c82689f --- /dev/null +++ b/src/concurrency.ts @@ -0,0 +1,117 @@ +interface PendingPromise { + onContinued: () => void; + promise: Promise; +} + +type AsyncWrapper

= ( + tag: string | symbol, + cb: (...args: P) => Promise, +) => { + cb: () => Promise; + onContinued: () => void; +}; + +/** + * Runs async functions serially. Useful for wrapping async actions that + * should never run simultaneously: if marked with the same tag, functions + * will run one after another. + * + * @param tag Async functions with the same tag will run serially. Async functions + * with different tags can run in parallel. + * @param cb Async function to run. + * @returns Promise that resolves when async functions returns. + */ +export const withoutConcurrency = createRunner(wrapWithContinuationTracking); + +/** + * Runs async functions serially, and cancels all other actions with the same tag + * when a new action is scheduled. Useful for wrapping async actions that override + * each other (e.g. enabling and disabling camera). + * + * If an async function hasn't started yet and was canceled, it will never run. + * If an async function is already running and was canceled, it will be notified + * via an abort signal passed as an argument. + * + * @param tag Async functions with the same tag will run serially and are canceled + * when a new action with the same tag is scheduled. + * @param cb Async function to run. Receives AbortSignal as the only argument. + * @returns Promise that resolves when async functions returns. If the function didn't + * start and was canceled, will resolve with 'canceled'. If the function started to run, + * it's up to the function to decide how to react to cancelation. + */ +export const withCancellation = createRunner(wrapWithCancellation); + +const pendingPromises = new Map(); + +export function hasPending(tag: string | symbol) { + return pendingPromises.has(tag); +} + +export async function settled(tag: string | symbol) { + await pendingPromises.get(tag)?.promise; +} + +/** + * Implements common functionality of running async functions serially, by chaining + * their promises one after another. + * + * Before running, async function is "wrapped" using the provided wrapper. This wrapper + * can add additional steps to run before or after the function. + * + * When async function is scheduled to run, the previous function is notified + * by calling the associated onContinued callback. This behavior of this callback + * is defined by the wrapper. + */ +function createRunner

(wrapper: AsyncWrapper) { + return function run(tag: string | symbol, cb: (...args: P) => Promise) { + const { cb: wrapped, onContinued } = wrapper(tag, cb); + const pending = pendingPromises.get(tag); + pending?.onContinued(); + const promise = pending ? pending.promise.then(wrapped, wrapped) : wrapped(); + pendingPromises.set(tag, { promise, onContinued }); + return promise; + }; +} + +/** + * Wraps an async function with an additional step run after the function: + * if the function is the last in the queue, it cleans up the whole chain + * of promises after finishing. + */ +function wrapWithContinuationTracking(tag: string | symbol, cb: () => Promise) { + let hasContinuation = false; + const wrapped = () => + cb().finally(() => { + if (!hasContinuation) { + pendingPromises.delete(tag); + } + }); + const onContinued = () => (hasContinuation = true); + return { cb: wrapped, onContinued }; +} + +/** + * Wraps an async function with additional functionalilty: + * 1. Associates an abort signal with every function, that is passed to it + * as an argument. When a new function is scheduled to run after the current + * one, current signal is aborted. + * 2. If current function didn't start and was aborted, in will never start. + * 3. If the function is the last in the queue, it cleans up the whole chain + * of promises after finishing. + */ +function wrapWithCancellation(tag: string | symbol, cb: (signal: AbortSignal) => Promise) { + const ac = new AbortController(); + const wrapped = () => { + if (ac.signal.aborted) { + return Promise.resolve('canceled' as const); + } + + return cb(ac.signal).finally(() => { + if (!ac.signal.aborted) { + pendingPromises.delete(tag); + } + }); + }; + const onContinued = () => ac.abort(); + return { cb: wrapped, onContinued }; +} diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts new file mode 100644 index 0000000000..c7f284ee1c --- /dev/null +++ b/src/live_location_manager.ts @@ -0,0 +1,294 @@ +/** + * RULES: + * + * 1. one loc-sharing message per channel per user + * 2. mandatory geolocation_eol (maxnow + 24h max), which should be unchangeable by anyone (set once) + * 3. serialized object must be stored + * 4. live location is per-device, no other device which did not store the message locally, should be updating the live location attachment + */ + +import { withCancellation } from './concurrency'; +import { StateStore } from './store'; +import type { MessageResponse, Attachment, EventTypes } from './types'; +import type { StreamChat } from './client'; +import type { Unsubscribe } from './store'; + +// type Unsubscribe = () => void; +type WatchLocation = (handler: (value: { latitude: number; longitude: number }) => void) => Unsubscribe; +type LiveLocationManagerState = { + ready: boolean; + targetMessages: MessageResponse[]; +}; + +// LLS - live location sharing +function isAttachmentValidLLSEntity(attachment?: Attachment) { + if (!attachment || typeof attachment.end_time !== 'string' || attachment.stopped_sharing) return false; + + const endTimeTimestamp = new Date(attachment.end_time).getTime(); + + if (Number.isNaN(endTimeTimestamp)) return false; + + const nowTimestamp = Date.now(); + + return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp; +} + +class LiveLocationManager { + private client: StreamChat; + private unsubscribeFunctions: Set<() => void> = new Set(); + private serializeAndStore: (state: MessageResponse[]) => void; + private watchLocation: WatchLocation; + public state: StateStore; + private messagesByChannelConfIdGetterCache: { + calculated: { [key: string]: [MessageResponse, number] }; + targetMessages: LiveLocationManagerState['targetMessages']; + }; + private messagesByIdGetterCache: { + calculated: { [key: string]: [MessageResponse, number] }; + targetMessages: LiveLocationManagerState['targetMessages']; + }; + + static symbol = Symbol(LiveLocationManager.name); + + constructor({ + client, + retrieveAndDeserialize, + watchLocation, + serializeAndStore, + }: { + client: StreamChat; + retrieveAndDeserialize: (userId: string) => MessageResponse[]; + serializeAndStore: (state: MessageResponse[]) => void; + watchLocation: WatchLocation; + }) { + this.client = client; + this.state = new StateStore({ + targetMessages: retrieveAndDeserialize(client.userID!), + ready: false, + }); + this.watchLocation = watchLocation; + this.serializeAndStore = serializeAndStore; + + this.messagesByIdGetterCache = { + targetMessages: this.state.getLatestValue().targetMessages, + calculated: {}, + }; + + this.messagesByChannelConfIdGetterCache = { + targetMessages: this.state.getLatestValue().targetMessages, + calculated: {}, + }; + } + + public get messagesById() { + const { targetMessages } = this.state.getLatestValue(); + + if (this.messagesByIdGetterCache.targetMessages !== targetMessages) { + this.messagesByIdGetterCache.targetMessages = targetMessages; + + this.messagesByIdGetterCache.calculated = targetMessages.reduce<{ [key: string]: [MessageResponse, number] }>( + (messagesById, message, index) => { + messagesById[message.id] = [message, index]; + return messagesById; + }, + {}, + ); + } + + return this.messagesByIdGetterCache.calculated; + } + + public get messagesByChannelConfId() { + const { targetMessages } = this.state.getLatestValue(); + + if (this.messagesByChannelConfIdGetterCache.targetMessages !== targetMessages) { + this.messagesByChannelConfIdGetterCache.targetMessages = targetMessages; + + this.messagesByChannelConfIdGetterCache.calculated = targetMessages.reduce<{ + [key: string]: [MessageResponse, number]; + }>((messagesByChannelConfIds, message, index) => { + if (!message.cid) return messagesByChannelConfIds; + + messagesByChannelConfIds[message.cid] = [message, index]; + return messagesByChannelConfIds; + }, {}); + } + + return this.messagesByChannelConfIdGetterCache.calculated; + } + + // private async getCompleteMessage(messageId: string) { + // const [cachedMessage, cachedMessageIndex] = this.messagesById[messageId] ?? []; + + // const [cachedMessageAttachment] = cachedMessage?.attachments ?? []; + + // if (isAttachmentValidLLSEntity(cachedMessageAttachment)) { + // return cachedMessage; + // } + + // const queriedMessage = (await this.client.getMessage(messageId)).message; + + // const [queriedMessageAttachment] = queriedMessage.attachments ?? []; + + // if (isAttachmentValidLLSEntity(queriedMessageAttachment)) { + // this.state.next((currentValue) => { + // const newTargetMessages = [...currentValue.targetMessages]; + + // if (typeof cachedMessageIndex === 'number') { + // newTargetMessages[cachedMessageIndex] = queriedMessage; + // } else { + // newTargetMessages.push(queriedMessage); + // } + + // return { + // ...currentValue, + // targetMessages: newTargetMessages, + // }; + // }); + + // return queriedMessage; + // } + + // return null; + // } + + public subscribeWatchLocation() { + const unsubscribe = this.watchLocation(({ latitude, longitude }) => { + withCancellation(LiveLocationManager.symbol, async () => { + const promises: Promise[] = []; + + await this.recoverAndValidateMessages(); + + const { targetMessages } = this.state.getLatestValue(); + + for (const message of targetMessages) { + const [attachment] = message.attachments!; + + const promise = this.client + .partialUpdateMessage(message.id, { + set: { attachments: [{ ...attachment, latitude, longitude }] }, + }) + // TODO: change this this + .then((v) => console.log(v)); + + promises.push(promise); + } + + const values = await Promise.allSettled(promises); + + // TODO: handle values (remove failed - based on specific error code), keep re-trying others + }); + }); + + console.log(unsubscribe); + + return unsubscribe; + } + + private async recoverAndValidateMessages() { + const { targetMessages } = this.state.getLatestValue(); + + if (!this.client.userID) return; + + const messages = await this.client.search( + { members: { $in: [this.client.userID] } }, + { id: { $in: targetMessages.map((m) => m.id) } }, + ); + + this.state.partialNext({ ready: true }); + console.log(messages); + + console.log('to consider...'); + } + + private registerMessage(message: MessageResponse) { + if (!this.client.userID || message?.user?.id !== this.client.userID) return; + + const [attachment] = message.attachments ?? []; + + const messagesById = this.messagesById; + + // FIXME: get associatedChannelConfIds.indexOf(message.cid) + if (message.cid && this.messagesByChannelConfId[message.cid]) { + const [m] = messagesById[message.id]; + throw new Error( + `[LocationUpdater.registerMessage]: one live location sharing message per channel limit has been reached, unregister message "${m.id}" first`, + ); + } + + if (!attachment || attachment.type !== 'geolocation' || !attachment.geolocation_eol) { + throw new Error( + '[LocationUpdater.registerMessage]: Message has either no attachment, the attachment is not of type "geolocation" or the attachment is missing `geolocation_eol` property', + ); + } + + if (typeof attachment.geolocation_eol !== 'string') { + throw new Error( + '[LocationUpdater.registerMessage]: `geolocation_eol` property is of incorrect type, should be date and time ISO 8601 string', + ); + } + + const nowTimestamp = Date.now(); + const eolTimestamp = new Date(attachment.geolocation_eol).getTime(); + + if (Number.isNaN(eolTimestamp) || eolTimestamp < nowTimestamp) { + throw new Error( + '[LocationUpdater.registerMessage]: `geolocation_eol` has either improper format or has not been set to some time in the future (is lesser than now)', + ); + } + + this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); + } + + private unregisterMessage(message: MessageResponse) { + this.state.next((currentValue) => { + const [, messageIndex] = this.messagesById[message.id]; + + if (typeof messageIndex !== 'number') return currentValue; + + const newTargetMessages = [...currentValue.targetMessages]; + + newTargetMessages.splice(messageIndex, 1); + + return { + ...currentValue, + targetMessages: newTargetMessages, + }; + }); + } + + public unregisterSubscriptions = () => { + this.unsubscribeFunctions.forEach((cleanupFunction) => cleanupFunction()); + this.unsubscribeFunctions.clear(); + }; + + private subscribeNewMessages() { + const subscriptions = (['notification.message_new', 'message.new'] as EventTypes[]).map((eventType) => + this.client.on(eventType, (event) => { + // TODO: switch to targeted event based on userId + if (!event.message) return; + + try { + this.registerMessage(event.message); + } catch { + // do nothing + } + }), + ); + + return () => subscriptions.forEach((subscription) => subscription.unsubscribe()); + } + + public registerSubscriptions = () => { + if (this.unsubscribeFunctions.size) { + // LocationUpdater is already listening for events and changes + return; + } + + this.unsubscribeFunctions.add(this.subscribeNewMessages()); + // this.unsubscribeFunctions.add() + // TODO - handle message registration during message updates too, message updated eol added + // TODO - handle message unregistration during message updates - message updated, eol removed + // this.unsubscribeFunctions.add(this.subscribeMessagesUpdated()); + }; +} diff --git a/src/types.ts b/src/types.ts index 0a7b7e3841..95ad267b0c 100644 --- a/src/types.ts +++ b/src/types.ts @@ -2145,6 +2145,10 @@ export type Attachment< author_name?: string; color?: string; duration?: number; + /** + * Location-related, should be an ISO timestamp if type of the attachment is `live_location` + */ + end_time?: string; fallback?: string; fields?: Field[]; file_size?: number | string; @@ -2159,6 +2163,10 @@ export type Attachment< original_height?: number; original_width?: number; pretext?: string; + /** + * Location-related, true when user forcibly stops live location sharing + */ + stopped_sharing?: boolean; text?: string; thumb_url?: string; title?: string; From 2b6b412219dfb5fffe70413346068b90a21741e6 Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Fri, 13 Dec 2024 17:20:18 +0100 Subject: [PATCH 2/6] LiveLocationManager updates --- src/live_location_manager.ts | 193 +++++++++++++++++++++-------------- 1 file changed, 114 insertions(+), 79 deletions(-) diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts index c7f284ee1c..62b298ece4 100644 --- a/src/live_location_manager.ts +++ b/src/live_location_manager.ts @@ -15,13 +15,78 @@ import type { Unsubscribe } from './store'; // type Unsubscribe = () => void; type WatchLocation = (handler: (value: { latitude: number; longitude: number }) => void) => Unsubscribe; +type SerializeAndStore = (state: MessageResponse[], userId: string) => void; +type RetrieveAndDeserialize = (userId: string) => MessageResponse[]; + type LiveLocationManagerState = { ready: boolean; targetMessages: MessageResponse[]; }; -// LLS - live location sharing -function isAttachmentValidLLSEntity(attachment?: Attachment) { +// if (message.cid && this.messagesByChannelConfId[message.cid]) { +// const [m] = this.messagesByChannelConfId[message.cid]; +// throw new Error( +// `[LocationUpdater.registerMessage]: one live location sharing message per channel limit has been reached, unregister message "${m.id}" first`, +// ); +// } + +// if (!attachment || attachment.type !== 'geolocation' || !attachment.geolocation_eol) { +// throw new Error( +// '[LocationUpdater.registerMessage]: Message has either no attachment, the attachment is not of type "geolocation" or the attachment is missing `geolocation_eol` property', +// ); +// } + +// if (typeof attachment.geolocation_eol !== 'string') { +// throw new Error( +// '[LocationUpdater.registerMessage]: `geolocation_eol` property is of incorrect type, should be date and time ISO 8601 string', +// ); +// } + +// const nowTimestamp = Date.now(); +// const eolTimestamp = new Date(attachment.geolocation_eol).getTime(); + +// if (Number.isNaN(eolTimestamp) || eolTimestamp < nowTimestamp) { +// throw new Error( +// '[LocationUpdater.registerMessage]: `geolocation_eol` has either improper format or has not been set to some time in the future (is lesser than now)', +// ); +// } + +// private async getCompleteMessage(messageId: string) { +// const [cachedMessage, cachedMessageIndex] = this.messagesById[messageId] ?? []; + +// const [cachedMessageAttachment] = cachedMessage?.attachments ?? []; + +// if (isAttachmentValidLLSEntity(cachedMessageAttachment)) { +// return cachedMessage; +// } + +// const queriedMessage = (await this.client.getMessage(messageId)).message; + +// const [queriedMessageAttachment] = queriedMessage.attachments ?? []; + +// if (isAttachmentValidLLSEntity(queriedMessageAttachment)) { +// this.state.next((currentValue) => { +// const newTargetMessages = [...currentValue.targetMessages]; + +// if (typeof cachedMessageIndex === 'number') { +// newTargetMessages[cachedMessageIndex] = queriedMessage; +// } else { +// newTargetMessages.push(queriedMessage); +// } + +// return { +// ...currentValue, +// targetMessages: newTargetMessages, +// }; +// }); + +// return queriedMessage; +// } + +// return null; +// } + +function isValidLiveLocationAttachment(attachment?: Attachment) { if (!attachment || typeof attachment.end_time !== 'string' || attachment.stopped_sharing) return false; const endTimeTimestamp = new Date(attachment.end_time).getTime(); @@ -33,10 +98,10 @@ function isAttachmentValidLLSEntity(attachment?: Attachment) { return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp; } -class LiveLocationManager { +export class LiveLocationManager { private client: StreamChat; private unsubscribeFunctions: Set<() => void> = new Set(); - private serializeAndStore: (state: MessageResponse[]) => void; + private serializeAndStore: SerializeAndStore; private watchLocation: WatchLocation; public state: StateStore; private messagesByChannelConfIdGetterCache: { @@ -52,14 +117,20 @@ class LiveLocationManager { constructor({ client, - retrieveAndDeserialize, watchLocation, - serializeAndStore, + retrieveAndDeserialize = (userId) => { + const targetMessagesString = localStorage.getItem(`${userId}-${LiveLocationManager.name}`); + if (!targetMessagesString) return []; + return JSON.parse(targetMessagesString); + }, + serializeAndStore = (messages, userId) => { + localStorage.setItem(`${userId}-${LiveLocationManager.name}`, JSON.stringify(messages)); + }, }: { client: StreamChat; - retrieveAndDeserialize: (userId: string) => MessageResponse[]; - serializeAndStore: (state: MessageResponse[]) => void; watchLocation: WatchLocation; + retrieveAndDeserialize?: RetrieveAndDeserialize; + serializeAndStore?: SerializeAndStore; }) { this.client = client; this.state = new StateStore({ @@ -117,53 +188,26 @@ class LiveLocationManager { return this.messagesByChannelConfIdGetterCache.calculated; } - // private async getCompleteMessage(messageId: string) { - // const [cachedMessage, cachedMessageIndex] = this.messagesById[messageId] ?? []; - - // const [cachedMessageAttachment] = cachedMessage?.attachments ?? []; - - // if (isAttachmentValidLLSEntity(cachedMessageAttachment)) { - // return cachedMessage; - // } - - // const queriedMessage = (await this.client.getMessage(messageId)).message; - - // const [queriedMessageAttachment] = queriedMessage.attachments ?? []; - - // if (isAttachmentValidLLSEntity(queriedMessageAttachment)) { - // this.state.next((currentValue) => { - // const newTargetMessages = [...currentValue.targetMessages]; - - // if (typeof cachedMessageIndex === 'number') { - // newTargetMessages[cachedMessageIndex] = queriedMessage; - // } else { - // newTargetMessages.push(queriedMessage); - // } - - // return { - // ...currentValue, - // targetMessages: newTargetMessages, - // }; - // }); - - // return queriedMessage; - // } - - // return null; - // } - public subscribeWatchLocation() { const unsubscribe = this.watchLocation(({ latitude, longitude }) => { withCancellation(LiveLocationManager.symbol, async () => { const promises: Promise[] = []; - await this.recoverAndValidateMessages(); + if (!this.state.getLatestValue().ready) { + await this.recoverAndValidateMessages(); + } const { targetMessages } = this.state.getLatestValue(); for (const message of targetMessages) { - const [attachment] = message.attachments!; + const [attachment] = message.attachments ?? []; + if (!isValidLiveLocationAttachment(attachment)) { + this.unregisterMessage(message); + continue; + } + + // TODO: revisit const promise = this.client .partialUpdateMessage(message.id, { set: { attachments: [{ ...attachment, latitude, longitude }] }, @@ -175,7 +219,7 @@ class LiveLocationManager { } const values = await Promise.allSettled(promises); - + console.log(values); // TODO: handle values (remove failed - based on specific error code), keep re-trying others }); }); @@ -190,15 +234,24 @@ class LiveLocationManager { if (!this.client.userID) return; - const messages = await this.client.search( + const response = await this.client.search( { members: { $in: [this.client.userID] } }, - { id: { $in: targetMessages.map((m) => m.id) } }, + { id: { $in: targetMessages.map(({ id }) => id) } }, ); - this.state.partialNext({ ready: true }); - console.log(messages); + const newTargetMessages = []; + + for (const result of response.results) { + const { message } = result; - console.log('to consider...'); + const [attachment] = message.attachments ?? []; + + if (isValidLiveLocationAttachment(attachment)) { + newTargetMessages.push(message); + } + } + + this.state.partialNext({ ready: true, targetMessages: newTargetMessages }); } private registerMessage(message: MessageResponse) { @@ -206,38 +259,15 @@ class LiveLocationManager { const [attachment] = message.attachments ?? []; - const messagesById = this.messagesById; - - // FIXME: get associatedChannelConfIds.indexOf(message.cid) - if (message.cid && this.messagesByChannelConfId[message.cid]) { - const [m] = messagesById[message.id]; - throw new Error( - `[LocationUpdater.registerMessage]: one live location sharing message per channel limit has been reached, unregister message "${m.id}" first`, - ); - } - - if (!attachment || attachment.type !== 'geolocation' || !attachment.geolocation_eol) { - throw new Error( - '[LocationUpdater.registerMessage]: Message has either no attachment, the attachment is not of type "geolocation" or the attachment is missing `geolocation_eol` property', - ); - } - - if (typeof attachment.geolocation_eol !== 'string') { - throw new Error( - '[LocationUpdater.registerMessage]: `geolocation_eol` property is of incorrect type, should be date and time ISO 8601 string', - ); + if (!isValidLiveLocationAttachment(attachment)) { + return; } - const nowTimestamp = Date.now(); - const eolTimestamp = new Date(attachment.geolocation_eol).getTime(); + this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); - if (Number.isNaN(eolTimestamp) || eolTimestamp < nowTimestamp) { - throw new Error( - '[LocationUpdater.registerMessage]: `geolocation_eol` has either improper format or has not been set to some time in the future (is lesser than now)', - ); + if (this.client.userID) { + this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); } - - this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); } private unregisterMessage(message: MessageResponse) { @@ -255,6 +285,10 @@ class LiveLocationManager { targetMessages: newTargetMessages, }; }); + + if (this.client.userID) { + this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); + } } public unregisterSubscriptions = () => { @@ -286,6 +320,7 @@ class LiveLocationManager { } this.unsubscribeFunctions.add(this.subscribeNewMessages()); + this.unsubscribeFunctions.add(this.subscribeWatchLocation()); // this.unsubscribeFunctions.add() // TODO - handle message registration during message updates too, message updated eol added // TODO - handle message unregistration during message updates - message updated, eol removed From 1fa5f14d54d72f37ef03d091dda416eb7d46ce15 Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Tue, 17 Dec 2024 20:15:08 +0100 Subject: [PATCH 3/6] Adjust LLM methods and subscriptions --- src/channel.ts | 30 +++++++++++++++++ src/events.ts | 2 ++ src/index.ts | 1 + src/live_location_manager.ts | 63 +++++++++++++++++++++++++----------- 4 files changed, 77 insertions(+), 19 deletions(-) diff --git a/src/channel.ts b/src/channel.ts index 538c6c42aa..eee8d13e55 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -61,6 +61,7 @@ import { PartialUpdateMemberAPIResponse, AIState, MessageOptions, + Attachment, } from './types'; import { Role } from './permissions'; import { DEFAULT_QUERY_CHANNEL_MESSAGE_LIST_PAGE_SIZE } from './constants'; @@ -471,6 +472,35 @@ export class Channel, + ) { + const { latitude, longitude, end_time } = attachmentMetadata; + + const message: Message = { + attachments: [ + { + ...attachmentMetadata, + type: 'live_location', + latitude, + longitude, + end_time, + }, + ], + }; + + // TODO: find existing, cancel and send new one + // const existing = this.search({ user_id: attachments: { type: { $eq: 'live_location' } } }); + + const response = await this.sendMessage(message); + + this.getClient().dispatchEvent({ message: response.message, type: 'live_location_sharing.started' }); + } + + public stopLiveLocationSharing(message: MessageResponse) { + this.getClient().dispatchEvent({ message, type: 'live_location_sharing.stopped' }); + } + /** * delete - Delete the channel. Messages are permanently removed. * diff --git a/src/events.ts b/src/events.ts index e145074a3d..f4e74194a7 100644 --- a/src/events.ts +++ b/src/events.ts @@ -59,4 +59,6 @@ export const EVENT_MAP = { 'connection.recovered': true, 'transport.changed': true, 'capabilities.changed': true, + 'live_location_sharing.started': true, + 'live_location_sharing.stopped': true, }; diff --git a/src/index.ts b/src/index.ts index c0d0901f6d..4bbf28d8c2 100644 --- a/src/index.ts +++ b/src/index.ts @@ -18,4 +18,5 @@ export * from './thread'; export * from './thread_manager'; export * from './token_manager'; export * from './types'; +export * from './live_location_manager'; export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils'; diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts index 62b298ece4..2f8444914d 100644 --- a/src/live_location_manager.ts +++ b/src/live_location_manager.ts @@ -98,12 +98,22 @@ function isValidLiveLocationAttachment(attachment?: Attachment) { return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp; } +export type LiveLocationManagerConstructorParameters = { + client: StreamChat; + watchLocation: WatchLocation; + retrieveAndDeserialize?: RetrieveAndDeserialize; + serializeAndStore?: SerializeAndStore; + // watchThrottleTimeout?: number; +}; + +const MIN_THROTTLE_TIMEOUT = 1000; + export class LiveLocationManager { + public state: StateStore; private client: StreamChat; private unsubscribeFunctions: Set<() => void> = new Set(); private serializeAndStore: SerializeAndStore; private watchLocation: WatchLocation; - public state: StateStore; private messagesByChannelConfIdGetterCache: { calculated: { [key: string]: [MessageResponse, number] }; targetMessages: LiveLocationManagerState['targetMessages']; @@ -126,27 +136,26 @@ export class LiveLocationManager { serializeAndStore = (messages, userId) => { localStorage.setItem(`${userId}-${LiveLocationManager.name}`, JSON.stringify(messages)); }, - }: { - client: StreamChat; - watchLocation: WatchLocation; - retrieveAndDeserialize?: RetrieveAndDeserialize; - serializeAndStore?: SerializeAndStore; - }) { + }: LiveLocationManagerConstructorParameters) { this.client = client; + + const retreivedTargetMessages = retrieveAndDeserialize(client.userID!); + this.state = new StateStore({ - targetMessages: retrieveAndDeserialize(client.userID!), - ready: false, + targetMessages: retreivedTargetMessages, + // If there are no messages to validate, the manager is considered "ready" + ready: retreivedTargetMessages.length === 0, }); this.watchLocation = watchLocation; this.serializeAndStore = serializeAndStore; this.messagesByIdGetterCache = { - targetMessages: this.state.getLatestValue().targetMessages, + targetMessages: retreivedTargetMessages, calculated: {}, }; this.messagesByChannelConfIdGetterCache = { - targetMessages: this.state.getLatestValue().targetMessages, + targetMessages: retreivedTargetMessages, calculated: {}, }; } @@ -189,7 +198,15 @@ export class LiveLocationManager { } public subscribeWatchLocation() { + let nextWatcherCallTimestamp = Date.now(); + const unsubscribe = this.watchLocation(({ latitude, longitude }) => { + // Integrators can adjust the update interval by supplying custom watchLocation subscription, + // but the minimal timeout still has to be set as a failsafe (to prevent rate-limitting) + if (Date.now() < nextWatcherCallTimestamp) return; + + nextWatcherCallTimestamp = Date.now() + MIN_THROTTLE_TIMEOUT; + withCancellation(LiveLocationManager.symbol, async () => { const promises: Promise[] = []; @@ -207,7 +224,7 @@ export class LiveLocationManager { continue; } - // TODO: revisit + // TODO: client.updateLiveLocation instead const promise = this.client .partialUpdateMessage(message.id, { set: { attachments: [{ ...attachment, latitude, longitude }] }, @@ -229,10 +246,13 @@ export class LiveLocationManager { return unsubscribe; } + /** + * Messages stored locally might've been updated while the device which registered message for updates has been offline. + */ private async recoverAndValidateMessages() { const { targetMessages } = this.state.getLatestValue(); - if (!this.client.userID) return; + if (!this.client.userID || !targetMessages.length) return; const response = await this.client.search( { members: { $in: [this.client.userID] } }, @@ -296,16 +316,20 @@ export class LiveLocationManager { this.unsubscribeFunctions.clear(); }; - private subscribeNewMessages() { - const subscriptions = (['notification.message_new', 'message.new'] as EventTypes[]).map((eventType) => + private subscribeLiveLocationSharingUpdates() { + const subscriptions = ([ + 'live_location_sharing.started', + 'live_location_sharing.stopped', + 'message.deleted', + ] as EventTypes[]).map((eventType) => this.client.on(eventType, (event) => { // TODO: switch to targeted event based on userId if (!event.message) return; - try { + if (event.type === 'live_location_sharing.started') { this.registerMessage(event.message); - } catch { - // do nothing + } else { + this.unregisterMessage(event.message); } }), ); @@ -319,7 +343,8 @@ export class LiveLocationManager { return; } - this.unsubscribeFunctions.add(this.subscribeNewMessages()); + // FIXME: maybe not do this? (find out whether connection-id check would work) + this.unsubscribeFunctions.add(this.subscribeLiveLocationSharingUpdates()); this.unsubscribeFunctions.add(this.subscribeWatchLocation()); // this.unsubscribeFunctions.add() // TODO - handle message registration during message updates too, message updated eol added From bb8afdca0bb70e0d6565dea1ee6c2e224fb23bea Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Thu, 19 Dec 2024 23:53:51 +0100 Subject: [PATCH 4/6] Updates to LLM and channel --- src/channel.ts | 78 ++++++++++++++++++++++++++++++++++-- src/live_location_manager.ts | 44 ++++++++++++++++++-- 2 files changed, 114 insertions(+), 8 deletions(-) diff --git a/src/channel.ts b/src/channel.ts index eee8d13e55..887a8b8599 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -472,9 +472,31 @@ export class Channel, + ) { + const { latitude, longitude } = attachmentMetadata; + + const message: Message = { + attachments: [ + { + ...attachmentMetadata, + type: 'static_location', + latitude, + longitude, + }, + ], + }; + + return await this.sendMessage(message); + } + public async startLiveLocationSharing( attachmentMetadata: { end_time: string; latitude: number; longitude: number } & Attachment, ) { + const client = this.getClient(); + if (!client.userID) return; + const { latitude, longitude, end_time } = attachmentMetadata; const message: Message = { @@ -489,12 +511,60 @@ export class Channel[] = []; + + for (const result of existing.results) { + const [attachment] = result.message.attachments ?? []; + + promises.push( + client.partialUpdateMessage(result.message.id, { + // @ts-expect-error + set: { + attachments: [ + { + ...attachment, + stopped_sharing: true, + }, + ], + }, + }), + ); + } + + // FIXME: sending message if the previous part failed/did not happen + // should result in BE error + promises.unshift(this.sendMessage(message)); - const response = await this.sendMessage(message); + const [response] = await Promise.allSettled(promises); - this.getClient().dispatchEvent({ message: response.message, type: 'live_location_sharing.started' }); + if (response.status === 'fulfilled') { + this.getClient().dispatchEvent({ message: response.value.message, type: 'live_location_sharing.started' }); + } } public stopLiveLocationSharing(message: MessageResponse) { diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts index 2f8444914d..d4021b5f71 100644 --- a/src/live_location_manager.ts +++ b/src/live_location_manager.ts @@ -200,6 +200,7 @@ export class LiveLocationManager { public subscribeWatchLocation() { let nextWatcherCallTimestamp = Date.now(); + // eslint-disable-next-line sonarjs/prefer-immediate-return const unsubscribe = this.watchLocation(({ latitude, longitude }) => { // Integrators can adjust the update interval by supplying custom watchLocation subscription, // but the minimal timeout still has to be set as a failsafe (to prevent rate-limitting) @@ -209,12 +210,15 @@ export class LiveLocationManager { withCancellation(LiveLocationManager.symbol, async () => { const promises: Promise[] = []; + const { ready } = this.state.getLatestValue(); - if (!this.state.getLatestValue().ready) { + if (!ready) { await this.recoverAndValidateMessages(); } const { targetMessages } = this.state.getLatestValue(); + // if validator removes messages, we need to check + if (!targetMessages.length) return; for (const message of targetMessages) { const [attachment] = message.attachments ?? []; @@ -241,8 +245,6 @@ export class LiveLocationManager { }); }); - console.log(unsubscribe); - return unsubscribe; } @@ -290,9 +292,30 @@ export class LiveLocationManager { } } + private updateRegisteredMessage(message: MessageResponse) { + if (!this.client.userID || message?.user?.id !== this.client.userID) return; + + const [, targetMessageIndex] = this.messagesById[message.id]; + + this.state.next((currentValue) => { + const newTargetMessages = [...currentValue.targetMessages]; + + newTargetMessages[targetMessageIndex] = message; + + return { + ...currentValue, + targetMessages: newTargetMessages, + }; + }); + + if (this.client.userID) { + this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); + } + } + private unregisterMessage(message: MessageResponse) { this.state.next((currentValue) => { - const [, messageIndex] = this.messagesById[message.id]; + const [, messageIndex] = this.messagesById[message.id] ?? []; if (typeof messageIndex !== 'number') return currentValue; @@ -321,6 +344,7 @@ export class LiveLocationManager { 'live_location_sharing.started', 'live_location_sharing.stopped', 'message.deleted', + 'message.updated', ] as EventTypes[]).map((eventType) => this.client.on(eventType, (event) => { // TODO: switch to targeted event based on userId @@ -328,6 +352,18 @@ export class LiveLocationManager { if (event.type === 'live_location_sharing.started') { this.registerMessage(event.message); + } else if (event.type === 'message.updated') { + const localMessage = this.messagesById[event.message.id]; + + if (!localMessage) return; + + const [attachment] = event.message.attachments ?? []; + + if (!isValidLiveLocationAttachment(attachment)) { + this.unregisterMessage(event.message); + } else { + this.updateRegisteredMessage(event.message); + } } else { this.unregisterMessage(event.message); } From c2a418566737b05b189dfc0a3f56a7a34715d62a Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Tue, 7 Jan 2025 20:15:53 +0100 Subject: [PATCH 5/6] LLM adjustments --- src/channel.ts | 10 ++++- src/live_location_manager.ts | 86 ++++++++++++++++++++++-------------- 2 files changed, 61 insertions(+), 35 deletions(-) diff --git a/src/channel.ts b/src/channel.ts index 887a8b8599..653a41005c 100644 --- a/src/channel.ts +++ b/src/channel.ts @@ -567,8 +567,14 @@ export class Channel) { - this.getClient().dispatchEvent({ message, type: 'live_location_sharing.stopped' }); + public async stopLiveLocationSharing(message: MessageResponse) { + const [attachment] = message.attachments ?? []; + const response = await this.getClient().partialUpdateMessage(message.id, { + // @ts-expect-error this is a valid update + set: { attachments: [{ ...attachment, stopped_sharing: true }] }, + }); + + this.getClient().dispatchEvent({ message: response.message, type: 'live_location_sharing.stopped' }); } /** diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts index d4021b5f71..e1f17bd47e 100644 --- a/src/live_location_manager.ts +++ b/src/live_location_manager.ts @@ -9,7 +9,7 @@ import { withCancellation } from './concurrency'; import { StateStore } from './store'; -import type { MessageResponse, Attachment, EventTypes } from './types'; +import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics } from './types'; import type { StreamChat } from './client'; import type { Unsubscribe } from './store'; @@ -18,7 +18,7 @@ type WatchLocation = (handler: (value: { latitude: number; longitude: number }) type SerializeAndStore = (state: MessageResponse[], userId: string) => void; type RetrieveAndDeserialize = (userId: string) => MessageResponse[]; -type LiveLocationManagerState = { +export type LiveLocationManagerState = { ready: boolean; targetMessages: MessageResponse[]; }; @@ -98,19 +98,18 @@ function isValidLiveLocationAttachment(attachment?: Attachment) { return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp; } -export type LiveLocationManagerConstructorParameters = { - client: StreamChat; +export type LiveLocationManagerConstructorParameters = { + client: StreamChat; watchLocation: WatchLocation; retrieveAndDeserialize?: RetrieveAndDeserialize; serializeAndStore?: SerializeAndStore; - // watchThrottleTimeout?: number; }; const MIN_THROTTLE_TIMEOUT = 1000; -export class LiveLocationManager { +export class LiveLocationManager { public state: StateStore; - private client: StreamChat; + private client: StreamChat; private unsubscribeFunctions: Set<() => void> = new Set(); private serializeAndStore: SerializeAndStore; private watchLocation: WatchLocation; @@ -134,9 +133,13 @@ export class LiveLocationManager { return JSON.parse(targetMessagesString); }, serializeAndStore = (messages, userId) => { - localStorage.setItem(`${userId}-${LiveLocationManager.name}`, JSON.stringify(messages)); + localStorage.setItem( + `${userId}-${LiveLocationManager.name}`, + // Strip sensitive data (these will be recovered at on first location watch call) + JSON.stringify(messages.map((message) => ({ id: message.id }))), + ); }, - }: LiveLocationManagerConstructorParameters) { + }: LiveLocationManagerConstructorParameters) { this.client = client; const retreivedTargetMessages = retrieveAndDeserialize(client.userID!); @@ -197,7 +200,34 @@ export class LiveLocationManager { return this.messagesByChannelConfIdGetterCache.calculated; } - public subscribeWatchLocation() { + private subscribeTargetMessagesChange() { + let unsubscribeWatchLocation: null | (() => void) = null; + + // Subscribe to location updates only if there are relevant messages to + // update, no need for the location watcher to active/instantiated otherwise + const unsubscribe = this.state.subscribeWithSelector( + ({ targetMessages }) => ({ targetMessages }), + ({ targetMessages }) => { + if (!targetMessages.length) { + unsubscribeWatchLocation?.(); + unsubscribeWatchLocation = null; + } else if (targetMessages.length && !unsubscribeWatchLocation) { + unsubscribeWatchLocation = this.subscribeWatchLocation(); + } + + if (this.client.userID) { + this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); + } + }, + ); + + return () => { + unsubscribe(); + unsubscribeWatchLocation?.(); + }; + } + + private subscribeWatchLocation() { let nextWatcherCallTimestamp = Date.now(); // eslint-disable-next-line sonarjs/prefer-immediate-return @@ -231,6 +261,7 @@ export class LiveLocationManager { // TODO: client.updateLiveLocation instead const promise = this.client .partialUpdateMessage(message.id, { + // @ts-expect-error valid update set: { attachments: [{ ...attachment, latitude, longitude }] }, }) // TODO: change this this @@ -257,6 +288,7 @@ export class LiveLocationManager { if (!this.client.userID || !targetMessages.length) return; const response = await this.client.search( + // @ts-expect-error valid filter { members: { $in: [this.client.userID] } }, { id: { $in: targetMessages.map(({ id }) => id) } }, ); @@ -286,10 +318,6 @@ export class LiveLocationManager { } this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); - - if (this.client.userID) { - this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); - } } private updateRegisteredMessage(message: MessageResponse) { @@ -307,18 +335,14 @@ export class LiveLocationManager { targetMessages: newTargetMessages, }; }); - - if (this.client.userID) { - this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); - } } private unregisterMessage(message: MessageResponse) { - this.state.next((currentValue) => { - const [, messageIndex] = this.messagesById[message.id] ?? []; + const [, messageIndex] = this.messagesById[message.id] ?? []; - if (typeof messageIndex !== 'number') return currentValue; + if (typeof messageIndex !== 'number') return; + this.state.next((currentValue) => { const newTargetMessages = [...currentValue.targetMessages]; newTargetMessages.splice(messageIndex, 1); @@ -328,10 +352,6 @@ export class LiveLocationManager { targetMessages: newTargetMessages, }; }); - - if (this.client.userID) { - this.serializeAndStore(this.state.getLatestValue().targetMessages, this.client.userID); - } } public unregisterSubscriptions = () => { @@ -342,12 +362,16 @@ export class LiveLocationManager { private subscribeLiveLocationSharingUpdates() { const subscriptions = ([ 'live_location_sharing.started', + /** + * Both message.updated & live_location_sharing.stopped get emitted when message attachment gets an + * update, live_location_sharing.stopped gets emitted only locally and only if the update goes + * through, it's a failsafe for when channel is no longer being watched for whatever reason + */ + 'message.updated', 'live_location_sharing.stopped', 'message.deleted', - 'message.updated', ] as EventTypes[]).map((eventType) => this.client.on(eventType, (event) => { - // TODO: switch to targeted event based on userId if (!event.message) return; if (event.type === 'live_location_sharing.started') { @@ -379,12 +403,8 @@ export class LiveLocationManager { return; } - // FIXME: maybe not do this? (find out whether connection-id check would work) this.unsubscribeFunctions.add(this.subscribeLiveLocationSharingUpdates()); - this.unsubscribeFunctions.add(this.subscribeWatchLocation()); - // this.unsubscribeFunctions.add() - // TODO - handle message registration during message updates too, message updated eol added - // TODO - handle message unregistration during message updates - message updated, eol removed - // this.unsubscribeFunctions.add(this.subscribeMessagesUpdated()); + this.unsubscribeFunctions.add(this.subscribeTargetMessagesChange()); + // TODO? - handle message registration during message updates too, message updated eol added (I hope not) }; } From 36d56e131f1d6c3b186c13f4c59f7f085874a234 Mon Sep 17 00:00:00 2001 From: Anton Arnautov Date: Thu, 9 Jan 2025 15:36:12 +0100 Subject: [PATCH 6/6] Validate messages instead (check deleted) --- src/client.ts | 18 ++++++++ src/live_location_manager.ts | 90 ++++++++++++++++++------------------ 2 files changed, 63 insertions(+), 45 deletions(-) diff --git a/src/client.ts b/src/client.ts index 4f8ee4dec2..562f39f8ed 100644 --- a/src/client.ts +++ b/src/client.ts @@ -2552,6 +2552,24 @@ export class StreamChat, + { latitude, longitude }: { latitude: number; longitude: number }, + ) { + const [attachment] = message.attachments ?? []; + + if (!attachment || attachment.type !== 'live_location') { + throw new Error( + 'Supplied message either has no attachments to update or attachment is not of type "live_location"', + ); + } + + return this.partialUpdateMessage(message.id, { + // @ts-expect-error valid update + set: { attachments: [{ ...attachment, latitude, longitude }] }, + }); + } + /** * pinMessage - pins the message * @param {string | { id: string }} messageOrMessageId message object or message id diff --git a/src/live_location_manager.ts b/src/live_location_manager.ts index e1f17bd47e..7aaa947f2c 100644 --- a/src/live_location_manager.ts +++ b/src/live_location_manager.ts @@ -9,18 +9,18 @@ import { withCancellation } from './concurrency'; import { StateStore } from './store'; -import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics } from './types'; +import type { MessageResponse, Attachment, EventTypes, ExtendableGenerics, UpdateMessageAPIResponse } from './types'; import type { StreamChat } from './client'; import type { Unsubscribe } from './store'; // type Unsubscribe = () => void; type WatchLocation = (handler: (value: { latitude: number; longitude: number }) => void) => Unsubscribe; -type SerializeAndStore = (state: MessageResponse[], userId: string) => void; -type RetrieveAndDeserialize = (userId: string) => MessageResponse[]; +type SerializeAndStore = (state: MessageResponse[], userId: string) => void; +type RetrieveAndDeserialize = (userId: string) => MessageResponse[]; -export type LiveLocationManagerState = { +export type LiveLocationManagerState = { ready: boolean; - targetMessages: MessageResponse[]; + targetMessages: MessageResponse[]; }; // if (message.cid && this.messagesByChannelConfId[message.cid]) { @@ -87,39 +87,55 @@ export type LiveLocationManagerState = { // } function isValidLiveLocationAttachment(attachment?: Attachment) { - if (!attachment || typeof attachment.end_time !== 'string' || attachment.stopped_sharing) return false; + if (!attachment || attachment.type !== 'live_location' || attachment.stopped_sharing) { + return false; + } + + // If end_time has been defined, consider it + if (typeof attachment.end_time === 'string') { + const endTimeTimestamp = new Date(attachment.end_time).getTime(); - const endTimeTimestamp = new Date(attachment.end_time).getTime(); + if (Number.isNaN(endTimeTimestamp)) return false; - if (Number.isNaN(endTimeTimestamp)) return false; + const nowTimestamp = Date.now(); - const nowTimestamp = Date.now(); + return nowTimestamp < endTimeTimestamp; + } - return attachment && attachment.type === 'live_location' && endTimeTimestamp > nowTimestamp; + return true; +} + +function isValidLiveLocationMessage(message?: MessageResponse) { + if (!message || message.type === 'deleted') return false; + + const [attachment] = message.attachments ?? []; + + return isValidLiveLocationAttachment(attachment); } export type LiveLocationManagerConstructorParameters = { client: StreamChat; watchLocation: WatchLocation; - retrieveAndDeserialize?: RetrieveAndDeserialize; - serializeAndStore?: SerializeAndStore; + retrieveAndDeserialize?: RetrieveAndDeserialize; + serializeAndStore?: SerializeAndStore; }; -const MIN_THROTTLE_TIMEOUT = 1000; +// Hard-coded minimal throttle timeout +const MIN_THROTTLE_TIMEOUT = 3000; export class LiveLocationManager { - public state: StateStore; + public state: StateStore>; private client: StreamChat; private unsubscribeFunctions: Set<() => void> = new Set(); - private serializeAndStore: SerializeAndStore; + private serializeAndStore: SerializeAndStore; private watchLocation: WatchLocation; private messagesByChannelConfIdGetterCache: { calculated: { [key: string]: [MessageResponse, number] }; - targetMessages: LiveLocationManagerState['targetMessages']; + targetMessages: LiveLocationManagerState['targetMessages']; }; private messagesByIdGetterCache: { calculated: { [key: string]: [MessageResponse, number] }; - targetMessages: LiveLocationManagerState['targetMessages']; + targetMessages: LiveLocationManagerState['targetMessages']; }; static symbol = Symbol(LiveLocationManager.name); @@ -144,7 +160,7 @@ export class LiveLocationManager { const retreivedTargetMessages = retrieveAndDeserialize(client.userID!); - this.state = new StateStore({ + this.state = new StateStore>({ targetMessages: retreivedTargetMessages, // If there are no messages to validate, the manager is considered "ready" ready: retreivedTargetMessages.length === 0, @@ -204,7 +220,7 @@ export class LiveLocationManager { let unsubscribeWatchLocation: null | (() => void) = null; // Subscribe to location updates only if there are relevant messages to - // update, no need for the location watcher to active/instantiated otherwise + // update, no need for the location watcher to be active/instantiated otherwise const unsubscribe = this.state.subscribeWithSelector( ({ targetMessages }) => ({ targetMessages }), ({ targetMessages }) => { @@ -239,7 +255,7 @@ export class LiveLocationManager { nextWatcherCallTimestamp = Date.now() + MIN_THROTTLE_TIMEOUT; withCancellation(LiveLocationManager.symbol, async () => { - const promises: Promise[] = []; + const promises: Promise>[] = []; const { ready } = this.state.getLatestValue(); if (!ready) { @@ -247,31 +263,21 @@ export class LiveLocationManager { } const { targetMessages } = this.state.getLatestValue(); - // if validator removes messages, we need to check + // If validator removes messages, we need to check if (!targetMessages.length) return; for (const message of targetMessages) { - const [attachment] = message.attachments ?? []; - - if (!isValidLiveLocationAttachment(attachment)) { + if (!isValidLiveLocationMessage(message)) { this.unregisterMessage(message); continue; } - // TODO: client.updateLiveLocation instead - const promise = this.client - .partialUpdateMessage(message.id, { - // @ts-expect-error valid update - set: { attachments: [{ ...attachment, latitude, longitude }] }, - }) - // TODO: change this this - .then((v) => console.log(v)); + const promise = this.client.updateLiveLocation(message, { latitude, longitude }); promises.push(promise); } - const values = await Promise.allSettled(promises); - console.log(values); + await Promise.allSettled(promises); // TODO: handle values (remove failed - based on specific error code), keep re-trying others }); }); @@ -298,9 +304,7 @@ export class LiveLocationManager { for (const result of response.results) { const { message } = result; - const [attachment] = message.attachments ?? []; - - if (isValidLiveLocationAttachment(attachment)) { + if (isValidLiveLocationMessage(message)) { newTargetMessages.push(message); } } @@ -308,19 +312,17 @@ export class LiveLocationManager { this.state.partialNext({ ready: true, targetMessages: newTargetMessages }); } - private registerMessage(message: MessageResponse) { + private registerMessage(message: MessageResponse) { if (!this.client.userID || message?.user?.id !== this.client.userID) return; - const [attachment] = message.attachments ?? []; - - if (!isValidLiveLocationAttachment(attachment)) { + if (!isValidLiveLocationMessage(message)) { return; } this.state.next((currentValue) => ({ ...currentValue, targetMessages: [...currentValue.targetMessages, message] })); } - private updateRegisteredMessage(message: MessageResponse) { + private updateRegisteredMessage(message: MessageResponse) { if (!this.client.userID || message?.user?.id !== this.client.userID) return; const [, targetMessageIndex] = this.messagesById[message.id]; @@ -381,9 +383,7 @@ export class LiveLocationManager { if (!localMessage) return; - const [attachment] = event.message.attachments ?? []; - - if (!isValidLiveLocationAttachment(attachment)) { + if (!isValidLiveLocationMessage(event.message)) { this.unregisterMessage(event.message); } else { this.updateRegisteredMessage(event.message);