Skip to content

Commit

Permalink
Merge pull request #25 from Jeffail/feature/wip-refactor-pipelines
Browse files Browse the repository at this point in the history
Feature/wip refactor pipelines
  • Loading branch information
Jeffail authored Mar 1, 2018
2 parents a49f8c0 + acec552 commit c988fce
Show file tree
Hide file tree
Showing 105 changed files with 2,149 additions and 4,190 deletions.
5 changes: 3 additions & 2 deletions cmd/benthos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,8 +232,9 @@ func createPipeline(
poolt1.Add(10, outputPipe)
poolt2.Add(0, outputPipe)

util.Couple(buf, outputPipe)
util.Couple(inputPipe, buf)
outputPipe.StartReceiving(buf.TransactionChan())
buf.StartReceiving(inputPipe.TransactionChan())

closeChan := make(chan struct{})

// If our outputs close down then we should shut down the service
Expand Down
3 changes: 1 addition & 2 deletions cmd/tools/benthos_bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ func createPipeline(
)
pool.Add(10, outputPipe)

util.Couple(inputPipe, outputPipe)

outputPipe.StartReceiving(inputPipe.TransactionChan())
return pool, nil
}

Expand Down
4 changes: 2 additions & 2 deletions config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ output:
files:
path: ${!count:files}-${!timestamp_unix_nano}.txt
http_client:
url: http://localhost:8081/post
url: http://localhost:4195/post
verb: POST
content_type: application/octet-stream
oauth:
Expand Down Expand Up @@ -289,10 +289,10 @@ output:
bind: true
socket_type: PUSH
high_water_mark: 0
poll_timeout_ms: 5000
processors: []
buffer:
type: none
retry_throttle_ms: 1000
memory:
limit: 524288000
mmap_file:
Expand Down
7 changes: 7 additions & 0 deletions config/test/smoke_in.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,12 @@ output:
cluster_id: test-cluster
client_id: benthos_client_1
subject: benthos_messages
- type: mqtt
mqtt:
urls:
- tcp://mqtt:1883
qos: 1
topic: benthos_topic
client_id: benthos_output
logger:
log_level: INFO
8 changes: 8 additions & 0 deletions config/test/smoke_out.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,14 @@ input:
cluster_id: test-cluster
client_id: benthos_client_2
subject: benthos_messages
- type: mqtt
mqtt:
urls:
- tcp://mqtt:1883
qos: 1
topics:
- benthos_topic
client_id: benthos_input
output:
type: http_server
logger:
Expand Down
44 changes: 9 additions & 35 deletions lib/broker/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package broker

import (
"errors"
"sync/atomic"
"time"

"github.com/Jeffail/benthos/lib/types"
Expand All @@ -39,30 +38,23 @@ var logConfig = log.LoggerConfig{

// MockInputType implements the input.Type interface.
type MockInputType struct {
MsgChan chan types.Message
ResChan <-chan types.Response
TChan chan types.Transaction
}

// StartListening sets the channel used for reading responses.
func (m *MockInputType) StartListening(resChan <-chan types.Response) error {
m.ResChan = resChan
return nil
}

// MessageChan returns the messages channel.
func (m *MockInputType) MessageChan() <-chan types.Message {
return m.MsgChan
// TransactionChan returns the messages channel.
func (m *MockInputType) TransactionChan() <-chan types.Transaction {
return m.TChan
}

// CloseAsync does nothing.
func (m MockInputType) CloseAsync() {
close(m.MsgChan)
close(m.TChan)
}

// WaitForClose does nothing.
func (m MockInputType) WaitForClose(t time.Duration) error {
select {
case _, open := <-m.MsgChan:
case _, open := <-m.TChan:
if open {
return errors.New("received unexpected message")
}
Expand All @@ -76,39 +68,21 @@ func (m MockInputType) WaitForClose(t time.Duration) error {

// MockOutputType implements the output.Type interface.
type MockOutputType struct {
ResChan chan types.Response
resClosed int32
MsgChan <-chan types.Message
TChan <-chan types.Transaction
}

// StartReceiving sets the read channel. This implementation is NOT thread safe.
func (m *MockOutputType) StartReceiving(msgs <-chan types.Message) error {
m.MsgChan = msgs
func (m *MockOutputType) StartReceiving(msgs <-chan types.Transaction) error {
m.TChan = msgs
return nil
}

// ResponseChan returns the errors channel.
func (m *MockOutputType) ResponseChan() <-chan types.Response {
return m.ResChan
}

// CloseAsync does nothing.
func (m *MockOutputType) CloseAsync() {
if atomic.CompareAndSwapInt32(&m.resClosed, 0, 1) {
close(m.ResChan)
}
}

// WaitForClose does nothing.
func (m MockOutputType) WaitForClose(t time.Duration) error {
select {
case _, open := <-m.ResChan:
if open {
return errors.New("received unexpected message")
}
case <-time.After(t):
return types.ErrTimeout
}
return nil
}

Expand Down
93 changes: 18 additions & 75 deletions lib/broker/dynamic_fan_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import (

// DynamicInput is an interface of input types that must be closable.
type DynamicInput interface {
types.MessageSender
types.ResponderListener
types.Transactor
types.Closable
}

Expand All @@ -57,15 +56,14 @@ type DynamicFanIn struct {
stats metrics.Type
log log.Modular

messageChan chan types.Message
responseChan <-chan types.Response
transactionChan chan types.Transaction
internalTsChan chan types.Transaction

onAdd func(label string)
onRemove func(label string)

wrappedMsgsChan chan wrappedMsg
newInputChan chan wrappedInput
inputs map[string]DynamicInput
newInputChan chan wrappedInput
inputs map[string]DynamicInput

closedChan chan struct{}
closeChan chan struct{}
Expand All @@ -84,15 +82,13 @@ func NewDynamicFanIn(
stats: stats,
log: logger,

messageChan: make(chan types.Message),
responseChan: nil,
transactionChan: make(chan types.Transaction),

onAdd: func(l string) {},
onRemove: func(l string) {},

wrappedMsgsChan: make(chan wrappedMsg),
newInputChan: make(chan wrappedInput),
inputs: make(map[string]DynamicInput),
newInputChan: make(chan wrappedInput),
inputs: make(map[string]DynamicInput),

closedChan: make(chan struct{}),
closeChan: make(chan struct{}),
Expand Down Expand Up @@ -136,20 +132,10 @@ func (d *DynamicFanIn) SetInput(ident string, input DynamicInput, timeout time.D
return <-resChan
}

// StartListening assigns a new responses channel for the broker to read.
func (d *DynamicFanIn) StartListening(responseChan <-chan types.Response) error {
if d.responseChan != nil {
return types.ErrAlreadyStarted
}
d.responseChan = responseChan

go d.readLoop()
return nil
}

// MessageChan returns the channel used for consuming messages from this broker.
func (d *DynamicFanIn) MessageChan() <-chan types.Message {
return d.messageChan
// TransactionChan returns the channel used for consuming messages from this
// broker.
func (d *DynamicFanIn) TransactionChan() <-chan types.Transaction {
return d.transactionChan
}

//------------------------------------------------------------------------------
Expand All @@ -173,28 +159,16 @@ func OptDynamicFanInSetOnRemove(onRemoveFunc func(label string)) func(*DynamicFa
//------------------------------------------------------------------------------

func (d *DynamicFanIn) addInput(ident string, input DynamicInput) error {
// Create unique response channel for each input
resChan := make(chan types.Response)
if err := input.StartListening(resChan); err != nil {
return err
}

// Launch goroutine that async writes input into single channel
go func(in DynamicInput, rChan chan types.Response) {
cancelChan := make(chan struct{})
defer close(cancelChan)
go func(in DynamicInput) {
for {
in, open := <-input.MessageChan()
in, open := <-input.TransactionChan()
if !open {
return
}
d.wrappedMsgsChan <- wrappedMsg{
msg: in,
cancelChan: cancelChan,
resChan: rChan,
}
d.transactionChan <- in
}
}(input, resChan)
}(input)

// Add new input to our map
d.inputs[ident] = input
Expand Down Expand Up @@ -234,7 +208,8 @@ func (d *DynamicFanIn) managerLoop() {
}
}
d.inputs = make(map[string]DynamicInput)
close(d.wrappedMsgsChan)
close(d.transactionChan)
close(d.closedChan)
}()

for {
Expand Down Expand Up @@ -277,38 +252,6 @@ func (d *DynamicFanIn) managerLoop() {
}
}

// readLoop is an internal loop that brokers multiple input streams into a
// single channel.
func (d *DynamicFanIn) readLoop() {
defer func() {
d.CloseAsync()
close(d.messageChan)
close(d.closedChan)
}()

for {
select {
case wrap, open := <-d.wrappedMsgsChan:
if !open {
return
}

d.stats.Incr("broker.dynamic_fan_in.messages.received", 1)
d.messageChan <- wrap.msg

res, open := <-d.responseChan
if !open {
return
}

select {
case wrap.resChan <- res:
case <-wrap.cancelChan:
}
}
}
}

// CloseAsync shuts down the DynamicFanIn broker and stops processing requests.
func (d *DynamicFanIn) CloseAsync() {
if atomic.CompareAndSwapInt32(&d.running, 1, 0) {
Expand Down
Loading

0 comments on commit c988fce

Please sign in to comment.