From ba526ceffe33a54b6015cdfbdc9bba920484dc23 Mon Sep 17 00:00:00 2001 From: zeripath Date: Sat, 15 May 2021 15:22:26 +0100 Subject: [PATCH] Multiple Queue improvements: LevelDB Wait on empty, shutdown empty shadow level queue, reduce goroutines etc (#15693) * move shutdownfns, terminatefns and hammerfns out of separate goroutines Coalesce the shutdownfns etc into a list of functions that get run at shutdown rather then have them run at goroutines blocked on selects. This may help reduce the background select/poll load in certain configurations. * The LevelDB queues can actually wait on empty instead of polling Slight refactor to cause leveldb queues to wait on empty instead of polling. * Shutdown the shadow level queue once it is empty * Remove bytefifo additional goroutine for readToChan as it can just be run in run * Remove additional removeWorkers goroutine for workers * Simplify the AtShutdown and AtTerminate functions and add Channel Flusher * Add shutdown flusher to CUQ * move persistable channel shutdown stuff to Shutdown Fn * Ensure that UPCQ has the correct config * handle shutdown during the flushing * reduce risk of race between zeroBoost and addWorkers * prevent double shutdown Signed-off-by: Andrew Thornton --- modules/graceful/context.go | 23 +-- modules/graceful/manager.go | 154 ++++++++------- modules/graceful/manager_unix.go | 26 ++- modules/graceful/manager_windows.go | 28 ++- modules/indexer/code/indexer.go | 8 +- modules/indexer/issues/indexer.go | 4 +- modules/queue/bytefifo.go | 18 +- modules/queue/manager.go | 6 +- modules/queue/queue.go | 6 +- modules/queue/queue_bytefifo.go | 217 +++++++++++++-------- modules/queue/queue_channel.go | 76 ++++++-- modules/queue/queue_channel_test.go | 5 +- modules/queue/queue_disk.go | 9 +- modules/queue/queue_disk_channel.go | 87 +++++---- modules/queue/queue_disk_channel_test.go | 49 +++-- modules/queue/queue_disk_test.go | 9 +- modules/queue/queue_redis.go | 20 +- modules/queue/queue_wrapped.go | 8 +- modules/queue/unique_queue_channel.go | 77 ++++++-- modules/queue/unique_queue_disk.go | 11 +- modules/queue/unique_queue_disk_channel.go | 88 +++++---- modules/queue/unique_queue_redis.go | 21 +- modules/queue/workerpool.go | 55 ++---- services/pull/check_test.go | 5 +- 24 files changed, 598 insertions(+), 412 deletions(-) diff --git a/modules/graceful/context.go b/modules/graceful/context.go index 1ad1109b4..9d955329a 100644 --- a/modules/graceful/context.go +++ b/modules/graceful/context.go @@ -6,17 +6,9 @@ package graceful import ( "context" - "fmt" "time" ) -// Errors for context.Err() -var ( - ErrShutdown = fmt.Errorf("Graceful Manager called Shutdown") - ErrHammer = fmt.Errorf("Graceful Manager called Hammer") - ErrTerminate = fmt.Errorf("Graceful Manager called Terminate") -) - // ChannelContext is a context that wraps a channel and error as a context type ChannelContext struct { done <-chan struct{} @@ -63,28 +55,19 @@ func (ctx *ChannelContext) Value(key interface{}) interface{} { // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) ShutdownContext() context.Context { - return &ChannelContext{ - done: g.IsShutdown(), - err: ErrShutdown, - } + return g.shutdownCtx } // HammerContext returns a context.Context that is Done at hammer // Callers using this context should ensure that they are registered as a running server // in order that they are waited for. func (g *Manager) HammerContext() context.Context { - return &ChannelContext{ - done: g.IsHammer(), - err: ErrHammer, - } + return g.hammerCtx } // TerminateContext returns a context.Context that is Done at terminate // Callers using this context should ensure that they are registered as a terminating server // in order that they are waited for. func (g *Manager) TerminateContext() context.Context { - return &ChannelContext{ - done: g.IsTerminate(), - err: ErrTerminate, - } + return g.terminateCtx } diff --git a/modules/graceful/manager.go b/modules/graceful/manager.go index 903d05ed2..8c3b95c4a 100644 --- a/modules/graceful/manager.go +++ b/modules/graceful/manager.go @@ -54,8 +54,8 @@ func InitManager(ctx context.Context) { }) } -// CallbackWithContext is combined runnable and context to watch to see if the caller has finished -type CallbackWithContext func(ctx context.Context, callback func()) +// WithCallback is a runnable to call when the caller has finished +type WithCallback func(callback func()) // RunnableWithShutdownFns is a runnable with functions to run at shutdown and terminate // After the callback to atShutdown is called and is complete, the main function must return. @@ -63,7 +63,7 @@ type CallbackWithContext func(ctx context.Context, callback func()) // Please note that use of the atShutdown and atTerminate callbacks will create go-routines that will wait till their respective signals // - users must therefore be careful to only call these as necessary. // If run is not expected to run indefinitely RunWithShutdownChan is likely to be more appropriate. -type RunnableWithShutdownFns func(atShutdown, atTerminate func(context.Context, func())) +type RunnableWithShutdownFns func(atShutdown, atTerminate func(func())) // RunWithShutdownFns takes a function that has both atShutdown and atTerminate callbacks // After the callback to atShutdown is called and is complete, the main function must return. @@ -80,17 +80,21 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { g.doShutdown() } }() - run(func(ctx context.Context, atShutdown func()) { - go func() { - select { - case <-g.IsShutdown(): + run(func(atShutdown func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunWithShutdownFns: %v\nStacktrace: %s", err, log.Stack(2)) + g.doShutdown() + } + }() atShutdown() - case <-ctx.Done(): - return - } - }() - }, func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + }) + }, func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -99,7 +103,7 @@ func (g *Manager) RunWithShutdownFns(run RunnableWithShutdownFns) { // (Optionally IsHammer may be waited for instead however, this should be avoided if possible.) // The callback function provided to atTerminate must return once termination is complete. // Please note that use of the atTerminate function will create a go-routine that will wait till terminate - users must therefore be careful to only call this as necessary. -type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate CallbackWithContext) +type RunnableWithShutdownChan func(atShutdown <-chan struct{}, atTerminate WithCallback) // RunWithShutdownChan takes a function that has channel to watch for shutdown and atTerminate callbacks // After the atShutdown channel is closed, the main function must return once shutdown is complete. @@ -115,8 +119,8 @@ func (g *Manager) RunWithShutdownChan(run RunnableWithShutdownChan) { g.doShutdown() } }() - run(g.IsShutdown(), func(ctx context.Context, atTerminate func()) { - g.RunAtTerminate(ctx, atTerminate) + run(g.IsShutdown(), func(atTerminate func()) { + g.RunAtTerminate(atTerminate) }) } @@ -136,60 +140,65 @@ func (g *Manager) RunWithShutdownContext(run func(context.Context)) { } // RunAtTerminate adds to the terminate wait group and creates a go-routine to run the provided function at termination -func (g *Manager) RunAtTerminate(ctx context.Context, terminate func()) { +func (g *Manager) RunAtTerminate(terminate func()) { g.terminateWaitGroup.Add(1) - go func() { - defer g.terminateWaitGroup.Done() - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsTerminate(): + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtTerminate = append(g.toRunAtTerminate, + func() { + defer g.terminateWaitGroup.Done() + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtTerminate: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() terminate() - case <-ctx.Done(): - } - }() + }) } // RunAtShutdown creates a go-routine to run the provided function at shutdown func (g *Manager) RunAtShutdown(ctx context.Context, shutdown func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtShutdown = append(g.toRunAtShutdown, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtShutdown: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() + select { + case <-ctx.Done(): + return + default: + shutdown() } - }() - select { - case <-g.IsShutdown(): - shutdown() - case <-ctx.Done(): - } - }() + }) } // RunAtHammer creates a go-routine to run the provided function at shutdown -func (g *Manager) RunAtHammer(ctx context.Context, hammer func()) { - go func() { - defer func() { - if err := recover(); err != nil { - log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) - } - }() - select { - case <-g.IsHammer(): +func (g *Manager) RunAtHammer(hammer func()) { + g.lock.Lock() + defer g.lock.Unlock() + g.toRunAtHammer = append(g.toRunAtHammer, + func() { + defer func() { + if err := recover(); err != nil { + log.Critical("PANIC during RunAtHammer: %v\nStacktrace: %s", err, log.Stack(2)) + } + }() hammer() - case <-ctx.Done(): - } - }() + }) } func (g *Manager) doShutdown() { if !g.setStateTransition(stateRunning, stateShuttingDown) { return } g.lock.Lock() - close(g.shutdown) + g.shutdownCtxCancel() + for _, fn := range g.toRunAtShutdown { + go fn() + } g.lock.Unlock() if setting.GracefulHammerTime >= 0 { @@ -203,7 +212,7 @@ func (g *Manager) doShutdown() { g.doTerminate() g.WaitForTerminate() g.lock.Lock() - close(g.done) + g.doneCtxCancel() g.lock.Unlock() }() } @@ -212,10 +221,13 @@ func (g *Manager) doHammerTime(d time.Duration) { time.Sleep(d) g.lock.Lock() select { - case <-g.hammer: + case <-g.hammerCtx.Done(): default: log.Warn("Setting Hammer condition") - close(g.hammer) + g.hammerCtxCancel() + for _, fn := range g.toRunAtHammer { + go fn() + } } g.lock.Unlock() } @@ -226,10 +238,13 @@ func (g *Manager) doTerminate() { } g.lock.Lock() select { - case <-g.terminate: + case <-g.terminateCtx.Done(): default: log.Warn("Terminating") - close(g.terminate) + g.terminateCtxCancel() + for _, fn := range g.toRunAtTerminate { + go fn() + } } g.lock.Unlock() } @@ -242,7 +257,7 @@ func (g *Manager) IsChild() bool { // IsShutdown returns a channel which will be closed at shutdown. // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate func (g *Manager) IsShutdown() <-chan struct{} { - return g.shutdown + return g.shutdownCtx.Done() } // IsHammer returns a channel which will be closed at hammer @@ -250,14 +265,14 @@ func (g *Manager) IsShutdown() <-chan struct{} { // Servers running within the running server wait group should respond to IsHammer // if not shutdown already func (g *Manager) IsHammer() <-chan struct{} { - return g.hammer + return g.hammerCtx.Done() } // IsTerminate returns a channel which will be closed at terminate // The order of closure is IsShutdown, IsHammer (potentially), IsTerminate // IsTerminate will only close once all running servers have stopped func (g *Manager) IsTerminate() <-chan struct{} { - return g.terminate + return g.terminateCtx.Done() } // ServerDone declares a running server done and subtracts one from the @@ -314,25 +329,20 @@ func (g *Manager) InformCleanup() { // Done allows the manager to be viewed as a context.Context, it returns a channel that is closed when the server is finished terminating func (g *Manager) Done() <-chan struct{} { - return g.done + return g.doneCtx.Done() } -// Err allows the manager to be viewed as a context.Context done at Terminate, it returns ErrTerminate +// Err allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Err() error { - select { - case <-g.Done(): - return ErrTerminate - default: - return nil - } + return g.doneCtx.Err() } -// Value allows the manager to be viewed as a context.Context done at Terminate, it has no values +// Value allows the manager to be viewed as a context.Context done at Terminate func (g *Manager) Value(key interface{}) interface{} { - return nil + return g.doneCtx.Value(key) } // Deadline returns nil as there is no fixed Deadline for the manager, it allows the manager to be viewed as a context.Context func (g *Manager) Deadline() (deadline time.Time, ok bool) { - return + return g.doneCtx.Deadline() } diff --git a/modules/graceful/manager_unix.go b/modules/graceful/manager_unix.go index 540974454..20d9b3905 100644 --- a/modules/graceful/manager_unix.go +++ b/modules/graceful/manager_unix.go @@ -25,13 +25,21 @@ type Manager struct { forked bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -45,11 +53,11 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start(ctx context.Context) { - // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) + // Make contexts + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(ctx) // Set the running state & handle signals g.setState(stateRunning) diff --git a/modules/graceful/manager_windows.go b/modules/graceful/manager_windows.go index 14923c2a9..51f29778b 100644 --- a/modules/graceful/manager_windows.go +++ b/modules/graceful/manager_windows.go @@ -36,14 +36,22 @@ type Manager struct { isChild bool lock *sync.RWMutex state state - shutdown chan struct{} - hammer chan struct{} - terminate chan struct{} - done chan struct{} + shutdownCtx context.Context + hammerCtx context.Context + terminateCtx context.Context + doneCtx context.Context + shutdownCtxCancel context.CancelFunc + hammerCtxCancel context.CancelFunc + terminateCtxCancel context.CancelFunc + doneCtxCancel context.CancelFunc runningServerWaitGroup sync.WaitGroup createServerWaitGroup sync.WaitGroup terminateWaitGroup sync.WaitGroup shutdownRequested chan struct{} + + toRunAtShutdown []func() + toRunAtHammer []func() + toRunAtTerminate []func() } func newGracefulManager(ctx context.Context) *Manager { @@ -58,11 +66,13 @@ func newGracefulManager(ctx context.Context) *Manager { } func (g *Manager) start() { + // Make contexts + g.terminateCtx, g.terminateCtxCancel = context.WithCancel(g.ctx) + g.shutdownCtx, g.shutdownCtxCancel = context.WithCancel(g.ctx) + g.hammerCtx, g.hammerCtxCancel = context.WithCancel(g.ctx) + g.doneCtx, g.doneCtxCancel = context.WithCancel(g.ctx) + // Make channels - g.terminate = make(chan struct{}) - g.shutdown = make(chan struct{}) - g.hammer = make(chan struct{}) - g.done = make(chan struct{}) g.shutdownRequested = make(chan struct{}) // Set the running state @@ -171,7 +181,7 @@ hammerLoop: default: log.Debug("Unexpected control request: %v", change.Cmd) } - case <-g.hammer: + case <-g.hammerCtx.Done(): break hammerLoop } } diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go index a7d78e9fd..67fa43eda 100644 --- a/modules/indexer/code/indexer.go +++ b/modules/indexer/code/indexer.go @@ -115,7 +115,13 @@ func Init() { ctx, cancel := context.WithCancel(context.Background()) - graceful.GetManager().RunAtTerminate(ctx, func() { + graceful.GetManager().RunAtTerminate(func() { + select { + case <-ctx.Done(): + return + default: + } + cancel() log.Debug("Closing repository indexer") indexer.Close() log.Info("PID: %d Repository Indexer closed", os.Getpid()) diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go index 9edaef6bd..676b6686e 100644 --- a/modules/indexer/issues/indexer.go +++ b/modules/indexer/issues/indexer.go @@ -160,7 +160,7 @@ func InitIssueIndexer(syncReindex bool) { } populate = !exist holder.set(issueIndexer) - graceful.GetManager().RunAtTerminate(context.Background(), func() { + graceful.GetManager().RunAtTerminate(func() { log.Debug("Closing issue indexer") issueIndexer := holder.get() if issueIndexer != nil { @@ -170,7 +170,7 @@ func InitIssueIndexer(syncReindex bool) { }) log.Debug("Created Bleve Indexer") case "elasticsearch": - graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(context.Context, func())) { + graceful.GetManager().RunWithShutdownFns(func(_, atTerminate func(func())) { issueIndexer, err := NewElasticSearchIndexer(setting.Indexer.IssueConnStr, setting.Indexer.IssueIndexerName) if err != nil { log.Fatal("Unable to initialize Elastic Search Issue Indexer at connection: %s Error: %v", setting.Indexer.IssueConnStr, err) diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go index 94478e6f0..3a10c8e12 100644 --- a/modules/queue/bytefifo.go +++ b/modules/queue/bytefifo.go @@ -4,14 +4,16 @@ package queue +import "context" + // ByteFIFO defines a FIFO that takes a byte array type ByteFIFO interface { // Len returns the length of the fifo - Len() int64 + Len(ctx context.Context) int64 // PushFunc pushes data to the end of the fifo and calls the callback if it is added - PushFunc(data []byte, fn func() error) error + PushFunc(ctx context.Context, data []byte, fn func() error) error // Pop pops data from the start of the fifo - Pop() ([]byte, error) + Pop(ctx context.Context) ([]byte, error) // Close this fifo Close() error } @@ -20,7 +22,7 @@ type ByteFIFO interface { type UniqueByteFIFO interface { ByteFIFO // Has returns whether the fifo contains this data - Has(data []byte) (bool, error) + Has(ctx context.Context, data []byte) (bool, error) } var _ ByteFIFO = &DummyByteFIFO{} @@ -29,12 +31,12 @@ var _ ByteFIFO = &DummyByteFIFO{} type DummyByteFIFO struct{} // PushFunc returns nil -func (*DummyByteFIFO) PushFunc(data []byte, fn func() error) error { +func (*DummyByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return nil } // Pop returns nil -func (*DummyByteFIFO) Pop() ([]byte, error) { +func (*DummyByteFIFO) Pop(ctx context.Context) ([]byte, error) { return []byte{}, nil } @@ -44,7 +46,7 @@ func (*DummyByteFIFO) Close() error { } // Len is always 0 -func (*DummyByteFIFO) Len() int64 { +func (*DummyByteFIFO) Len(ctx context.Context) int64 { return 0 } @@ -56,6 +58,6 @@ type DummyUniqueByteFIFO struct { } // Has always returns false -func (*DummyUniqueByteFIFO) Has([]byte) (bool, error) { +func (*DummyUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return false, nil } diff --git a/modules/queue/manager.go b/modules/queue/manager.go index c3ec735af..a6d48575a 100644 --- a/modules/queue/manager.go +++ b/modules/queue/manager.go @@ -187,14 +187,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error if flushable, ok := mq.Managed.(Flushable); ok { log.Debug("Flushing (flushable) queue: %s", mq.Name) go func(q *ManagedQueue) { - localCtx, localCancel := context.WithCancel(ctx) - pid := q.RegisterWorkers(1, start, hasTimeout, end, localCancel, true) + localCtx, localCtxCancel := context.WithCancel(ctx) + pid := q.RegisterWorkers(1, start, hasTimeout, end, localCtxCancel, true) err := flushable.FlushWithContext(localCtx) if err != nil && err != ctx.Err() { cancel() } q.CancelWorkers(pid) - localCancel() + localCtxCancel() wg.Done() }(mq) } else { diff --git a/modules/queue/queue.go b/modules/queue/queue.go index d08cba35a..7159048c1 100644 --- a/modules/queue/queue.go +++ b/modules/queue/queue.go @@ -57,7 +57,7 @@ type Named interface { // Queues will handle their own contents in the Run method type Queue interface { Flushable - Run(atShutdown, atTerminate func(context.Context, func())) + Run(atShutdown, atTerminate func(func())) Push(Data) error } @@ -74,7 +74,7 @@ type DummyQueue struct { } // Run does nothing -func (*DummyQueue) Run(_, _ func(context.Context, func())) {} +func (*DummyQueue) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (*DummyQueue) Push(Data) error { @@ -122,7 +122,7 @@ type Immediate struct { } // Run does nothing -func (*Immediate) Run(_, _ func(context.Context, func())) {} +func (*Immediate) Run(_, _ func(func())) {} // Push fakes a push of data to the queue func (q *Immediate) Push(data Data) error { diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go index fe1fb7807..3ea61aad0 100644 --- a/modules/queue/queue_bytefifo.go +++ b/modules/queue/queue_bytefifo.go @@ -17,8 +17,9 @@ import ( // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue type ByteFIFOQueueConfiguration struct { WorkerPoolConfiguration - Workers int - Name string + Workers int + Name string + WaitOnEmpty bool } var _ Queue = &ByteFIFOQueue{} @@ -26,14 +27,18 @@ var _ Queue = &ByteFIFOQueue{} // ByteFIFOQueue is a Queue formed from a ByteFIFO and WorkerPool type ByteFIFOQueue struct { *WorkerPool - byteFIFO ByteFIFO - typ Type - closed chan struct{} - terminated chan struct{} - exemplar interface{} - workers int - name string - lock sync.Mutex + byteFIFO ByteFIFO + typ Type + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string + lock sync.Mutex + waitOnEmpty bool + pushed chan struct{} } // NewByteFIFOQueue creates a new ByteFIFOQueue @@ -44,15 +49,22 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + return &ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, + waitOnEmpty: config.WaitOnEmpty, + pushed: make(chan struct{}, 1), }, nil } @@ -76,7 +88,15 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error { if err != nil { return err } - return q.byteFIFO.PushFunc(bs, fn) + if q.waitOnEmpty { + defer func() { + select { + case q.pushed <- struct{}{}: + default: + } + }() + } + return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn) } // IsEmpty checks if the queue is empty @@ -86,135 +106,160 @@ func (q *ByteFIFOQueue) IsEmpty() bool { if !q.WorkerPool.IsEmpty() { return false } - return q.byteFIFO.Len() == 0 + return q.byteFIFO.Len(q.terminateCtx) == 0 } // Run runs the bytefifo queue -func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) +func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("%s: %s Starting", q.typ, q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) - go q.readToChan() + log.Trace("%s: %s Now running", q.typ, q.name) + q.readToChan() - log.Trace("%s: %s Waiting til closed", q.typ, q.name) - <-q.closed + <-q.shutdownCtx.Done() log.Trace("%s: %s Waiting til done", q.typ, q.name) q.Wait() log.Trace("%s: %s Waiting til cleaned", q.typ, q.name) - ctx, cancel := context.WithCancel(context.Background()) - atTerminate(ctx, cancel) - q.CleanUp(ctx) - cancel() + q.CleanUp(q.terminateCtx) + q.terminateCtxCancel() } +const maxBackOffTime = time.Second * 3 + func (q *ByteFIFOQueue) readToChan() { // handle quick cancels select { - case <-q.closed: + case <-q.shutdownCtx.Done(): // tell the pool to shutdown. - q.cancel() + q.baseCtxCancel() return default: } + // Default backoff values backOffTime := time.Millisecond * 100 - maxBackOffTime := time.Second * 3 - for { - success, resetBackoff := q.doPop() - if resetBackoff { - backOffTime = 100 * time.Millisecond - } - if success { +loop: + for { + err := q.doPop() + if err == errQueueEmpty { + log.Trace("%s: %s Waiting on Empty", q.typ, q.name) select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.pushed: + // reset backOffTime + backOffTime = 100 * time.Millisecond + continue loop + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst waiting + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return - default: } - } else { + } + + // Reset the backOffTime if there is no error or an unmarshalError + if err == nil || err == errUnmarshal { + backOffTime = 100 * time.Millisecond + } + + if err != nil { + // Need to Backoff select { - case <-q.closed: - // tell the pool to shutdown. - q.cancel() + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown whilst backing off + // Make sure the worker pool is shutdown too + q.baseCtxCancel() return case <-time.After(backOffTime): - } - backOffTime += backOffTime / 2 - if backOffTime > maxBackOffTime { - backOffTime = maxBackOffTime + // OK we've waited - so backoff a bit + backOffTime += backOffTime / 2 + if backOffTime > maxBackOffTime { + backOffTime = maxBackOffTime + } + continue loop } } + select { + case <-q.shutdownCtx.Done(): + // Oops we've been shutdown + // Make sure the worker pool is shutdown too + q.baseCtxCancel() + return + default: + continue loop + } } } -func (q *ByteFIFOQueue) doPop() (success, resetBackoff bool) { +var errQueueEmpty = fmt.Errorf("empty queue") +var errEmptyBytes = fmt.Errorf("empty bytes") +var errUnmarshal = fmt.Errorf("failed to unmarshal") + +func (q *ByteFIFOQueue) doPop() error { q.lock.Lock() defer q.lock.Unlock() - bs, err := q.byteFIFO.Pop() + bs, err := q.byteFIFO.Pop(q.shutdownCtx) if err != nil { + if err == context.Canceled { + q.baseCtxCancel() + return err + } log.Error("%s: %s Error on Pop: %v", q.typ, q.name, err) - return + return err } if len(bs) == 0 { - return + if q.waitOnEmpty && q.byteFIFO.Len(q.shutdownCtx) == 0 { + return errQueueEmpty + } + return errEmptyBytes } - resetBackoff = true - data, err := unmarshalAs(bs, q.exemplar) if err != nil { log.Error("%s: %s Failed to unmarshal with error: %v", q.typ, q.name, err) - return + return errUnmarshal } log.Trace("%s %s: Task found: %#v", q.typ, q.name, data) q.WorkerPool.Push(data) - success = true - return + return nil } // Shutdown processing from this queue func (q *ByteFIFOQueue) Shutdown() { log.Trace("%s: %s Shutting down", q.typ, q.name) - q.lock.Lock() select { - case <-q.closed: + case <-q.shutdownCtx.Done(): + return default: - close(q.closed) } - q.lock.Unlock() + q.shutdownCtxCancel() log.Debug("%s: %s Shutdown", q.typ, q.name) } // IsShutdown returns a channel which is closed when this Queue is shutdown func (q *ByteFIFOQueue) IsShutdown() <-chan struct{} { - return q.closed + return q.shutdownCtx.Done() } // Terminate this queue and close the queue func (q *ByteFIFOQueue) Terminate() { log.Trace("%s: %s Terminating", q.typ, q.name) q.Shutdown() - q.lock.Lock() select { - case <-q.terminated: - q.lock.Unlock() + case <-q.terminateCtx.Done(): return default: } - close(q.terminated) - q.lock.Unlock() if log.IsDebug() { - log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len()) + log.Debug("%s: %s Closing with %d tasks left in queue", q.typ, q.name, q.byteFIFO.Len(q.terminateCtx)) } + q.terminateCtxCancel() if err := q.byteFIFO.Close(); err != nil { log.Error("Error whilst closing internal byte fifo in %s: %s: %v", q.typ, q.name, err) } @@ -223,7 +268,7 @@ func (q *ByteFIFOQueue) Terminate() { // IsTerminated returns a channel which is closed when this Queue is terminated func (q *ByteFIFOQueue) IsTerminated() <-chan struct{} { - return q.terminated + return q.terminateCtx.Done() } var _ UniqueQueue = &ByteFIFOUniqueQueue{} @@ -240,17 +285,21 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun return nil, err } config := configInterface.(ByteFIFOQueueConfiguration) + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) return &ByteFIFOUniqueQueue{ ByteFIFOQueue: ByteFIFOQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - byteFIFO: byteFIFO, - typ: typ, - closed: make(chan struct{}), - terminated: make(chan struct{}), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + byteFIFO: byteFIFO, + typ: typ, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, }, }, nil } @@ -265,5 +314,5 @@ func (q *ByteFIFOUniqueQueue) Has(data Data) (bool, error) { if err != nil { return false, err } - return q.byteFIFO.(UniqueByteFIFO).Has(bs) + return q.byteFIFO.(UniqueByteFIFO).Has(q.terminateCtx, bs) } diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go index d7a11e79f..4df64b69e 100644 --- a/modules/queue/queue_channel.go +++ b/modules/queue/queue_channel.go @@ -27,9 +27,13 @@ type ChannelQueueConfiguration struct { // It is basically a very thin wrapper around a WorkerPool type ChannelQueue struct { *WorkerPool - exemplar interface{} - workers int - name string + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelQueue creates a memory channel queue @@ -42,28 +46,30 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelQueue{ - WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration), + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar) return queue, nil } // Run starts to run the queue -func (q *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue @@ -75,6 +81,42 @@ func (q *ChannelQueue) Push(data Data) error { return nil } +// Shutdown processing from this queue +func (q *ChannelQueue) Shutdown() { + q.lock.Lock() + defer q.lock.Unlock() + select { + case <-q.shutdownCtx.Done(): + log.Trace("ChannelQueue: %s Already Shutting down", q.name) + return + default: + } + log.Trace("ChannelQueue: %s Shutting down", q.name) + go func() { + log.Trace("ChannelQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelQueue) Terminate() { + log.Trace("ChannelQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelQueue) Name() string { return q.name diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go index bca81d50f..e7abe5b50 100644 --- a/modules/queue/queue_channel_test.go +++ b/modules/queue/queue_channel_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "testing" "time" @@ -21,7 +20,7 @@ func TestChannelQueue(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ @@ -61,7 +60,7 @@ func TestChannelQueue_Batch(t *testing.T) { } } - nilFn := func(_ context.Context, _ func()) {} + nilFn := func(_ func()) {} queue, err := NewChannelQueue(handle, ChannelQueueConfiguration{ diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go index 6c15a8e63..911233a5d 100644 --- a/modules/queue/queue_disk.go +++ b/modules/queue/queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -37,6 +39,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { @@ -82,7 +85,7 @@ func NewLevelQueueByteFIFO(connection, prefix string) (*LevelQueueByteFIFO, erro } // PushFunc will push data into the fifo -func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err @@ -92,7 +95,7 @@ func (fifo *LevelQueueByteFIFO) PushFunc(data []byte, fn func() error) error { } // Pop pops data from the start of the fifo -func (fifo *LevelQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -108,7 +111,7 @@ func (fifo *LevelQueueByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *LevelQueueByteFIFO) Len() int64 { +func (fifo *LevelQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go index 801fd8a12..c3a1c5781 100644 --- a/modules/queue/queue_disk_channel.go +++ b/modules/queue/queue_disk_channel.go @@ -133,8 +133,9 @@ func (q *PersistableChannelQueue) Push(data Data) error { } // Run starts to run the queue -func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) q.lock.Lock() if q.internal == nil { @@ -147,34 +148,32 @@ func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - }() + if lq, ok := q.internal.(*LevelQueue); ok && lq.byteFIFO.Len(lq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + for !q.IsEmpty() { + _ = q.internal.Flush(0) + select { + case <-time.After(100 * time.Millisecond): + case <-q.internal.(*LevelQueue).shutdownCtx.Done(): + log.Warn("LevelQueue: %s shut down before completely flushed", q.internal.(*LevelQueue).Name()) + return + } + } + log.Debug("LevelQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + }() + } else { + log.Debug("PersistableChannelQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelQueue).qid) + } - log.Trace("PersistableChannelQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) - q.channelQueue.cancel() - q.internal.(*LevelQueue).cancel() - log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) - q.channelQueue.Wait() - q.internal.(*LevelQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.channelQueue.dataChan { - _ = q.internal.Push(data) - atomic.AddInt64(&q.channelQueue.numInQueue, -1) - } - log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue and blocks till the queue is empty @@ -232,16 +231,37 @@ func (q *PersistableChannelQueue) IsEmpty() bool { func (q *PersistableChannelQueue) Shutdown() { log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() + select { case <-q.closed: + q.lock.Unlock() + return default: - if q.internal != nil { - q.internal.(*LevelQueue).Shutdown() - } - close(q.closed) - log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } + q.channelQueue.Shutdown() + if q.internal != nil { + q.internal.(*LevelQueue).Shutdown() + } + close(q.closed) + q.lock.Unlock() + + log.Trace("PersistableChannelQueue: %s Cancelling pools", q.delayedStarter.name) + q.channelQueue.baseCtxCancel() + q.internal.(*LevelQueue).baseCtxCancel() + log.Trace("PersistableChannelQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + atomic.AddInt64(&q.channelQueue.numInQueue, -1) + } + log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + + log.Debug("PersistableChannelQueue: %s Shutdown", q.delayedStarter.name) } // Terminate this queue and close the queue @@ -250,6 +270,7 @@ func (q *PersistableChannelQueue) Terminate() { q.Shutdown() q.lock.Lock() defer q.lock.Unlock() + q.channelQueue.Terminate() if q.internal != nil { q.internal.(*LevelQueue).Terminate() } diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go index 93061bffc..561f98ca9 100644 --- a/modules/queue/queue_disk_channel_test.go +++ b/modules/queue/queue_disk_channel_test.go @@ -5,10 +5,8 @@ package queue import ( - "context" "io/ioutil" "testing" - "time" "code.gitea.io/gitea/modules/util" "github.com/stretchr/testify/assert" @@ -32,17 +30,19 @@ func TestPersistableChannelQueue(t *testing.T) { defer util.RemoveAll(tmpDir) queue, err := NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "first", }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) @@ -64,13 +64,18 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Equal(t, test2.TestString, result2.TestString) assert.Equal(t, test2.TestInt, result2.TestInt) + // test1 is a testData not a *testData so will be rejected err = queue.Push(test1) assert.Error(t, err) + // Now shutdown the queue for _, callback := range queueShutdown { callback() } - time.Sleep(200 * time.Millisecond) + + // Wait til it is closed + <-queue.(*PersistableChannelQueue).closed + err = queue.Push(&test1) assert.NoError(t, err) err = queue.Push(&test2) @@ -80,23 +85,33 @@ func TestPersistableChannelQueue(t *testing.T) { assert.Fail(t, "Handler processing should have stopped") default: } + + // terminate the queue for _, callback := range queueTerminate { callback() } + select { + case <-handleChan: + assert.Fail(t, "Handler processing should have stopped") + default: + } + // Reopen queue queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{ - DataDir: tmpDir, - BatchLength: 2, - QueueLength: 20, - Workers: 1, - MaxWorkers: 10, + DataDir: tmpDir, + BatchLength: 2, + QueueLength: 20, + Workers: 1, + BoostWorkers: 0, + MaxWorkers: 10, + Name: "second", }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) }) diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go index edaed49a5..1f884d4f8 100644 --- a/modules/queue/queue_disk_test.go +++ b/modules/queue/queue_disk_test.go @@ -5,7 +5,6 @@ package queue import ( - "context" "io/ioutil" "sync" "testing" @@ -49,11 +48,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() @@ -123,11 +122,11 @@ func TestLevelQueue(t *testing.T) { }, &testData{}) assert.NoError(t, err) - go queue.Run(func(_ context.Context, shutdown func()) { + go queue.Run(func(shutdown func()) { lock.Lock() queueShutdown = append(queueShutdown, shutdown) lock.Unlock() - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { lock.Lock() queueTerminate = append(queueTerminate, terminate) lock.Unlock() diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go index af2cc3033..a5fb866dc 100644 --- a/modules/queue/queue_redis.go +++ b/modules/queue/queue_redis.go @@ -6,7 +6,6 @@ package queue import ( "context" - "fmt" "code.gitea.io/gitea/modules/graceful" "code.gitea.io/gitea/modules/log" @@ -47,8 +46,6 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisQueue{ ByteFIFOQueue: byteFIFOQueue, } @@ -73,8 +70,8 @@ var _ ByteFIFO = &RedisByteFIFO{} // RedisByteFIFO represents a ByteFIFO formed from a redisClient type RedisByteFIFO struct { - ctx context.Context - client redisClient + client redisClient + queueName string } @@ -89,7 +86,6 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) fifo := &RedisByteFIFO{ queueName: config.QueueName, } - fifo.ctx = graceful.GetManager().TerminateContext() fifo.client = nosql.GetManager().GetRedisClient(config.ConnectionString) if err := fifo.client.Ping(graceful.GetManager().ShutdownContext()).Err(); err != nil { return nil, err @@ -98,18 +94,18 @@ func NewRedisByteFIFO(config RedisByteFIFOConfiguration) (*RedisByteFIFO, error) } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { if fn != nil { if err := fn(); err != nil { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err == nil || err == redis.Nil { return data, nil } @@ -122,8 +118,8 @@ func (fifo *RedisByteFIFO) Close() error { } // Len returns the length of the fifo -func (fifo *RedisByteFIFO) Len() int64 { - val, err := fifo.client.LLen(fifo.ctx, fifo.queueName).Result() +func (fifo *RedisByteFIFO) Len(ctx context.Context) int64 { + val, err := fifo.client.LLen(ctx, fifo.queueName).Result() if err != nil { log.Error("Error whilst getting length of redis queue %s: Error: %v", fifo.queueName, err) return -1 diff --git a/modules/queue/queue_wrapped.go b/modules/queue/queue_wrapped.go index 88d64e824..ec30ab028 100644 --- a/modules/queue/queue_wrapped.go +++ b/modules/queue/queue_wrapped.go @@ -38,7 +38,7 @@ type delayedStarter struct { } // setInternal must be called with the lock locked. -func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), handle HandlerFunc, exemplar interface{}) error { +func (q *delayedStarter) setInternal(atShutdown func(func()), handle HandlerFunc, exemplar interface{}) error { var ctx context.Context var cancel context.CancelFunc if q.timeout > 0 { @@ -49,9 +49,7 @@ func (q *delayedStarter) setInternal(atShutdown func(context.Context, func()), h defer cancel() // Ensure we also stop at shutdown - atShutdown(ctx, func() { - cancel() - }) + atShutdown(cancel) i := 1 for q.internal == nil { @@ -221,7 +219,7 @@ func (q *WrappedQueue) IsEmpty() bool { } // Run starts to run the queue and attempts to create the internal queue -func (q *WrappedQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *WrappedQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("WrappedQueue: %s Starting", q.name) q.lock.Lock() if q.internal == nil { diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go index dec1cfc5c..5bec67c4d 100644 --- a/modules/queue/unique_queue_channel.go +++ b/modules/queue/unique_queue_channel.go @@ -28,11 +28,15 @@ type ChannelUniqueQueueConfiguration ChannelQueueConfiguration // only guaranteed whilst the task is waiting in the queue. type ChannelUniqueQueue struct { *WorkerPool - lock sync.Mutex - table map[Data]bool - exemplar interface{} - workers int - name string + lock sync.Mutex + table map[Data]bool + shutdownCtx context.Context + shutdownCtxCancel context.CancelFunc + terminateCtx context.Context + terminateCtxCancel context.CancelFunc + exemplar interface{} + workers int + name string } // NewChannelUniqueQueue create a memory channel queue @@ -45,11 +49,19 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue if config.BatchLength == 0 { config.BatchLength = 1 } + + terminateCtx, terminateCtxCancel := context.WithCancel(context.Background()) + shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx) + queue := &ChannelUniqueQueue{ - table: map[Data]bool{}, - exemplar: exemplar, - workers: config.Workers, - name: config.Name, + table: map[Data]bool{}, + shutdownCtx: shutdownCtx, + shutdownCtxCancel: shutdownCtxCancel, + terminateCtx: terminateCtx, + terminateCtxCancel: terminateCtxCancel, + exemplar: exemplar, + workers: config.Workers, + name: config.Name, } queue.WorkerPool = NewWorkerPool(func(data ...Data) { for _, datum := range data { @@ -65,17 +77,11 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue } // Run starts to run the queue -func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { - atShutdown(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not shutdownable!", q.name) - }) - atTerminate(context.Background(), func() { - log.Warn("ChannelUniqueQueue: %s is not terminatable!", q.name) - }) +func (q *ChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { + atShutdown(q.Shutdown) + atTerminate(q.Terminate) log.Debug("ChannelUniqueQueue: %s Starting", q.name) - go func() { - _ = q.AddWorkers(q.workers, 0) - }() + _ = q.AddWorkers(q.workers, 0) } // Push will push data into the queue if the data is not already in the queue @@ -122,6 +128,39 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) { return has, nil } +// Shutdown processing from this queue +func (q *ChannelUniqueQueue) Shutdown() { + log.Trace("ChannelUniqueQueue: %s Shutting down", q.name) + select { + case <-q.shutdownCtx.Done(): + return + default: + } + go func() { + log.Trace("ChannelUniqueQueue: %s Flushing", q.name) + if err := q.FlushWithContext(q.terminateCtx); err != nil { + log.Warn("ChannelUniqueQueue: %s Terminated before completed flushing", q.name) + return + } + log.Debug("ChannelUniqueQueue: %s Flushed", q.name) + }() + q.shutdownCtxCancel() + log.Debug("ChannelUniqueQueue: %s Shutdown", q.name) +} + +// Terminate this queue and close the queue +func (q *ChannelUniqueQueue) Terminate() { + log.Trace("ChannelUniqueQueue: %s Terminating", q.name) + q.Shutdown() + select { + case <-q.terminateCtx.Done(): + return + default: + } + q.terminateCtxCancel() + log.Debug("ChannelUniqueQueue: %s Terminated", q.name) +} + // Name returns the name of this queue func (q *ChannelUniqueQueue) Name() string { return q.name diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go index 8ec8848bc..bb0eb7d95 100644 --- a/modules/queue/unique_queue_disk.go +++ b/modules/queue/unique_queue_disk.go @@ -5,6 +5,8 @@ package queue import ( + "context" + "code.gitea.io/gitea/modules/nosql" "gitea.com/lunny/levelqueue" @@ -41,6 +43,7 @@ func NewLevelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, if len(config.ConnectionString) == 0 { config.ConnectionString = config.DataDir } + config.WaitOnEmpty = true byteFIFO, err := NewLevelUniqueQueueByteFIFO(config.ConnectionString, config.QueueName) if err != nil { @@ -86,12 +89,12 @@ func NewLevelUniqueQueueByteFIFO(connection, prefix string) (*LevelUniqueQueueBy } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *LevelUniqueQueueByteFIFO) PushFunc(data []byte, fn func() error) error { +func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { return fifo.internal.LPushFunc(data, fn) } // Pop pops data from the start of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { +func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) { data, err := fifo.internal.RPop() if err != nil && err != levelqueue.ErrNotFound { return nil, err @@ -100,12 +103,12 @@ func (fifo *LevelUniqueQueueByteFIFO) Pop() ([]byte, error) { } // Len returns the length of the fifo -func (fifo *LevelUniqueQueueByteFIFO) Len() int64 { +func (fifo *LevelUniqueQueueByteFIFO) Len(ctx context.Context) int64 { return fifo.internal.Len() } // Has returns whether the fifo contains this data -func (fifo *LevelUniqueQueueByteFIFO) Has(data []byte) (bool, error) { +func (fifo *LevelUniqueQueueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { return fifo.internal.Has(data) } diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go index 47c4f2bdd..65a394151 100644 --- a/modules/queue/unique_queue_disk_channel.go +++ b/modules/queue/unique_queue_disk_channel.go @@ -36,7 +36,7 @@ type PersistableChannelUniqueQueueConfiguration struct { // task cannot be processed twice or more at the same time. Uniqueness is // only guaranteed whilst the task is waiting in the queue. type PersistableChannelUniqueQueue struct { - *ChannelUniqueQueue + channelQueue *ChannelUniqueQueue delayedStarter lock sync.Mutex closed chan struct{} @@ -85,8 +85,8 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac } queue := &PersistableChannelUniqueQueue{ - ChannelUniqueQueue: channelUniqueQueue.(*ChannelUniqueQueue), - closed: make(chan struct{}), + channelQueue: channelUniqueQueue.(*ChannelUniqueQueue), + closed: make(chan struct{}), } levelQueue, err := NewLevelUniqueQueue(func(data ...Data) { @@ -138,14 +138,14 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err case <-q.closed: return q.internal.(UniqueQueue).PushFunc(data, fn) default: - return q.ChannelUniqueQueue.PushFunc(data, fn) + return q.channelQueue.PushFunc(data, fn) } } // Has will test if the queue has the data func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { // This is more difficult... - has, err := q.ChannelUniqueQueue.Has(data) + has, err := q.channelQueue.Has(data) if err != nil || has { return has, err } @@ -158,7 +158,7 @@ func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) { } // Run starts to run the queue -func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context.Context, func())) { +func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())) { log.Debug("PersistableChannelUniqueQueue: %s Starting", q.delayedStarter.name) q.lock.Lock() @@ -170,7 +170,7 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context log.Error("Unable push to channelled queue: %v", err) } } - }, q.exemplar) + }, q.channelQueue.exemplar) q.lock.Unlock() if err != nil { log.Fatal("Unable to create internal queue for %s Error: %v", q.Name(), err) @@ -179,53 +179,73 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(context } else { q.lock.Unlock() } - atShutdown(context.Background(), q.Shutdown) - atTerminate(context.Background(), q.Terminate) + atShutdown(q.Shutdown) + atTerminate(q.Terminate) + _ = q.channelQueue.AddWorkers(q.channelQueue.workers, 0) - // Just run the level queue - we shut it down later - go q.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {}) - - go func() { - _ = q.ChannelUniqueQueue.AddWorkers(q.workers, 0) - }() + if luq, ok := q.internal.(*LevelUniqueQueue); ok && luq.ByteFIFOUniqueQueue.byteFIFO.Len(luq.shutdownCtx) != 0 { + // Just run the level queue - we shut it down once it's flushed + go q.internal.Run(func(_ func()) {}, func(_ func()) {}) + go func() { + _ = q.internal.Flush(0) + log.Debug("LevelUniqueQueue: %s flushed so shutting down", q.internal.(*LevelQueue).Name()) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + }() + } else { + log.Debug("PersistableChannelUniqueQueue: %s Skipping running the empty level queue", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).Shutdown() + GetManager().Remove(q.internal.(*LevelUniqueQueue).qid) + } - log.Trace("PersistableChannelUniqueQueue: %s Waiting til closed", q.delayedStarter.name) - <-q.closed - log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) - q.internal.(*LevelUniqueQueue).cancel() - q.ChannelUniqueQueue.cancel() - log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) - q.ChannelUniqueQueue.Wait() - q.internal.(*LevelUniqueQueue).Wait() - // Redirect all remaining data in the chan to the internal channel - go func() { - log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) - for data := range q.ChannelUniqueQueue.dataChan { - _ = q.internal.Push(data) - } - log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) - }() - log.Trace("PersistableChannelUniqueQueue: %s Done main loop", q.delayedStarter.name) } // Flush flushes the queue func (q *PersistableChannelUniqueQueue) Flush(timeout time.Duration) error { - return q.ChannelUniqueQueue.Flush(timeout) + return q.channelQueue.Flush(timeout) +} + +// FlushWithContext flushes the queue +func (q *PersistableChannelUniqueQueue) FlushWithContext(ctx context.Context) error { + return q.channelQueue.FlushWithContext(ctx) +} + +// IsEmpty checks if a queue is empty +func (q *PersistableChannelUniqueQueue) IsEmpty() bool { + return q.channelQueue.IsEmpty() } // Shutdown processing this queue func (q *PersistableChannelUniqueQueue) Shutdown() { log.Trace("PersistableChannelUniqueQueue: %s Shutting down", q.delayedStarter.name) q.lock.Lock() - defer q.lock.Unlock() select { case <-q.closed: + q.lock.Unlock() + return default: if q.internal != nil { q.internal.(*LevelUniqueQueue).Shutdown() } close(q.closed) + q.lock.Unlock() } + + log.Trace("PersistableChannelUniqueQueue: %s Cancelling pools", q.delayedStarter.name) + q.internal.(*LevelUniqueQueue).baseCtxCancel() + q.channelQueue.baseCtxCancel() + log.Trace("PersistableChannelUniqueQueue: %s Waiting til done", q.delayedStarter.name) + q.channelQueue.Wait() + q.internal.(*LevelUniqueQueue).Wait() + // Redirect all remaining data in the chan to the internal channel + go func() { + log.Trace("PersistableChannelUniqueQueue: %s Redirecting remaining data", q.delayedStarter.name) + for data := range q.channelQueue.dataChan { + _ = q.internal.Push(data) + } + log.Trace("PersistableChannelUniqueQueue: %s Done Redirecting remaining data", q.delayedStarter.name) + }() + log.Debug("PersistableChannelUniqueQueue: %s Shutdown", q.delayedStarter.name) } diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go index 20a50cc1f..7474c0966 100644 --- a/modules/queue/unique_queue_redis.go +++ b/modules/queue/unique_queue_redis.go @@ -5,9 +5,8 @@ package queue import ( - "fmt" + "context" - "code.gitea.io/gitea/modules/graceful" "github.com/go-redis/redis/v8" ) @@ -51,8 +50,6 @@ func NewRedisUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, return nil, err } - byteFIFO.ctx = graceful.NewChannelContext(byteFIFOQueue.IsTerminated(), fmt.Errorf("queue has been terminated")) - queue := &RedisUniqueQueue{ ByteFIFOUniqueQueue: byteFIFOQueue, } @@ -92,8 +89,8 @@ func NewRedisUniqueByteFIFO(config RedisUniqueByteFIFOConfiguration) (*RedisUniq } // PushFunc pushes data to the end of the fifo and calls the callback if it is added -func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { - added, err := fifo.client.SAdd(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn func() error) error { + added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result() if err != nil { return err } @@ -105,12 +102,12 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(data []byte, fn func() error) error { return err } } - return fifo.client.RPush(fifo.ctx, fifo.queueName, data).Err() + return fifo.client.RPush(ctx, fifo.queueName, data).Err() } // Pop pops data from the start of the fifo -func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { - data, err := fifo.client.LPop(fifo.ctx, fifo.queueName).Bytes() +func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) { + data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes() if err != nil && err != redis.Nil { return data, err } @@ -119,13 +116,13 @@ func (fifo *RedisUniqueByteFIFO) Pop() ([]byte, error) { return data, nil } - err = fifo.client.SRem(fifo.ctx, fifo.setName, data).Err() + err = fifo.client.SRem(ctx, fifo.setName, data).Err() return data, err } // Has returns whether the fifo contains this data -func (fifo *RedisUniqueByteFIFO) Has(data []byte) (bool, error) { - return fifo.client.SIsMember(fifo.ctx, fifo.setName, data).Result() +func (fifo *RedisUniqueByteFIFO) Has(ctx context.Context, data []byte) (bool, error) { + return fifo.client.SIsMember(ctx, fifo.setName, data).Result() } func init() { diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go index 0f15ccac9..0176e2e0b 100644 --- a/modules/queue/workerpool.go +++ b/modules/queue/workerpool.go @@ -21,7 +21,7 @@ import ( type WorkerPool struct { lock sync.Mutex baseCtx context.Context - cancel context.CancelFunc + baseCtxCancel context.CancelFunc cond *sync.Cond qid int64 maxNumberOfWorkers int @@ -52,7 +52,7 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo dataChan := make(chan Data, config.QueueLength) pool := &WorkerPool{ baseCtx: ctx, - cancel: cancel, + baseCtxCancel: cancel, batchLength: config.BatchLength, dataChan: dataChan, handle: handle, @@ -83,7 +83,7 @@ func (p *WorkerPool) Push(data Data) { } func (p *WorkerPool) zeroBoost() { - ctx, cancel := context.WithCancel(p.baseCtx) + ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -94,26 +94,14 @@ func (p *WorkerPool) zeroBoost() { start := time.Now() pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } + cancel = func() { mq.RemoveWorkers(pid) - cancel() - }() + } } else { log.Warn("WorkerPool: %d has zero workers - adding %d temporary workers for %s", p.qid, p.boostWorkers, p.boostTimeout) - go func() { - select { - case <-ctx.Done(): - case <-time.After(p.boostTimeout): - } - cancel() - }() } p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(ctx, cancel, boost) } func (p *WorkerPool) pushBoost(data Data) { @@ -140,7 +128,7 @@ func (p *WorkerPool) pushBoost(data Data) { return } p.blockTimeout *= 2 - ctx, cancel := context.WithCancel(p.baseCtx) + boostCtx, boostCtxCancel := context.WithCancel(p.baseCtx) mq := GetManager().GetManagedQueue(p.qid) boost := p.boostWorkers if (boost+p.numberOfWorkers) > p.maxNumberOfWorkers && p.maxNumberOfWorkers >= 0 { @@ -150,24 +138,24 @@ func (p *WorkerPool) pushBoost(data Data) { log.Warn("WorkerPool: %d (for %s) Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, mq.Name, ourTimeout, boost, p.boostTimeout, p.blockTimeout) start := time.Now() - pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), cancel, false) + pid := mq.RegisterWorkers(boost, start, true, start.Add(p.boostTimeout), boostCtxCancel, false) go func() { - <-ctx.Done() + <-boostCtx.Done() mq.RemoveWorkers(pid) - cancel() + boostCtxCancel() }() } else { log.Warn("WorkerPool: %d Channel blocked for %v - adding %d temporary workers for %s, block timeout now %v", p.qid, ourTimeout, p.boostWorkers, p.boostTimeout, p.blockTimeout) } go func() { <-time.After(p.boostTimeout) - cancel() + boostCtxCancel() p.lock.Lock() p.blockTimeout /= 2 p.lock.Unlock() }() p.lock.Unlock() - p.addWorkers(ctx, boost) + p.addWorkers(boostCtx, boostCtxCancel, boost) p.dataChan <- data } } @@ -243,28 +231,25 @@ func (p *WorkerPool) commonRegisterWorkers(number int, timeout time.Duration, is mq := GetManager().GetManagedQueue(p.qid) if mq != nil { pid := mq.RegisterWorkers(number, start, hasTimeout, end, cancel, isFlusher) - go func() { - <-ctx.Done() - mq.RemoveWorkers(pid) - cancel() - }() log.Trace("WorkerPool: %d (for %s) adding %d workers with group id: %d", p.qid, mq.Name, number, pid) - } else { - log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) - + return ctx, func() { + mq.RemoveWorkers(pid) + } } + log.Trace("WorkerPool: %d adding %d workers (no group id)", p.qid, number) + return ctx, cancel } // AddWorkers adds workers to the pool - this allows the number of workers to go above the limit func (p *WorkerPool) AddWorkers(number int, timeout time.Duration) context.CancelFunc { ctx, cancel := p.commonRegisterWorkers(number, timeout, false) - p.addWorkers(ctx, number) + p.addWorkers(ctx, cancel, number) return cancel } // addWorkers adds workers to the pool -func (p *WorkerPool) addWorkers(ctx context.Context, number int) { +func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc, number int) { for i := 0; i < number; i++ { p.lock.Lock() if p.cond == nil { @@ -279,11 +264,13 @@ func (p *WorkerPool) addWorkers(ctx context.Context, number int) { p.numberOfWorkers-- if p.numberOfWorkers == 0 { p.cond.Broadcast() + cancel() } else if p.numberOfWorkers < 0 { // numberOfWorkers can't go negative but... log.Warn("Number of Workers < 0 for QID %d - this shouldn't happen", p.qid) p.numberOfWorkers = 0 p.cond.Broadcast() + cancel() } p.lock.Unlock() }() diff --git a/services/pull/check_test.go b/services/pull/check_test.go index 33a230e5a..f6614ea0a 100644 --- a/services/pull/check_test.go +++ b/services/pull/check_test.go @@ -6,7 +6,6 @@ package pull import ( - "context" "strconv" "testing" "time" @@ -54,9 +53,9 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) { assert.True(t, has) assert.NoError(t, err) - prQueue.Run(func(_ context.Context, shutdown func()) { + prQueue.Run(func(shutdown func()) { queueShutdown = append(queueShutdown, shutdown) - }, func(_ context.Context, terminate func()) { + }, func(terminate func()) { queueTerminate = append(queueTerminate, terminate) })