Skip to content

Commit

Permalink
Add dynamic output type, improve docs and API
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Feb 16, 2018
1 parent b0a7069 commit dd1d00f
Show file tree
Hide file tree
Showing 14 changed files with 518 additions and 53 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Currently supported input/output targets:
Setting up multiple outputs or inputs is done by choosing a routing strategy
(fan-in, fan-out, round-robin, etc.)

It is possible to enable a REST API to dynamically change inputs and outputs at
runtime, [you read about that here][11].

For a full and up to date list of all inputs, buffer options, processors, and
outputs [you can find them in the docs][7], or print them from the binary:

Expand Down Expand Up @@ -172,6 +175,7 @@ containers using `docker-compose`.
[8]: resources/docs/config_interpolation.md
[9]: resources/docker/compose_examples
[10]: resources/docs/processors/list.md
[11]: resources/docs/dynamic_inputs_and_outputs.md
[travis-badge]: https://travis-ci.org/Jeffail/benthos.svg?branch=master
[travis-url]: https://travis-ci.org/Jeffail/benthos
[dep]: https://github.com/golang/dep
Expand Down
5 changes: 5 additions & 0 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ AMQP_OUTPUT_QUEUE = benthos-stream
AMQP_OUTPUT_KEY = benthos-key
AMQP_OUTPUT_CONSUMER_TAG = benthos-consumer

DYNAMIC_INPUT_PREFIX =
DYNAMIC_INPUT_TIMEOUT_MS = 5000
DYNAMIC_OUTPUT_PREFIX =
DYNAMIC_OUTPUT_TIMEOUT_MS = 5000

FILE_INPUT_PATH =
FILE_INPUT_MULTIPART = false
FILE_INPUT_MAX_BUFFER = 65536
Expand Down
8 changes: 8 additions & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ input:
queue: ${AMQP_INPUT_QUEUE:benthos-stream}
key: ${AMQP_INPUT_KEY:benthos-key}
consumer_tag: ${AMQP_INPUT_CONSUMER_TAG:benthos-consumer}
dynamic:
inputs: {}
prefix: ${DYNAMIC_INPUT_PREFIX}
timeout_ms: ${DYNAMIC_INPUT_TIMEOUT_MS:5000}
file:
path: ${FILE_INPUT_PATH}
multipart: ${FILE_INPUT_MULTIPART:false}
Expand Down Expand Up @@ -147,6 +151,10 @@ output:
exchange: ${AMQP_OUTPUT_EXCHANGE:benthos-exchange}
exchange_type: ${AMQP_OUTPUT_EXCHANGE_TYPE:direct}
key: ${AMQP_OUTPUT_KEY:benthos-key}
dynamic:
outputs: {}
prefix: ${DYNAMIC_OUTPUT_PREFIX}
timeout_ms: ${DYNAMIC_OUTPUT_TIMEOUT_MS:5000}
file:
path: ${FILE_OUTPUT_PATH}
http_client:
Expand Down
6 changes: 5 additions & 1 deletion config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ input:
consumer_tag: benthos-consumer
prefetch_count: 1
prefetch_size: 0
dynamic_fan_in:
dynamic:
inputs: {}
prefix: ""
timeout_ms: 5000
Expand Down Expand Up @@ -192,6 +192,10 @@ output:
exchange: benthos-exchange
exchange_type: direct
key: benthos-key
dynamic:
outputs: {}
prefix: ""
timeout_ms: 5000
fan_out:
outputs: []
file:
Expand Down
40 changes: 36 additions & 4 deletions lib/broker/dynamic_fan_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type DynamicFanIn struct {
messageChan chan types.Message
responseChan <-chan types.Response

onAdd func(label string)
onRemove func(label string)

wrappedMsgsChan chan wrappedMsg
newInputChan chan wrappedInput
inputs map[string]DynamicInput
Expand All @@ -74,6 +77,7 @@ func NewDynamicFanIn(
inputs map[string]DynamicInput,
logger log.Modular,
stats metrics.Type,
options ...func(*DynamicFanIn),
) (*DynamicFanIn, error) {
d := &DynamicFanIn{
running: 1,
Expand All @@ -83,16 +87,26 @@ func NewDynamicFanIn(
messageChan: make(chan types.Message),
responseChan: nil,

onAdd: func(l string) {},
onRemove: func(l string) {},

wrappedMsgsChan: make(chan wrappedMsg),
newInputChan: make(chan wrappedInput),
inputs: make(map[string]DynamicInput),

closedChan: make(chan struct{}),
closeChan: make(chan struct{}),
}

for _, opt := range options {
opt(d)
}
for key, input := range inputs {
d.addInput(key, input)
if err := d.addInput(key, input); err != nil {
d.stats.Incr("broker.dynamic_fan_in.input.add.error", 1)
d.log.Errorf("Failed to start new dynamic input '%v': %v\n", key, err)
} else {
d.onAdd(key)
}
}
go d.managerLoop()
return d, nil
Expand Down Expand Up @@ -122,8 +136,6 @@ func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.D
return <-resChan
}

//------------------------------------------------------------------------------

// StartListening assigns a new responses channel for the broker to read.
func (d *DynamicFanIn) StartListening(responseChan <-chan types.Response) error {
if d.responseChan != nil {
Expand All @@ -142,6 +154,24 @@ func (d *DynamicFanIn) MessageChan() <-chan types.Message {

//------------------------------------------------------------------------------

// OptDynamicFanInSetOnAdd sets the function that is called whenever a dynamic
// input is added.
func OptDynamicFanInSetOnAdd(onAddFunc func(label string)) func(*DynamicFanIn) {
return func(d *DynamicFanIn) {
d.onAdd = onAddFunc
}
}

// OptDynamicFanInSetOnRemove sets the function that is called whenever a
// dynamic input is removed.
func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanIn) {
return func(d *DynamicFanIn) {
d.onRemove = onRemoveFunc
}
}

//------------------------------------------------------------------------------

func (d *DynamicFanIn) addInput(ident string, input DynamicInput) error {
// Create unique response channel for each input
resChan := make(chan types.Response)
Expand Down Expand Up @@ -222,6 +252,7 @@ func (d *DynamicFanIn) managerLoop() {
d.log.Errorf("Failed to stop old copy of dynamic input '%v': %v\n", wrappedInput.Name, err)
} else {
d.stats.Incr("broker.dynamic_fan_in.input.remove.success", 1)
d.onRemove(wrappedInput.Name)
}
}
if err == nil && wrappedInput.Input != nil {
Expand All @@ -231,6 +262,7 @@ func (d *DynamicFanIn) managerLoop() {
d.log.Errorf("Failed to start new dynamic input '%v': %v\n", wrappedInput.Name, err)
} else {
d.stats.Incr("broker.dynamic_fan_in.input.add.success", 1)
d.onAdd(wrappedInput.Name)
}
}
select {
Expand Down
58 changes: 51 additions & 7 deletions lib/broker/dynamic_fan_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ type outputWithMsgChan struct {
type DynamicFanOut struct {
running int32

logger log.Modular
stats metrics.Type
log log.Modular
stats metrics.Type

throt *throttle.Type

onAdd func(label string)
onRemove func(label string)

messages <-chan types.Message
responseChan chan types.Response

Expand All @@ -83,23 +86,32 @@ func NewDynamicFanOut(
outputs map[string]DynamicOutput,
logger log.Modular,
stats metrics.Type,
options ...func(*DynamicFanOut),
) (*DynamicFanOut, error) {
d := &DynamicFanOut{
running: 1,
stats: stats,
logger: logger.NewModule(".broker.dynamic_fan_out"),
log: logger.NewModule(".broker.dynamic_fan_out"),
onAdd: func(l string) {},
onRemove: func(l string) {},
messages: nil,
responseChan: make(chan types.Response),
newOutputChan: make(chan wrappedOutput),
outputs: make(map[string]outputWithMsgChan, len(outputs)),
closedChan: make(chan struct{}),
closeChan: make(chan struct{}),
}
for _, opt := range options {
opt(d)
}
d.throt = throttle.New(throttle.OptCloseChan(d.closeChan))

for k, v := range outputs {
if err := d.addOutput(k, v); err != nil {
d.logger.Errorf("Failed to initialise dynamic output '%v': %v\n", err)
d.log.Errorf("Failed to initialise dynamic output '%v': %v\n", k, err)
d.stats.Incr("broker.dynamic_fan_out.output.add.error", 1)
} else {
d.onAdd(k)
}
}
return d, nil
Expand Down Expand Up @@ -132,6 +144,24 @@ func (d *DynamicFanOut) SetOutput(ident string, output DynamicOutput, timeout ti

//------------------------------------------------------------------------------

// OptDynamicFanOutSetOnAdd sets the function that is called whenever a dynamic
// output is added.
func OptDynamicFanOutSetOnAdd(onAddFunc func(label string)) func(*DynamicFanOut) {
return func(d *DynamicFanOut) {
d.onAdd = onAddFunc
}
}

// OptDynamicFanOutSetOnRemove sets the function that is called whenever a
// dynamic output is removed.
func OptDynamicFanOutSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFanOut) {
return func(d *DynamicFanOut) {
d.onRemove = onRemoveFunc
}
}

//------------------------------------------------------------------------------

// StartReceiving assigns a new messages channel for the broker to read.
func (d *DynamicFanOut) StartReceiving(msgs <-chan types.Message) error {
if d.messages != nil {
Expand Down Expand Up @@ -218,16 +248,30 @@ func (d *DynamicFanOut) loop() {
if !open {
return
}
d.stats.Incr("broker.dynamic_fan_out.output.count", 1)

if _, exists := d.outputs[wrappedOutput.Name]; exists {
if err := d.removeOutput(wrappedOutput.Name, wrappedOutput.Timeout); err != nil {
d.stats.Incr("broker.dynamic_fan_out.output.remove.error", 1)
d.log.Errorf("Failed to stop old copy of dynamic output '%v': %v\n", wrappedOutput.Name, err)
wrappedOutput.ResChan <- err
continue
}
d.stats.Incr("broker.dynamic_fan_out.output.remove.success", 1)
d.onRemove(wrappedOutput.Name)
}
if wrappedOutput.Output == nil {
wrappedOutput.ResChan <- nil
} else {
wrappedOutput.ResChan <- d.addOutput(wrappedOutput.Name, wrappedOutput.Output)
err := d.addOutput(wrappedOutput.Name, wrappedOutput.Output)
if err != nil {
d.stats.Incr("broker.dynamic_fan_out.output.add.error", 1)
d.log.Errorf("Failed to start new dynamic output '%v': %v\n", wrappedOutput.Name, err)
} else {
d.stats.Incr("broker.dynamic_fan_out.output.add.success", 1)
d.onAdd(wrappedOutput.Name)
}
wrappedOutput.ResChan <- err
}
continue
case msg, open = <-msgChan:
Expand Down Expand Up @@ -260,11 +304,11 @@ func (d *DynamicFanOut) loop() {
select {
case res, open = <-v.output.ResponseChan():
if !open {
d.logger.Warnf("Dynamic output '%v' has closed\n", k)
d.log.Warnf("Dynamic output '%v' has closed\n", k)
d.removeOutput(k, time.Second)
delete(remainingTargets, k)
} else if res.Error() != nil {
d.logger.Errorf("Failed to dispatch dynamic fan out message: %v\n", res.Error())
d.log.Errorf("Failed to dispatch dynamic fan out message: %v\n", res.Error())
d.stats.Incr("broker.dynamic_fan_out.output.error", 1)
if !d.throt.Retry() {
return
Expand Down
4 changes: 2 additions & 2 deletions lib/input/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type Config struct {
AmazonS3 reader.AmazonS3Config `json:"amazon_s3" yaml:"amazon_s3"`
AmazonSQS reader.AmazonSQSConfig `json:"amazon_sqs" yaml:"amazon_sqs"`
AMQP reader.AMQPConfig `json:"amqp" yaml:"amqp"`
DynamicFanIn DynamicFanInConfig `json:"dynamic_fan_in" yaml:"dynamic_fan_in"`
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
FanIn FanInConfig `json:"fan_in" yaml:"fan_in"`
File FileConfig `json:"file" yaml:"file"`
HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"`
Expand All @@ -86,7 +86,7 @@ func NewConfig() Config {
AmazonS3: reader.NewAmazonS3Config(),
AmazonSQS: reader.NewAmazonSQSConfig(),
AMQP: reader.NewAMQPConfig(),
DynamicFanIn: NewDynamicFanInConfig(),
Dynamic: NewDynamicConfig(),
FanIn: NewFanInConfig(),
File: NewFileConfig(),
HTTPClient: NewHTTPClientConfig(),
Expand Down
Loading

0 comments on commit dd1d00f

Please sign in to comment.