From c23d201186813bfb376321af5ec9683ef5ada1d4 Mon Sep 17 00:00:00 2001 From: binwiederhier Date: Sun, 18 Jan 2026 15:50:40 -0500 Subject: [PATCH] Updated/cancel scheduled messages --- server/message_cache.go | 9 ++++ server/message_cache_test.go | 69 +++++++++++++++++++++++++ server/server.go | 8 +++ server/server_test.go | 98 ++++++++++++++++++++++++++++++++++++ 4 files changed, 184 insertions(+) diff --git a/server/message_cache.go b/server/message_cache.go index 342f9687..df6b4e54 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -73,6 +73,7 @@ const ( VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` deleteMessageQuery = `DELETE FROM messages WHERE mid = ?` + deleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics selectMessagesByIDQuery = ` @@ -607,6 +608,14 @@ func (c *messageCache) DeleteMessages(ids ...string) error { return tx.Commit() } +// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID +func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) error { + c.mu.Lock() + defer c.mu.Unlock() + _, err := c.db.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID) + return err +} + func (c *messageCache) ExpireMessages(topics ...string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 1e285605..25f2a91b 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -703,6 +703,75 @@ func testSender(t *testing.T, c *messageCache) { require.Equal(t, messages[1].Sender, netip.Addr{}) } +func TestSqliteCache_DeleteScheduledBySequenceID(t *testing.T) { + testDeleteScheduledBySequenceID(t, newSqliteTestCache(t)) +} + +func TestMemCache_DeleteScheduledBySequenceID(t *testing.T) { + testDeleteScheduledBySequenceID(t, newMemTestCache(t)) +} + +func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) { + // Create a scheduled (unpublished) message + scheduledMsg := newDefaultMessage("mytopic", "scheduled message") + scheduledMsg.ID = "scheduled1" + scheduledMsg.SequenceID = "seq123" + scheduledMsg.Time = time.Now().Add(time.Hour).Unix() // Future time makes it scheduled + require.Nil(t, c.AddMessage(scheduledMsg)) + + // Create a published message with different sequence ID + publishedMsg := newDefaultMessage("mytopic", "published message") + publishedMsg.ID = "published1" + publishedMsg.SequenceID = "seq456" + publishedMsg.Time = time.Now().Add(-time.Hour).Unix() // Past time makes it published + require.Nil(t, c.AddMessage(publishedMsg)) + + // Create a scheduled message in a different topic + otherTopicMsg := newDefaultMessage("othertopic", "other scheduled") + otherTopicMsg.ID = "other1" + otherTopicMsg.SequenceID = "seq123" // Same sequence ID as scheduledMsg + otherTopicMsg.Time = time.Now().Add(time.Hour).Unix() + require.Nil(t, c.AddMessage(otherTopicMsg)) + + // Verify all messages exist (including scheduled) + messages, err := c.Messages("mytopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 2, len(messages)) + + messages, err = c.Messages("othertopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 1, len(messages)) + + // Delete scheduled message by sequence ID + err = c.DeleteScheduledBySequenceID("mytopic", "seq123") + require.Nil(t, err) + + // Verify scheduled message is deleted + messages, err = c.Messages("mytopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 1, len(messages)) + require.Equal(t, "published message", messages[0].Message) + + // Verify other topic's message still exists (topic-scoped deletion) + messages, err = c.Messages("othertopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 1, len(messages)) + require.Equal(t, "other scheduled", messages[0].Message) + + // Deleting non-existent sequence ID should not error + err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent") + require.Nil(t, err) + + // Deleting published message should not affect it (only deletes unpublished) + err = c.DeleteScheduledBySequenceID("mytopic", "seq456") + require.Nil(t, err) + + messages, err = c.Messages("mytopic", sinceAllMessages, true) + require.Nil(t, err) + require.Equal(t, 1, len(messages)) + require.Equal(t, "published message", messages[0].Message) +} + func checkSchemaVersion(t *testing.T, db *sql.DB) { rows, err := db.Query(`SELECT version FROM schemaVersion`) require.Nil(t, err) diff --git a/server/server.go b/server/server.go index d3c72029..5f775ee1 100644 --- a/server/server.go +++ b/server/server.go @@ -863,6 +863,10 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e logvrm(v, r, m).Tag(tagPublish).Debug("Message delayed, will process later") } if cache { + // Delete any existing scheduled message with the same sequence ID + if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID); err != nil { + return nil, err + } logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache") if err := s.messageCache.AddMessage(m); err != nil { return nil, err @@ -958,6 +962,10 @@ func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v * if s.config.WebPushPublicKey != "" { go s.publishToWebPushEndpoints(v, m) } + // Delete any existing scheduled message with the same sequence ID + if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID); err != nil { + return err + } // Add to message cache if err := s.messageCache.AddMessage(m); err != nil { return err diff --git a/server/server_test.go b/server/server_test.go index 530d9458..3af029b4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -3495,6 +3495,104 @@ func TestServer_ClearMessage_WithFirebase(t *testing.T) { require.Equal(t, "firebase-clear-seq", sender.Messages()[1].Data["sequence_id"]) } +func TestServer_UpdateScheduledMessage(t *testing.T) { + t.Parallel() + s := newTestServer(t, newTestConfig(t)) + + // Publish a scheduled message (future delivery) + response := request(t, s, "PUT", "/mytopic/sched-seq?delay=1h", "original scheduled message", nil) + require.Equal(t, 200, response.Code) + msg1 := toMessage(t, response.Body.String()) + require.Equal(t, "sched-seq", msg1.SequenceID) + require.Equal(t, "original scheduled message", msg1.Message) + + // Verify scheduled message exists + response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "original scheduled message", messages[0].Message) + + // Update the scheduled message (same sequence ID, new content) + response = request(t, s, "PUT", "/mytopic/sched-seq?delay=2h", "updated scheduled message", nil) + require.Equal(t, 200, response.Code) + msg2 := toMessage(t, response.Body.String()) + require.Equal(t, "sched-seq", msg2.SequenceID) + require.Equal(t, "updated scheduled message", msg2.Message) + require.NotEqual(t, msg1.ID, msg2.ID) + + // Verify only the updated message exists (old scheduled was deleted) + response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages = toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "updated scheduled message", messages[0].Message) + require.Equal(t, msg2.ID, messages[0].ID) +} + +func TestServer_DeleteScheduledMessage(t *testing.T) { + t.Parallel() + s := newTestServer(t, newTestConfig(t)) + + // Publish a scheduled message (future delivery) + response := request(t, s, "PUT", "/mytopic/delete-sched-seq?delay=1h", "scheduled message to delete", nil) + require.Equal(t, 200, response.Code) + msg := toMessage(t, response.Body.String()) + require.Equal(t, "delete-sched-seq", msg.SequenceID) + + // Verify scheduled message exists + response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "scheduled message to delete", messages[0].Message) + + // Delete the scheduled message + response = request(t, s, "DELETE", "/mytopic/delete-sched-seq", "", nil) + require.Equal(t, 200, response.Code) + deleteMsg := toMessage(t, response.Body.String()) + require.Equal(t, "delete-sched-seq", deleteMsg.SequenceID) + require.Equal(t, "message_delete", deleteMsg.Event) + + // Verify scheduled message was deleted, only delete event remains + response = request(t, s, "GET", "/mytopic/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages = toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "message_delete", messages[0].Event) + require.Equal(t, "delete-sched-seq", messages[0].SequenceID) +} + +func TestServer_UpdateScheduledMessage_TopicScoped(t *testing.T) { + t.Parallel() + s := newTestServer(t, newTestConfig(t)) + + // Publish scheduled messages with same sequence ID in different topics + response := request(t, s, "PUT", "/topic1/shared-seq?delay=1h", "topic1 scheduled", nil) + require.Equal(t, 200, response.Code) + + response = request(t, s, "PUT", "/topic2/shared-seq?delay=1h", "topic2 scheduled", nil) + require.Equal(t, 200, response.Code) + + // Update scheduled message in topic1 only + response = request(t, s, "PUT", "/topic1/shared-seq?delay=2h", "topic1 updated", nil) + require.Equal(t, 200, response.Code) + + // Verify topic1 has only the updated message + response = request(t, s, "GET", "/topic1/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages := toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "topic1 updated", messages[0].Message) + + // Verify topic2 still has its original scheduled message (not affected) + response = request(t, s, "GET", "/topic2/json?poll=1&scheduled=1", "", nil) + require.Equal(t, 200, response.Code) + messages = toMessages(t, response.Body.String()) + require.Equal(t, 1, len(messages)) + require.Equal(t, "topic2 scheduled", messages[0].Message) +} + func newTestConfig(t *testing.T) *Config { conf := NewConfig() conf.BaseURL = "http://127.0.0.1:12345"