Switch to event type

This commit is contained in:
binwiederhier
2026-01-08 20:50:23 -05:00
parent 66ea25c18b
commit 5ad3de2904
11 changed files with 187 additions and 67 deletions

View File

@@ -125,7 +125,7 @@ var (
errHTTPBadRequestInvalidUsername = &errHTTP{40046, http.StatusBadRequest, "invalid request: invalid username", "", nil} errHTTPBadRequestInvalidUsername = &errHTTP{40046, http.StatusBadRequest, "invalid request: invalid username", "", nil}
errHTTPBadRequestTemplateFileNotFound = &errHTTP{40047, http.StatusBadRequest, "invalid request: template file not found", "https://ntfy.sh/docs/publish/#message-templating", nil} errHTTPBadRequestTemplateFileNotFound = &errHTTP{40047, http.StatusBadRequest, "invalid request: template file not found", "https://ntfy.sh/docs/publish/#message-templating", nil}
errHTTPBadRequestTemplateFileInvalid = &errHTTP{40048, http.StatusBadRequest, "invalid request: template file invalid", "https://ntfy.sh/docs/publish/#message-templating", nil} errHTTPBadRequestTemplateFileInvalid = &errHTTP{40048, http.StatusBadRequest, "invalid request: template file invalid", "https://ntfy.sh/docs/publish/#message-templating", nil}
errHTTPBadRequestSIDInvalid = &errHTTP{40049, http.StatusBadRequest, "invalid request: sequence ID invalid", "https://ntfy.sh/docs/publish/#TODO", nil} errHTTPBadRequestSequenceIDInvalid = &errHTTP{40049, http.StatusBadRequest, "invalid request: sequence ID invalid", "https://ntfy.sh/docs/publish/#TODO", nil}
errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil} errHTTPNotFound = &errHTTP{40401, http.StatusNotFound, "page not found", "", nil}
errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil} errHTTPUnauthorized = &errHTTP{40101, http.StatusUnauthorized, "unauthorized", "https://ntfy.sh/docs/publish/#authentication", nil}
errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil} errHTTPForbidden = &errHTTP{40301, http.StatusForbidden, "forbidden", "https://ntfy.sh/docs/publish/#authentication", nil}

View File

@@ -51,7 +51,7 @@ const (
content_type TEXT NOT NULL, content_type TEXT NOT NULL,
encoding TEXT NOT NULL, encoding TEXT NOT NULL,
published INT NOT NULL, published INT NOT NULL,
deleted INT NOT NULL event TEXT NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid); CREATE INDEX IF NOT EXISTS idx_mid ON messages (mid);
CREATE INDEX IF NOT EXISTS idx_sequence_id ON messages (sequence_id); CREATE INDEX IF NOT EXISTS idx_sequence_id ON messages (sequence_id);
@@ -69,58 +69,57 @@ const (
COMMIT; COMMIT;
` `
insertMessageQuery = ` insertMessageQuery = `
INSERT INTO messages (mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published, deleted) INSERT INTO messages (mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published, event)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
` `
deleteMessageQuery = `DELETE FROM messages WHERE mid = ?` deleteMessageQuery = `DELETE FROM messages WHERE mid = ?`
updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
selectMessagesByIDQuery = ` selectMessagesByIDQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE mid = ? WHERE mid = ?
` `
selectMessagesSinceTimeQuery = ` selectMessagesSinceTimeQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE topic = ? AND time >= ? AND published = 1 WHERE topic = ? AND time >= ? AND published = 1
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceTimeIncludeScheduledQuery = ` selectMessagesSinceTimeIncludeScheduledQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE topic = ? AND time >= ? WHERE topic = ? AND time >= ?
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceIDQuery = ` selectMessagesSinceIDQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE topic = ? AND id > ? AND published = 1 WHERE topic = ? AND id > ? AND published = 1
ORDER BY time, id ORDER BY time, id
` `
selectMessagesSinceIDIncludeScheduledQuery = ` selectMessagesSinceIDIncludeScheduledQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE topic = ? AND (id > ? OR published = 0) WHERE topic = ? AND (id > ? OR published = 0)
ORDER BY time, id ORDER BY time, id
` `
selectMessagesLatestQuery = ` selectMessagesLatestQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE topic = ? AND published = 1 WHERE topic = ? AND published = 1
ORDER BY time DESC, id DESC ORDER BY time DESC, id DESC
LIMIT 1 LIMIT 1
` `
selectMessagesDueQuery = ` selectMessagesDueQuery = `
SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, deleted SELECT mid, sequence_id, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding, event
FROM messages FROM messages
WHERE time <= ? AND published = 0 WHERE time <= ? AND published = 0
ORDER BY time, id ORDER BY time, id
` `
selectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` selectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1`
updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` updateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?`
updateMessageDeletedQuery = `UPDATE messages SET deleted = 1 WHERE mid = ?` selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessagesCountQuery = `SELECT COUNT(*) FROM messages`
selectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic` selectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic`
selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` selectTopicsQuery = `SELECT topic FROM messages GROUP BY topic`
@@ -268,7 +267,7 @@ const (
//13 -> 14 //13 -> 14
migrate13To14AlterMessagesTableQuery = ` migrate13To14AlterMessagesTableQuery = `
ALTER TABLE messages ADD COLUMN sequence_id TEXT NOT NULL DEFAULT(''); ALTER TABLE messages ADD COLUMN sequence_id TEXT NOT NULL DEFAULT('');
ALTER TABLE messages ADD COLUMN deleted INT NOT NULL DEFAULT('0'); ALTER TABLE messages ADD COLUMN event TEXT NOT NULL DEFAULT('message');
CREATE INDEX IF NOT EXISTS idx_sequence_id ON messages (sequence_id); CREATE INDEX IF NOT EXISTS idx_sequence_id ON messages (sequence_id);
` `
) )
@@ -381,7 +380,7 @@ func (c *messageCache) addMessages(ms []*message) error {
} }
defer stmt.Close() defer stmt.Close()
for _, m := range ms { for _, m := range ms {
if m.Event != messageEvent { if m.Event != messageEvent && m.Event != messageDeleteEvent && m.Event != messageReadEvent {
return errUnexpectedMessageType return errUnexpectedMessageType
} }
published := m.Time <= time.Now().Unix() published := m.Time <= time.Now().Unix()
@@ -431,7 +430,7 @@ func (c *messageCache) addMessages(ms []*message) error {
m.ContentType, m.ContentType,
m.Encoding, m.Encoding,
published, published,
m.Deleted, m.Event,
) )
if err != nil { if err != nil {
return err return err
@@ -720,8 +719,7 @@ func readMessages(rows *sql.Rows) ([]*message, error) {
func readMessage(rows *sql.Rows) (*message, error) { func readMessage(rows *sql.Rows) (*message, error) {
var timestamp, expires, attachmentSize, attachmentExpires int64 var timestamp, expires, attachmentSize, attachmentExpires int64
var priority int var priority int
var id, sequenceID, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding string var id, sequenceID, topic, msg, title, tagsStr, click, icon, actionsStr, attachmentName, attachmentType, attachmentURL, sender, user, contentType, encoding, event string
var deleted bool
err := rows.Scan( err := rows.Scan(
&id, &id,
&sequenceID, &sequenceID,
@@ -744,7 +742,7 @@ func readMessage(rows *sql.Rows) (*message, error) {
&user, &user,
&contentType, &contentType,
&encoding, &encoding,
&deleted, &event,
) )
if err != nil { if err != nil {
return nil, err return nil, err
@@ -782,7 +780,7 @@ func readMessage(rows *sql.Rows) (*message, error) {
SequenceID: sequenceID, SequenceID: sequenceID,
Time: timestamp, Time: timestamp,
Expires: expires, Expires: expires,
Event: messageEvent, Event: event,
Topic: topic, Topic: topic,
Message: msg, Message: msg,
Title: title, Title: title,
@@ -796,7 +794,6 @@ func readMessage(rows *sql.Rows) (*message, error) {
User: user, User: user,
ContentType: contentType, ContentType: contentType,
Encoding: encoding, Encoding: encoding,
Deleted: deleted,
}, nil }, nil
} }

View File

@@ -80,8 +80,9 @@ var (
wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`) wsPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/ws$`)
authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`) authPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}(,[-_A-Za-z0-9]{1,64})*/auth$`)
publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`) publishPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/(publish|send|trigger)$`)
sidRegex = topicRegex
updatePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}$`) updatePathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}$`)
markReadPathRegex = regexp.MustCompile(`^/[-_A-Za-z0-9]{1,64}/[-_A-Za-z0-9]{1,64}/read$`)
sequenceIDRegex = topicRegex
webConfigPath = "/config.js" webConfigPath = "/config.js"
webManifestPath = "/manifest.webmanifest" webManifestPath = "/manifest.webmanifest"
@@ -140,7 +141,6 @@ const (
firebaseControlTopic = "~control" // See Android if changed firebaseControlTopic = "~control" // See Android if changed
firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now) firebasePollTopic = "~poll" // See iOS if changed (DISABLED for now)
emptyMessageBody = "triggered" // Used when a message body is empty emptyMessageBody = "triggered" // Used when a message body is empty
deletedMessageBody = "deleted" // Used when a message is deleted
newMessageBody = "New message" // Used in poll requests as generic message newMessageBody = "New message" // Used in poll requests as generic message
defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment defaultAttachmentMessage = "You received a file: %s" // Used if message body is empty, and there is an attachment
encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages encodingBase64 = "base64" // Used mainly for binary UnifiedPush messages
@@ -550,6 +550,8 @@ func (s *Server) handleInternal(w http.ResponseWriter, r *http.Request, v *visit
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v) return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
} else if r.Method == http.MethodDelete && updatePathRegex.MatchString(r.URL.Path) { } else if r.Method == http.MethodDelete && updatePathRegex.MatchString(r.URL.Path) {
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleDelete))(w, r, v) return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleDelete))(w, r, v)
} else if r.Method == http.MethodPut && markReadPathRegex.MatchString(r.URL.Path) {
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handleMarkRead))(w, r, v)
} else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) { } else if r.Method == http.MethodGet && publishPathRegex.MatchString(r.URL.Path) {
return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v) return s.limitRequestsWithTopic(s.authorizeTopicWrite(s.handlePublish))(w, r, v)
} else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) { } else if r.Method == http.MethodGet && jsonPathRegex.MatchString(r.URL.Path) {
@@ -921,10 +923,8 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request, v *visitor
if e != nil { if e != nil {
return e.With(t) return e.With(t)
} }
// Create a delete message: empty body, same SequenceID, deleted flag set // Create a delete message with event type message_delete
m := newDefaultMessage(t.ID, deletedMessageBody) m := newActionMessage(messageDeleteEvent, t.ID, sequenceID)
m.SequenceID = sequenceID
m.Deleted = true
m.Sender = v.IP() m.Sender = v.IP()
m.User = v.MaybeUserID() m.User = v.MaybeUserID()
m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix() m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
@@ -951,6 +951,50 @@ func (s *Server) handleDelete(w http.ResponseWriter, r *http.Request, v *visitor
return s.writeJSON(w, m) return s.writeJSON(w, m)
} }
func (s *Server) handleMarkRead(w http.ResponseWriter, r *http.Request, v *visitor) error {
t, err := fromContext[*topic](r, contextTopic)
if err != nil {
return err
}
vrate, err := fromContext[*visitor](r, contextRateVisitor)
if err != nil {
return err
}
if !util.ContainsIP(s.config.VisitorRequestExemptPrefixes, v.ip) && !vrate.MessageAllowed() {
return errHTTPTooManyRequestsLimitMessages.With(t)
}
sequenceID, e := s.sequenceIDFromPath(r.URL.Path)
if e != nil {
return e.With(t)
}
// Create a read message with event type message_read
m := newActionMessage(messageReadEvent, t.ID, sequenceID)
m.Sender = v.IP()
m.User = v.MaybeUserID()
m.Expires = time.Unix(m.Time, 0).Add(v.Limits().MessageExpiryDuration).Unix()
// Publish to subscribers
if err := t.Publish(v, m); err != nil {
return err
}
// Send to Firebase for Android clients
if s.firebaseClient != nil {
go s.sendToFirebase(v, m)
}
// Send to web push endpoints
if s.config.WebPushPublicKey != "" {
go s.publishToWebPushEndpoints(v, m)
}
// Add to message cache
if err := s.messageCache.AddMessage(m); err != nil {
return err
}
logvrm(v, r, m).Tag(tagPublish).Debug("Marked message as read with sequence ID %s", sequenceID)
s.mu.Lock()
s.messages++
s.mu.Unlock()
return s.writeJSON(w, m)
}
func (s *Server) sendToFirebase(v *visitor, m *message) { func (s *Server) sendToFirebase(v *visitor, m *message) {
logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase") logvm(v, m).Tag(tagFirebase).Debug("Publishing to Firebase")
if err := s.firebaseClient.Send(v, m); err != nil { if err := s.firebaseClient.Send(v, m); err != nil {
@@ -1017,10 +1061,10 @@ func (s *Server) parsePublishParams(r *http.Request, m *message) (cache bool, fi
} else { } else {
sequenceID := readParam(r, "x-sequence-id", "sequence-id", "sid") sequenceID := readParam(r, "x-sequence-id", "sequence-id", "sid")
if sequenceID != "" { if sequenceID != "" {
if sidRegex.MatchString(sequenceID) { if sequenceIDRegex.MatchString(sequenceID) {
m.SequenceID = sequenceID m.SequenceID = sequenceID
} else { } else {
return false, false, "", "", "", false, errHTTPBadRequestSIDInvalid return false, false, "", "", "", false, errHTTPBadRequestSequenceIDInvalid
} }
} else { } else {
m.SequenceID = m.ID m.SequenceID = m.ID
@@ -1767,8 +1811,8 @@ func (s *Server) topicsFromPath(path string) ([]*topic, string, error) {
// sequenceIDFromPath returns the sequence ID from a POST path like /mytopic/sequenceIdHere // sequenceIDFromPath returns the sequence ID from a POST path like /mytopic/sequenceIdHere
func (s *Server) sequenceIDFromPath(path string) (string, *errHTTP) { func (s *Server) sequenceIDFromPath(path string) (string, *errHTTP) {
parts := strings.Split(path, "/") parts := strings.Split(path, "/")
if len(parts) != 3 { if len(parts) < 3 {
return "", errHTTPBadRequestSIDInvalid return "", errHTTPBadRequestSequenceIDInvalid
} }
return parts[2], nil return parts[2], nil
} }

View File

@@ -12,10 +12,12 @@ import (
// List of possible events // List of possible events
const ( const (
openEvent = "open" openEvent = "open"
keepaliveEvent = "keepalive" keepaliveEvent = "keepalive"
messageEvent = "message" messageEvent = "message"
pollRequestEvent = "poll_request" messageDeleteEvent = "message_delete"
messageReadEvent = "message_read"
pollRequestEvent = "poll_request"
) )
const ( const (
@@ -41,7 +43,6 @@ type message struct {
PollID string `json:"poll_id,omitempty"` PollID string `json:"poll_id,omitempty"`
ContentType string `json:"content_type,omitempty"` // text/plain by default (if empty), or text/markdown ContentType string `json:"content_type,omitempty"` // text/plain by default (if empty), or text/markdown
Encoding string `json:"encoding,omitempty"` // Empty for raw UTF-8, or "base64" for encoded bytes Encoding string `json:"encoding,omitempty"` // Empty for raw UTF-8, or "base64" for encoded bytes
Deleted bool `json:"deleted,omitempty"` // True if message is marked as deleted
Sender netip.Addr `json:"-"` // IP address of uploader, used for rate limiting Sender netip.Addr `json:"-"` // IP address of uploader, used for rate limiting
User string `json:"-"` // UserID of the uploader, used to associated attachments User string `json:"-"` // UserID of the uploader, used to associated attachments
} }
@@ -149,6 +150,13 @@ func newPollRequestMessage(topic, pollID string) *message {
return m return m
} }
// newActionMessage creates a new action message (message_delete or message_read)
func newActionMessage(event, topic, sequenceID string) *message {
m := newMessage(event, topic, "")
m.SequenceID = sequenceID
return m
}
func validMessageID(s string) bool { func validMessageID(s string) bool {
return util.ValidRandomString(s, messageIDLength) return util.ValidRandomString(s, messageIDLength)
} }
@@ -227,7 +235,7 @@ func parseQueryFilters(r *http.Request) (*queryFilter, error) {
} }
func (q *queryFilter) Pass(msg *message) bool { func (q *queryFilter) Pass(msg *message) bool {
if msg.Event != messageEvent { if msg.Event != messageEvent && msg.Event != messageDeleteEvent && msg.Event != messageReadEvent {
return true // filters only apply to messages return true // filters only apply to messages
} else if q.ID != "" && msg.ID != q.ID { } else if q.ID != "" && msg.ID != q.ID {
return false return false

View File

@@ -9,6 +9,7 @@ import { dbAsync } from "../src/app/db";
import { toNotificationParams, icon, badge } from "../src/app/notificationUtils"; import { toNotificationParams, icon, badge } from "../src/app/notificationUtils";
import initI18n from "../src/app/i18n"; import initI18n from "../src/app/i18n";
import { messageWithSequenceId } from "../src/app/utils"; import { messageWithSequenceId } from "../src/app/utils";
import { EVENT_MESSAGE, EVENT_MESSAGE_DELETE, EVENT_MESSAGE_READ } from "../src/app/events";
/** /**
* General docs for service workers and PWAs: * General docs for service workers and PWAs:
@@ -62,11 +63,6 @@ const handlePushMessage = async (data) => {
// Add notification to database // Add notification to database
await addNotification({ subscriptionId, message }); await addNotification({ subscriptionId, message });
// Don't show a notification for deleted messages
if (message.deleted) {
return;
}
// Broadcast the message to potentially play a sound // Broadcast the message to potentially play a sound
broadcastChannel.postMessage(message); broadcastChannel.postMessage(message);
@@ -80,6 +76,51 @@ const handlePushMessage = async (data) => {
); );
}; };
/**
* Handle a message_delete event: delete the notification from the database.
*/
const handlePushMessageDelete = async (data) => {
const { subscription_id: subscriptionId, message } = data;
const db = await dbAsync();
// Delete notification with the same sequence_id
const sequenceId = message.sequence_id;
if (sequenceId) {
console.log("[ServiceWorker] Deleting notification with sequenceId", { subscriptionId, sequenceId });
await db.notifications.where({ subscriptionId, sequenceId }).delete();
}
// Update subscription last message id (for ?since=... queries)
await db.subscriptions.update(subscriptionId, {
last: message.id,
});
};
/**
* Handle a message_read event: mark the notification as read.
*/
const handlePushMessageRead = async (data) => {
const { subscription_id: subscriptionId, message } = data;
const db = await dbAsync();
// Mark notification as read (set new = 0)
const sequenceId = message.sequence_id;
if (sequenceId) {
console.log("[ServiceWorker] Marking notification as read", { subscriptionId, sequenceId });
await db.notifications.where({ subscriptionId, sequenceId }).modify({ new: 0 });
}
// Update subscription last message id (for ?since=... queries)
await db.subscriptions.update(subscriptionId, {
last: message.id,
});
// Update badge count
const badgeCount = await db.notifications.where({ new: 1 }).count();
console.log("[ServiceWorker] Setting new app badge count", { badgeCount });
self.navigator.setAppBadge?.(badgeCount);
};
/** /**
* Handle a received web push subscription expiring. * Handle a received web push subscription expiring.
*/ */
@@ -114,8 +155,12 @@ const handlePushUnknown = async (data) => {
* @param {object} data see server/types.go, type webPushPayload * @param {object} data see server/types.go, type webPushPayload
*/ */
const handlePush = async (data) => { const handlePush = async (data) => {
if (data.event === "message") { if (data.event === EVENT_MESSAGE) {
await handlePushMessage(data); await handlePushMessage(data);
} else if (data.event === EVENT_MESSAGE_DELETE) {
await handlePushMessageDelete(data);
} else if (data.event === EVENT_MESSAGE_READ) {
await handlePushMessageRead(data);
} else if (data.event === "subscription_expiring") { } else if (data.event === "subscription_expiring") {
await handlePushSubscriptionExpiring(data); await handlePushSubscriptionExpiring(data);
} else { } else {

View File

@@ -1,5 +1,6 @@
/* eslint-disable max-classes-per-file */ /* eslint-disable max-classes-per-file */
import { basicAuth, bearerAuth, encodeBase64Url, topicShortUrl, topicUrlWs } from "./utils"; import { basicAuth, bearerAuth, encodeBase64Url, topicShortUrl, topicUrlWs } from "./utils";
import { EVENT_OPEN, isNotificationEvent } from "./events";
const retryBackoffSeconds = [5, 10, 20, 30, 60, 120]; const retryBackoffSeconds = [5, 10, 20, 30, 60, 120];
@@ -48,10 +49,11 @@ class Connection {
console.log(`[Connection, ${this.shortUrl}, ${this.connectionId}] Message received from server: ${event.data}`); console.log(`[Connection, ${this.shortUrl}, ${this.connectionId}] Message received from server: ${event.data}`);
try { try {
const data = JSON.parse(event.data); const data = JSON.parse(event.data);
if (data.event === "open") { if (data.event === EVENT_OPEN) {
return; return;
} }
const relevantAndValid = data.event === "message" && "id" in data && "time" in data && "message" in data; // Accept message, message_delete, and message_read events
const relevantAndValid = isNotificationEvent(data.event) && "id" in data && "time" in data;
if (!relevantAndValid) { if (!relevantAndValid) {
console.log(`[Connection, ${this.shortUrl}, ${this.connectionId}] Unexpected message. Ignoring.`); console.log(`[Connection, ${this.shortUrl}, ${this.connectionId}] Unexpected message. Ignoring.`);
return; return;

View File

@@ -1,6 +1,7 @@
import api from "./Api"; import api from "./Api";
import prefs from "./Prefs"; import prefs from "./Prefs";
import subscriptionManager from "./SubscriptionManager"; import subscriptionManager from "./SubscriptionManager";
import { EVENT_MESSAGE, EVENT_MESSAGE_DELETE } from "./events";
const delayMillis = 2000; // 2 seconds const delayMillis = 2000; // 2 seconds
const intervalMillis = 300000; // 5 minutes const intervalMillis = 300000; // 5 minutes
@@ -55,7 +56,7 @@ class Poller {
// Delete all existing notifications for which the latest notification is marked as deleted // Delete all existing notifications for which the latest notification is marked as deleted
const deletedSequenceIds = Object.entries(latestBySequenceId) const deletedSequenceIds = Object.entries(latestBySequenceId)
.filter(([, notification]) => notification.deleted) .filter(([, notification]) => notification.event === EVENT_MESSAGE_DELETE)
.map(([sequenceId]) => sequenceId); .map(([sequenceId]) => sequenceId);
if (deletedSequenceIds.length > 0) { if (deletedSequenceIds.length > 0) {
console.log(`[Poller] Deleting notifications with deleted sequence IDs for ${subscription.id}`, deletedSequenceIds); console.log(`[Poller] Deleting notifications with deleted sequence IDs for ${subscription.id}`, deletedSequenceIds);
@@ -65,7 +66,9 @@ class Poller {
} }
// Add only the latest notification for each non-deleted sequence // Add only the latest notification for each non-deleted sequence
const notificationsToAdd = Object.values(latestBySequenceId).filter((n) => !n.deleted); const notificationsToAdd = Object
.values(latestBySequenceId)
.filter(n => n.event === EVENT_MESSAGE);
if (notificationsToAdd.length > 0) { if (notificationsToAdd.length > 0) {
console.log(`[Poller] Adding ${notificationsToAdd.length} notification(s) for ${subscription.id}`); console.log(`[Poller] Adding ${notificationsToAdd.length} notification(s) for ${subscription.id}`);
await subscriptionManager.addNotifications(subscription.id, notificationsToAdd); await subscriptionManager.addNotifications(subscription.id, notificationsToAdd);

View File

@@ -3,6 +3,7 @@ import notifier from "./Notifier";
import prefs from "./Prefs"; import prefs from "./Prefs";
import db from "./db"; import db from "./db";
import { messageWithSequenceId, topicUrl } from "./utils"; import { messageWithSequenceId, topicUrl } from "./utils";
import { EVENT_MESSAGE, EVENT_MESSAGE_DELETE, EVENT_MESSAGE_READ } from "./events";
class SubscriptionManager { class SubscriptionManager {
constructor(dbImpl) { constructor(dbImpl) {
@@ -15,7 +16,7 @@ class SubscriptionManager {
return Promise.all( return Promise.all(
subscriptions.map(async (s) => ({ subscriptions.map(async (s) => ({
...s, ...s,
new: await this.db.notifications.where({ subscriptionId: s.id, new: 1 }).count(), new: await this.db.notifications.where({ subscriptionId: s.id, new: 1 }).count()
})) }))
); );
} }
@@ -48,7 +49,7 @@ class SubscriptionManager {
} }
async notify(subscriptionId, notification) { async notify(subscriptionId, notification) {
if (notification.deleted) { if (notification.event !== EVENT_MESSAGE) {
return; return;
} }
const subscription = await this.get(subscriptionId); const subscription = await this.get(subscriptionId);
@@ -83,7 +84,7 @@ class SubscriptionManager {
baseUrl, baseUrl,
topic, topic,
mutedUntil: 0, mutedUntil: 0,
last: null, last: null
}; };
await this.db.subscriptions.put(subscription); await this.db.subscriptions.put(subscription);
@@ -101,7 +102,7 @@ class SubscriptionManager {
const local = await this.add(remote.base_url, remote.topic, { const local = await this.add(remote.base_url, remote.topic, {
displayName: remote.display_name, // May be undefined displayName: remote.display_name, // May be undefined
reservation, // May be null! reservation // May be null!
}); });
return local.id; return local.id;
@@ -174,7 +175,7 @@ class SubscriptionManager {
/** Adds notification, or returns false if it already exists */ /** Adds notification, or returns false if it already exists */
async addNotification(subscriptionId, notification) { async addNotification(subscriptionId, notification) {
const exists = await this.db.notifications.get(notification.id); const exists = await this.db.notifications.get(notification.id);
if (exists || notification.deleted) { if (exists || notification.event === EVENT_MESSAGE_DELETE || notification.event === EVENT_MESSAGE_READ) {
return false; return false;
} }
try { try {
@@ -185,13 +186,13 @@ class SubscriptionManager {
await this.db.notifications.add({ await this.db.notifications.add({
...messageWithSequenceId(notification), ...messageWithSequenceId(notification),
subscriptionId, subscriptionId,
new: 1, // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation new: 1 // New marker (used for bubble indicator); cannot be boolean; Dexie index limitation
}); });
// FIXME consider put() for double tab // FIXME consider put() for double tab
// Update subscription last message id (for ?since=... queries) // Update subscription last message id (for ?since=... queries)
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
last: notification.id, last: notification.id
}); });
} catch (e) { } catch (e) {
console.error(`[SubscriptionManager] Error adding notification`, e); console.error(`[SubscriptionManager] Error adding notification`, e);
@@ -207,7 +208,7 @@ class SubscriptionManager {
const lastNotificationId = notifications.at(-1).id; const lastNotificationId = notifications.at(-1).id;
await this.db.notifications.bulkPut(notificationsWithSubscriptionId); await this.db.notifications.bulkPut(notificationsWithSubscriptionId);
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
last: lastNotificationId, last: lastNotificationId
}); });
} }
@@ -250,19 +251,19 @@ class SubscriptionManager {
async setMutedUntil(subscriptionId, mutedUntil) { async setMutedUntil(subscriptionId, mutedUntil) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
mutedUntil, mutedUntil
}); });
} }
async setDisplayName(subscriptionId, displayName) { async setDisplayName(subscriptionId, displayName) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
displayName, displayName
}); });
} }
async setReservation(subscriptionId, reservation) { async setReservation(subscriptionId, reservation) {
await this.db.subscriptions.update(subscriptionId, { await this.db.subscriptions.update(subscriptionId, {
reservation, reservation
}); });
} }

View File

@@ -13,7 +13,7 @@ const createDatabase = (username) => {
db.version(3).stores({ db.version(3).stores({
subscriptions: "&id,baseUrl,[baseUrl+mutedUntil]", subscriptions: "&id,baseUrl,[baseUrl+mutedUntil]",
notifications: "&id,sequenceId,subscriptionId,time,new,deleted,[subscriptionId+new],[subscriptionId+sequenceId]", notifications: "&id,sequenceId,subscriptionId,time,new,[subscriptionId+new],[subscriptionId+sequenceId]",
users: "&baseUrl,username", users: "&baseUrl,username",
prefs: "&key" prefs: "&key"
}); });

14
web/src/app/events.js Normal file
View File

@@ -0,0 +1,14 @@
// Event types for ntfy messages
// These correspond to the server event types in server/types.go
export const EVENT_OPEN = "open";
export const EVENT_KEEPALIVE = "keepalive";
export const EVENT_MESSAGE = "message";
export const EVENT_MESSAGE_DELETE = "message_delete";
export const EVENT_MESSAGE_READ = "message_read";
export const EVENT_POLL_REQUEST = "poll_request";
// Check if an event is a notification event (message, delete, or read)
export const isNotificationEvent = (event) =>
event === EVENT_MESSAGE || event === EVENT_MESSAGE_DELETE || event === EVENT_MESSAGE_READ;

View File

@@ -12,6 +12,7 @@ import accountApi from "../app/AccountApi";
import { UnauthorizedError } from "../app/errors"; import { UnauthorizedError } from "../app/errors";
import notifier from "../app/Notifier"; import notifier from "../app/Notifier";
import prefs from "../app/Prefs"; import prefs from "../app/Prefs";
import { EVENT_MESSAGE_DELETE, EVENT_MESSAGE_READ } from "../app/events";
/** /**
* Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection * Wire connectionManager and subscriptionManager so that subscriptions are updated when the connection
@@ -53,13 +54,18 @@ export const useConnectionListeners = (account, subscriptions, users, webPushTop
// Note: This logic is duplicated in the Android app in SubscriberService::onNotificationReceived() // Note: This logic is duplicated in the Android app in SubscriberService::onNotificationReceived()
// and FirebaseService::handleMessage(). // and FirebaseService::handleMessage().
// Delete existing notification with same sequenceId, if any if (notification.event === EVENT_MESSAGE_DELETE && notification.sequence_id) {
const sequenceId = notification.sequence_id || notification.id; // Handle delete: remove notification from database
if (sequenceId) { await subscriptionManager.deleteNotificationBySequenceId(subscriptionId, notification.sequence_id);
await subscriptionManager.deleteNotificationBySequenceId(subscriptionId, sequenceId); } else if (notification.event === EVENT_MESSAGE_READ && notification.sequence_id) {
} // Handle read: mark notification as read
// Add notification to database await subscriptionManager.markNotificationReadBySequenceId(subscriptionId, notification.sequence_id);
if (!notification.deleted) { } else {
// Regular message: delete existing and add new
const sequenceId = notification.sequence_id || notification.id;
if (sequenceId) {
await subscriptionManager.deleteNotificationBySequenceId(subscriptionId, sequenceId);
}
const added = await subscriptionManager.addNotification(subscriptionId, notification); const added = await subscriptionManager.addNotification(subscriptionId, notification);
if (added) { if (added) {
await subscriptionManager.notify(subscriptionId, notification); await subscriptionManager.notify(subscriptionId, notification);