Skip to content

Commit

Permalink
Vastly improve metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Oct 17, 2017
1 parent 3c0083b commit 272a419
Show file tree
Hide file tree
Showing 27 changed files with 232 additions and 49 deletions.
2 changes: 1 addition & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ output:
There are also configuration sections for logging and metrics, if you print an
example config you will see the available options.
For a list of metrics within Benthos [check out this spec][6].
## Speed and Benchmarks
Benthos isn't doing much, so it's reasonable to expect low latencies and high
Expand Down Expand Up @@ -144,6 +146,7 @@ docker run --rm -v ~/benthos.yaml:/config.yaml -v /tmp/data:/data -p 8080:8080 \
[3]: https://goreportcard.com/badge/github.com/jeffail/benthos
[4]: https://goreportcard.com/report/jeffail/benthos
[5]: resources/docs/multipart.md
[6]: resources/docs/metrics.md
[dep]: https://github.com/golang/dep
[zmq]: http://zeromq.org/
[nanomsg]: http://nanomsg.org/
Expand Down
9 changes: 9 additions & 0 deletions lib/buffer/output_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,10 @@ func (m *OutputWrapper) inputLoop() {
}
backlog, err := m.buffer.PushMessage(msg)
if err == nil {
m.stats.Incr("buffer.write.count", 1)
m.stats.Gauge("buffer.backlog", int64(backlog))
} else {
m.stats.Incr("buffer.write.error", 1)
}
select {
case m.responsesOut <- types.NewSimpleResponse(err):
Expand Down Expand Up @@ -119,6 +122,8 @@ func (m *OutputWrapper) outputLoop() {
var err error
if msg, err = m.buffer.NextMessage(); err != nil {
if err != types.ErrTypeClosed {
m.stats.Incr("buffer.read.error", 1)

// Unconventional errors here should always indicate some
// sort of corruption. Hopefully the corruption was message
// specific and not the whole buffer, so we can try shifting
Expand All @@ -132,6 +137,8 @@ func (m *OutputWrapper) outputLoop() {
// If our buffer is closed then we exit.
return
}
} else {
m.stats.Incr("buffer.read.count", 1)
}
}

Expand All @@ -148,8 +155,10 @@ func (m *OutputWrapper) outputLoop() {
if res.Error() == nil {
msg = types.Message{}
backlog, _ := m.buffer.ShiftMessage()
m.stats.Incr("buffer.send.success", 1)
m.stats.Gauge("buffer.backlog", int64(backlog))
} else {
m.stats.Incr("buffer.send.error", 1)
if _, exists := errMap[res.Error()]; !exists {
errMap[res.Error()] = struct{}{}
errs = append(errs, res.Error())
Expand Down
6 changes: 2 additions & 4 deletions lib/input/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ func (a *AMQP) loop() {
select {
case msg := <-a.consumerChan:
data = &msg
a.stats.Incr("input.amqp.count", 1)
case <-a.closeChan:
return
}
Expand All @@ -234,13 +235,10 @@ func (a *AMQP) loop() {
return
}
if resErr := res.Error(); resErr == nil {
a.stats.Incr("input.amqp.count", 1)
if !res.SkipAck() {
data.Ack(true)
}
data = nil
} else if resErr == types.ErrMessageTooLarge {
a.stats.Incr("input.amqp.send.rejected", 1)
a.stats.Incr("input.amqp.send.success", 1)
data = nil
} else {
a.stats.Incr("input.amqp.send.error", 1)
Expand Down
10 changes: 8 additions & 2 deletions lib/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func NewFileConfig() FileConfig {
type File struct {
running int32

conf Config
log log.Modular
conf Config
log log.Modular
stats metrics.Type

messages chan types.Message
responses <-chan types.Response
Expand All @@ -84,6 +85,7 @@ func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
running: 1,
conf: conf,
log: log.NewModule(".input.file"),
stats: stats,
messages: make(chan types.Message),
responses: nil,
closeChan: make(chan struct{}),
Expand Down Expand Up @@ -139,6 +141,7 @@ func (f *File) loop() {
partsToSend = parts
parts = nil
}
f.stats.Incr("input.file.count", 1)
}
if len(partsToSend) > 0 {
select {
Expand All @@ -151,7 +154,10 @@ func (f *File) loop() {
return
}
if res.Error() == nil {
f.stats.Incr("input.file.send.success", 1)
partsToSend = nil
} else {
f.stats.Incr("input.file.send.error", 1)
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion lib/input/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,9 +171,12 @@ func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) {
msg.Parts = [][]byte{msgBytes}
}

h.stats.Incr("input.http_server.count", 1)

select {
case h.messages <- msg:
case <-time.After(time.Millisecond * time.Duration(h.conf.HTTPServer.TimeoutMS)):
h.stats.Incr("input.http_server.send.timeout", 1)
http.Error(w, "Request timed out", http.StatusRequestTimeout)
return
case <-h.closeChan:
Expand All @@ -187,14 +190,22 @@ func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "Server closing", http.StatusServiceUnavailable)
return
} else if res.Error() != nil {
h.stats.Incr("input.http_server.send.error", 1)
http.Error(w, res.Error().Error(), http.StatusBadGateway)
return
}
h.stats.Incr("input.http_server.send.success", 1)
case <-time.After(time.Millisecond * time.Duration(h.conf.HTTPServer.TimeoutMS)):
h.stats.Incr("input.http_server.send.timeout", 1)
http.Error(w, "Request timed out", http.StatusRequestTimeout)
go func() {
// Even if the request times out, we still need to drain a response.
<-h.responses
resAsync := <-h.responses
if resAsync.Error() != nil {
h.stats.Incr("input.http_server.send.async_error", 1)
} else {
h.stats.Incr("input.http_server.send.async_success", 1)
}
}()
return
}
Expand Down
8 changes: 2 additions & 6 deletions lib/input/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ func (k *Kafka) loop() {
if data, open = <-k.topicConsumer.Messages(); !open {
return
}
k.stats.Incr("input.kafka.count", 1)
}

// If bytes are read then try and propagate.
Expand All @@ -269,18 +270,13 @@ func (k *Kafka) loop() {
}
if resErr := res.Error(); resErr == nil {
k.stats.Timing("input.kafka.timing", int64(time.Since(start)))
k.stats.Incr("input.kafka.count", 1)
k.offset = data.Offset + 1
if !res.SkipAck() {
if err := k.commitOffset(); err != nil {
k.log.Errorf("Failed to commit offset: %v\n", err)
}
}
data = nil
} else if resErr == types.ErrMessageTooLarge {
k.stats.Incr("input.kafka.send.rejected", 1)
k.log.Errorf("Kafka message was rejected: %v\n", resErr)
k.log.Errorf("Message content: %s\n", data.Value)
k.stats.Incr("input.kafka.send.success", 1)
data = nil
} else {
k.stats.Incr("input.kafka.send.error", 1)
Expand Down
6 changes: 2 additions & 4 deletions lib/input/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,7 @@ func (n *NATS) loop() {
return
}
if resErr := res.Error(); resErr == nil {
n.stats.Incr("input.nats.count", 1)
msg = nil
} else if resErr == types.ErrMessageTooLarge {
n.stats.Incr("input.nats.send.rejected", 1)
n.stats.Incr("input.nats.send.success", 1)
msg = nil
} else {
n.stats.Incr("input.nats.send.error", 1)
Expand All @@ -149,6 +146,7 @@ func (n *NATS) loop() {
if !open {
return
}
n.stats.Incr("input.nats.count", 1)
case <-n.closeChan:
return
}
Expand Down
7 changes: 2 additions & 5 deletions lib/input/nsq.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ func (n *NSQ) loop() {
if msg == nil {
select {
case msg = <-n.internalMessages:
n.stats.Incr("input.nsq.count", 1)
case <-n.closeChan:
return
}
Expand All @@ -193,7 +194,7 @@ func (n *NSQ) loop() {
return
}
if resErr := res.Error(); resErr == nil {
n.stats.Incr("input.nsq.count", 1)
n.stats.Incr("input.nsq.send.success", 1)
if !res.SkipAck() {
msg.Finish()
if len(unAck) > 0 {
Expand All @@ -206,10 +207,6 @@ func (n *NSQ) loop() {
unAck = append(unAck, msg)
}
msg = nil
} else if resErr == types.ErrMessageTooLarge {
n.stats.Incr("input.nsq.send.rejected", 1)
msg.Finish()
msg = nil
} else {
n.stats.Incr("input.nsq.send.error", 1)
}
Expand Down
9 changes: 3 additions & 6 deletions lib/input/scale_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ func (s *ScaleProto) loop() {
if err != nil && err != mangos.ErrRecvTimeout {
s.log.Errorf("ScaleProto Socket recv error: %v\n", err)
s.stats.Incr("input.scale_proto.socket.recv.error", 1)
} else {
s.stats.Incr("input.scale_proto.count", 1)
}
}

Expand All @@ -219,12 +221,7 @@ func (s *ScaleProto) loop() {
}
if resErr := res.Error(); resErr == nil {
s.stats.Timing("input.scale_proto.timing", int64(time.Since(start)))
s.stats.Incr("input.scale_proto.count", 1)
data = nil
} else if resErr == types.ErrMessageTooLarge {
s.stats.Incr("input.scale_proto.send.rejected", 1)
s.log.Errorf("ScaleProto message was rejected: %v\n", resErr)
s.log.Errorf("Message content: %s\n", data)
s.stats.Incr("input.scale_proto.send.success", 1)
data = nil
} else {
s.stats.Incr("input.scale_proto.send.error", 1)
Expand Down
5 changes: 4 additions & 1 deletion lib/input/stdin.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,6 @@ func (s *STDIN) readLoop() {
if len(partsToSend) != 0 {
select {
case s.internalMessages <- partsToSend:
s.stats.Incr("stdin.message.sent", 1)
partsToSend = nil
case <-time.After(time.Second):
}
Expand Down Expand Up @@ -173,6 +172,7 @@ func (s *STDIN) loop() {
if !open {
return
}
s.stats.Incr("input.stdin.count", 1)
case <-s.closeChan:
return
}
Expand All @@ -189,7 +189,10 @@ func (s *STDIN) loop() {
return
}
if res.Error() == nil {
s.stats.Incr("input.stdin.send.success", 1)
data = nil
} else {
s.stats.Incr("input.stdin.send.error", 1)
}
}
}
Expand Down
7 changes: 2 additions & 5 deletions lib/input/zmq4.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,8 @@ func (z *ZMQ4) loop() {
z.stats.Incr("input.zmq4.receive.error", 1)
z.log.Errorf("Failed to receive message bytes: %v\n", err)
data = nil
} else {
z.stats.Incr("input.zmq4.count", 1)
}
if len(data) == 0 {
data = nil
Expand All @@ -208,11 +210,6 @@ func (z *ZMQ4) loop() {
return
}
if resErr := res.Error(); resErr == nil {
z.stats.Incr("input.zmq4.count", 1)
data = nil
} else if resErr == types.ErrMessageTooLarge {
z.stats.Incr("input.zmq4.send.rejected", 1)
z.log.Errorf("ZMQ4 message was rejected: %v\nMessage content: %v\n", resErr, data)
data = nil
} else {
z.stats.Incr("input.zmq4.send.error", 1)
Expand Down
6 changes: 6 additions & 0 deletions lib/output/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func (a *AMQP) loop() {
if msg, open = <-a.messages; !open {
return
}
a.stats.Incr("output.amqp.count", 1)
var err error
var sending []byte
var contentType string
Expand All @@ -201,6 +202,11 @@ func (a *AMQP) loop() {
// a bunch of application/implementation-specific fields
},
)
if err != nil {
a.stats.Incr("output.amqp.send.error", 1)
} else {
a.stats.Incr("output.amqp.send.success", 1)
}
select {
case a.responseChan <- types.NewSimpleResponse(err):
case <-a.closeChan:
Expand Down
5 changes: 4 additions & 1 deletion lib/output/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ func (h *HTTPClient) loop() {
if msg, open = <-h.messages; !open {
return
}
h.stats.Incr("output.http_client.count", 1)

// POST message
var client http.Client
Expand Down Expand Up @@ -184,7 +185,9 @@ func (h *HTTPClient) loop() {

if err != nil {
h.log.Errorf("POST request failed: %v\n", err)
h.stats.Incr("output.http_client.post.error", 1)
h.stats.Incr("output.http_client.send.error", 1)
} else {
h.stats.Incr("output.http_client.send.success", 1)
}
select {
case h.responseChan <- types.NewSimpleResponse(err):
Expand Down
2 changes: 2 additions & 0 deletions lib/output/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (h *HTTPServer) getHandler(w http.ResponseWriter, r *http.Request) {
go h.CloseAsync()
return
}
h.stats.Incr("output.http_server.count", 1)
case <-time.After(tOutDuration - time.Since(tStart)):
http.Error(w, "Timed out waiting for message", http.StatusRequestTimeout)
return
Expand Down Expand Up @@ -157,6 +158,7 @@ func (h *HTTPServer) getHandler(w http.ResponseWriter, r *http.Request) {
}

h.responseChan <- types.NewSimpleResponse(nil)
h.stats.Incr("output.http_server.send.success", 1)
}

//------------------------------------------------------------------------------
Expand Down
2 changes: 2 additions & 0 deletions lib/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,14 @@ func (k *Kafka) loop() {
if msg, open = <-k.messages; !open {
return
}
k.stats.Incr("output.kafka.count", 1)
for _, part := range msg.Parts {
select {
case k.producer.Input() <- &sarama.ProducerMessage{
Topic: k.conf.Kafka.Topic,
Value: sarama.ByteEncoder(part),
}:
k.stats.Incr("output.kafka.send.success", 1)
case <-k.closeChan:
return
}
Expand Down
Loading

0 comments on commit 272a419

Please sign in to comment.