diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go index 212fe6056..812d67399 100644 --- a/modules/eventsource/manager.go +++ b/modules/eventsource/manager.go @@ -13,6 +13,7 @@ type Manager struct { mutex sync.Mutex messengers map[int64]*Messenger + connection chan struct{} } var manager *Manager @@ -20,6 +21,7 @@ var manager *Manager func init() { manager = &Manager{ messengers: make(map[int64]*Messenger), + connection: make(chan struct{}, 1), } } @@ -36,6 +38,10 @@ func (m *Manager) Register(uid int64) <-chan *Event { messenger = NewMessenger(uid) m.messengers[uid] = messenger } + select { + case m.connection <- struct{}{}: + default: + } m.mutex.Unlock() return messenger.Register() } diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go index ccfe2e070..60598ecb4 100644 --- a/modules/eventsource/manager_run.go +++ b/modules/eventsource/manager_run.go @@ -34,6 +34,35 @@ loop: timer.Stop() break loop case <-timer.C: + m.mutex.Lock() + connectionCount := len(m.messengers) + if connectionCount == 0 { + log.Trace("Event source has no listeners") + // empty the connection channel + select { + case <-m.connection: + default: + } + } + m.mutex.Unlock() + if connectionCount == 0 { + // No listeners so the source can be paused + log.Trace("Pausing the eventsource") + select { + case <-ctx.Done(): + break loop + case <-m.connection: + log.Trace("Connection detected - restarting the eventsource") + // OK we're back so lets reset the timer and start again + // We won't change the "then" time because there could be concurrency issues + select { + case <-timer.C: + default: + } + continue + } + } + now := timeutil.TimeStampNow().Add(-2) uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)