diff --git a/modules/queue/manager.go b/modules/queue/manager.go index da0fc606e..c3ec735af 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -198,17 +198,20 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error wg.Done() }(mq) } else { - log.Debug("Queue: %s is non-empty but is not flushable - adding 100 millisecond wait", mq.Name) - go func() { - <-time.After(100 * time.Millisecond) - wg.Done() - }() + log.Debug("Queue: %s is non-empty but is not flushable", mq.Name) + wg.Done() } - } if allEmpty { + log.Debug("All queues are empty") break } + // Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign + // but don't delay cancellation here. + select { + case <-ctx.Done(): + case <-time.After(100 * time.Millisecond): + } wg.Wait() } return nil