diff --git a/modules/queue/manager.go b/modules/queue/manager.go index a6734787a..3e9f8fc8d 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -183,17 +183,17 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error } allEmpty = false if flushable, ok := mq.Managed.(Flushable); ok { - go func() { + go func(q *ManagedQueue) { localCtx, localCancel := context.WithCancel(ctx) - pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) err := flushable.FlushWithContext(localCtx) if err != nil && err != ctx.Err() { cancel() } - mq.CancelWorkers(pid) + q.CancelWorkers(pid) localCancel() wg.Done() - }() + }(mq) } else { wg.Done() }