From 8e32eeb5deed2da2dc2a648f62cba2613b566f71 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 15 May 2021 22:46:13 +0100 Subject: [PATCH] Hold the event source when there are no listeners (#15725) * Hold the event source when there are no listeners The event source does not need to run when there are no listeners. Therefore pause it when there are none. * add some more logging Signed-off-by: Andrew Thornton --- modules/eventsource/manager.go | 6 ++++++ modules/eventsource/manager_run.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+) diff --git a/modules/eventsource/manager.go b/modules/eventsource/manager.go index 212fe6056..812d67399 100644 --- a/modules/eventsource/manager.go +++ b/modules/eventsource/manager.go @@ -13,6 +13,7 @@ type Manager struct { mutex sync.Mutex messengers map[int64]*Messenger + connection chan struct{} } var manager *Manager @@ -20,6 +21,7 @@ var manager *Manager func init() { manager = &Manager{ messengers: make(map[int64]*Messenger), + connection: make(chan struct{}, 1), } } @@ -36,6 +38,10 @@ func (m *Manager) Register(uid int64) <-chan *Event { messenger = NewMessenger(uid) m.messengers[uid] = messenger } + select { + case m.connection <- struct{}{}: + default: + } m.mutex.Unlock() return messenger.Register() } diff --git a/modules/eventsource/manager_run.go b/modules/eventsource/manager_run.go index ccfe2e070..60598ecb4 100644 --- a/modules/eventsource/manager_run.go +++ b/modules/eventsource/manager_run.go @@ -34,6 +34,35 @@ loop: timer.Stop() break loop case <-timer.C: + m.mutex.Lock() + connectionCount := len(m.messengers) + if connectionCount == 0 { + log.Trace("Event source has no listeners") + // empty the connection channel + select { + case <-m.connection: + default: + } + } + m.mutex.Unlock() + if connectionCount == 0 { + // No listeners so the source can be paused + log.Trace("Pausing the eventsource") + select { + case <-ctx.Done(): + break loop + case <-m.connection: + log.Trace("Connection detected - restarting the eventsource") + // OK we're back so lets reset the timer and start again + // We won't change the "then" time because there could be concurrency issues + select { + case <-timer.C: + default: + } + continue + } + } + now := timeutil.TimeStampNow().Add(-2) uidCounts, err := models.GetUIDsAndNotificationCounts(then, now)