Browse Source

Queue: Make WorkerPools and Queues flushable (#10001)

* Make WorkerPools and Queues flushable

Adds Flush methods to Queues and the WorkerPool
Further abstracts the WorkerPool
Adds a final step to Flush the queues in the defer from PrintCurrentTest
Fixes an issue with Settings inheritance in queues

Signed-off-by: Andrew Thornton <art27@cantab.net>

* Change to for loop

* Add IsEmpty and begin just making the queues composed WorkerPools

* subsume workerpool into the queues and create a flushable interface

* Add manager command

* Move flushall to queue.Manager and add to testlogger

* As per @guillep2k

* as per @guillep2k

* Just make queues all implement flushable and clean up the wrapped queue flushes

* cope with no timeout

Co-authored-by: Lauris BH <lauris@nix.lv>
mj
zeripath 3 years ago
committed by GitHub
parent
commit
c01221e70f
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 92
      cmd/manager.go
  2. 6
      integrations/testlogger.go
  3. 1
      main.go
  4. 46
      modules/graceful/manager_unix.go
  5. 33
      modules/graceful/manager_windows.go
  6. 83
      modules/private/manager.go
  7. 63
      modules/queue/helper.go
  8. 159
      modules/queue/manager.go
  9. 43
      modules/queue/queue.go
  10. 59
      modules/queue/queue_channel.go
  11. 30
      modules/queue/queue_channel_test.go
  12. 87
      modules/queue/queue_disk.go
  13. 126
      modules/queue/queue_disk_channel.go
  14. 36
      modules/queue/queue_disk_test.go
  15. 102
      modules/queue/queue_redis.go
  16. 97
      modules/queue/queue_wrapped.go
  17. 24
      modules/queue/setting.go
  18. 94
      modules/queue/workerpool.go
  19. 20
      modules/setting/queue.go
  20. 4
      options/locale/locale_en-US.ini
  21. 30
      routers/admin/admin.go
  22. 4
      routers/private/internal.go
  23. 41
      routers/private/manager.go
  24. 28
      routers/private/manager_unix.go
  25. 28
      routers/private/manager_windows.go
  26. 1
      routers/routes/routes.go
  27. 20
      templates/admin/queue.tmpl

92
cmd/manager.go

@ -0,0 +1,92 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package cmd
import (
"fmt"
"net/http"
"os"
"time"
"code.gitea.io/gitea/modules/private"
"github.com/urfave/cli"
)
var (
// CmdManager represents the manager command
CmdManager = cli.Command{
Name: "manager",
Usage: "Manage the running gitea process",
Description: "This is a command for managing the running gitea process",
Subcommands: []cli.Command{
subcmdShutdown,
subcmdRestart,
subcmdFlushQueues,
},
}
subcmdShutdown = cli.Command{
Name: "shutdown",
Usage: "Gracefully shutdown the running process",
Action: runShutdown,
}
subcmdRestart = cli.Command{
Name: "restart",
Usage: "Gracefully restart the running process - (not implemented for windows servers)",
Action: runRestart,
}
subcmdFlushQueues = cli.Command{
Name: "flush-queues",
Usage: "Flush queues in the running process",
Action: runFlushQueues,
Flags: []cli.Flag{
cli.DurationFlag{
Name: "timeout",
Value: 60 * time.Second,
Usage: "Timeout for the flushing process",
},
cli.BoolFlag{
Name: "non-blocking",
Usage: "Set to true to not wait for flush to complete before returning",
},
},
}
)
func runShutdown(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.Shutdown()
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}
fmt.Fprintln(os.Stdout, msg)
return nil
}
func runRestart(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.Restart()
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}
fmt.Fprintln(os.Stdout, msg)
return nil
}
func runFlushQueues(c *cli.Context) error {
setup("manager", false)
statusCode, msg := private.FlushQueues(c.Duration("timeout"), c.Bool("non-blocking"))
switch statusCode {
case http.StatusInternalServerError:
fail("InternalServerError", msg)
}
fmt.Fprintln(os.Stdout, msg)
return nil
}

6
integrations/testlogger.go

@ -5,6 +5,7 @@
package integrations
import (
"context"
"encoding/json"
"fmt"
"os"
@ -12,8 +13,10 @@ import (
"strings"
"sync"
"testing"
"time"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/queue"
)
var prefix string
@ -98,6 +101,9 @@ func PrintCurrentTest(t testing.TB, skip ...int) func() {
}
writerCloser.setT(&t)
return func() {
if err := queue.GetManager().FlushAll(context.Background(), 20*time.Second); err != nil {
t.Errorf("Flushing queues failed with error %v", err)
}
_ = writerCloser.Close()
}
}

1
main.go

@ -69,6 +69,7 @@ arguments - which can alternatively be run by running the subcommand web.`
cmd.CmdKeys,
cmd.CmdConvert,
cmd.CmdDoctor,
cmd.CmdManager,
}
// Now adjust these commands to add our global configuration options

46
modules/graceful/manager_unix.go

@ -110,28 +110,19 @@ func (g *Manager) handleSignals(ctx context.Context) {
case sig := <-signalChannel:
switch sig {
case syscall.SIGHUP:
if setting.GracefulRestartable {
log.Info("PID: %d. Received SIGHUP. Forking...", pid)
err := g.doFork()
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
log.Error("Error whilst forking from PID: %d : %v", pid, err)
}
} else {
log.Info("PID: %d. Received SIGHUP. Not set restartable. Shutting down...", pid)
g.doShutdown()
}
log.Info("PID: %d. Received SIGHUP. Attempting GracefulShutdown...", pid)
g.DoGracefulShutdown()
case syscall.SIGUSR1:
log.Info("PID %d. Received SIGUSR1.", pid)
case syscall.SIGUSR2:
log.Warn("PID %d. Received SIGUSR2. Hammering...", pid)
g.doHammerTime(0 * time.Second)
g.DoImmediateHammer()
case syscall.SIGINT:
log.Warn("PID %d. Received SIGINT. Shutting down...", pid)
g.doShutdown()
g.DoGracefulShutdown()
case syscall.SIGTERM:
log.Warn("PID %d. Received SIGTERM. Shutting down...", pid)
g.doShutdown()
g.DoGracefulShutdown()
case syscall.SIGTSTP:
log.Info("PID %d. Received SIGTSTP.", pid)
default:
@ -139,7 +130,7 @@ func (g *Manager) handleSignals(ctx context.Context) {
}
case <-ctx.Done():
log.Warn("PID: %d. Background context for manager closed - %v - Shutting down...", pid, ctx.Err())
g.doShutdown()
g.DoGracefulShutdown()
}
}
}
@ -160,6 +151,31 @@ func (g *Manager) doFork() error {
return err
}
// DoGracefulRestart causes a graceful restart
func (g *Manager) DoGracefulRestart() {
if setting.GracefulRestartable {
log.Info("PID: %d. Forking...", os.Getpid())
err := g.doFork()
if err != nil && err.Error() != "another process already forked. Ignoring this one" {
log.Error("Error whilst forking from PID: %d : %v", os.Getpid(), err)
}
} else {
log.Info("PID: %d. Not set restartable. Shutting down...", os.Getpid())
g.doShutdown()
}
}
// DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() {
g.doHammerTime(0 * time.Second)
}
// DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() {
g.doShutdown()
}
// RegisterServer registers the running of a listening server, in the case of unix this means that the parent process can now die.
// Any call to RegisterServer must be matched by a call to ServerDone
func (g *Manager) RegisterServer() {

33
modules/graceful/manager_windows.go

@ -43,6 +43,7 @@ type Manager struct {
runningServerWaitGroup sync.WaitGroup
createServerWaitGroup sync.WaitGroup
terminateWaitGroup sync.WaitGroup
shutdownRequested chan struct{}
}
func newGracefulManager(ctx context.Context) *Manager {
@ -62,6 +63,7 @@ func (g *Manager) start() {
g.shutdown = make(chan struct{})
g.hammer = make(chan struct{})
g.done = make(chan struct{})
g.shutdownRequested = make(chan struct{})
// Set the running state
g.setState(stateRunning)
@ -107,7 +109,10 @@ loop:
for {
select {
case <-g.ctx.Done():
g.doShutdown()
g.DoGracefulShutdown()
waitTime += setting.GracefulHammerTime
break loop
case <-g.shutdownRequested:
waitTime += setting.GracefulHammerTime
break loop
case change := <-changes:
@ -115,12 +120,12 @@ loop:
case svc.Interrogate:
status <- change.CurrentStatus
case svc.Stop, svc.Shutdown:
g.doShutdown()
g.DoGracefulShutdown()
waitTime += setting.GracefulHammerTime
break loop
case hammerCode:
g.doShutdown()
g.doHammerTime(0 * time.Second)
g.DoGracefulShutdown()
g.DoImmediateHammer()
break loop
default:
log.Debug("Unexpected control request: %v", change.Cmd)
@ -140,7 +145,7 @@ hammerLoop:
case svc.Interrogate:
status <- change.CurrentStatus
case svc.Stop, svc.Shutdown, hammerCmd:
g.doHammerTime(0 * time.Second)
g.DoImmediateHammer()
break hammerLoop
default:
log.Debug("Unexpected control request: %v", change.Cmd)
@ -152,6 +157,24 @@ hammerLoop:
return false, 0
}
// DoImmediateHammer causes an immediate hammer
func (g *Manager) DoImmediateHammer() {
g.doHammerTime(0 * time.Second)
}
// DoGracefulShutdown causes a graceful shutdown
func (g *Manager) DoGracefulShutdown() {
g.lock.Lock()
select {
case <-g.shutdownRequested:
g.lock.Unlock()
default:
close(g.shutdownRequested)
g.lock.Unlock()
g.doShutdown()
}
}
// RegisterServer registers the running of a listening server.
// Any call to RegisterServer must be matched by a call to ServerDone
func (g *Manager) RegisterServer() {

83
modules/private/manager.go

@ -0,0 +1,83 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package private
import (
"encoding/json"
"fmt"
"net/http"
"time"
"code.gitea.io/gitea/modules/setting"
)
// Shutdown calls the internal shutdown function
func Shutdown() (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/shutdown")
req := newInternalRequest(reqURL, "POST")
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}
return http.StatusOK, "Shutting down"
}
// Restart calls the internal restart function
func Restart() (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/restart")
req := newInternalRequest(reqURL, "POST")
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}
return http.StatusOK, "Restarting"
}
// FlushOptions represents the options for the flush call
type FlushOptions struct {
Timeout time.Duration
NonBlocking bool
}
// FlushQueues calls the internal flush-queues function
func FlushQueues(timeout time.Duration, nonBlocking bool) (int, string) {
reqURL := setting.LocalURL + fmt.Sprintf("api/internal/manager/flush-queues")
req := newInternalRequest(reqURL, "POST")
if timeout > 0 {
req.SetTimeout(timeout+10*time.Second, timeout+10*time.Second)
}
req = req.Header("Content-Type", "application/json")
jsonBytes, _ := json.Marshal(FlushOptions{
Timeout: timeout,
NonBlocking: nonBlocking,
})
req.Body(jsonBytes)
resp, err := req.Response()
if err != nil {
return http.StatusInternalServerError, fmt.Sprintf("Unable to contact gitea: %v", err.Error())
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return resp.StatusCode, decodeJSONError(resp).Err
}
return http.StatusOK, "Flushed"
}

63
modules/queue/helper.go

@ -0,0 +1,63 @@
// Copyright 2020 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.
package queue
import (
"encoding/json"
"reflect"
)
// toConfig will attempt to convert a given configuration cfg into the provided exemplar type.
//
// It will tolerate the cfg being passed as a []byte or string of a json representation of the
// exemplar or the correct type of the exemplar itself
func toConfig(exemplar, cfg interface{}) (interface{}, error) {
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
return cfg, nil
}
configBytes, ok := cfg.([]byte)
if !ok {
configStr, ok := cfg.(string)
if !ok {
return nil, ErrInvalidConfiguration{cfg: cfg}
}
configBytes = []byte(configStr)
}
newVal := reflect.New(reflect.TypeOf(exemplar))
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
}
return newVal.Elem().Interface(), nil
}
// unmarshalAs will attempt to unmarshal provided bytes as the provided exemplar
func unmarshalAs(bs []byte, exemplar interface{}) (data Data, err error) {
if exemplar != nil {
t := reflect.TypeOf(exemplar)
n := reflect.New(t)
ne := n.Elem()
err = json.Unmarshal(bs, ne.Addr().Interface())
data = ne.Interface().(Data)
} else {
err = json.Unmarshal(bs, &data)
}
return
}
// assignableTo will check if provided data is assignable to the same type as the exemplar
// if the provided exemplar is nil then it will always return true
func assignableTo(data Data, exemplar interface{}) bool {
if exemplar == nil {
return true
}
// Assert data is of same type as exemplar
t := reflect.TypeOf(data)
exemplarType := reflect.TypeOf(exemplar)
return t.AssignableTo(exemplarType) && data != nil
}

159
modules/queue/manager.go

@ -26,36 +26,57 @@ type Manager struct {
Queues map[int64]*ManagedQueue
}
// ManagedQueue represents a working queue inheriting from Gitea.
// ManagedQueue represents a working queue with a Pool of workers.
//
// Although a ManagedQueue should really represent a Queue this does not
// necessarily have to be the case. This could be used to describe any queue.WorkerPool.
type ManagedQueue struct {
mutex sync.Mutex
QID int64
Queue Queue
Type Type
Name string
Configuration interface{}
ExemplarType string
Pool ManagedPool
Managed interface{}
counter int64
PoolWorkers map[int64]*PoolWorkers
}
// Flushable represents a pool or queue that is flushable
type Flushable interface {
// Flush will add a flush worker to the pool - the worker should be autoregistered with the manager
Flush(time.Duration) error
// FlushWithContext is very similar to Flush
// NB: The worker will not be registered with the manager.
FlushWithContext(ctx context.Context) error
// IsEmpty will return if the managed pool is empty and has no work
IsEmpty() bool
}
// ManagedPool is a simple interface to get certain details from a worker pool
type ManagedPool interface {
// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
AddWorkers(number int, timeout time.Duration) context.CancelFunc
// NumberOfWorkers returns the total number of workers in the pool
NumberOfWorkers() int
// MaxNumberOfWorkers returns the maximum number of workers the pool can dynamically grow to
MaxNumberOfWorkers() int
// SetMaxNumberOfWorkers sets the maximum number of workers the pool can dynamically grow to
SetMaxNumberOfWorkers(int)
// BoostTimeout returns the current timeout for worker groups created during a boost
BoostTimeout() time.Duration
// BlockTimeout returns the timeout the internal channel can block for before a boost would occur
BlockTimeout() time.Duration
// BoostWorkers sets the number of workers to be created during a boost
BoostWorkers() int
SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
// SetPoolSettings sets the user updatable settings for the pool
SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration)
}
// ManagedQueueList implements the sort.Interface
type ManagedQueueList []*ManagedQueue
// PoolWorkers represents a working queue inheriting from Gitea.
// PoolWorkers represents a group of workers working on a queue
type PoolWorkers struct {
PID int64
Workers int
@ -63,9 +84,10 @@ type PoolWorkers struct {
Timeout time.Time
HasTimeout bool
Cancel context.CancelFunc
IsFlusher bool
}
// PoolWorkersList implements the sort.Interface
// PoolWorkersList implements the sort.Interface for PoolWorkers
type PoolWorkersList []*PoolWorkers
func init() {
@ -83,27 +105,28 @@ func GetManager() *Manager {
}
// Add adds a queue to this manager
func (m *Manager) Add(queue Queue,
func (m *Manager) Add(managed interface{},
t Type,
configuration,
exemplar interface{},
pool ManagedPool) int64 {
exemplar interface{}) int64 {
cfg, _ := json.Marshal(configuration)
mq := &ManagedQueue{
Queue: queue,
Type: t,
Configuration: string(cfg),
ExemplarType: reflect.TypeOf(exemplar).String(),
PoolWorkers: make(map[int64]*PoolWorkers),
Pool: pool,
Managed: managed,
}
m.mutex.Lock()
m.counter++
mq.QID = m.counter
mq.Name = fmt.Sprintf("queue-%d", mq.QID)
if named, ok := queue.(Named); ok {
mq.Name = named.Name()
if named, ok := managed.(Named); ok {
name := named.Name()
if len(name) > 0 {
mq.Name = name
}
}
m.Queues[mq.QID] = mq
m.mutex.Unlock()
@ -127,6 +150,64 @@ func (m *Manager) GetManagedQueue(qid int64) *ManagedQueue {
return m.Queues[qid]
}
// FlushAll flushes all the flushable queues attached to this manager
func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error {
var ctx context.Context
var cancel context.CancelFunc
start := time.Now()
end := start
hasTimeout := false
if timeout > 0 {
ctx, cancel = context.WithTimeout(baseCtx, timeout)
end = start.Add(timeout)
hasTimeout = true
} else {
ctx, cancel = context.WithCancel(baseCtx)
}
defer cancel()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
mqs := m.ManagedQueues()
wg := sync.WaitGroup{}
wg.Add(len(mqs))
allEmpty := true
for _, mq := range mqs {
if mq.IsEmpty() {
wg.Done()
continue
}
allEmpty = false
if flushable, ok := mq.Managed.(Flushable); ok {
go func() {
localCtx, localCancel := context.WithCancel(ctx)
pid := mq.RegisterWorkers(1, start, hasTimeout, end, localCancel, true)
err := flushable.FlushWithContext(localCtx)
if err != nil && err != ctx.Err() {
cancel()
}
mq.CancelWorkers(pid)
localCancel()
wg.Done()
}()
} else {
wg.Done()
}
}
if allEmpty {
break
}
wg.Wait()
}
return nil
}
// ManagedQueues returns the managed queues
func (m *Manager) ManagedQueues() []*ManagedQueue {
m.mutex.Lock()
@ -152,7 +233,7 @@ func (q *ManagedQueue) Workers() []*PoolWorkers {
}
// RegisterWorkers registers workers to this queue
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc) int64 {
func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout bool, timeout time.Time, cancel context.CancelFunc, isFlusher bool) int64 {
q.mutex.Lock()
defer q.mutex.Unlock()
q.counter++
@ -163,6 +244,7 @@ func (q *ManagedQueue) RegisterWorkers(number int, start time.Time, hasTimeout b
Timeout: timeout,
HasTimeout: hasTimeout,
Cancel: cancel,
IsFlusher: isFlusher,
}
return q.counter
}
@ -191,57 +273,74 @@ func (q *ManagedQueue) RemoveWorkers(pid int64) {
// AddWorkers adds workers to the queue if it has registered an add worker function
func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.CancelFunc {
if q.Pool != nil {
if pool, ok := q.Managed.(ManagedPool); ok {
// the cancel will be added to the pool workers description above
return q.Pool.AddWorkers(number, timeout)
return pool.AddWorkers(number, timeout)
}
return nil
}
// Flush flushes the queue with a timeout
func (q *ManagedQueue) Flush(timeout time.Duration) error {
if flushable, ok := q.Managed.(Flushable); ok {
// the cancel will be added to the pool workers description above
return flushable.Flush(timeout)
}
return nil
}
// IsEmpty returns if the queue is empty
func (q *ManagedQueue) IsEmpty() bool {
if flushable, ok := q.Managed.(Flushable); ok {
return flushable.IsEmpty()
}
return true
}
// NumberOfWorkers returns the number of workers in the queue
func (q *ManagedQueue) NumberOfWorkers() int {
if q.Pool != nil {
return q.Pool.NumberOfWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.NumberOfWorkers()
}
return -1
}
// MaxNumberOfWorkers returns the maximum number of workers for the pool
func (q *ManagedQueue) MaxNumberOfWorkers() int {
if q.Pool != nil {
return q.Pool.MaxNumberOfWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.MaxNumberOfWorkers()
}
return 0
}
// BoostWorkers returns the number of workers for a boost
func (q *ManagedQueue) BoostWorkers() int {
if q.Pool != nil {
return q.Pool.BoostWorkers()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostWorkers()
}
return -1
}
// BoostTimeout returns the timeout of the next boost
func (q *ManagedQueue) BoostTimeout() time.Duration {
if q.Pool != nil {
return q.Pool.BoostTimeout()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BoostTimeout()
}
return 0
}
// BlockTimeout returns the timeout til the next boost
func (q *ManagedQueue) BlockTimeout() time.Duration {
if q.Pool != nil {
return q.Pool.BlockTimeout()
if pool, ok := q.Managed.(ManagedPool); ok {
return pool.BlockTimeout()
}
return 0
}
// SetSettings sets the setable boost values
func (q *ManagedQueue) SetSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if q.Pool != nil {
q.Pool.SetSettings(maxNumberOfWorkers, boostWorkers, timeout)
// SetPoolSettings sets the setable boost values
func (q *ManagedQueue) SetPoolSettings(maxNumberOfWorkers, boostWorkers int, timeout time.Duration) {
if pool, ok := q.Managed.(ManagedPool); ok {
pool.SetPoolSettings(maxNumberOfWorkers, boostWorkers, timeout)
}
}

43
modules/queue/queue.go

@ -6,9 +6,8 @@ package queue
import (
"context"
"encoding/json"
"fmt"
"reflect"
"time"
)
// ErrInvalidConfiguration is called when there is invalid configuration for a queue
@ -53,8 +52,11 @@ type Named interface {
Name() string
}
// Queue defines an interface to save an issue indexer queue
// Queue defines an interface of a queue-like item
//
// Queues will handle their own contents in the Run method
type Queue interface {
Flushable
Run(atShutdown, atTerminate func(context.Context, func()))
Push(Data) error
}
@ -71,32 +73,27 @@ func NewDummyQueue(handler HandlerFunc, opts, exemplar interface{}) (Queue, erro
type DummyQueue struct {
}
// Run starts to run the queue
// Run does nothing
func (b *DummyQueue) Run(_, _ func(context.Context, func())) {}
// Push pushes data to the queue
// Push fakes a push of data to the queue
func (b *DummyQueue) Push(Data) error {
return nil
}
func toConfig(exemplar, cfg interface{}) (interface{}, error) {
if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) {
return cfg, nil
}
// Flush always returns nil
func (b *DummyQueue) Flush(time.Duration) error {
return nil
}
configBytes, ok := cfg.([]byte)
if !ok {
configStr, ok := cfg.(string)
if !ok {
return nil, ErrInvalidConfiguration{cfg: cfg}
}
configBytes = []byte(configStr)
}
newVal := reflect.New(reflect.TypeOf(exemplar))
if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil {
return nil, ErrInvalidConfiguration{cfg: cfg, err: err}
}
return newVal.Elem().Interface(), nil
// FlushWithContext always returns nil
func (b *DummyQueue) FlushWithContext(context.Context) error {
return nil
}
// IsEmpty asserts that the queue is empty
func (b *DummyQueue) IsEmpty() bool {
return true
}
var queuesMap = map[Type]NewQueueFunc{DummyQueueType: NewDummyQueue}
@ -123,7 +120,7 @@ func RegisteredTypesAsString() []string {
return types
}
// NewQueue takes a queue Type and HandlerFunc some options and possibly an exemplar and returns a Queue or an error
// NewQueue takes a queue Type, HandlerFunc, some options and possibly an exemplar and returns a Queue or an error
func NewQueue(queueType Type, handlerFunc HandlerFunc, opts, exemplar interface{}) (Queue, error) {
newFn, ok := queuesMap[queueType]
if !ok {

59
modules/queue/queue_channel.go

@ -7,8 +7,6 @@ package queue
import (
"context"
"fmt"
"reflect"
"time"
"code.gitea.io/gitea/modules/log"
)
@ -18,25 +16,23 @@ const ChannelQueueType Type = "channel"
// ChannelQueueConfiguration is the configuration for a ChannelQueue
type ChannelQueueConfiguration struct {
QueueLength int
BatchLength int
Workers int
MaxWorkers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
Name string
WorkerPoolConfiguration
Workers int
Name string
}
// ChannelQueue implements
// ChannelQueue implements Queue
//
// A channel queue is not persistable and does not shutdown or terminate cleanly
// It is basically a very thin wrapper around a WorkerPool
type ChannelQueue struct {
pool *WorkerPool
*WorkerPool
exemplar interface{}
workers int
name string
}
// NewChannelQueue create a memory channel queue
// NewChannelQueue creates a memory channel queue
func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error) {
configInterface, err := toConfig(ChannelQueueConfiguration{}, cfg)
if err != nil {
@ -46,26 +42,13 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
if config.BatchLength == 0 {
config.BatchLength = 1
}
dataChan := make(chan Data, config.QueueLength)
ctx, cancel := context.WithCancel(context.Background())
queue := &ChannelQueue{
pool: &WorkerPool{
baseCtx: ctx,
cancel: cancel,
batchLength: config.BatchLength,
handle: handle,
dataChan: dataChan,
blockTimeout: config.BlockTimeout,
boostTimeout: config.BoostTimeout,
boostWorkers: config.BoostWorkers,
maxNumberOfWorkers: config.MaxWorkers,
},
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
exemplar: exemplar,
workers: config.Workers,
name: config.Name,
}
queue.pool.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar, queue.pool)
queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
return queue, nil
}
@ -77,22 +60,18 @@ func (c *ChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())
atTerminate(context.Background(), func() {
log.Warn("ChannelQueue: %s is not terminatable!", c.name)
})
log.Debug("ChannelQueue: %s Starting", c.name)
go func() {
_ = c.pool.AddWorkers(c.workers, 0)
_ = c.AddWorkers(c.workers, 0)
}()
}
// Push will push data into the queue
func (c *ChannelQueue) Push(data Data) error {
if c.exemplar != nil {
// Assert data is of same type as r.exemplar
t := reflect.TypeOf(data)
exemplarType := reflect.TypeOf(c.exemplar)
if !t.AssignableTo(exemplarType) || data == nil {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
}
if !assignableTo(data, c.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in queue: %s", data, c.exemplar, c.name)
}
c.pool.Push(data)
c.WorkerPool.Push(data)
return nil
}

30
modules/queue/queue_channel_test.go

@ -25,12 +25,14 @@ func TestChannelQueue(t *testing.T) {
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
QueueLength: 20,
Workers: 1,
MaxWorkers: 10,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 20,
MaxWorkers: 10,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
},
Workers: 1,
}, &testData{})
assert.NoError(t, err)
@ -60,13 +62,15 @@ func TestChannelQueue_Batch(t *testing.T) {
queue, err := NewChannelQueue(handle,
ChannelQueueConfiguration{
QueueLength: 20,
BatchLength: 2,
Workers: 1,
MaxWorkers: 10,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 20,
BatchLength: 2,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
MaxWorkers: 10,
},
Workers: 1,
}, &testData{})
assert.NoError(t, err)

87
modules/queue/queue_disk.go

@ -8,8 +8,8 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@ -22,20 +22,15 @@ const LevelQueueType Type = "level"
// LevelQueueConfiguration is the configuration for a LevelQueue
type LevelQueueConfiguration struct {
DataDir string
QueueLength int
BatchLength int
Workers int
MaxWorkers int
BlockTimeout time.Duration
BoostTimeout time.Duration
BoostWorkers int
Name string
WorkerPoolConfiguration
DataDir string
Workers int
Name string
}
// LevelQueue implements a disk library queue
type LevelQueue struct {
pool *WorkerPool
*WorkerPool
queue *levelqueue.Queue
closed chan struct{}
terminated chan struct{}
@ -58,21 +53,8 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
return nil, err
}
dataChan := make(chan Data, config.QueueLength)
ctx, cancel := context.WithCancel(context.Background())
queue := &LevelQueue{
pool: &WorkerPool{
baseCtx: ctx,
cancel: cancel,
batchLength: config.BatchLength,
handle: handle,
dataChan: dataChan,
blockTimeout: config.BlockTimeout,
boostTimeout: config.BoostTimeout,
boostWorkers: config.BoostWorkers,
maxNumberOfWorkers: config.MaxWorkers,
},
WorkerPool: NewWorkerPool(handle, config.WorkerPoolConfiguration),
queue: internal,
exemplar: exemplar,
closed: make(chan struct{}),
@ -80,7 +62,7 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
workers: config.Workers,
name: config.Name,
}
queue.pool.qid = GetManager().Add(queue, LevelQueueType, config, exemplar, queue.pool)
queue.qid = GetManager().Add(queue, LevelQueueType, config, exemplar)
return queue, nil
}
@ -88,9 +70,10 @@ func NewLevelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
atShutdown(context.Background(), l.Shutdown)
atTerminate(context.Background(), l.Terminate)
log.Debug("LevelQueue: %s Starting", l.name)
go func() {
_ = l.pool.AddWorkers(l.workers, 0)
_ = l.AddWorkers(l.workers, 0)
}()
go l.readToChan()
@ -99,12 +82,12 @@ func (l *LevelQueue) Run(atShutdown, atTerminate func(context.Context, func()))
<-l.closed
log.Trace("LevelQueue: %s Waiting til done", l.name)
l.pool.Wait()
l.Wait()
log.Trace("LevelQueue: %s Waiting til cleaned", l.name)
ctx, cancel := context.WithCancel(context.Background())
atTerminate(ctx, cancel)
l.pool.CleanUp(ctx)
l.CleanUp(ctx)
cancel()
log.Trace("LevelQueue: %s Cleaned", l.name)
@ -115,56 +98,45 @@ func (l *LevelQueue) readToChan() {
select {
case <-l.closed:
// tell the pool to shutdown.
l.pool.cancel()
l.cancel()
return
default:
atomic.AddInt64(&l.numInQueue, 1)
bs, err := l.queue.RPop()
if err != nil {
if err != levelqueue.ErrNotFound {
log.Error("LevelQueue: %s Error on RPop: %v", l.name, err)
}
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
if len(bs) == 0 {
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
var data Data
if l.exemplar != nil {
t := reflect.TypeOf(l.exemplar)
n := reflect.New(t)
ne := n.Elem()
err = json.Unmarshal(bs, ne.Addr().Interface())
data = ne.Interface().(Data)
} else {
err = json.Unmarshal(bs, &data)
}
data, err := unmarshalAs(bs, l.exemplar)
if err != nil {
log.Error("LevelQueue: %s Failed to unmarshal with error: %v", l.name, err)
atomic.AddInt64(&l.numInQueue, -1)
time.Sleep(time.Millisecond * 100)
continue
}
log.Trace("LevelQueue %s: Task found: %#v", l.name, data)
l.pool.Push(data)
l.WorkerPool.Push(data)
atomic.AddInt64(&l.numInQueue, -1)
}
}
}
// Push will push the indexer data to queue
func (l *LevelQueue) Push(data Data) error {
if l.exemplar != nil {
// Assert data is of same type as r.exemplar
value := reflect.ValueOf(data)
t := value.Type()
exemplarType := reflect.ValueOf(l.exemplar).Type()
if !t.AssignableTo(exemplarType) || data == nil {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
}
if !assignableTo(data, l.exemplar) {
return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, l.exemplar, l.name)
}
bs, err := json.Marshal(data)
if err != nil {
@ -173,16 +145,25 @@ func (l *LevelQueue) Push(data Data) error {
return l.queue.LPush(bs)
}
// IsEmpty checks whether the queue is empty
func (l *LevelQueue) IsEmpty() bool {
if !l.WorkerPool.IsEmpty() {
return false
}
return l.queue.Len() == 0
}
// Shutdown this queue and stop processing
func (l *LevelQueue) Shutdown() {
l.lock.Lock()
defer l.lock.Unlock()
log.Trace("LevelQueue: %s Shutdown", l.name)
log.Trace("LevelQueue: %s Shutting down", l.name)
select {
case <-l.closed:
default:
close(l.closed)
}
log.Debug("LevelQueue: %s Shutdown", l.name)
}
// Terminate this queue and close the queue
@ -196,11 +177,15 @@ func (l *LevelQueue) Terminate() {
default:
close(l.terminated)
l.lock.Unlock()
if log.IsDebug() {
log.Debug("LevelQueue: %s Closing with %d tasks left in queue", l.name, l.queue.Len())
}
if err := l.queue.Close(); err != nil && err.Error() != "leveldb: closed" {
log.Error("Error whilst closing internal queue in %s: %v", l.name, err)
}
}
log.Debug("LevelQueue: %s Terminated", l.name)
}
// Name returns the name of this queue

126
modules/queue/queue_disk_channel.go

@ -6,7 +6,9 @@ package queue
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"code.gitea.io/gitea/modules/log"
@ -31,8 +33,10 @@ type PersistableChannelQueueConfiguration struct {
}
// PersistableChannelQueue wraps a channel queue and level queue together
// The disk level queue will be used to store data at shutdown and terminate - and will be restored
// on start up.
type PersistableChannelQueue struct {
*ChannelQueue
channelQueue *ChannelQueue
delayedStarter
lock sync.Mutex
closed chan struct{}
@ -48,14 +52,16 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
config := configInterface.(PersistableChannelQueueConfiguration)
channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
Workers: config.Workers,
MaxWorkers: config.MaxWorkers,
BlockTimeout: config.BlockTimeout,
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
Name: config.Name + "-channel",
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
BlockTimeout: config.BlockTimeout,
BoostTimeout: config.BoostTimeout,
BoostWorkers: config.BoostWorkers,
MaxWorkers: config.MaxWorkers,
},
Workers: config.Workers,
Name: config.Name + "-channel",
}, exemplar)
if err != nil {
return nil, err
@ -63,28 +69,30 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
// the level backend only needs temporary workers to catch up with the previously dropped work
levelCfg := LevelQueueConfiguration{
DataDir: config.DataDir,
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
Workers: 1,
MaxWorkers: 6,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
Name: config.Name + "-level",
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: config.QueueLength,
BatchLength: config.BatchLength,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
MaxWorkers: 6,
},
DataDir: config.DataDir,
Workers: 1,
Name: config.Name + "-level",
}
levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
if err == nil {
queue := &PersistableChannelQueue{
ChannelQueue: channelQueue.(*ChannelQueue),
channelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
internal: levelQueue.(*LevelQueue),
name: config.Name,
},
closed: make(chan struct{}),
}
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
return queue, nil
}
if IsErrInvalidConfiguration(err) {
@ -93,7 +101,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
}
queue := &PersistableChannelQueue{
ChannelQueue: channelQueue.(*ChannelQueue),
channelQueue: channelQueue.(*ChannelQueue),
delayedStarter: delayedStarter{
cfg: levelCfg,
underlying: LevelQueueType,
@ -103,7 +111,7 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
},
closed: make(chan struct{}),
}
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar, nil)
_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
return queue, nil
}
@ -118,15 +126,17 @@ func (p *PersistableChannelQueue) Push(data Data) error {
case <-p.closed:
return p.internal.Push(data)
default:
return p.ChannelQueue.Push(data)
return p.channelQueue.Push(data)
}
}
// Run starts to run the queue
func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Context, func())) {
log.Debug("PersistableChannelQueue: %s Starting", p.delayedStarter.name)
p.lock.Lock()
if p.internal == nil {
err := p.setInternal(atShutdown, p.ChannelQueue.pool.handle, p.exemplar)
err := p.setInternal(atShutdown, p.channelQueue.handle, p.channelQueue.exemplar)
p.lock.Unlock()
if err != nil {
log.Fatal("Unable to create internal queue for %s Error: %v", p.Name(), err)
@ -142,31 +152,83 @@ func (p *PersistableChannelQueue) Run(atShutdown, atTerminate func(context.Conte
go p.internal.Run(func(_ context.Context, _ func()) {}, func(_ context.Context, _ func()) {})
go func() {
_ = p.ChannelQueue.pool.AddWorkers(p.workers, 0)
_ = p.channelQueue.AddWorkers(p.channelQueue.workers, 0)
}()
log.Trace("PersistableChannelQueue: %s Waiting til closed", p.delayedStarter.name)
<-p.closed
log.Trace("PersistableChannelQueue: %s Cancelling pools", p.delayedStarter.name)
p.ChannelQueue.pool.cancel()
p.internal.(*LevelQueue).pool.cancel()
p.channelQueue.cancel()
p.internal.(*LevelQueue).cancel()
log.Trace("PersistableChannelQueue: %s Waiting til done", p.delayedStarter.name)
p.ChannelQueue.pool.Wait()
p.internal.(*LevelQueue).pool.Wait()
p.channelQueue.Wait()
p.internal.(*LevelQueue).Wait()
// Redirect all remaining data in the chan to the internal channel
go func() {
log.Trace("PersistableChannelQueue: %s Redirecting remaining data", p.delayedStarter.name)
for data := range p.ChannelQueue.pool.dataChan {
for data := range p.channelQueue.dataChan {
_ = p.internal.Push(data)
atomic.AddInt64(&p.channelQueue.numInQueue, -1)
}
log.Trace("PersistableChannelQueue: %s Done Redirecting remaining data", p.delayedStarter.name)
}()
log.Trace("PersistableChannelQueue: %s Done main loop", p.delayedStarter.name)
}
// Flush flushes the queue and blocks till the queue is empty
func (p *PersistableChannelQueue) Flush(timeout time.Duration) error {
var ctx context.Context
var cancel context.CancelFunc
if timeout > 0 {
ctx, cancel = context.WithTimeout(context.Background(), timeout)
} else {
ctx, cancel = context.WithCancel(context.Background())
}
defer cancel()
return p.FlushWithContext(ctx)
}
// FlushWithContext flushes the queue and blocks till the queue is empty
func (p *PersistableChannelQueue) FlushWithContext(ctx context.Context) error {
errChan := make(chan error, 1)
go func() {
errChan <- p.channelQueue.FlushWithContext(ctx)
}()
go func() {
p.lock.Lock()
if p.internal == nil {
p.lock.Unlock()
errChan <- fmt.Errorf("not ready to flush internal queue %s yet", p.Name())
return
}
p.lock.Unlock()
errChan <- p.internal.FlushWithContext(ctx)
}()
err1 := <-errChan
err2 := <-errChan
if err1 != nil {
return err1
}
return err2
}
// IsEmpty checks if a queue is empty
func (p *PersistableChannelQueue) IsEmpty() bool {
if !p.channelQueue.IsEmpty() {
return false
}
p.lock.Lock()
defer p.lock.Unlock()
if p.internal == nil {
return false
}
return p.internal.IsEmpty()
}
// Shutdown processing this queue
func (p *PersistableChannelQueue) Shutdown() {
log.Trace("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
log.Trace("PersistableChannelQueue: %s Shutting down", p.delayedStarter.name)
select {
case <-p.closed:
default:
@ -177,6 +239,7 @@ func (p *PersistableChannelQueue) Shutdown() {
}
close(p.closed)
}
log.Debug("PersistableChannelQueue: %s Shutdown", p.delayedStarter.name)
}
// Terminate this queue and close the queue
@ -188,6 +251,7 @@ func (p *PersistableChannelQueue) Terminate() {
if p.internal != nil {
p.internal.(*LevelQueue).Terminate()
}
log.Debug("PersistableChannelQueue: %s Terminated", p.delayedStarter.name)
}
func init() {

36
modules/queue/queue_disk_test.go

@ -32,14 +32,16 @@ func TestLevelQueue(t *testing.T) {
defer os.RemoveAll(tmpDir)
queue, err := NewLevelQueue(handle, LevelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 2,
Workers: 1,
MaxWorkers: 10,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 20,
BatchLength: 2,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
MaxWorkers: 10,
},
DataDir: tmpDir,
Workers: 1,
}, &testData{})
assert.NoError(t, err)
@ -92,14 +94,16 @@ func TestLevelQueue(t *testing.T) {
WrappedQueueConfiguration{
Underlying: LevelQueueType,
Config: LevelQueueConfiguration{
DataDir: tmpDir,
BatchLength: 2,
Workers: 1,
MaxWorkers: 10,
QueueLength: 20,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
WorkerPoolConfiguration: WorkerPoolConfiguration{
QueueLength: 20,
BatchLength: 2,
BlockTimeout: 1 * time.Second,
BoostTimeout: 5 * time.Minute,
BoostWorkers: 5,
MaxWorkers: 10,
},
DataDir: tmpDir,
Workers: 1,
},
}, &testData{})
assert.NoError(t, err)

102
modules/queue/queue_redis.go

@ -9,9 +9,9 @@ import (
"encoding/json"
"errors"