diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index bc8607849..fe1fb7807 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() } func (q *ByteFIFOQueue) readToChan() { + // handle quick cancels + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + } + + backOffTime := time.Millisecond * 100 + maxBackOffTime := time.Second * 3 for { - select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() - return - default: - q.lock.Lock() - bs, err := q.byteFIFO.Pop() - if err != nil { - q.lock.Unlock() - log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - time.Sleep(time.Millisecond * 100) - continue - } + success, resetBackoff := q.doPop() + if resetBackoff { + backOffTime = 100 * time.Millisecond + } - if len(bs) == 0 { - q.lock.Unlock() - time.Sleep(time.Millisecond * 100) - continue + if success { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: } - - data, err := unmarshalAs(bs, q.exemplar) - if err != nil { - log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - q.lock.Unlock() - time.Sleep(time.Millisecond * 100) - continue + } else { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + case <-time.After(backOffTime): + } + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime } - - log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) - q.WorkerPool.Push(data) - q.lock.Unlock() } } } +func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { + q.lock.Lock() + defer q.lock.Unlock() + bs, err := q.byteFIFO.Pop() + if err != nil { + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + return + } + if len(bs) == 0 { + return + } + + resetBackoff = true + + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + return + } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + success = true + return +} + // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name)