From d644709b22bd1872e78a0938dec7eb77d5ed03f3 Mon Sep 17 00:00:00 2001 From: 6543 <6543@obermui.de> Date: Sat, 8 May 2021 20:27:00 +0200 Subject: [PATCH] Exponential Backoff for ByteFIFO (#15724) (#15793) This PR is another in the vein of queue improvements. It suggests an exponential backoff for bytefifo queues to reduce the load from queue polling. This will mostly be useful for redis queues. Signed-off-by: Andrew Thornton Co-authored-by: Lauris BH Co-authored-by: zeripath Co-authored-by: Lauris BH --- modules/queue/queue_bytefifo.go | 88 ++++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index fe5178ff2..6f92e3cb6 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -114,43 +114,73 @@ func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func() } func (q *ByteFIFOQueue) readToChan() { + // handle quick cancels + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: + } + + backOffTime := time.Millisecond * 100 + maxBackOffTime := time.Second * 3 for { - select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() - return - default: - q.lock.Lock() - bs, err := q.byteFIFO.Pop() - if err != nil { - q.lock.Unlock() - log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - time.Sleep(time.Millisecond * 100) - continue - } + success, resetBackoff := q.doPop() + if resetBackoff { + backOffTime = 100 * time.Millisecond + } - if len(bs) == 0 { - q.lock.Unlock() - time.Sleep(time.Millisecond * 100) - continue + if success { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + default: } - - data, err := unmarshalAs(bs, q.exemplar) - if err != nil { - log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - q.lock.Unlock() - time.Sleep(time.Millisecond * 100) - continue + } else { + select { + case <-q.closed: + // tell the pool to shutdown. + q.cancel() + return + case <-time.After(backOffTime): + } + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime } - - log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) - q.WorkerPool.Push(data) - q.lock.Unlock() } } } +func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { + q.lock.Lock() + defer q.lock.Unlock() + bs, err := q.byteFIFO.Pop() + if err != nil { + log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) + return + } + if len(bs) == 0 { + return + } + + resetBackoff = true + + data, err := unmarshalAs(bs, q.exemplar) + if err != nil { + log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) + return + } + + log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) + q.WorkerPool.Push(data) + success = true + return +} + // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name)