Performance improvements in pg
This commit is contained in:
@@ -25,7 +25,7 @@ type Store interface {
|
||||
AddMessage(m *model.Message) error
|
||||
AddMessages(ms []*model.Message) error
|
||||
Message(id string) (*model.Message, error)
|
||||
MessageCounts() (map[string]int, error)
|
||||
MessagesCount() (int, error)
|
||||
Messages(topic string, since model.SinceMarker, scheduled bool) ([]*model.Message, error)
|
||||
MessagesDue() ([]*model.Message, error)
|
||||
MessagesExpired() ([]string, error)
|
||||
@@ -51,7 +51,6 @@ type storeQueries struct {
|
||||
selectScheduledMessageIDsBySeqID string
|
||||
deleteScheduledBySequenceID string
|
||||
updateMessagesForTopicExpiry string
|
||||
selectRowIDFromMessageID string
|
||||
selectMessagesByID string
|
||||
selectMessagesSinceTime string
|
||||
selectMessagesSinceTimeScheduled string
|
||||
@@ -62,7 +61,6 @@ type storeQueries struct {
|
||||
selectMessagesExpired string
|
||||
updateMessagePublished string
|
||||
selectMessagesCount string
|
||||
selectMessageCountPerTopic string
|
||||
selectTopics string
|
||||
updateAttachmentDeleted string
|
||||
selectAttachmentsExpired string
|
||||
@@ -78,11 +76,11 @@ type commonStore struct {
|
||||
db *sql.DB
|
||||
queue *util.BatchingQueue[*model.Message]
|
||||
nop bool
|
||||
mu sync.Mutex
|
||||
mu *sync.Mutex // nil for PostgreSQL (concurrent writes supported), set for SQLite (single writer)
|
||||
queries storeQueries
|
||||
}
|
||||
|
||||
func newCommonStore(db *sql.DB, queries storeQueries, batchSize int, batchTimeout time.Duration, nop bool) *commonStore {
|
||||
func newCommonStore(db *sql.DB, queries storeQueries, mu *sync.Mutex, batchSize int, batchTimeout time.Duration, nop bool) *commonStore {
|
||||
var queue *util.BatchingQueue[*model.Message]
|
||||
if batchSize > 0 || batchTimeout > 0 {
|
||||
queue = util.NewBatchingQueue[*model.Message](batchSize, batchTimeout)
|
||||
@@ -91,12 +89,25 @@ func newCommonStore(db *sql.DB, queries storeQueries, batchSize int, batchTimeou
|
||||
db: db,
|
||||
queue: queue,
|
||||
nop: nop,
|
||||
mu: mu,
|
||||
queries: queries,
|
||||
}
|
||||
go c.processMessageBatches()
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *commonStore) maybeLock() {
|
||||
if c.mu != nil {
|
||||
c.mu.Lock()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *commonStore) maybeUnlock() {
|
||||
if c.mu != nil {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// AddMessage stores a message to the message cache synchronously, or queues it to be stored at a later date asynchronously.
|
||||
// The message is queued only if "batchSize" or "batchTimeout" are passed to the constructor.
|
||||
func (c *commonStore) AddMessage(m *model.Message) error {
|
||||
@@ -113,8 +124,8 @@ func (c *commonStore) AddMessages(ms []*model.Message) error {
|
||||
}
|
||||
|
||||
func (c *commonStore) addMessages(ms []*model.Message) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
if c.nop {
|
||||
return nil
|
||||
}
|
||||
@@ -224,24 +235,12 @@ func (c *commonStore) messagesSinceTime(topic string, since model.SinceMarker, s
|
||||
}
|
||||
|
||||
func (c *commonStore) messagesSinceID(topic string, since model.SinceMarker, scheduled bool) ([]*model.Message, error) {
|
||||
idrows, err := c.db.Query(c.queries.selectRowIDFromMessageID, since.ID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer idrows.Close()
|
||||
if !idrows.Next() {
|
||||
return c.messagesSinceTime(topic, model.SinceAllMessages, scheduled)
|
||||
}
|
||||
var rowID int64
|
||||
if err := idrows.Scan(&rowID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
idrows.Close()
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if scheduled {
|
||||
rows, err = c.db.Query(c.queries.selectMessagesSinceIDScheduled, topic, rowID)
|
||||
rows, err = c.db.Query(c.queries.selectMessagesSinceIDScheduled, topic, since.ID())
|
||||
} else {
|
||||
rows, err = c.db.Query(c.queries.selectMessagesSinceID, topic, rowID)
|
||||
rows, err = c.db.Query(c.queries.selectMessagesSinceID, topic, since.ID())
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -305,30 +304,26 @@ func (c *commonStore) UpdateMessageTime(messageID string, timestamp int64) error
|
||||
}
|
||||
|
||||
func (c *commonStore) MarkPublished(m *model.Message) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
_, err := c.db.Exec(c.queries.updateMessagePublished, m.ID)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *commonStore) MessageCounts() (map[string]int, error) {
|
||||
rows, err := c.db.Query(c.queries.selectMessageCountPerTopic)
|
||||
func (c *commonStore) MessagesCount() (int, error) {
|
||||
rows, err := c.db.Query(c.queries.selectMessagesCount)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
var topic string
|
||||
var count int
|
||||
counts := make(map[string]int)
|
||||
for rows.Next() {
|
||||
if err := rows.Scan(&topic, &count); err != nil {
|
||||
return nil, err
|
||||
} else if err := rows.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
counts[topic] = count
|
||||
if !rows.Next() {
|
||||
return 0, errNoRows
|
||||
}
|
||||
return counts, nil
|
||||
var count int
|
||||
if err := rows.Scan(&count); err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (c *commonStore) Topics() ([]string, error) {
|
||||
@@ -352,8 +347,8 @@ func (c *commonStore) Topics() ([]string, error) {
|
||||
}
|
||||
|
||||
func (c *commonStore) DeleteMessages(ids ...string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
tx, err := c.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -370,8 +365,8 @@ func (c *commonStore) DeleteMessages(ids ...string) error {
|
||||
// DeleteScheduledBySequenceID deletes unpublished (scheduled) messages with the given topic and sequence ID.
|
||||
// It returns the message IDs of the deleted messages, which can be used to clean up attachment files.
|
||||
func (c *commonStore) DeleteScheduledBySequenceID(topic, sequenceID string) ([]string, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
tx, err := c.db.Begin()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -406,8 +401,8 @@ func (c *commonStore) DeleteScheduledBySequenceID(topic, sequenceID string) ([]s
|
||||
}
|
||||
|
||||
func (c *commonStore) ExpireMessages(topics ...string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
tx, err := c.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -442,8 +437,8 @@ func (c *commonStore) AttachmentsExpired() ([]string, error) {
|
||||
}
|
||||
|
||||
func (c *commonStore) MarkAttachmentsDeleted(ids ...string) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
tx, err := c.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -488,8 +483,8 @@ func (c *commonStore) readAttachmentBytesUsed(rows *sql.Rows) (int64, error) {
|
||||
}
|
||||
|
||||
func (c *commonStore) UpdateStats(messages int64) error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
c.maybeLock()
|
||||
defer c.maybeUnlock()
|
||||
_, err := c.db.Exec(c.queries.updateStats, messages)
|
||||
return err
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user