Delete attachments
This commit is contained in:
@@ -73,6 +73,7 @@ const (
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`
|
||||
deleteMessageQuery = `DELETE FROM messages WHERE mid = ?`
|
||||
selectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0`
|
||||
deleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0`
|
||||
updateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?`
|
||||
selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics
|
||||
@@ -608,12 +609,42 @@ func (c *messageCache) DeleteMessages(ids ...string) error {
|
||||
return tx.Commit()
|
||||
}
|
||||
|
||||
// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID
|
||||
func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) error {
|
||||
// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID.
|
||||
// It returns the message IDs of the deleted messages, which can be used to clean up attachment files.
|
||||
func (c *messageCache) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
_, err := c.db.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID)
|
||||
return err
|
||||
tx, err := c.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer tx.Rollback()
|
||||
// First, get the message IDs of scheduled messages to be deleted
|
||||
rows, err := tx.Query(selectScheduledMessageIDsBySeqIDQuery, topic, sequenceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
ids := make([]string, 0)
|
||||
for rows.Next() {
|
||||
var id string
|
||||
if err := rows.Scan(&id); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
ids = append(ids, id)
|
||||
}
|
||||
if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
rows.Close() // Close rows before executing delete in same transaction
|
||||
// Then delete the messages
|
||||
if _, err := tx.Exec(deleteScheduledBySequenceIDQuery, topic, sequenceID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return ids, nil
|
||||
}
|
||||
|
||||
func (c *messageCache) ExpireMessages(topics ...string) error {
|
||||
|
||||
@@ -742,9 +742,11 @@ func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) {
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 1, len(messages))
|
||||
|
||||
// Delete scheduled message by sequence ID
|
||||
err = c.DeleteScheduledBySequenceID("mytopic", "seq123")
|
||||
// Delete scheduled message by sequence ID and verify returned IDs
|
||||
deletedIDs, err := c.DeleteScheduledBySequenceID("mytopic", "seq123")
|
||||
require.Nil(t, err)
|
||||
require.Equal(t, 1, len(deletedIDs))
|
||||
require.Equal(t, "scheduled1", deletedIDs[0])
|
||||
|
||||
// Verify scheduled message is deleted
|
||||
messages, err = c.Messages("mytopic", sinceAllMessages, true)
|
||||
@@ -758,13 +760,15 @@ func testDeleteScheduledBySequenceID(t *testing.T, c *messageCache) {
|
||||
require.Equal(t, 1, len(messages))
|
||||
require.Equal(t, "other scheduled", messages[0].Message)
|
||||
|
||||
// Deleting non-existent sequence ID should not error
|
||||
err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent")
|
||||
// Deleting non-existent sequence ID should return empty list
|
||||
deletedIDs, err = c.DeleteScheduledBySequenceID("mytopic", "nonexistent")
|
||||
require.Nil(t, err)
|
||||
require.Empty(t, deletedIDs)
|
||||
|
||||
// Deleting published message should not affect it (only deletes unpublished)
|
||||
err = c.DeleteScheduledBySequenceID("mytopic", "seq456")
|
||||
deletedIDs, err = c.DeleteScheduledBySequenceID("mytopic", "seq456")
|
||||
require.Nil(t, err)
|
||||
require.Empty(t, deletedIDs)
|
||||
|
||||
messages, err = c.Messages("mytopic", sinceAllMessages, true)
|
||||
require.Nil(t, err)
|
||||
|
||||
@@ -864,9 +864,16 @@ func (s *Server) handlePublishInternal(r *http.Request, v *visitor) (*message, e
|
||||
}
|
||||
if cache {
|
||||
// Delete any existing scheduled message with the same sequence ID
|
||||
if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID); err != nil {
|
||||
deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, m.SequenceID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Delete attachment files for deleted scheduled messages
|
||||
if s.fileCache != nil && len(deletedIDs) > 0 {
|
||||
if err := s.fileCache.Remove(deletedIDs...); err != nil {
|
||||
logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages")
|
||||
}
|
||||
}
|
||||
logvrm(v, r, m).Tag(tagPublish).Debug("Adding message to cache")
|
||||
if err := s.messageCache.AddMessage(m); err != nil {
|
||||
return nil, err
|
||||
@@ -963,9 +970,16 @@ func (s *Server) handleActionMessage(w http.ResponseWriter, r *http.Request, v *
|
||||
go s.publishToWebPushEndpoints(v, m)
|
||||
}
|
||||
// Delete any existing scheduled message with the same sequence ID
|
||||
if err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID); err != nil {
|
||||
deletedIDs, err := s.messageCache.DeleteScheduledBySequenceID(t.ID, sequenceID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Delete attachment files for deleted scheduled messages
|
||||
if s.fileCache != nil && len(deletedIDs) > 0 {
|
||||
if err := s.fileCache.Remove(deletedIDs...); err != nil {
|
||||
logvrm(v, r, m).Tag(tagPublish).Err(err).Warn("Error removing attachments for deleted scheduled messages")
|
||||
}
|
||||
}
|
||||
// Add to message cache
|
||||
if err := s.messageCache.AddMessage(m); err != nil {
|
||||
return err
|
||||
|
||||
@@ -3593,6 +3593,64 @@ func TestServer_UpdateScheduledMessage_TopicScoped(t *testing.T) {
|
||||
require.Equal(t, "topic2 scheduled", messages[0].Message)
|
||||
}
|
||||
|
||||
func TestServer_UpdateScheduledMessage_WithAttachment(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := newTestServer(t, newTestConfig(t))
|
||||
|
||||
// Publish a scheduled message with an attachment
|
||||
content := util.RandomString(5000) // > 4096 to trigger attachment
|
||||
response := request(t, s, "PUT", "/mytopic/attach-seq?delay=1h", content, nil)
|
||||
require.Equal(t, 200, response.Code)
|
||||
msg1 := toMessage(t, response.Body.String())
|
||||
require.Equal(t, "attach-seq", msg1.SequenceID)
|
||||
require.NotNil(t, msg1.Attachment)
|
||||
|
||||
// Verify attachment file exists
|
||||
attachmentFile1 := filepath.Join(s.config.AttachmentCacheDir, msg1.ID)
|
||||
require.FileExists(t, attachmentFile1)
|
||||
|
||||
// Update the scheduled message with a new attachment
|
||||
newContent := util.RandomString(5000)
|
||||
response = request(t, s, "PUT", "/mytopic/attach-seq?delay=2h", newContent, nil)
|
||||
require.Equal(t, 200, response.Code)
|
||||
msg2 := toMessage(t, response.Body.String())
|
||||
require.Equal(t, "attach-seq", msg2.SequenceID)
|
||||
require.NotEqual(t, msg1.ID, msg2.ID)
|
||||
|
||||
// Verify old attachment file was deleted
|
||||
require.NoFileExists(t, attachmentFile1)
|
||||
|
||||
// Verify new attachment file exists
|
||||
attachmentFile2 := filepath.Join(s.config.AttachmentCacheDir, msg2.ID)
|
||||
require.FileExists(t, attachmentFile2)
|
||||
}
|
||||
|
||||
func TestServer_DeleteScheduledMessage_WithAttachment(t *testing.T) {
|
||||
t.Parallel()
|
||||
s := newTestServer(t, newTestConfig(t))
|
||||
|
||||
// Publish a scheduled message with an attachment
|
||||
content := util.RandomString(5000) // > 4096 to trigger attachment
|
||||
response := request(t, s, "PUT", "/mytopic/delete-attach-seq?delay=1h", content, nil)
|
||||
require.Equal(t, 200, response.Code)
|
||||
msg := toMessage(t, response.Body.String())
|
||||
require.Equal(t, "delete-attach-seq", msg.SequenceID)
|
||||
require.NotNil(t, msg.Attachment)
|
||||
|
||||
// Verify attachment file exists
|
||||
attachmentFile := filepath.Join(s.config.AttachmentCacheDir, msg.ID)
|
||||
require.FileExists(t, attachmentFile)
|
||||
|
||||
// Delete the scheduled message
|
||||
response = request(t, s, "DELETE", "/mytopic/delete-attach-seq", "", nil)
|
||||
require.Equal(t, 200, response.Code)
|
||||
deleteMsg := toMessage(t, response.Body.String())
|
||||
require.Equal(t, "message_delete", deleteMsg.Event)
|
||||
|
||||
// Verify attachment file was deleted
|
||||
require.NoFileExists(t, attachmentFile)
|
||||
}
|
||||
|
||||
func newTestConfig(t *testing.T) *Config {
|
||||
conf := NewConfig()
|
||||
conf.BaseURL = "http://127.0.0.1:12345"
|
||||
|
||||
Reference in New Issue
Block a user