diff --git a/server/message_cache.go b/server/message_cache.go index df6b4e54..84083aee 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -72,11 +72,12 @@ const ( INSERT INTO messages (mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, attachment_deleted, sender, user, content_type, encoding, published) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ` - deleteMessageQuery = `DELETE FROM messages WHERE mid = ?` - deleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` - updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` - selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics - selectMessagesByIDQuery = ` + deleteMessageQuery = `DELETE FROM messages WHERE mid = ?` + selectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` + deleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` + updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` + selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics + selectMessagesByIDQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding FROM messages WHERE mid = ? @@ -608,12 +609,42 @@ func (c *messageCache) DeleteMessages(ids ...string) error { return tx.Commit() } -// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID -func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) error { +// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID. +// It returns the message IDs of the deleted messages, which can be used to clean up attachment files. +func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) { c.mu.Lock() defer c.mu.Unlock() - _, err := c.db.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID) - return err + tx, err := c.db.Begin() + if err != nil { + return nil, err + } + defer tx.Rollback() + // First, get the message IDs of scheduled messages to be deleted + rows, err := tx.Query(selectScheduledMessageIDsBySeqIDQuery, topic, sequenceID) + if err != nil { + return nil, err + } + defer rows.Close() + ids := make([]string, 0) + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + ids = append(ids, id) + } + if err := rows.Err(); err != nil { + return nil, err + } + rows.Close() // Close rows before executing delete in same transaction + // Then delete the messages + if _, err := tx.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID); err != nil { + return nil, err + } + if err := tx.Commit(); err != nil { + return nil, err + } + return ids, nil } func (c *messageCache) ExpireMessages(topics ...string) error { diff --git a/server/message_cache_test.go b/server/message_cache_test.go index 25f2a91b..672f91b0 100644 --- a/server/message_cache_test.go +++ b/server/message_cache_test.go @@ -742,9 +742,11 @@ func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) { require.Nil(t, err) require.Equal(t, 1, len(messages)) - // Delete scheduled message by sequence ID - err = c.DeleteScheduledBySequenceID("mytopic", "seq123") + // Delete scheduled message by sequence ID and verify returned IDs + deletedIDs, err := c.DeleteScheduledBySequenceID("mytopic", "seq123") require.Nil(t, err) + require.Equal(t, 1, len(deletedIDs)) + require.Equal(t, "scheduled1", deletedIDs[0]) // Verify scheduled message is deleted messages, err = c.Messages("mytopic", sinceAllMessages, true) @@ -758,13 +760,15 @@ func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) { require.Equal(t, 1, len(messages)) require.Equal(t, "other scheduled", messages[0].Message) - // Deleting non-existent sequence ID should not error - err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent") + // Deleting non-existent sequence ID should return empty list + deletedIDs, err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent") require.Nil(t, err) + require.Empty(t, deletedIDs) // Deleting published message should not affect it (only deletes unpublished) - err = c.DeleteScheduledBySequenceID("mytopic", "seq456") + deletedIDs, err = c.DeleteScheduledBySequenceID("mytopic", "seq456") require.Nil(t, err) + require.Empty(t, deletedIDs) messages, err = c.Messages("mytopic", sinceAllMessages, true) require.Nil(t, err) diff --git a/server/server.go b/server/server.go index 5f775ee1..7274e686 100644 --- a/server/server.go +++ b/server/server.go @@ -864,9 +864,16 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e } if cache { // Delete any existing scheduled message with the same sequence ID - if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID); err != nil { + deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID) + if err != nil { return nil, err } + // Delete attachment files for deleted scheduled messages + if s.fileCache != nil && len(deletedIDs) > 0 { + if err := s.fileCache.Remove(deletedIDs...); err != nil { + logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages") + } + } logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache") if err := s.messageCache.AddMessage(m); err != nil { return nil, err @@ -963,9 +970,16 @@ func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v * go s.publishToWebPushEndpoints(v, m) } // Delete any existing scheduled message with the same sequence ID - if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID); err != nil { + deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID) + if err != nil { return err } + // Delete attachment files for deleted scheduled messages + if s.fileCache != nil && len(deletedIDs) > 0 { + if err := s.fileCache.Remove(deletedIDs...); err != nil { + logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages") + } + } // Add to message cache if err := s.messageCache.AddMessage(m); err != nil { return err diff --git a/server/server_test.go b/server/server_test.go index 3af029b4..0b125638 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -3593,6 +3593,64 @@ func TestServer_UpdateScheduledMessage_TopicScoped(t *testing.T) { require.Equal(t, "topic2 scheduled", messages[0].Message) } +func TestServer_UpdateScheduledMessage_WithAttachment(t *testing.T) { + t.Parallel() + s := newTestServer(t, newTestConfig(t)) + + // Publish a scheduled message with an attachment + content := util.RandomString(5000) // > 4096 to trigger attachment + response := request(t, s, "PUT", "/mytopic/attach-seq?delay=1h", content, nil) + require.Equal(t, 200, response.Code) + msg1 := toMessage(t, response.Body.String()) + require.Equal(t, "attach-seq", msg1.SequenceID) + require.NotNil(t, msg1.Attachment) + + // Verify attachment file exists + attachmentFile1 := filepath.Join(s.config.AttachmentCacheDir, msg1.ID) + require.FileExists(t, attachmentFile1) + + // Update the scheduled message with a new attachment + newContent := util.RandomString(5000) + response = request(t, s, "PUT", "/mytopic/attach-seq?delay=2h", newContent, nil) + require.Equal(t, 200, response.Code) + msg2 := toMessage(t, response.Body.String()) + require.Equal(t, "attach-seq", msg2.SequenceID) + require.NotEqual(t, msg1.ID, msg2.ID) + + // Verify old attachment file was deleted + require.NoFileExists(t, attachmentFile1) + + // Verify new attachment file exists + attachmentFile2 := filepath.Join(s.config.AttachmentCacheDir, msg2.ID) + require.FileExists(t, attachmentFile2) +} + +func TestServer_DeleteScheduledMessage_WithAttachment(t *testing.T) { + t.Parallel() + s := newTestServer(t, newTestConfig(t)) + + // Publish a scheduled message with an attachment + content := util.RandomString(5000) // > 4096 to trigger attachment + response := request(t, s, "PUT", "/mytopic/delete-attach-seq?delay=1h", content, nil) + require.Equal(t, 200, response.Code) + msg := toMessage(t, response.Body.String()) + require.Equal(t, "delete-attach-seq", msg.SequenceID) + require.NotNil(t, msg.Attachment) + + // Verify attachment file exists + attachmentFile := filepath.Join(s.config.AttachmentCacheDir, msg.ID) + require.FileExists(t, attachmentFile) + + // Delete the scheduled message + response = request(t, s, "DELETE", "/mytopic/delete-attach-seq", "", nil) + require.Equal(t, 200, response.Code) + deleteMsg := toMessage(t, response.Body.String()) + require.Equal(t, "message_delete", deleteMsg.Event) + + // Verify attachment file was deleted + require.NoFileExists(t, attachmentFile) +} + func newTestConfig(t *testing.T) *Config { conf := NewConfig() conf.BaseURL = "http://127.0.0.1:12345"