Skip to content

Commit

Permalink
Lock reader/writer resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jun 29, 2018
1 parent 724a7a8 commit 6a4f66f
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 20 deletions.
2 changes: 1 addition & 1 deletion lib/input/reader/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (a *AMQP) Read() (types.Message, error) {
func (a *AMQP) Acknowledge(err error) error {
a.m.RLock()
defer a.m.RUnlock()
if a.conn != nil {
if a.conn == nil {
return types.ErrNotConnected
}
if err != nil {
Expand Down
7 changes: 7 additions & 0 deletions lib/input/reader/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package reader

import (
"strings"
"sync"
"time"

"github.com/Jeffail/benthos/lib/log"
Expand Down Expand Up @@ -55,6 +56,7 @@ func NewMQTTConfig() MQTTConfig {
// MQTT is an input type that reads MQTT Pub/Sub messages.
type MQTT struct {
client mqtt.Client
cMut sync.Mutex

conf MQTTConfig

Expand Down Expand Up @@ -94,6 +96,9 @@ func NewMQTT(

// Connect establishes a connection to an MQTT server.
func (m *MQTT) Connect() error {
m.cMut.Lock()
defer m.cMut.Unlock()

if m.client != nil {
return nil
}
Expand Down Expand Up @@ -151,11 +156,13 @@ func (m *MQTT) Acknowledge(err error) error {

// CloseAsync shuts down the MQTT input and stops processing requests.
func (m *MQTT) CloseAsync() {
m.cMut.Lock()
if m.client != nil {
m.client.Disconnect(0)
m.client = nil
close(m.interruptChan)
}
m.cMut.Unlock()
}

// WaitForClose blocks until the MQTT input has closed down.
Expand Down
15 changes: 14 additions & 1 deletion lib/input/reader/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package reader

import (
"strings"
"sync"
"time"

"github.com/Jeffail/benthos/lib/log"
Expand Down Expand Up @@ -55,6 +56,8 @@ type NATS struct {
stats metrics.Type
log log.Modular

cMut sync.Mutex

natsConn *nats.Conn
natsSub *nats.Subscription
natsChan chan *nats.Msg
Expand All @@ -78,6 +81,9 @@ func NewNATS(conf NATSConfig, log log.Modular, stats metrics.Type) (Type, error)

// Connect establishes a connection to a NATS server.
func (n *NATS) Connect() error {
n.cMut.Lock()
defer n.cMut.Unlock()

if n.natsConn != nil {
return nil
}
Expand All @@ -103,6 +109,9 @@ func (n *NATS) Connect() error {
}

func (n *NATS) disconnect() {
n.cMut.Lock()
defer n.cMut.Unlock()

if n.natsSub != nil {
n.natsSub.Unsubscribe()
n.natsSub = nil
Expand All @@ -116,10 +125,14 @@ func (n *NATS) disconnect() {

// Read attempts to read a new message from the NATS subject.
func (n *NATS) Read() (types.Message, error) {
n.cMut.Lock()
natsChan := n.natsChan
n.cMut.Unlock()

var msg *nats.Msg
var open bool
select {
case msg, open = <-n.natsChan:
case msg, open = <-natsChan:
case _, open = <-n.interruptChan:
}
if !open {
Expand Down
23 changes: 17 additions & 6 deletions lib/input/reader/scale_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package reader

import (
"strings"
"sync"
"time"

"nanomsg.org/go-mangos"
Expand Down Expand Up @@ -64,6 +65,7 @@ func NewScaleProtoConfig() ScaleProtoConfig {
// ScaleProto is an input type that serves Scalability Protocols messages.
type ScaleProto struct {
socket mangos.Socket
cMut sync.Mutex

urls []string
conf ScaleProtoConfig
Expand Down Expand Up @@ -106,6 +108,9 @@ func getSocketFromType(t string) (mangos.Socket, error) {

// Connect establishes a nanomsg socket.
func (s *ScaleProto) Connect() error {
s.cMut.Lock()
defer s.cMut.Unlock()

if s.socket != nil {
return nil
}
Expand Down Expand Up @@ -184,10 +189,14 @@ func (s *ScaleProto) Connect() error {

// Read attempts to read a new message from the nanomsg socket.
func (s *ScaleProto) Read() (types.Message, error) {
if s.socket == nil {
s.cMut.Lock()
socket := s.socket
s.cMut.Unlock()

if socket == nil {
return nil, types.ErrNotConnected
}
data, err := s.socket.Recv()
data, err := socket.Recv()
if err != nil {
if err == mangos.ErrRecvTimeout {
return nil, types.ErrTimeout
Expand All @@ -205,14 +214,16 @@ func (s *ScaleProto) Acknowledge(err error) error {

// CloseAsync shuts down the ScaleProto input and stops processing requests.
func (s *ScaleProto) CloseAsync() {
}

// WaitForClose blocks until the ScaleProto input has closed down.
func (s *ScaleProto) WaitForClose(timeout time.Duration) error {
s.cMut.Lock()
if s.socket != nil {
s.socket.Close()
s.socket = nil
}
s.cMut.Unlock()
}

// WaitForClose blocks until the ScaleProto input has closed down.
func (s *ScaleProto) WaitForClose(timeout time.Duration) error {
return nil
}

Expand Down
24 changes: 18 additions & 6 deletions lib/output/writer/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ package writer
import (
"fmt"
"strings"
"sync"
"time"

"github.com/Jeffail/benthos/lib/log"
Expand Down Expand Up @@ -80,6 +81,8 @@ type Kafka struct {

producer sarama.SyncProducer
compression sarama.CompressionCodec

connMut sync.RWMutex
}

// NewKafka creates a new Kafka writer type.
Expand Down Expand Up @@ -136,6 +139,9 @@ func strToCompressionCodec(str string) (sarama.CompressionCodec, error) {

// Connect attempts to establish a connection to a Kafka broker.
func (k *Kafka) Connect() error {
k.connMut.Lock()
defer k.connMut.Unlock()

if k.producer != nil {
return nil
}
Expand Down Expand Up @@ -173,7 +179,11 @@ func (k *Kafka) Connect() error {
// Write will attempt to write a message to Kafka, wait for acknowledgement, and
// returns an error if applicable.
func (k *Kafka) Write(msg types.Message) error {
if k.producer == nil {
k.connMut.RLock()
producer := k.producer
k.connMut.RUnlock()

if producer == nil {
return types.ErrNotConnected
}

Expand All @@ -198,7 +208,7 @@ func (k *Kafka) Write(msg types.Message) error {
msgs = append(msgs, nextMsg)
}

err := k.producer.SendMessages(msgs)
err := producer.SendMessages(msgs)
if err != nil {
if pErr, ok := err.(sarama.ProducerErrors); ok && len(pErr) > 0 {
err = fmt.Errorf("failed to send %v parts from message: %v", len(pErr), pErr[0].Err)
Expand All @@ -210,14 +220,16 @@ func (k *Kafka) Write(msg types.Message) error {

// CloseAsync shuts down the Kafka writer and stops processing messages.
func (k *Kafka) CloseAsync() {
}

// WaitForClose blocks until the Kafka writer has closed down.
func (k *Kafka) WaitForClose(timeout time.Duration) error {
k.connMut.Lock()
if nil != k.producer {
k.producer.Close()
k.producer = nil
}
k.connMut.Unlock()
}

// WaitForClose blocks until the Kafka writer has closed down.
func (k *Kafka) WaitForClose(timeout time.Duration) error {
return nil
}

Expand Down
17 changes: 14 additions & 3 deletions lib/output/writer/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package writer

import (
"strings"
"sync"
"time"

"github.com/Jeffail/benthos/lib/log"
Expand Down Expand Up @@ -60,7 +61,8 @@ type MQTT struct {
urls []string
conf MQTTConfig

client mqtt.Client
client mqtt.Client
connMut sync.RWMutex
}

// NewMQTT creates a new MQTT output type.
Expand Down Expand Up @@ -90,6 +92,9 @@ func NewMQTT(

// Connect establishes a connection to an MQTT server.
func (m *MQTT) Connect() error {
m.connMut.Lock()
defer m.connMut.Unlock()

if m.client != nil {
return nil
}
Expand Down Expand Up @@ -120,12 +125,16 @@ func (m *MQTT) Connect() error {

// Write attempts to write a message by pushing it to an MQTT broker.
func (m *MQTT) Write(msg types.Message) error {
if m.client == nil {
m.connMut.RLock()
client := m.client
m.connMut.RUnlock()

if client == nil {
return types.ErrNotConnected
}

for _, part := range msg.GetAll() {
mtok := m.client.Publish(m.conf.Topic, byte(m.conf.QoS), false, part)
mtok := client.Publish(m.conf.Topic, byte(m.conf.QoS), false, part)
mtok.Wait()
if err := mtok.Error(); err != nil {
return err
Expand All @@ -137,10 +146,12 @@ func (m *MQTT) Write(msg types.Message) error {

// CloseAsync shuts down the MQTT output and stops processing messages.
func (m *MQTT) CloseAsync() {
m.connMut.Lock()
if m.client != nil {
m.client.Disconnect(0)
m.client = nil
}
m.connMut.Unlock()
}

// WaitForClose blocks until the MQTT output has closed down.
Expand Down
17 changes: 14 additions & 3 deletions lib/output/writer/redis_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package writer

import (
"net/url"
"sync"
"time"

"github.com/Jeffail/benthos/lib/log"
Expand Down Expand Up @@ -56,7 +57,8 @@ type RedisList struct {
url *url.URL
conf RedisListConfig

client *redis.Client
client *redis.Client
connMut sync.RWMutex
}

// NewRedisList creates a new RedisList output type.
Expand Down Expand Up @@ -85,6 +87,9 @@ func NewRedisList(

// Connect establishes a connection to an RedisList server.
func (r *RedisList) Connect() error {
r.connMut.Lock()
defer r.connMut.Unlock()

var pass string
if r.url.User != nil {
pass, _ = r.url.User.Password()
Expand All @@ -109,12 +114,16 @@ func (r *RedisList) Connect() error {

// Write attempts to write a message by pushing it to the end of a Redis list.
func (r *RedisList) Write(msg types.Message) error {
if r.client == nil {
r.connMut.RLock()
client := r.client
r.connMut.RUnlock()

if client == nil {
return types.ErrNotConnected
}

for _, part := range msg.GetAll() {
if err := r.client.RPush(r.conf.Key, part).Err(); err != nil {
if err := client.RPush(r.conf.Key, part).Err(); err != nil {
r.disconnect()
r.log.Errorf("Error from redis: %v\n", err)
return types.ErrNotConnected
Expand All @@ -126,6 +135,8 @@ func (r *RedisList) Write(msg types.Message) error {

// disconnect safely closes a connection to an RedisList server.
func (r *RedisList) disconnect() error {
r.connMut.Lock()
defer r.connMut.Unlock()
if r.client != nil {
err := r.client.Close()
r.client = nil
Expand Down

0 comments on commit 6a4f66f

Please sign in to comment.