Skip to content

Commit

Permalink
Add initial rabbitmq support
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 18, 2016
1 parent 3e794d0 commit 63e913a
Show file tree
Hide file tree
Showing 12 changed files with 571 additions and 15 deletions.
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@
[submodule "vendor/golang.org/x/net"]
path = vendor/golang.org/x/net
url = https://github.com/golang/net
[submodule "vendor/github.com/streadway/amqp"]
path = vendor/github.com/streadway/amqp
url = https://github.com/streadway/amqp
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Currently supported input/output targets:

- ZMQ4 (PUSH, PULL, SUB, PUB)
- Nanomsg/Scalability Protocols (PUSH, PULL, SUB, PUB)
- RabbitMQ (AMQP)
- HTTP 1.1 POST
- STDIN/STDOUT
- File
Expand Down
13 changes: 13 additions & 0 deletions config/rabbitmq_to_stdout.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# You can easily test this with docker:
# docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
input:
type: amqp
amqp:
uri: amqp://guest:guest@localhost:5672/
exchange: benthos-exchange
exchange_type: direct
queue: benthos-queue
key: benthos-key
consumer_tag: benthos-consumer
output:
type: stdout
11 changes: 11 additions & 0 deletions config/stdin_to_rabbitmq.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# You can easily test this with docker:
# docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 rabbitmq:3
input:
type: stdin
output:
type: amqp
amqp:
uri: amqp://guest:guest@localhost:5672/
exchange: benthos-exchange
exchange_type: direct
key: benthos-key
5 changes: 2 additions & 3 deletions lib/buffer/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ func Descriptions() string {

buf := bytes.Buffer{}
buf.WriteString("BUFFERS\n")
buf.WriteString(strings.Repeat("=", 80))
buf.WriteString(strings.Repeat("=", 7))
buf.WriteString("\n\n")

// Append each description
for i, name := range names {
buf.WriteString("## ")
buf.WriteString(name)
buf.WriteString("\n")
buf.WriteString(strings.Repeat("-", 80))
buf.WriteString("\n")
buf.WriteString(constructors[name].description)
buf.WriteString("\n")
if i != (len(names) - 1) {
Expand Down
12 changes: 9 additions & 3 deletions lib/buffer/mmap_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,15 @@ func init() {
constructor: NewMmapFile,
description: `
The 'mmap_file' buffer type uses memory mapped files to perform low-latency,
file-persisted buffering of messages. This protects the pipeline against
backpressure until the target disk space is full. Since the buffers are flushed
to disk the buffer is persisted across service restarts.`,
file-persisted buffering of messages.
To configure the mmap_file buffer you need to designate a writeable directory
for storing the mapped files. Benthos will create multiple files in this
directory as it fills them.
When files are fully read from they will be deleted. You can disable this
feature if you wish to preserve the data indefinitely, but the directory will
fill up as fast as data passes through.`,
}
}

Expand Down
274 changes: 274 additions & 0 deletions lib/input/amqp.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
/*
Copyright (c) 2014 Ashley Jeffs
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
*/

package input

import (
"fmt"
"sync/atomic"
"time"

"github.com/jeffail/benthos/lib/types"
"github.com/jeffail/util/log"
"github.com/jeffail/util/metrics"
"github.com/streadway/amqp"
)

//--------------------------------------------------------------------------------------------------

func init() {
constructors["amqp"] = typeSpec{
constructor: NewAMQP,
description: `
AMQP is the underlying messaging protocol that is used my RabbitMQ. Support is
currently rather limited, but more configuration options are on the way.
Exchange type options are: direct|fanout|topic|x-custom`,
}
}

//--------------------------------------------------------------------------------------------------

// AMQPConfig - Configuration for the AMQP input type.
type AMQPConfig struct {
URI string `json:"uri" yaml:"uri"`
Exchange string `json:"exchange" yaml:"exchange"`
ExchangeType string `json:"exchange_type" yaml:"exchange_type"`
Queue string `json:"queue" yaml:"queue"`
BindingKey string `json:"key" yaml:"key"`
ConsumerTag string `json:"consumer_tag" yaml:"consumer_tag"`
}

// NewAMQPConfig - Creates a new AMQPConfig with default values.
func NewAMQPConfig() AMQPConfig {
return AMQPConfig{
URI: "amqp://guest:guest@localhost:5672/",
Exchange: "benthos-exchange",
ExchangeType: "direct",
Queue: "benthos-queue",
BindingKey: "benthos-key",
ConsumerTag: "benthos-consumer",
}
}

//--------------------------------------------------------------------------------------------------

// AMQP - An input type that serves Scalability Protocols messages.
type AMQP struct {
running int32

conn *amqp.Connection
amqpChan *amqp.Channel
consumerChan <-chan amqp.Delivery

conf Config
stats metrics.Type
log log.Modular

messages chan types.Message
responses <-chan types.Response

closeChan chan struct{}
closedChan chan struct{}
}

// NewAMQP - Create a new AMQP input type.
func NewAMQP(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
a := AMQP{
running: 1,
conf: conf,
stats: stats,
log: log.NewModule(".input.amqp"),
messages: make(chan types.Message),
responses: nil,
closeChan: make(chan struct{}),
closedChan: make(chan struct{}),
}

if err := a.connect(); err != nil {
return nil, err
}

return &a, nil
}

//--------------------------------------------------------------------------------------------------

// connect - Establish a connection to an AMQP server.
func (a *AMQP) connect() (err error) {
a.conn, err = amqp.Dial(a.conf.AMQP.URI)
if err != nil {
return fmt.Errorf("AMQP Connect: %s", err)
}

a.amqpChan, err = a.conn.Channel()
if err != nil {
return fmt.Errorf("AMQP Channel: %s", err)
}

if err = a.amqpChan.ExchangeDeclare(
a.conf.AMQP.Exchange, // name of the exchange
a.conf.AMQP.ExchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}

if _, err = a.amqpChan.QueueDeclare(
a.conf.AMQP.Queue, // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Declare: %s", err)
}

if err = a.amqpChan.QueueBind(
a.conf.AMQP.Queue, // name of the queue
a.conf.AMQP.BindingKey, // bindingKey
a.conf.AMQP.Exchange, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Bind: %s", err)
}

if a.consumerChan, err = a.amqpChan.Consume(
a.conf.AMQP.Queue, // name
a.conf.AMQP.ConsumerTag, // consumerTag,
false, // noAck
false, // exclusive
false, // noLocal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Consume: %s", err)
}

return
}

// disconnect - Safely close a connection to an AMQP server.
func (a *AMQP) disconnect() error {
if a.amqpChan != nil {
if err := a.amqpChan.Cancel(a.conf.AMQP.ConsumerTag, true); err != nil {
return fmt.Errorf("Consumer cancel failed: %s", err)
}
a.amqpChan = nil
}
if a.conn != nil {
if err := a.conn.Close(); err != nil {
return fmt.Errorf("AMQP connection close error: %s", err)
}
a.conn = nil
}
return nil
}

//--------------------------------------------------------------------------------------------------

func (a *AMQP) loop() {
defer func() {
atomic.StoreInt32(&a.running, 0)
a.disconnect()

close(a.messages)
close(a.closedChan)
}()

var data []byte

for atomic.LoadInt32(&a.running) == 1 {
// If no bytes then read a message
if data == nil {
select {
case msg := <-a.consumerChan:
data = msg.Body
case <-a.closeChan:
return
}
}

// If bytes are read then try and propagate.
if data != nil {
select {
case a.messages <- types.Message{Parts: [][]byte{data}}:
case <-a.closeChan:
return
}
res, open := <-a.responses
if !open {
return
}
if resErr := res.Error(); resErr == nil {
a.stats.Incr("input.amqp.count", 1)
data = nil
} else if resErr == types.ErrMessageTooLarge {
a.stats.Incr("input.amqp.send.rejected", 1)
data = nil
} else {
a.stats.Incr("input.amqp.send.error", 1)
}
}
}

}

// StartListening - Sets the channel used by the input to validate message receipt.
func (a *AMQP) StartListening(responses <-chan types.Response) error {
if a.responses != nil {
return types.ErrAlreadyStarted
}
a.responses = responses
go a.loop()
return nil
}

// MessageChan - Returns the messages channel.
func (a *AMQP) MessageChan() <-chan types.Message {
return a.messages
}

// CloseAsync - Shuts down the AMQP input and stops processing requests.
func (a *AMQP) CloseAsync() {
if atomic.CompareAndSwapInt32(&a.running, 1, 0) {
close(a.closeChan)
}
}

// WaitForClose - Blocks until the AMQP input has closed down.
func (a *AMQP) WaitForClose(timeout time.Duration) error {
select {
case <-a.closedChan:
case <-time.After(timeout):
return types.ErrTimeout
}
return nil
}

//--------------------------------------------------------------------------------------------------
7 changes: 4 additions & 3 deletions lib/input/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Config struct {
ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"`
ZMQ4 *ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
Kafka KafkaConfig `json:"kafka" yaml:"kafka"`
AMQP AMQPConfig `json:"amqp" yaml:"amqp"`
File FileConfig `json:"file" yaml:"file"`
STDIN STDINConfig `json:"stdin" yaml:"stdin"`
FanIn FanInConfig `json:"fan_in" yaml:"fan_in"`
Expand All @@ -65,6 +66,7 @@ func NewConfig() Config {
ScaleProto: NewScaleProtoConfig(),
ZMQ4: NewZMQ4Config(),
Kafka: NewKafkaConfig(),
AMQP: NewAMQPConfig(),
File: NewFileConfig(),
STDIN: NewSTDINConfig(),
FanIn: NewFanInConfig(),
Expand All @@ -84,15 +86,14 @@ func Descriptions() string {

buf := bytes.Buffer{}
buf.WriteString("INPUTS\n")
buf.WriteString(strings.Repeat("=", 80))
buf.WriteString(strings.Repeat("=", 6))
buf.WriteString("\n\n")

// Append each description
for i, name := range names {
buf.WriteString("## ")
buf.WriteString(name)
buf.WriteString("\n")
buf.WriteString(strings.Repeat("-", 80))
buf.WriteString("\n")
buf.WriteString(constructors[name].description)
buf.WriteString("\n")
if i != (len(names) - 1) {
Expand Down
Loading

0 comments on commit 63e913a

Please sign in to comment.