Delete old messages with SID when new messages arrive

This commit is contained in:
binwiederhier
2026-01-08 10:28:02 -05:00
parent bfbe73aea3
commit 75abf2e245
4 changed files with 43 additions and 35 deletions

View File

@@ -27,6 +27,11 @@ const addNotification = async ({ subscriptionId, message }) => {
// Note: SubscriptionManager duplicates this logic, so if you change it here, change it there too // Note: SubscriptionManager duplicates this logic, so if you change it here, change it there too
// Delete existing notification with same SID (if any)
if (message.sid) {
await db.notifications.where({ subscriptionId, sid: message.sid }).delete();
}
// Add notification to database // Add notification to database
await db.notifications.add({ await db.notifications.add({
...messageWithSID(message), ...messageWithSID(message),

View File

@@ -42,19 +42,22 @@ class Poller {
const since = subscription.last; const since = subscription.last;
const notifications = await api.poll(subscription.baseUrl, subscription.topic, since); const notifications = await api.poll(subscription.baseUrl, subscription.topic, since);
const deletedSids = this.deletedSids(notifications); const latestBySid = this.latestNotificationsBySid(notifications);
const newOrUpdatedNotifications = this.newOrUpdatedNotifications(notifications, deletedSids);
// Delete all existing notifications with a deleted sequence ID // Delete all existing notifications with a deleted sequence ID
const deletedSids = Object.entries(latestBySid)
.filter(([, notification]) => notification.deleted)
.map(([sid]) => sid);
if (deletedSids.length > 0) { if (deletedSids.length > 0) {
console.log(`[Poller] Deleting notifications with deleted sequence IDs for ${subscription.id}`); console.log(`[Poller] Deleting notifications with deleted sequence IDs for ${subscription.id}`);
await Promise.all(deletedSids.map((sid) => subscriptionManager.deleteNotificationBySid(subscription.id, sid))); await Promise.all(deletedSids.map((sid) => subscriptionManager.deleteNotificationBySid(subscription.id, sid)));
} }
// Add new or updated notifications // Add only the latest notification for each non-deleted sequence
if (newOrUpdatedNotifications.length > 0) { const notificationsToAdd = Object.values(latestBySid).filter((n) => !n.deleted);
console.log(`[Poller] Adding ${notifications.length} notification(s) for ${subscription.id}`); if (notificationsToAdd.length > 0) {
await subscriptionManager.addNotifications(subscription.id, notifications); console.log(`[Poller] Adding ${notificationsToAdd.length} notification(s) for ${subscription.id}`);
await subscriptionManager.addNotifications(subscription.id, notificationsToAdd);
} else { } else {
console.log(`[Poller] No new notifications found for ${subscription.id}`); console.log(`[Poller] No new notifications found for ${subscription.id}`);
} }
@@ -70,20 +73,19 @@ class Poller {
})(); })();
} }
deletedSids(notifications) { /**
return new Set( * Groups notifications by sid and returns only the latest (highest time) for each sequence.
notifications * Returns an object mapping sid -> latest notification.
.filter(n => n.sid && n.deleted) */
.map(n => n.sid) latestNotificationsBySid(notifications) {
); const latestBySid = {};
} notifications.forEach((notification) => {
const sid = notification.sid || notification.id;
newOrUpdatedNotifications(notifications, deletedSids) { if (!(sid in latestBySid) || notification.time >= latestBySid[sid].time) {
return notifications latestBySid[sid] = notification;
.filter((notification) => { }
const sid = notification.sid || notification.id; });
return !deletedSids.has(notification.id) && !deletedSids.has(sid) && !notification.deleted; return latestBySid;
});
} }
} }

View File

@@ -15,7 +15,7 @@ class SubscriptionManager {
return Promise.all( return Promise.all(
subscriptions.map(async (s) => ({ subscriptions.map(async (s) => ({
...s, ...s,
new: await this.db.notifications.where({ subscriptionId: s.id, new: 1 }).count() new: await this.db.notifications.where({ subscriptionId: s.id, new: 1 }).count(),
})) }))
); );
} }
@@ -83,7 +83,7 @@ class SubscriptionManager {
baseUrl, baseUrl,
topic, topic,
mutedUntil: 0, mutedUntil: 0,
last: null last: null,
}; };
await this.db.subscriptions.put(subscription); await this.db.subscriptions.put(subscription);
@@ -101,7 +101,7 @@ class SubscriptionManager {
const local = await this.add(remote.base_url, remote.topic, { const local = await this.add(remote.base_url, remote.topic, {
displayName: remote.display_name, // May be undefined displayName: remote.display_name, // May be undefined
reservation // May be null! reservation, // May be null!
}); });
return local.id; return local.id;
@@ -157,22 +157,18 @@ class SubscriptionManager {
// It's actually fine, because the reading and filtering is quite fast. The rendering is what's // It's actually fine, because the reading and filtering is quite fast. The rendering is what's
// killing performance. See https://dexie.org/docs/Collection/Collection.offset()#a-better-paging-approach // killing performance. See https://dexie.org/docs/Collection/Collection.offset()#a-better-paging-approach
const notifications = await this.db.notifications return this.db.notifications
.orderBy("time") // Sort by time .orderBy("time") // Sort by time
.filter((n) => n.subscriptionId === subscriptionId) .filter((n) => n.subscriptionId === subscriptionId)
.reverse() .reverse()
.toArray(); .toArray();
return this.groupNotificationsBySID(notifications);
} }
async getAllNotifications() { async getAllNotifications() {
const notifications = await this.db.notifications return this.db.notifications
.orderBy("time") // Efficient, see docs .orderBy("time") // Efficient, see docs
.reverse() .reverse()
.toArray(); .toArray();
return this.groupNotificationsBySID(notifications);
} }
// Collapse notification updates based on sids, keeping only the latest version // Collapse notification updates based on sids, keeping only the latest version
@@ -204,13 +200,13 @@ class SubscriptionManager {
await this.db.notifications.add({ await this.db.notifications.add({
...messageWithSID(notification), ...messageWithSID(notification),
subscriptionId, subscriptionId,
new: 1 // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation new: 1, // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation
}); });
// FIXME consider put() for double tab // FIXME consider put() for double tab
// Update subscription last message id (for ?since=... queries) // Update subscription last message id (for ?since=... queries)
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
last: notification.id last: notification.id,
}); });
} catch (e) { } catch (e) {
console.error(`[SubscriptionManager] Error adding notification`, e); console.error(`[SubscriptionManager] Error adding notification`, e);
@@ -226,7 +222,7 @@ class SubscriptionManager {
const lastNotificationId = notifications.at(-1).id; const lastNotificationId = notifications.at(-1).id;
await this.db.notifications.bulkPut(notificationsWithSubscriptionId); await this.db.notifications.bulkPut(notificationsWithSubscriptionId);
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
last: lastNotificationId last: lastNotificationId,
}); });
} }
@@ -269,19 +265,19 @@ class SubscriptionManager {
async setMutedUntil(subscriptionId, mutedUntil) { async setMutedUntil(subscriptionId, mutedUntil) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
mutedUntil mutedUntil,
}); });
} }
async setDisplayName(subscriptionId, displayName) { async setDisplayName(subscriptionId, displayName) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
displayName displayName,
}); });
} }
async setReservation(subscriptionId, reservation) { async setReservation(subscriptionId, reservation) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
reservation reservation,
}); });
} }

View File

@@ -57,6 +57,11 @@ export const useConnectionListeners = (account, subscriptions, users, webPushTop
}; };
const handleNewOrUpdatedNotification = async (subscriptionId, notification) => { const handleNewOrUpdatedNotification = async (subscriptionId, notification) => {
// Delete existing notification with same sid, if any
if (notification.sid) {
await subscriptionManager.deleteNotificationBySid(subscriptionId, notification.sid);
}
// Add notification to database
const added = await subscriptionManager.addNotification(subscriptionId, notification); const added = await subscriptionManager.addNotification(subscriptionId, notification);
if (added) { if (added) {
await subscriptionManager.notify(subscriptionId, notification); await subscriptionManager.notify(subscriptionId, notification);