diff --git a/message/store.go b/message/store.go index ed6168fd..ba3ddc81 100644 --- a/message/store.go +++ b/message/store.go @@ -25,7 +25,7 @@ type Store interface { AddMessage(m *model.Message) error AddMessages(ms []*model.Message) error Message(id string) (*model.Message, error) - MessageCounts() (map[string]int, error) + MessagesCount() (int, error) Messages(topic string, since model.SinceMarker, scheduled bool) ([]*model.Message, error) MessagesDue() ([]*model.Message, error) MessagesExpired() ([]string, error) @@ -51,7 +51,6 @@ type storeQueries struct { selectScheduledMessageIDsBySeqID string deleteScheduledBySequenceID string updateMessagesForTopicExpiry string - selectRowIDFromMessageID string selectMessagesByID string selectMessagesSinceTime string selectMessagesSinceTimeScheduled string @@ -62,7 +61,6 @@ type storeQueries struct { selectMessagesExpired string updateMessagePublished string selectMessagesCount string - selectMessageCountPerTopic string selectTopics string updateAttachmentDeleted string selectAttachmentsExpired string @@ -78,11 +76,11 @@ type commonStore struct { db *sql.DB queue *util.BatchingQueue[*model.Message] nop bool - mu sync.Mutex + mu *sync.Mutex // nil for PostgreSQL (concurrent writes supported), set for SQLite (single writer) queries storeQueries } -func newCommonStore(db *sql.DB, queries storeQueries, batchSize int, batchTimeout time.Duration, nop bool) *commonStore { +func newCommonStore(db *sql.DB, queries storeQueries, mu *sync.Mutex, batchSize int, batchTimeout time.Duration, nop bool) *commonStore { var queue *util.BatchingQueue[*model.Message] if batchSize > 0 || batchTimeout > 0 { queue = util.NewBatchingQueue[*model.Message](batchSize, batchTimeout) @@ -91,12 +89,25 @@ func newCommonStore(db *sql.DB, queries storeQueries, batchSize int, batchTimeou db: db, queue: queue, nop: nop, + mu: mu, queries: queries, } go c.processMessageBatches() return c } +func (c *commonStore) maybeLock() { + if c.mu != nil { + c.mu.Lock() + } +} + +func (c *commonStore) maybeUnlock() { + if c.mu != nil { + c.mu.Unlock() + } +} + // AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asynchronously. // The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor. func (c *commonStore) AddMessage(m *model.Message) error { @@ -113,8 +124,8 @@ func (c *commonStore) AddMessages(ms []*model.Message) error { } func (c *commonStore) addMessages(ms []*model.Message) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() if c.nop { return nil } @@ -224,24 +235,12 @@ func (c *commonStore) messagesSinceTime(topic string, since model.SinceMarker, s } func (c *commonStore) messagesSinceID(topic string, since model.SinceMarker, scheduled bool) ([]*model.Message, error) { - idrows, err := c.db.Query(c.queries.selectRowIDFromMessageID, since.ID()) - if err != nil { - return nil, err - } - defer idrows.Close() - if !idrows.Next() { - return c.messagesSinceTime(topic, model.SinceAllMessages, scheduled) - } - var rowID int64 - if err := idrows.Scan(&rowID); err != nil { - return nil, err - } - idrows.Close() var rows *sql.Rows + var err error if scheduled { - rows, err = c.db.Query(c.queries.selectMessagesSinceIDScheduled, topic, rowID) + rows, err = c.db.Query(c.queries.selectMessagesSinceIDScheduled, topic, since.ID()) } else { - rows, err = c.db.Query(c.queries.selectMessagesSinceID, topic, rowID) + rows, err = c.db.Query(c.queries.selectMessagesSinceID, topic, since.ID()) } if err != nil { return nil, err @@ -305,30 +304,26 @@ func (c *commonStore) UpdateMessageTime(messageID string, timestamp int64) error } func (c *commonStore) MarkPublished(m *model.Message) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() _, err := c.db.Exec(c.queries.updateMessagePublished, m.ID) return err } -func (c *commonStore) MessageCounts() (map[string]int, error) { - rows, err := c.db.Query(c.queries.selectMessageCountPerTopic) +func (c *commonStore) MessagesCount() (int, error) { + rows, err := c.db.Query(c.queries.selectMessagesCount) if err != nil { - return nil, err + return 0, err } defer rows.Close() - var topic string - var count int - counts := make(map[string]int) - for rows.Next() { - if err := rows.Scan(&topic, &count); err != nil { - return nil, err - } else if err := rows.Err(); err != nil { - return nil, err - } - counts[topic] = count + if !rows.Next() { + return 0, errNoRows } - return counts, nil + var count int + if err := rows.Scan(&count); err != nil { + return 0, err + } + return count, nil } func (c *commonStore) Topics() ([]string, error) { @@ -352,8 +347,8 @@ func (c *commonStore) Topics() ([]string, error) { } func (c *commonStore) DeleteMessages(ids ...string) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() tx, err := c.db.Begin() if err != nil { return err @@ -370,8 +365,8 @@ func (c *commonStore) DeleteMessages(ids ...string) error { // DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID. // It returns the message IDs of the deleted messages, which can be used to clean up attachment files. func (c *commonStore) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() tx, err := c.db.Begin() if err != nil { return nil, err @@ -406,8 +401,8 @@ func (c *commonStore) DeleteScheduledBySequenceID(topic, sequenceID string) ([]s } func (c *commonStore) ExpireMessages(topics ...string) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() tx, err := c.db.Begin() if err != nil { return err @@ -442,8 +437,8 @@ func (c *commonStore) AttachmentsExpired() ([]string, error) { } func (c *commonStore) MarkAttachmentsDeleted(ids ...string) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() tx, err := c.db.Begin() if err != nil { return err @@ -488,8 +483,8 @@ func (c *commonStore) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) { } func (c *commonStore) UpdateStats(messages int64) error { - c.mu.Lock() - defer c.mu.Unlock() + c.maybeLock() + defer c.maybeUnlock() _, err := c.db.Exec(c.queries.updateStats, messages) return err } diff --git a/message/store_postgres.go b/message/store_postgres.go index 4fb052c8..a21b8f62 100644 --- a/message/store_postgres.go +++ b/message/store_postgres.go @@ -15,7 +15,6 @@ const ( postgresSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresDeleteScheduledBySequenceIDQuery = `DELETE FROM message WHERE topic = $1 AND sequence_id = $2 AND published = FALSE` postgresUpdateMessagesForTopicExpiryQuery = `UPDATE message SET expires = $1 WHERE topic = $2` - postgresSelectRowIDFromMessageIDQuery = `SELECT id FROM message WHERE mid = $1` postgresSelectMessagesByIDQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user_id, content_type, encoding FROM message @@ -36,13 +35,16 @@ const ( postgresSelectMessagesSinceIDQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user_id, content_type, encoding FROM message - WHERE topic = $1 AND id > $2 AND published = TRUE + WHERE topic = $1 + AND id > COALESCE((SELECT id FROM message WHERE mid = $2), 0) + AND published = TRUE ORDER BY time, id ` postgresSelectMessagesSinceIDIncludeScheduledQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user_id, content_type, encoding FROM message - WHERE topic = $1 AND (id > $2 OR published = FALSE) + WHERE topic = $1 + AND (id > COALESCE((SELECT id FROM message WHERE mid = $2), 0) OR published = FALSE) ORDER BY time, id ` postgresSelectMessagesLatestQuery = ` @@ -58,11 +60,10 @@ const ( WHERE time <= $1 AND published = FALSE ORDER BY time, id ` - postgresSelectMessagesExpiredQuery = `SELECT mid FROM message WHERE expires <= $1 AND published = TRUE` - postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1` - postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message` - postgresSelectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM message GROUP BY topic` - postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic` + postgresSelectMessagesExpiredQuery = `SELECT mid FROM message WHERE expires <= $1 AND published = TRUE` + postgresUpdateMessagePublishedQuery = `UPDATE message SET published = TRUE WHERE mid = $1` + postgresSelectMessagesCountQuery = `SELECT COUNT(*) FROM message` + postgresSelectTopicsQuery = `SELECT topic FROM message GROUP BY topic` postgresUpdateAttachmentDeletedQuery = `UPDATE message SET attachment_deleted = TRUE WHERE mid = $1` postgresSelectAttachmentsExpiredQuery = `SELECT mid FROM message WHERE attachment_expires > 0 AND attachment_expires <= $1 AND attachment_deleted = FALSE` @@ -80,7 +81,6 @@ var pgQueries = storeQueries{ selectScheduledMessageIDsBySeqID: postgresSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: postgresDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: postgresUpdateMessagesForTopicExpiryQuery, - selectRowIDFromMessageID: postgresSelectRowIDFromMessageIDQuery, selectMessagesByID: postgresSelectMessagesByIDQuery, selectMessagesSinceTime: postgresSelectMessagesSinceTimeQuery, selectMessagesSinceTimeScheduled: postgresSelectMessagesSinceTimeIncludeScheduledQuery, @@ -91,7 +91,6 @@ var pgQueries = storeQueries{ selectMessagesExpired: postgresSelectMessagesExpiredQuery, updateMessagePublished: postgresUpdateMessagePublishedQuery, selectMessagesCount: postgresSelectMessagesCountQuery, - selectMessageCountPerTopic: postgresSelectMessageCountPerTopicQuery, selectTopics: postgresSelectTopicsQuery, updateAttachmentDeleted: postgresUpdateAttachmentDeletedQuery, selectAttachmentsExpired: postgresSelectAttachmentsExpiredQuery, @@ -107,5 +106,5 @@ func NewPostgresStore(db *sql.DB, batchSize int, batchTimeout time.Duration) (St if err := setupPostgresDB(db); err != nil { return nil, err } - return newCommonStore(db, pgQueries, batchSize, batchTimeout, false), nil + return newCommonStore(db, pgQueries, nil, batchSize, batchTimeout, false), nil } diff --git a/message/store_postgres_schema.go b/message/store_postgres_schema.go index 6e4d654c..a647cdf9 100644 --- a/message/store_postgres_schema.go +++ b/message/store_postgres_schema.go @@ -37,12 +37,10 @@ const ( ); CREATE INDEX IF NOT EXISTS idx_message_mid ON message (mid); CREATE INDEX IF NOT EXISTS idx_message_sequence_id ON message (sequence_id); - CREATE INDEX IF NOT EXISTS idx_message_time ON message (time); - CREATE INDEX IF NOT EXISTS idx_message_topic ON message (topic); - CREATE INDEX IF NOT EXISTS idx_message_expires ON message (expires); - CREATE INDEX IF NOT EXISTS idx_message_sender ON message (sender); - CREATE INDEX IF NOT EXISTS idx_message_user_id ON message (user_id); - CREATE INDEX IF NOT EXISTS idx_message_attachment_expires ON message (attachment_expires); + CREATE INDEX IF NOT EXISTS idx_message_topic_published_time ON message (topic, published, time, id); + CREATE INDEX IF NOT EXISTS idx_message_published_expires ON message (published, expires); + CREATE INDEX IF NOT EXISTS idx_message_sender_attachment_expires ON message (sender, attachment_expires) WHERE user_id = ''; + CREATE INDEX IF NOT EXISTS idx_message_user_id_attachment_expires ON message (user_id, attachment_expires); CREATE TABLE IF NOT EXISTS message_stats ( key TEXT PRIMARY KEY, value BIGINT diff --git a/message/store_sqlite.go b/message/store_sqlite.go index 646e3975..923f3480 100644 --- a/message/store_sqlite.go +++ b/message/store_sqlite.go @@ -4,6 +4,7 @@ import ( "database/sql" "fmt" "path/filepath" + "sync" "time" _ "github.com/mattn/go-sqlite3" // SQLite driver @@ -20,7 +21,6 @@ const ( sqliteSelectScheduledMessageIDsBySeqIDQuery = `SELECT mid FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteDeleteScheduledBySequenceIDQuery = `DELETE FROM messages WHERE topic = ? AND sequence_id = ? AND published = 0` sqliteUpdateMessagesForTopicExpiryQuery = `UPDATE messages SET expires = ? WHERE topic = ?` - sqliteSelectRowIDFromMessageIDQuery = `SELECT id FROM messages WHERE mid = ?` sqliteSelectMessagesByIDQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding FROM messages @@ -41,13 +41,13 @@ const ( sqliteSelectMessagesSinceIDQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding FROM messages - WHERE topic = ? AND id > ? AND published = 1 + WHERE topic = ? AND id > COALESCE((SELECT id FROM messages WHERE mid = ?), 0) AND published = 1 ORDER BY time, id ` sqliteSelectMessagesSinceIDIncludeScheduledQuery = ` SELECT mid, sequence_id, time, event, expires, topic, message, title, priority, tags, click, icon, actions, attachment_name, attachment_type, attachment_size, attachment_expires, attachment_url, sender, user, content_type, encoding FROM messages - WHERE topic = ? AND (id > ? OR published = 0) + WHERE topic = ? AND (id > COALESCE((SELECT id FROM messages WHERE mid = ?), 0) OR published = 0) ORDER BY time, id ` sqliteSelectMessagesLatestQuery = ` @@ -63,11 +63,10 @@ const ( WHERE time <= ? AND published = 0 ORDER BY time, id ` - sqliteSelectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` - sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` - sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages` - sqliteSelectMessageCountPerTopicQuery = `SELECT topic, COUNT(*) FROM messages GROUP BY topic` - sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` + sqliteSelectMessagesExpiredQuery = `SELECT mid FROM messages WHERE expires <= ? AND published = 1` + sqliteUpdateMessagePublishedQuery = `UPDATE messages SET published = 1 WHERE mid = ?` + sqliteSelectMessagesCountQuery = `SELECT COUNT(*) FROM messages` + sqliteSelectTopicsQuery = `SELECT topic FROM messages GROUP BY topic` sqliteUpdateAttachmentDeletedQuery = `UPDATE messages SET attachment_deleted = 1 WHERE mid = ?` sqliteSelectAttachmentsExpiredQuery = `SELECT mid FROM messages WHERE attachment_expires > 0 AND attachment_expires <= ? AND attachment_deleted = 0` @@ -85,7 +84,6 @@ var sqliteQueries = storeQueries{ selectScheduledMessageIDsBySeqID: sqliteSelectScheduledMessageIDsBySeqIDQuery, deleteScheduledBySequenceID: sqliteDeleteScheduledBySequenceIDQuery, updateMessagesForTopicExpiry: sqliteUpdateMessagesForTopicExpiryQuery, - selectRowIDFromMessageID: sqliteSelectRowIDFromMessageIDQuery, selectMessagesByID: sqliteSelectMessagesByIDQuery, selectMessagesSinceTime: sqliteSelectMessagesSinceTimeQuery, selectMessagesSinceTimeScheduled: sqliteSelectMessagesSinceTimeIncludeScheduledQuery, @@ -96,7 +94,6 @@ var sqliteQueries = storeQueries{ selectMessagesExpired: sqliteSelectMessagesExpiredQuery, updateMessagePublished: sqliteUpdateMessagePublishedQuery, selectMessagesCount: sqliteSelectMessagesCountQuery, - selectMessageCountPerTopic: sqliteSelectMessageCountPerTopicQuery, selectTopics: sqliteSelectTopicsQuery, updateAttachmentDeleted: sqliteUpdateAttachmentDeletedQuery, selectAttachmentsExpired: sqliteSelectAttachmentsExpiredQuery, @@ -120,7 +117,7 @@ func NewSQLiteStore(filename, startupQueries string, cacheDuration time.Duration if err := setupSQLite(db, startupQueries, cacheDuration); err != nil { return nil, err } - return newCommonStore(db, sqliteQueries, batchSize, batchTimeout, nop), nil + return newCommonStore(db, sqliteQueries, &sync.Mutex{}, batchSize, batchTimeout, nop), nil } // NewMemStore creates an in-memory cache diff --git a/message/store_test.go b/message/store_test.go index 50bf84f3..e53c11ec 100644 --- a/message/store_test.go +++ b/message/store_test.go @@ -65,10 +65,10 @@ func TestStore_Messages(t *testing.T) { require.Equal(t, model.ErrUnexpectedMessageType, s.AddMessage(model.NewKeepaliveMessage("mytopic"))) // These should not be added! require.Equal(t, model.ErrUnexpectedMessageType, s.AddMessage(model.NewOpenMessage("example"))) // These should not be added! - // mytopic: count - counts, err := s.MessageCounts() + // count + count, err := s.MessagesCount() require.Nil(t, err) - require.Equal(t, 2, counts["mytopic"]) + require.Equal(t, 3, count) // mytopic: since all messages, _ := s.Messages("mytopic", model.SinceAllMessages, false) @@ -102,20 +102,10 @@ func TestStore_Messages(t *testing.T) { require.Equal(t, 1, len(messages)) require.Equal(t, "my other message", messages[0].Message) - // example: count - counts, err = s.MessageCounts() - require.Nil(t, err) - require.Equal(t, 1, counts["example"]) - // example: since all messages, _ = s.Messages("example", model.SinceAllMessages, false) require.Equal(t, "my example message", messages[0].Message) - // non-existing: count - counts, err = s.MessageCounts() - require.Nil(t, err) - require.Equal(t, 0, counts["doesnotexist"]) - // non-existing: since all messages, _ = s.Messages("doesnotexist", model.SinceAllMessages, false) require.Empty(t, messages) @@ -280,19 +270,17 @@ func TestStore_Prune(t *testing.T) { require.Nil(t, s.AddMessage(m2)) require.Nil(t, s.AddMessage(m3)) - counts, err := s.MessageCounts() + count, err := s.MessagesCount() require.Nil(t, err) - require.Equal(t, 2, counts["mytopic"]) - require.Equal(t, 1, counts["another_topic"]) + require.Equal(t, 3, count) expiredMessageIDs, err := s.MessagesExpired() require.Nil(t, err) require.Nil(t, s.DeleteMessages(expiredMessageIDs...)) - counts, err = s.MessageCounts() + count, err = s.MessagesCount() require.Nil(t, err) - require.Equal(t, 1, counts["mytopic"]) - require.Equal(t, 0, counts["another_topic"]) + require.Equal(t, 1, count) messages, err := s.Messages("mytopic", model.SinceAllMessages, false) require.Nil(t, err) diff --git a/model/model.go b/model/model.go index b0e2d259..a8ecdf78 100644 --- a/model/model.go +++ b/model/model.go @@ -191,7 +191,7 @@ func (t SinceMarker) IsLatest() bool { // IsID returns true if this marker references a specific message ID func (t SinceMarker) IsID() bool { - return t.id != "" && t.id != "latest" + return t.id != "" && t.id != SinceLatestMessage.id } // Time returns the time component of the marker diff --git a/server/server_manager.go b/server/server_manager.go index 9f5fe888..afed7b33 100644 --- a/server/server_manager.go +++ b/server/server_manager.go @@ -17,15 +17,10 @@ func (s *Server) execManager() { s.pruneMessages() s.pruneAndNotifyWebPushSubscriptions() - // Message count per topic - var messagesCached int - messageCounts, err := s.messageCache.MessageCounts() + // Message count + messagesCached, err := s.messageCache.MessagesCount() if err != nil { - log.Tag(tagManager).Err(err).Warn("Cannot get message counts") - messageCounts = make(map[string]int) // Empty, so we can continue - } - for _, count := range messageCounts { - messagesCached += count + log.Tag(tagManager).Err(err).Warn("Cannot get messages count") } // Remove subscriptions without subscribers diff --git a/webpush/store_postgres.go b/webpush/store_postgres.go index 9fb7f9f1..5dd72e70 100644 --- a/webpush/store_postgres.go +++ b/webpush/store_postgres.go @@ -18,6 +18,8 @@ const ( warned_at BIGINT NOT NULL DEFAULT 0 ); CREATE INDEX IF NOT EXISTS idx_webpush_subscriber_ip ON webpush_subscription (subscriber_ip); + CREATE INDEX IF NOT EXISTS idx_webpush_updated_at ON webpush_subscription (updated_at); + CREATE INDEX IF NOT EXISTS idx_webpush_user_id ON webpush_subscription (user_id); CREATE TABLE IF NOT EXISTS webpush_subscription_topic ( subscription_id TEXT NOT NULL REFERENCES webpush_subscription (id) ON DELETE CASCADE, topic TEXT NOT NULL,