Delayed message lock shorter
This commit is contained in:
@@ -1080,7 +1080,8 @@ func (s *Server) topicsFromIDs(ids ...string) ([]*topic, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) updateStatsAndPrune() {
|
func (s *Server) updateStatsAndPrune() {
|
||||||
log.Debug("Manager: Running cleanup")
|
log.Debug("Manager: Starting")
|
||||||
|
defer log.Debug("Manager: Finished")
|
||||||
|
|
||||||
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
|
// WARNING: Make sure to only selectively lock with the mutex, and be aware that this
|
||||||
// there is no mutex for the entire function.
|
// there is no mutex for the entire function.
|
||||||
@@ -1232,10 +1233,10 @@ func (s *Server) sendDelayedMessages() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
|
func (s *Server) sendDelayedMessage(v *visitor, m *message) error {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
log.Debug("%s Sending delayed message", logMessagePrefix(v, m))
|
log.Debug("%s Sending delayed message", logMessagePrefix(v, m))
|
||||||
|
s.mu.Lock()
|
||||||
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
|
t, ok := s.topics[m.Topic] // If no subscribers, just mark message as published
|
||||||
|
s.mu.Unlock()
|
||||||
if ok {
|
if ok {
|
||||||
go func() {
|
go func() {
|
||||||
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
|
// We do not rate-limit messages here, since we've rate limited them in the PUT/POST handler
|
||||||
|
|||||||
Reference in New Issue
Block a user