diff --git a/README.md b/README.md index d7d74a608c..a9b0a8ca01 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,9 @@ Currently supported input/output targets: Setting up multiple outputs or inputs is done by choosing a routing strategy (fan-in, fan-out, round-robin, etc.) +It is possible to enable a REST API to dynamically change inputs and outputs at +runtime, [you read about that here][11]. + For a full and up to date list of all inputs, buffer options, processors, and outputs [you can find them in the docs][7], or print them from the binary: @@ -172,6 +175,7 @@ containers using `docker-compose`. [8]: resources/docs/config_interpolation.md [9]: resources/docker/compose_examples [10]: resources/docs/processors/list.md +[11]: resources/docs/dynamic_inputs_and_outputs.md [travis-badge]: https://travis-ci.org/Jeffail/benthos.svg?branch=master [travis-url]: https://travis-ci.org/Jeffail/benthos [dep]: https://github.com/golang/dep diff --git a/config/env/README.md b/config/env/README.md index 034830e506..b3fdb3f4f8 100644 --- a/config/env/README.md +++ b/config/env/README.md @@ -55,6 +55,11 @@ AMQP_OUTPUT_QUEUE = benthos-stream AMQP_OUTPUT_KEY = benthos-key AMQP_OUTPUT_CONSUMER_TAG = benthos-consumer +DYNAMIC_INPUT_PREFIX = +DYNAMIC_INPUT_TIMEOUT_MS = 5000 +DYNAMIC_OUTPUT_PREFIX = +DYNAMIC_OUTPUT_TIMEOUT_MS = 5000 + FILE_INPUT_PATH = FILE_INPUT_MULTIPART = false FILE_INPUT_MAX_BUFFER = 65536 diff --git a/config/env/default.yaml b/config/env/default.yaml index 1fd1274c36..7481b7be80 100644 --- a/config/env/default.yaml +++ b/config/env/default.yaml @@ -46,6 +46,10 @@ input: queue: ${AMQP_INPUT_QUEUE:benthos-stream} key: ${AMQP_INPUT_KEY:benthos-key} consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer} + dynamic: + inputs: {} + prefix: ${DYNAMIC_INPUT_PREFIX} + timeout_ms: ${DYNAMIC_INPUT_TIMEOUT_MS:5000} file: path: ${FILE_INPUT_PATH} multipart: ${FILE_INPUT_MULTIPART:false} @@ -147,6 +151,10 @@ output: exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange} exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct} key: ${AMQP_OUTPUT_KEY:benthos-key} + dynamic: + outputs: {} + prefix: ${DYNAMIC_OUTPUT_PREFIX} + timeout_ms: ${DYNAMIC_OUTPUT_TIMEOUT_MS:5000} file: path: ${FILE_OUTPUT_PATH} http_client: diff --git a/config/everything.yaml b/config/everything.yaml index 78eb9ec1b1..19f6508660 100644 --- a/config/everything.yaml +++ b/config/everything.yaml @@ -32,7 +32,7 @@ input: consumer_tag: benthos-consumer prefetch_count: 1 prefetch_size: 0 - dynamic_fan_in: + dynamic: inputs: {} prefix: "" timeout_ms: 5000 @@ -192,6 +192,10 @@ output: exchange: benthos-exchange exchange_type: direct key: benthos-key + dynamic: + outputs: {} + prefix: "" + timeout_ms: 5000 fan_out: outputs: [] file: diff --git a/lib/broker/dynamic_fan_in.go b/lib/broker/dynamic_fan_in.go index 13fcd00821..8114f9fe32 100644 --- a/lib/broker/dynamic_fan_in.go +++ b/lib/broker/dynamic_fan_in.go @@ -60,6 +60,9 @@ type DynamicFanIn struct { messageChan chan types.Message responseChan <-chan types.Response + onAdd func(label string) + onRemove func(label string) + wrappedMsgsChan chan wrappedMsg newInputChan chan wrappedInput inputs map[string]DynamicInput @@ -74,6 +77,7 @@ func NewDynamicFanIn( inputs map[string]DynamicInput, logger log.Modular, stats metrics.Type, + options ...func(*DynamicFanIn), ) (*DynamicFanIn, error) { d := &DynamicFanIn{ running: 1, @@ -83,6 +87,9 @@ func NewDynamicFanIn( messageChan: make(chan types.Message), responseChan: nil, + onAdd: func(l string) {}, + onRemove: func(l string) {}, + wrappedMsgsChan: make(chan wrappedMsg), newInputChan: make(chan wrappedInput), inputs: make(map[string]DynamicInput), @@ -90,9 +97,16 @@ func NewDynamicFanIn( closedChan: make(chan struct{}), closeChan: make(chan struct{}), } - + for _, opt := range options { + opt(d) + } for key, input := range inputs { - d.addInput(key, input) + if err := d.addInput(key, input); err != nil { + d.stats.Incr("broker.dynamic_fan_in.input.add.error", 1) + d.log.Errorf("Failed to start new dynamic input '%v': %v\n", key, err) + } else { + d.onAdd(key) + } } go d.managerLoop() return d, nil @@ -122,8 +136,6 @@ func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.D return <-resChan } -//------------------------------------------------------------------------------ - // StartListening assigns a new responses channel for the broker to read. func (d *DynamicFanIn) StartListening(responseChan <-chan types.Response) error { if d.responseChan != nil { @@ -142,6 +154,24 @@ func (d *DynamicFanIn) MessageChan() <-chan types.Message { //------------------------------------------------------------------------------ +// OptDynamicFanInSetOnAdd sets the function that is called whenever a dynamic +// input is added. +func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn) { + return func(d *DynamicFanIn) { + d.onAdd = onAddFunc + } +} + +// OptDynamicFanInSetOnRemove sets the function that is called whenever a +// dynamic input is removed. +func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn) { + return func(d *DynamicFanIn) { + d.onRemove = onRemoveFunc + } +} + +//------------------------------------------------------------------------------ + func (d *DynamicFanIn) addInput(ident string, input DynamicInput) error { // Create unique response channel for each input resChan := make(chan types.Response) @@ -222,6 +252,7 @@ func (d *DynamicFanIn) managerLoop() { d.log.Errorf("Failed to stop old copy of dynamic input '%v': %v\n", wrappedInput.Name, err) } else { d.stats.Incr("broker.dynamic_fan_in.input.remove.success", 1) + d.onRemove(wrappedInput.Name) } } if err == nil && wrappedInput.Input != nil { @@ -231,6 +262,7 @@ func (d *DynamicFanIn) managerLoop() { d.log.Errorf("Failed to start new dynamic input '%v': %v\n", wrappedInput.Name, err) } else { d.stats.Incr("broker.dynamic_fan_in.input.add.success", 1) + d.onAdd(wrappedInput.Name) } } select { diff --git a/lib/broker/dynamic_fan_out.go b/lib/broker/dynamic_fan_out.go index e98142c36e..a7880aeb0d 100644 --- a/lib/broker/dynamic_fan_out.go +++ b/lib/broker/dynamic_fan_out.go @@ -63,11 +63,14 @@ type outputWithMsgChan struct { type DynamicFanOut struct { running int32 - logger log.Modular - stats metrics.Type + log log.Modular + stats metrics.Type throt *throttle.Type + onAdd func(label string) + onRemove func(label string) + messages <-chan types.Message responseChan chan types.Response @@ -83,11 +86,14 @@ func NewDynamicFanOut( outputs map[string]DynamicOutput, logger log.Modular, stats metrics.Type, + options ...func(*DynamicFanOut), ) (*DynamicFanOut, error) { d := &DynamicFanOut{ running: 1, stats: stats, - logger: logger.NewModule(".broker.dynamic_fan_out"), + log: logger.NewModule(".broker.dynamic_fan_out"), + onAdd: func(l string) {}, + onRemove: func(l string) {}, messages: nil, responseChan: make(chan types.Response), newOutputChan: make(chan wrappedOutput), @@ -95,11 +101,17 @@ func NewDynamicFanOut( closedChan: make(chan struct{}), closeChan: make(chan struct{}), } + for _, opt := range options { + opt(d) + } d.throt = throttle.New(throttle.OptCloseChan(d.closeChan)) for k, v := range outputs { if err := d.addOutput(k, v); err != nil { - d.logger.Errorf("Failed to initialise dynamic output '%v': %v\n", err) + d.log.Errorf("Failed to initialise dynamic output '%v': %v\n", k, err) + d.stats.Incr("broker.dynamic_fan_out.output.add.error", 1) + } else { + d.onAdd(k) } } return d, nil @@ -132,6 +144,24 @@ func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout ti //------------------------------------------------------------------------------ +// OptDynamicFanOutSetOnAdd sets the function that is called whenever a dynamic +// output is added. +func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut) { + return func(d *DynamicFanOut) { + d.onAdd = onAddFunc + } +} + +// OptDynamicFanOutSetOnRemove sets the function that is called whenever a +// dynamic output is removed. +func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut) { + return func(d *DynamicFanOut) { + d.onRemove = onRemoveFunc + } +} + +//------------------------------------------------------------------------------ + // StartReceiving assigns a new messages channel for the broker to read. func (d *DynamicFanOut) StartReceiving(msgs <-chan types.Message) error { if d.messages != nil { @@ -218,16 +248,30 @@ func (d *DynamicFanOut) loop() { if !open { return } + d.stats.Incr("broker.dynamic_fan_out.output.count", 1) + if _, exists := d.outputs[wrappedOutput.Name]; exists { if err := d.removeOutput(wrappedOutput.Name, wrappedOutput.Timeout); err != nil { + d.stats.Incr("broker.dynamic_fan_out.output.remove.error", 1) + d.log.Errorf("Failed to stop old copy of dynamic output '%v': %v\n", wrappedOutput.Name, err) wrappedOutput.ResChan <- err continue } + d.stats.Incr("broker.dynamic_fan_out.output.remove.success", 1) + d.onRemove(wrappedOutput.Name) } if wrappedOutput.Output == nil { wrappedOutput.ResChan <- nil } else { - wrappedOutput.ResChan <- d.addOutput(wrappedOutput.Name, wrappedOutput.Output) + err := d.addOutput(wrappedOutput.Name, wrappedOutput.Output) + if err != nil { + d.stats.Incr("broker.dynamic_fan_out.output.add.error", 1) + d.log.Errorf("Failed to start new dynamic output '%v': %v\n", wrappedOutput.Name, err) + } else { + d.stats.Incr("broker.dynamic_fan_out.output.add.success", 1) + d.onAdd(wrappedOutput.Name) + } + wrappedOutput.ResChan <- err } continue case msg, open = <-msgChan: @@ -260,11 +304,11 @@ func (d *DynamicFanOut) loop() { select { case res, open = <-v.output.ResponseChan(): if !open { - d.logger.Warnf("Dynamic output '%v' has closed\n", k) + d.log.Warnf("Dynamic output '%v' has closed\n", k) d.removeOutput(k, time.Second) delete(remainingTargets, k) } else if res.Error() != nil { - d.logger.Errorf("Failed to dispatch dynamic fan out message: %v\n", res.Error()) + d.log.Errorf("Failed to dispatch dynamic fan out message: %v\n", res.Error()) d.stats.Incr("broker.dynamic_fan_out.output.error", 1) if !d.throt.Retry() { return diff --git a/lib/input/constructor.go b/lib/input/constructor.go index dc28f113f8..b9b4172ca0 100644 --- a/lib/input/constructor.go +++ b/lib/input/constructor.go @@ -61,7 +61,7 @@ type Config struct { AmazonS3 reader.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"` AmazonSQS reader.AmazonSQSConfig `json:"amazon_sqs" yaml:"amazon_sqs"` AMQP reader.AMQPConfig `json:"amqp" yaml:"amqp"` - DynamicFanIn DynamicFanInConfig `json:"dynamic_fan_in" yaml:"dynamic_fan_in"` + Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` FanIn FanInConfig `json:"fan_in" yaml:"fan_in"` File FileConfig `json:"file" yaml:"file"` HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"` @@ -86,7 +86,7 @@ func NewConfig() Config { AmazonS3: reader.NewAmazonS3Config(), AmazonSQS: reader.NewAmazonSQSConfig(), AMQP: reader.NewAMQPConfig(), - DynamicFanIn: NewDynamicFanInConfig(), + Dynamic: NewDynamicConfig(), FanIn: NewFanInConfig(), File: NewFileConfig(), HTTPClient: NewHTTPClientConfig(), diff --git a/lib/input/dynamic_fan_in.go b/lib/input/dynamic.go similarity index 68% rename from lib/input/dynamic_fan_in.go rename to lib/input/dynamic.go index aaed9216a5..93d17d7435 100644 --- a/lib/input/dynamic_fan_in.go +++ b/lib/input/dynamic.go @@ -25,7 +25,6 @@ import ( "io/ioutil" "net/http" "path" - "sort" "sync" "time" @@ -40,13 +39,14 @@ import ( //------------------------------------------------------------------------------ func init() { - constructors["dynamic_fan_in"] = typeSpec{ - brokerConstructor: NewDynamicFanIn, + constructors["dynamic"] = typeSpec{ + brokerConstructor: NewDynamic, description: ` -The dynamic fan in type is similar to the regular fan in type except the inputs -can be changed during runtime via a REST HTTP interface. +The dynamic type is similar to the 'fan_in' type except the inputs can be +changed during runtime via a REST HTTP interface. -To GET the full list of input identifiers use the '/inputs' endpoint. +To GET a JSON map of input identifiers with their current uptimes use the +'/inputs' endpoint. To perform CRUD actions on the inputs themselves use POST, DELETE, and GET methods on the '/input/{input_id}' endpoint. When using POST the body of the @@ -57,16 +57,16 @@ exists it will be changed.`, //------------------------------------------------------------------------------ -// DynamicFanInConfig is configuration for the DynamicFanIn input type. -type DynamicFanInConfig struct { +// DynamicConfig is configuration for the Dynamic input type. +type DynamicConfig struct { Inputs map[string]Config `json:"inputs" yaml:"inputs"` Prefix string `json:"prefix" yaml:"prefix"` TimeoutMS int `json:"timeout_ms" yaml:"timeout_ms"` } -// NewDynamicFanInConfig creates a new DynamicFanInConfig with default values. -func NewDynamicFanInConfig() DynamicFanInConfig { - return DynamicFanInConfig{ +// NewDynamicConfig creates a new DynamicConfig with default values. +func NewDynamicConfig() DynamicConfig { + return DynamicConfig{ Inputs: map[string]Config{}, Prefix: "", TimeoutMS: 5000, @@ -75,16 +75,19 @@ func NewDynamicFanInConfig() DynamicFanInConfig { //------------------------------------------------------------------------------ -// NewDynamicFanIn creates a new DynamicFanIn input type. -func NewDynamicFanIn( +// NewDynamic creates a new Dynamic input type. +func NewDynamic( conf Config, mgr types.Manager, log log.Modular, stats metrics.Type, pipelines ...pipeline.ConstructorFunc, ) (Type, error) { + inputMap := map[string]time.Time{} + inputMapMut := sync.Mutex{} + inputs := map[string]broker.DynamicInput{} - for k, v := range conf.DynamicFanIn.Inputs { + for k, v := range conf.Dynamic.Inputs { newInput, err := New(v, mgr, log, stats, pipelines...) if err != nil { return nil, err @@ -92,19 +95,33 @@ func NewDynamicFanIn( inputs[k] = newInput } - fanIn, err := broker.NewDynamicFanIn(inputs, log, stats) + fanIn, err := broker.NewDynamicFanIn( + inputs, log, stats, + broker.OptDynamicFanInSetOnAdd(func(l string) { + inputMapMut.Lock() + inputMap[l] = time.Now() + inputMapMut.Unlock() + }), + broker.OptDynamicFanInSetOnRemove(func(l string) { + inputMapMut.Lock() + delete(inputMap, l) + inputMapMut.Unlock() + }), + ) if err != nil { return nil, err } + inputs = nil - reqTimeout := time.Millisecond * time.Duration(conf.DynamicFanIn.TimeoutMS) + reqTimeout := time.Millisecond * time.Duration(conf.Dynamic.TimeoutMS) - inputConfigs := conf.DynamicFanIn.Inputs + inputConfigs := conf.Dynamic.Inputs inputConfigsMut := sync.RWMutex{} + mgr.RegisterEndpoint( - path.Join(conf.DynamicFanIn.Prefix, "/input/{input_id}"), + path.Join(conf.Dynamic.Prefix, "/input/{input_id}"), "Perform CRUD operations on the configuration of dynamic inputs. For"+ - " more information read the `dynamic_fan_in` documentation.", + " more information read the `dynamic` input type documentation.", func(w http.ResponseWriter, r *http.Request) { var httpErr error defer func() { @@ -142,12 +159,28 @@ func NewDynamicFanIn( inputConfigs[inputID] = newConf } case "GET": - if _, exists := inputConfigs[inputID]; !exists { + getConf, exists := inputConfigs[inputID] + if !exists { http.Error(w, "Input does not exist", http.StatusBadRequest) return } var cBytes []byte - cBytes, httpErr = json.Marshal(inputConfigs[inputID]) + cBytes, httpErr = json.Marshal(getConf) + if httpErr != nil { + return + } + + hashMap := map[string]interface{}{} + if httpErr = json.Unmarshal(cBytes, &hashMap); httpErr != nil { + return + } + + outputMap := map[string]interface{}{} + outputMap["type"] = hashMap["type"] + outputMap[getConf.Type] = hashMap[getConf.Type] + outputMap["processors"] = hashMap["processors"] + + cBytes, httpErr = json.Marshal(outputMap) if httpErr != nil { return } @@ -164,8 +197,8 @@ func NewDynamicFanIn( }, ) mgr.RegisterEndpoint( - path.Join(conf.DynamicFanIn.Prefix, "/inputs"), - "Get a full list of all input identifiers.", + path.Join(conf.Dynamic.Prefix, "/inputs"), + "Get a map of running input identifiers with their current uptimes.", func(w http.ResponseWriter, r *http.Request) { var httpErr error defer func() { @@ -176,17 +209,16 @@ func NewDynamicFanIn( } }() - inputConfigsMut.Lock() - defer inputConfigsMut.Unlock() + inputMapMut.Lock() + defer inputMapMut.Unlock() - labels := []string{} - for k := range inputConfigs { - labels = append(labels, k) + uptimes := map[string]string{} + for k, v := range inputMap { + uptimes[k] = time.Since(v).String() } - sort.Strings(labels) var resBytes []byte - if resBytes, httpErr = json.Marshal(labels); httpErr == nil { + if resBytes, httpErr = json.Marshal(uptimes); httpErr == nil { w.Write(resBytes) } }, diff --git a/lib/output/broker_out_common.go b/lib/output/broker_out_common.go index 80526eb283..f4152f3e47 100644 --- a/lib/output/broker_out_common.go +++ b/lib/output/broker_out_common.go @@ -43,16 +43,18 @@ import ( // formats that we do not know at this stage (JSON, YAML, etc), therefore we use // the more hacky method as performance is not an issue at this stage. func parseOutputConfsWithDefaults(outConfs []interface{}) ([]Config, error) { + type confAlias Config + outputConfs := []Config{} for i, boxedConfig := range outConfs { - newConfs := []Config{NewConfig()} + newConfs := []confAlias{confAlias(NewConfig())} if i > 0 { // If the type of this output is 'ditto' we want to start with a // duplicate of the previous config. newConfsFromDitto := func(label string) error { // Remove the vanilla config. - newConfs = []Config{} + newConfs = []confAlias{} if len(label) > 5 && label[5] == '_' { if label[6:] == "0" { @@ -65,10 +67,10 @@ func parseOutputConfsWithDefaults(outConfs []interface{}) ([]Config, error) { return fmt.Errorf("failed to parse ditto multiplier: %v", err) } for j := 0; j < n; j++ { - newConfs = append(newConfs, outputConfs[i-1]) + newConfs = append(newConfs, confAlias(outputConfs[i-1])) } } else { - newConfs = append(newConfs, outputConfs[i-1]) + newConfs = append(newConfs, confAlias(outputConfs[i-1])) } return nil } @@ -101,7 +103,7 @@ func parseOutputConfsWithDefaults(outConfs []interface{}) ([]Config, error) { if err = yaml.Unmarshal(rawBytes, &conf); err != nil { return nil, err } - outputConfs = append(outputConfs, conf) + outputConfs = append(outputConfs, Config(conf)) } } diff --git a/lib/output/constructor.go b/lib/output/constructor.go index 2acc31baf1..f643bfcc41 100644 --- a/lib/output/constructor.go +++ b/lib/output/constructor.go @@ -22,6 +22,7 @@ package output import ( "bytes" + "encoding/json" "sort" "strings" @@ -52,6 +53,7 @@ type Config struct { Type string `json:"type" yaml:"type"` AmazonS3 writer.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"` AMQP AMQPConfig `json:"amqp" yaml:"amqp"` + Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"` FanOut FanOutConfig `json:"fan_out" yaml:"fan_out"` File FileConfig `json:"file" yaml:"file"` Files writer.FilesConfig `json:"files" yaml:"files"` @@ -76,6 +78,7 @@ func NewConfig() Config { Type: "stdout", AmazonS3: writer.NewAmazonS3Config(), AMQP: NewAMQPConfig(), + Dynamic: NewDynamicConfig(), FanOut: NewFanOutConfig(), File: NewFileConfig(), Files: writer.NewFilesConfig(), @@ -97,6 +100,36 @@ func NewConfig() Config { //------------------------------------------------------------------------------ +// UnmarshalJSON ensures that when parsing configs that are in a map or slice +// the default values are still applied. +func (c *Config) UnmarshalJSON(bytes []byte) error { + type confAlias Config + aliased := confAlias(NewConfig()) + + if err := json.Unmarshal(bytes, &aliased); err != nil { + return err + } + + *c = Config(aliased) + return nil +} + +// UnmarshalYAML ensures that when parsing configs that are in a map or slice +// the default values are still applied. +func (c *Config) UnmarshalYAML(unmarshal func(interface{}) error) error { + type confAlias Config + aliased := confAlias(NewConfig()) + + if err := unmarshal(&aliased); err != nil { + return err + } + + *c = Config(aliased) + return nil +} + +//------------------------------------------------------------------------------ + // Descriptions returns a formatted string of collated descriptions of each // type. func Descriptions() string { diff --git a/lib/output/dynamic.go b/lib/output/dynamic.go new file mode 100644 index 0000000000..64217ffba8 --- /dev/null +++ b/lib/output/dynamic.go @@ -0,0 +1,227 @@ +// Copyright (c) 2018 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package output + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "path" + "sync" + "time" + + "github.com/Jeffail/benthos/lib/broker" + "github.com/Jeffail/benthos/lib/types" + "github.com/Jeffail/benthos/lib/util/service/log" + "github.com/Jeffail/benthos/lib/util/service/metrics" + "github.com/gorilla/mux" +) + +//------------------------------------------------------------------------------ + +func init() { + constructors["dynamic"] = typeSpec{ + constructor: NewDynamic, + description: ` +The dynamic type is similar to the 'fan_out' type except the outputs can be +changed during runtime via a REST HTTP interface. + +To GET a JSON map of output identifiers with their current uptimes use the +'/outputs' endpoint. + +To perform CRUD actions on the outputs themselves use POST, DELETE, and GET +methods on the '/output/{output_id}' endpoint. When using POST the body of the +request should be a JSON configuration for the output, if the output already +exists it will be changed.`, + } +} + +//------------------------------------------------------------------------------ + +// DynamicConfig is configuration for the Dynamic input type. +type DynamicConfig struct { + Outputs map[string]Config `json:"outputs" yaml:"outputs"` + Prefix string `json:"prefix" yaml:"prefix"` + TimeoutMS int `json:"timeout_ms" yaml:"timeout_ms"` +} + +// NewDynamicConfig creates a new DynamicConfig with default values. +func NewDynamicConfig() DynamicConfig { + return DynamicConfig{ + Outputs: map[string]Config{}, + Prefix: "", + TimeoutMS: 5000, + } +} + +//------------------------------------------------------------------------------ + +// NewDynamic creates a new Dynamic output type. +func NewDynamic( + conf Config, + mgr types.Manager, + log log.Modular, + stats metrics.Type, +) (Type, error) { + outputMap := map[string]time.Time{} + outputMapMut := sync.Mutex{} + + outputs := map[string]broker.DynamicOutput{} + for k, v := range conf.Dynamic.Outputs { + newOutput, err := New(v, mgr, log, stats) + if err != nil { + return nil, err + } + outputs[k] = newOutput + } + + fanOut, err := broker.NewDynamicFanOut( + outputs, log, stats, + broker.OptDynamicFanOutSetOnAdd(func(l string) { + outputMapMut.Lock() + outputMap[l] = time.Now() + outputMapMut.Unlock() + }), + broker.OptDynamicFanOutSetOnRemove(func(l string) { + outputMapMut.Lock() + delete(outputMap, l) + outputMapMut.Unlock() + }), + ) + if err != nil { + return nil, err + } + outputs = nil + + reqTimeout := time.Millisecond * time.Duration(conf.Dynamic.TimeoutMS) + + outputConfigs := conf.Dynamic.Outputs + outputConfigsMut := sync.RWMutex{} + + mgr.RegisterEndpoint( + path.Join(conf.Dynamic.Prefix, "/output/{output_id}"), + "Perform CRUD operations on the configuration of dynamic outputs. For"+ + " more information read the `dynamic` output type documentation.", + func(w http.ResponseWriter, r *http.Request) { + var httpErr error + defer func() { + r.Body.Close() + if httpErr != nil { + log.Warnf("Request error: %v\n", httpErr) + http.Error(w, "Internal server error", http.StatusBadGateway) + } + }() + + outputConfigsMut.Lock() + defer outputConfigsMut.Unlock() + + outputID := mux.Vars(r)["output_id"] + if len(outputID) == 0 { + http.Error(w, "Var `output_id` must be set", http.StatusBadRequest) + return + } + + switch r.Method { + case "POST": + newConf := NewConfig() + var reqBytes []byte + if reqBytes, httpErr = ioutil.ReadAll(r.Body); httpErr != nil { + return + } + if httpErr = json.Unmarshal(reqBytes, &newConf); httpErr != nil { + return + } + var newOutput Type + if newOutput, httpErr = New(newConf, mgr, log, stats); httpErr != nil { + return + } + if httpErr = fanOut.SetOutput(outputID, newOutput, reqTimeout); httpErr == nil { + outputConfigs[outputID] = newConf + } + case "GET": + getConf, exists := outputConfigs[outputID] + if !exists { + http.Error(w, "Output does not exist", http.StatusBadRequest) + return + } + var cBytes []byte + cBytes, httpErr = json.Marshal(getConf) + if httpErr != nil { + return + } + + hashMap := map[string]interface{}{} + if httpErr = json.Unmarshal(cBytes, &hashMap); httpErr != nil { + return + } + + outputMap := map[string]interface{}{} + outputMap["type"] = hashMap["type"] + outputMap[getConf.Type] = hashMap[getConf.Type] + outputMap["processors"] = hashMap["processors"] + + cBytes, httpErr = json.Marshal(outputMap) + if httpErr != nil { + return + } + w.Write(cBytes) + case "DELETE": + if _, exists := outputConfigs[outputID]; !exists { + http.Error(w, "Output does not exist", http.StatusBadRequest) + return + } + if httpErr = fanOut.SetOutput(outputID, nil, reqTimeout); httpErr == nil { + delete(outputConfigs, outputID) + } + } + }, + ) + mgr.RegisterEndpoint( + path.Join(conf.Dynamic.Prefix, "/outputs"), + "Get a map of running output identifiers with their current uptimes.", + func(w http.ResponseWriter, r *http.Request) { + var httpErr error + defer func() { + r.Body.Close() + if httpErr != nil { + log.Warnf("Request error: %v\n", httpErr) + http.Error(w, "Internal server error", http.StatusBadGateway) + } + }() + + outputMapMut.Lock() + defer outputMapMut.Unlock() + + uptimes := map[string]string{} + for k, v := range outputMap { + uptimes[k] = time.Since(v).String() + } + + var resBytes []byte + if resBytes, httpErr = json.Marshal(uptimes); httpErr == nil { + w.Write(resBytes) + } + }, + ) + return fanOut, nil +} + +//------------------------------------------------------------------------------ diff --git a/resources/docs/dynamic_inputs_and_outputs.md b/resources/docs/dynamic_inputs_and_outputs.md new file mode 100644 index 0000000000..f99b8a6ac8 --- /dev/null +++ b/resources/docs/dynamic_inputs_and_outputs.md @@ -0,0 +1,60 @@ +Dynamic Inputs and Outputs +========================== + +It is possible to have sets of inputs and outputs in Benthos that can be added, +updated and removed during runtime with the [dynamic fan in][dynamic_inputs] and +[dynamic fan out][dynamic_outputs] types. + +Dynamic inputs and outputs are each identified by unique string labels, which +are specified when adding them either in configuration or via the HTTP API. The +labels are useful when querying which types are active. + +## API + +The API for dynamic types (both inputs and outputs) is a collection of HTTP REST +endpoints: + +### `/inputs` + +Returns a JSON object of labels to uptime of the currently active inputs. + +### `/input/{input_label}` + +GET returns the configuration of the input idenfified by `input_label`. + +POST sets the input `input_label` to the body of the request parsed as a JSON +configuration. If the input label already exists the previous input is first +stopped and removed. + +DELETE stops and removes the input identified by `input_label`. + +### `/outputs` + +Returns a JSON object of labels to uptime of the currently active outputs. + +### `/output/{output_label}` + +GET returns the configuration of the output idenfified by `output_label`. + +POST sets the output `output_label` to the body of the request parsed as a JSON +configuration. If the output label already exists the previous output is first +stopped and removed. + +DELETE stops and removes the output identified by `output_label`. + +A custom prefix can be set for these endpoints in configuration. + +## Applications + +Dynamic types are useful when a platforms data streams might need to change +regularly and automatically. It is also useful for triggering batches of +platform data, e.g. a cron job can be created to send hourly curl requests that +adds a dynamic input to read a file directory of sample data. + +Some inputs have a finite lifetime, e.g. `amazon_s3` without an SQS queue +configured will close once the whole bucket has been read. When a dynamic types +lifetime ends the label will be no longer appear in queries. You can use this to +write tools that trigger new inputs (to move onto the next bucket, for example). + +[dynamic_inputs]: ./inputs/README.md#dynamic +[dynamic_outputs]: ./outputs/README.md#dynamic diff --git a/resources/docs/inputs/README.md b/resources/docs/inputs/README.md index 7e50234a57..39bcb7ed91 100644 --- a/resources/docs/inputs/README.md +++ b/resources/docs/inputs/README.md @@ -28,12 +28,13 @@ brokers, including RabbitMQ. Exchange type options are: direct|fanout|topic|x-custom -## `dynamic_fan_in` +## `dynamic` -The dynamic fan in type is similar to the regular fan in type except the inputs -can be changed during runtime via a REST HTTP interface. +The dynamic type is similar to the 'fan_in' type except the inputs can be +changed during runtime via a REST HTTP interface. -To GET the full list of input identifiers use the '/inputs' endpoint. +To GET a JSON map of input identifiers with their current uptimes use the +'/inputs' endpoint. To perform CRUD actions on the inputs themselves use POST, DELETE, and GET methods on the '/input/{input_id}' endpoint. When using POST the body of the diff --git a/resources/docs/outputs/README.md b/resources/docs/outputs/README.md index 423e937d04..c52923a8b8 100644 --- a/resources/docs/outputs/README.md +++ b/resources/docs/outputs/README.md @@ -15,6 +15,19 @@ for each object you should use function interpolations described AMQP (0.91) is the underlying messaging protocol that is used by various message brokers, including RabbitMQ. +## `dynamic` + +The dynamic type is similar to the 'fan_out' type except the outputs can be +changed during runtime via a REST HTTP interface. + +To GET a JSON map of output identifiers with their current uptimes use the +'/outputs' endpoint. + +To perform CRUD actions on the outputs themselves use POST, DELETE, and GET +methods on the '/output/{output_id}' endpoint. When using POST the body of the +request should be a JSON configuration for the output, if the output already +exists it will be changed. + ## `fan_out` The fan out output type allows you to configure multiple output targets. With