diff --git a/server/message_cache.go b/server/message_cache.go index 03cb4969..64561d02 100644 --- a/server/message_cache.go +++ b/server/message_cache.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strings" "time" + "sync" _ "github.com/mattn/go-sqlite3" // SQLite driver "heckel.io/ntfy/v2/log" @@ -36,7 +37,7 @@ const ( priority INT NOT NULL, tags TEXT NOT NULL, click TEXT NOT NULL, - icon TEXT NOT NULL, + icon TEXT NOT NULL, actions TEXT NOT NULL, attachment_name TEXT NOT NULL, attachment_type TEXT NOT NULL, @@ -73,30 +74,30 @@ const ( selectRowIDFromMessageID = `SELECT id FROM messages WHERE mid = ?` // Do not include topic, see #336 and TestServer_PollSinceID_MultipleTopics selectMessagesByIDQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE mid = ? ` selectMessagesSinceTimeQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE topic = ? AND time >= ? AND published = 1 ORDER BY time, id ` selectMessagesSinceTimeIncludeScheduledQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE topic = ? AND time >= ? ORDER BY time, id ` selectMessagesSinceIDQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE topic = ? AND id > ? AND published = 1 ORDER BY time, id ` selectMessagesSinceIDIncludeScheduledQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE topic = ? AND (id > ? OR published = 0) ORDER BY time, id ` @@ -106,10 +107,10 @@ const ( WHERE topic = ? AND published = 1 ORDER BY time DESC, id DESC LIMIT 1 - ` + ` selectMessagesDueQuery = ` SELECT mid, time, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding - FROM messages + FROM messages WHERE time <= ? AND published = 0 ORDER BY time, id ` @@ -282,6 +283,7 @@ var ( type messageCache struct { db *sql.DB queue *util.BatchingQueue[*message] + mu sync.Mutex nop bool } @@ -347,6 +349,8 @@ func (c *messageCache) AddMessage(m *message) error { // addMessages synchronously stores a match of messages. If the database is locked, the transaction waits until // SQLite's busy_timeout is exceeded before erroring out. func (c *messageCache) addMessages(ms []*message) error { + c.mu.Lock() + defer c.mu.Unlock() if c.nop { return nil } @@ -528,6 +532,8 @@ func (c *messageCache) Message(id string) (*message, error) { } func (c *messageCache) MarkPublished(m *message) error { + c.mu.Lock() + defer c.mu.Unlock() _, err := c.db.Exec(updateMessagePublishedQuery, m.ID) return err } @@ -573,6 +579,8 @@ func (c *messageCache) Topics() (map[string]*topic, error) { } func (c *messageCache) DeleteMessages(ids ...string) error { + c.mu.Lock() + defer c.mu.Unlock() tx, err := c.db.Begin() if err != nil { return err @@ -587,6 +595,8 @@ func (c *messageCache) DeleteMessages(ids ...string) error { } func (c *messageCache) ExpireMessages(topics ...string) error { + c.mu.Lock() + defer c.mu.Unlock() tx, err := c.db.Begin() if err != nil { return err @@ -621,6 +631,8 @@ func (c *messageCache) AttachmentsExpired() ([]string, error) { } func (c *messageCache) MarkAttachmentsDeleted(ids ...string) error { + c.mu.Lock() + defer c.mu.Unlock() tx, err := c.db.Begin() if err != nil { return err @@ -766,6 +778,8 @@ func readMessage(rows *sql.Rows) (*message, error) { } func (c *messageCache) UpdateStats(messages int64) error { + c.mu.Lock() + defer c.mu.Unlock() _, err := c.db.Exec(updateStatsQuery, messages) return err }