From c8f7a6b7742cf42057b6d220ef93ff7f939bb94f Mon Sep 17 00:00:00 2001 From: zeripath Date: Thu, 15 Oct 2020 22:40:03 +0100 Subject: [PATCH] Slightly simplify the queue settings code to help reduce the risk of problems (#12976) Signed-off-by: Andrew Thornton Co-authored-by: techknowlogick --- modules/queue/helper.go | 39 +++++++++++++++++++++++++++++++++++---- modules/queue/setting.go | 25 ++++--------------------- modules/setting/queue.go | 10 +++++++--- 3 files changed, 46 insertions(+), 28 deletions(-) diff --git a/modules/queue/helper.go b/modules/queue/helper.go index e6fb1b94f..751e0cfad 100644 --- a/modules/queue/helper.go +++ b/modules/queue/helper.go @@ -9,25 +9,56 @@ import ( "reflect" ) +// Mappable represents an interface that can MapTo another interface +type Mappable interface { + MapTo(v interface{}) error +} + // toConfig will attempt to convert a given configuration cfg into the provided exemplar type. // // It will tolerate the cfg being passed as a []byte or string of a json representation of the // exemplar or the correct type of the exemplar itself func toConfig(exemplar, cfg interface{}) (interface{}, error) { + + // First of all check if we've got the same type as the exemplar - if so it's all fine. if reflect.TypeOf(cfg).AssignableTo(reflect.TypeOf(exemplar)) { return cfg, nil } + // Now if not - does it provide a MapTo function we can try? + if mappable, ok := cfg.(Mappable); ok { + newVal := reflect.New(reflect.TypeOf(exemplar)) + if err := mappable.MapTo(newVal.Interface()); err == nil { + return newVal.Elem().Interface(), nil + } + // MapTo has failed us ... let's try the json route ... + } + + // OK we've been passed a byte array right? configBytes, ok := cfg.([]byte) if !ok { - configStr, ok := cfg.(string) - if !ok { - return nil, ErrInvalidConfiguration{cfg: cfg} - } + // oh ... it's a string then? + var configStr string + + configStr, ok = cfg.(string) configBytes = []byte(configStr) } + if !ok { + // hmm ... can we marshal it to json? + var err error + + configBytes, err = json.Marshal(cfg) + ok = (err == nil) + } + if !ok { + // no ... we've tried hard enough at this point - throw an error! + return nil, ErrInvalidConfiguration{cfg: cfg} + } + + // OK unmarshal the byte array into a new copy of the exemplar newVal := reflect.New(reflect.TypeOf(exemplar)) if err := json.Unmarshal(configBytes, newVal.Interface()); err != nil { + // If we can't unmarshal it then return an error! return nil, ErrInvalidConfiguration{cfg: cfg, err: err} } return newVal.Elem().Interface(), nil diff --git a/modules/queue/setting.go b/modules/queue/setting.go index 35c33aeac..9ee1af8c7 100644 --- a/modules/queue/setting.go +++ b/modules/queue/setting.go @@ -27,27 +27,10 @@ func validType(t string) (Type, error) { func getQueueSettings(name string) (setting.QueueSettings, []byte) { q := setting.GetQueueSettings(name) - opts := make(map[string]interface{}) - opts["Name"] = name - opts["QueueLength"] = q.Length - opts["BatchLength"] = q.BatchLength - opts["DataDir"] = q.DataDir - opts["Addresses"] = q.Addresses - opts["Network"] = q.Network - opts["Password"] = q.Password - opts["DBIndex"] = q.DBIndex - opts["QueueName"] = q.QueueName - opts["SetName"] = q.SetName - opts["Workers"] = q.Workers - opts["MaxWorkers"] = q.MaxWorkers - opts["BlockTimeout"] = q.BlockTimeout - opts["BoostTimeout"] = q.BoostTimeout - opts["BoostWorkers"] = q.BoostWorkers - opts["ConnectionString"] = q.ConnectionString - cfg, err := json.Marshal(opts) + cfg, err := json.Marshal(q) if err != nil { - log.Error("Unable to marshall generic options: %v Error: %v", opts, err) + log.Error("Unable to marshall generic options: %v Error: %v", q, err) log.Error("Unable to create queue for %s", name, err) return q, []byte{} } @@ -75,7 +58,7 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue { Timeout: q.Timeout, MaxAttempts: q.MaxAttempts, Config: cfg, - QueueLength: q.Length, + QueueLength: q.QueueLength, Name: name, }, exemplar) } @@ -114,7 +97,7 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un Timeout: q.Timeout, MaxAttempts: q.MaxAttempts, Config: cfg, - QueueLength: q.Length, + QueueLength: q.QueueLength, }, exemplar) } if err != nil { diff --git a/modules/setting/queue.go b/modules/setting/queue.go index fc4397861..236560456 100644 --- a/modules/setting/queue.go +++ b/modules/setting/queue.go @@ -16,8 +16,9 @@ import ( // QueueSettings represent the settings for a queue from the ini type QueueSettings struct { + Name string DataDir string - Length int + QueueLength int `ini:"LENGTH"` BatchLength int ConnectionString string Type string @@ -44,6 +45,8 @@ var Queue = QueueSettings{} func GetQueueSettings(name string) QueueSettings { q := QueueSettings{} sec := Cfg.Section("queue." + name) + q.Name = name + // DataDir is not directly inheritable q.DataDir = filepath.Join(Queue.DataDir, name) // QueueName is not directly inheritable either @@ -65,8 +68,9 @@ func GetQueueSettings(name string) QueueSettings { q.DataDir = filepath.Join(AppDataPath, q.DataDir) } _, _ = sec.NewKey("DATADIR", q.DataDir) + // The rest are... - q.Length = sec.Key("LENGTH").MustInt(Queue.Length) + q.QueueLength = sec.Key("LENGTH").MustInt(Queue.QueueLength) q.BatchLength = sec.Key("BATCH_LENGTH").MustInt(Queue.BatchLength) q.ConnectionString = sec.Key("CONN_STR").MustString(Queue.ConnectionString) q.Type = sec.Key("TYPE").MustString(Queue.Type) @@ -91,7 +95,7 @@ func NewQueueService() { if !filepath.IsAbs(Queue.DataDir) { Queue.DataDir = filepath.Join(AppDataPath, Queue.DataDir) } - Queue.Length = sec.Key("LENGTH").MustInt(20) + Queue.QueueLength = sec.Key("LENGTH").MustInt(20) Queue.BatchLength = sec.Key("BATCH_LENGTH").MustInt(20) Queue.ConnectionString = sec.Key("CONN_STR").MustString("") Queue.Type = sec.Key("TYPE").MustString("persistable-channel")