diff --git a/.gitignore b/.gitignore index ae659826e2..90cd73c780 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,5 @@ tracker TODO target vendor +key.pem +cert.pem diff --git a/lib/input/file.go b/lib/input/file.go index e8c601c739..ecb40d1f49 100644 --- a/lib/input/file.go +++ b/lib/input/file.go @@ -23,10 +23,7 @@ package input import ( "bufio" "os" - "sync/atomic" - "time" - "github.com/jeffail/benthos/lib/types" "github.com/jeffail/util/log" "github.com/jeffail/util/metrics" ) @@ -39,7 +36,10 @@ func init() { description: ` The file type reads input from a file. If multipart is set to false each line is read as a separate message. If multipart is set to true each line is read as -a message part, and an empty line indicates the end of a message.`, +a message part, and an empty line indicates the end of a message. + +Alternatively, a custom delimiter can be set that is used instead of line +breaks.`, } } @@ -47,153 +47,41 @@ a message part, and an empty line indicates the end of a message.`, // FileConfig is configuration values for the File input type. type FileConfig struct { - Path string `json:"path" yaml:"path"` - Multipart bool `json:"multipart" yaml:"multipart"` - MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` + Path string `json:"path" yaml:"path"` + Multipart bool `json:"multipart" yaml:"multipart"` + MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` + CustomDelim string `json:"custom_delimiter" yaml:"custom_delimiter"` } // NewFileConfig creates a new FileConfig with default values. func NewFileConfig() FileConfig { return FileConfig{ - Path: "", - Multipart: false, - MaxBuffer: bufio.MaxScanTokenSize, + Path: "", + Multipart: false, + MaxBuffer: bufio.MaxScanTokenSize, + CustomDelim: "", } } //------------------------------------------------------------------------------ -// File is an input type that reads lines from a file, creating a message per -// line. -type File struct { - running int32 - - conf Config - log log.Modular - stats metrics.Type - - messages chan types.Message - responses <-chan types.Response - - closeChan chan struct{} - closedChan chan struct{} -} - // NewFile creates a new File input type. func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error) { - f := File{ - running: 1, - conf: conf, - log: log.NewModule(".input.file"), - stats: stats, - messages: make(chan types.Message), - responses: nil, - closeChan: make(chan struct{}), - closedChan: make(chan struct{}), - } - return &f, nil -} - -//------------------------------------------------------------------------------ - -// loop is an internal loop that brokers incoming messages to output pipe. -func (f *File) loop() { - defer func() { - atomic.StoreInt32(&f.running, 0) - - close(f.messages) - close(f.closedChan) - }() - - file, err := os.Open(f.conf.File.Path) + file, err := os.Open(conf.File.Path) if err != nil { - f.log.Errorf("Read %v error: %v\n", f.conf.File.Path, err) - return - } - defer file.Close() - - scanner := bufio.NewScanner(file) - if f.conf.File.MaxBuffer != bufio.MaxScanTokenSize { - scanner.Buffer([]byte{}, f.conf.File.MaxBuffer) + return nil, err } - - f.log.Infof("Reading messages from: %v\n", f.conf.File.Path) - - var partsToSend, parts [][]byte - - for atomic.LoadInt32(&f.running) == 1 { - if len(partsToSend) == 0 { - if !scanner.Scan() { - if err = scanner.Err(); err != nil { - f.log.Errorf("File read error: %v\n", err) - } - return - } - data := make([]byte, len(scanner.Bytes())) - copy(data, scanner.Bytes()) - if len(data) > 0 { - if f.conf.File.Multipart { - parts = append(parts, data) - } else { - partsToSend = append(partsToSend, data) - } - } else if f.conf.File.Multipart { - partsToSend = parts - parts = nil - } - f.stats.Incr("input.file.count", 1) - } - if len(partsToSend) > 0 { - select { - case f.messages <- types.Message{Parts: partsToSend}: - case <-f.closeChan: - return - } - res, open := <-f.responses - if !open { - return - } - if res.Error() == nil { - f.stats.Incr("input.file.send.success", 1) - partsToSend = nil - } else { - f.stats.Incr("input.file.send.error", 1) - } - } - } -} - -// StartListening sets the channel used by the input to validate message -// receipt. -func (f *File) StartListening(responses <-chan types.Response) error { - if f.responses != nil { - return types.ErrAlreadyStarted - } - f.responses = responses - go f.loop() - return nil -} - -// MessageChan returns the messages channel. -func (f *File) MessageChan() <-chan types.Message { - return f.messages -} - -// CloseAsync shuts down the File input and stops processing requests. -func (f *File) CloseAsync() { - if atomic.CompareAndSwapInt32(&f.running, 1, 0) { - close(f.closeChan) - } -} - -// WaitForClose blocks until the File input has closed down. -func (f *File) WaitForClose(timeout time.Duration) error { - select { - case <-f.closedChan: - case <-time.After(timeout): - return types.ErrTimeout + delim := []byte("\n") + if len(conf.File.CustomDelim) > 0 { + delim = []byte(conf.File.CustomDelim) } - return nil + return newReader( + file, + conf.File.MaxBuffer, + conf.File.Multipart, + delim, + log, stats, + ) } //------------------------------------------------------------------------------ diff --git a/lib/input/http_server.go b/lib/input/http_server.go index d7be5aabb5..7dcece1a73 100644 --- a/lib/input/http_server.go +++ b/lib/input/http_server.go @@ -116,11 +116,15 @@ func NewHTTPServer(conf Config, log log.Modular, stats metrics.Type) (Type, erro //------------------------------------------------------------------------------ func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if atomic.LoadInt32(&h.running) != 1 { http.Error(w, "Server closing", http.StatusServiceUnavailable) return } + h.stats.Incr("input.http_server.count", 1) + if r.Method != "POST" { http.Error(w, "Incorrect method", http.StatusMethodNotAllowed) return @@ -169,8 +173,6 @@ func (h *HTTPServer) postHandler(w http.ResponseWriter, r *http.Request) { msg.Parts = [][]byte{msgBytes} } - h.stats.Incr("input.http_server.count", 1) - select { case h.messages <- msg: case <-time.After(time.Millisecond * time.Duration(h.conf.HTTPServer.TimeoutMS)): diff --git a/lib/input/interface_test.go b/lib/input/interface_test.go deleted file mode 100644 index c8e0427bc6..0000000000 --- a/lib/input/interface_test.go +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright (c) 2014 Ashley Jeffs -// -// Permission is hereby granted, free of charge, to any person obtaining a copy -// of this software and associated documentation files (the "Software"), to deal -// in the Software without restriction, including without limitation the rights -// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -// copies of the Software, and to permit persons to whom the Software is -// furnished to do so, subject to the following conditions: -// -// The above copyright notice and this permission notice shall be included in -// all copies or substantial portions of the Software. -// -// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN -// THE SOFTWARE. - -package input - -import "testing" - -//------------------------------------------------------------------------------ - -func TestInterfaces(t *testing.T) { - s := &STDIN{} - if Type(s) == nil { - t.Errorf("stdin: nil Type") - } -} - -//------------------------------------------------------------------------------ diff --git a/lib/input/reader.go b/lib/input/reader.go new file mode 100644 index 0000000000..8dd426fee6 --- /dev/null +++ b/lib/input/reader.go @@ -0,0 +1,235 @@ +// Copyright (c) 2014 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package input + +import ( + "bufio" + "bytes" + "io" + "sync/atomic" + "time" + + "github.com/jeffail/benthos/lib/types" + "github.com/jeffail/util/log" + "github.com/jeffail/util/metrics" +) + +//------------------------------------------------------------------------------ + +// reader is an input type that reads messages from an io.Reader type. +type reader struct { + running int32 + + handle io.Reader + + maxBuffer int + multipart bool + customDelim []byte + + log log.Modular + stats metrics.Type + + internalMessages chan [][]byte + + messages chan types.Message + responses <-chan types.Response + + closeChan chan struct{} + closedChan chan struct{} +} + +// newReader creates a new reader input type. +func newReader( + handle io.Reader, + maxBuffer int, + multipart bool, + customDelim []byte, + log log.Modular, + stats metrics.Type, +) (Type, error) { + s := reader{ + running: 1, + handle: handle, + maxBuffer: maxBuffer, + multipart: multipart, + customDelim: customDelim, + log: log.NewModule(".input.reader"), + stats: stats, + internalMessages: make(chan [][]byte), + messages: make(chan types.Message), + responses: nil, + closeChan: make(chan struct{}), + closedChan: make(chan struct{}), + } + + return &s, nil +} + +//------------------------------------------------------------------------------ + +// readLoop reads from input pipe and sends to internal messages chan. +func (s *reader) readLoop() { + defer func() { + close(s.internalMessages) + if closer, ok := s.handle.(io.ReadCloser); ok { + closer.Close() + } + }() + scanner := bufio.NewScanner(s.handle) + if s.maxBuffer != bufio.MaxScanTokenSize { + scanner.Buffer([]byte{}, s.maxBuffer) + } + scanner.Split(func(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + + if i := bytes.Index(data, s.customDelim); i >= 0 { + // We have a full terminated line. + return i + len(s.customDelim), data[0:i], nil + } + + // If we're at EOF, we have a final, non-terminated line. Return it. + if atEOF { + return len(data), data, nil + } + + // Request more data. + return 0, nil, nil + }) + + var partsToSend, parts [][]byte + + for atomic.LoadInt32(&s.running) == 1 { + // If no bytes then read a line + if len(partsToSend) == 0 { + if scanner.Scan() { + newPart := make([]byte, len(scanner.Bytes())) + copy(newPart, scanner.Bytes()) + if len(newPart) > 0 { + if s.multipart { + parts = append(parts, newPart) + } else { + partsToSend = append(partsToSend, newPart) + } + } else if s.multipart { + // Empty line means we're finished reading parts for this + // message. + partsToSend = parts + parts = nil + } + } else { + return + } + } + + // If we have a line to push out + if len(partsToSend) != 0 { + select { + case s.internalMessages <- partsToSend: + partsToSend = nil + case <-time.After(time.Second): + } + } + } +} + +// loop is the internal loop that brokers incoming messages to output pipe. +func (s *reader) loop() { + defer func() { + atomic.StoreInt32(&s.running, 0) + close(s.messages) + close(s.closedChan) + }() + + var data [][]byte + var open bool + + readChan := s.internalMessages + + for atomic.LoadInt32(&s.running) == 1 { + if data == nil { + select { + case data, open = <-readChan: + if !open { + return + } + s.stats.Incr("input.reader.count", 1) + case <-s.closeChan: + return + } + } + if data != nil { + select { + case s.messages <- types.Message{Parts: data}: + case <-s.closeChan: + return + } + + var res types.Response + if res, open = <-s.responses; !open { + return + } + if res.Error() == nil { + s.stats.Incr("input.reader.send.success", 1) + data = nil + } else { + s.stats.Incr("input.reader.send.error", 1) + } + } + } +} + +// StartListening sets the channel used by the input to validate message +// receipt. +func (s *reader) StartListening(responses <-chan types.Response) error { + if s.responses != nil { + return types.ErrAlreadyStarted + } + s.responses = responses + go s.readLoop() + go s.loop() + return nil +} + +// MessageChan returns the messages channel. +func (s *reader) MessageChan() <-chan types.Message { + return s.messages +} + +// CloseAsync shuts down the reader input and stops processing requests. +func (s *reader) CloseAsync() { + if atomic.CompareAndSwapInt32(&s.running, 1, 0) { + close(s.closeChan) + } +} + +// WaitForClose blocks until the reader input has closed down. +func (s *reader) WaitForClose(timeout time.Duration) error { + select { + case <-s.closedChan: + case <-time.After(timeout): + return types.ErrTimeout + } + return nil +} + +//------------------------------------------------------------------------------ diff --git a/lib/input/reader_test.go b/lib/input/reader_test.go new file mode 100644 index 0000000000..5cfe8f5341 --- /dev/null +++ b/lib/input/reader_test.go @@ -0,0 +1,341 @@ +// Copyright (c) 2014 Ashley Jeffs +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package input + +import ( + "bufio" + "bytes" + "os" + "testing" + "time" + + "github.com/jeffail/benthos/lib/types" + "github.com/jeffail/util/log" + "github.com/jeffail/util/metrics" +) + +func TestReaderSinglePart(t *testing.T) { + messages := []string{ + "first message", + "second message", + "third message", + } + + var handle bytes.Buffer + + for _, msg := range messages { + handle.Write([]byte(msg)) + handle.Write([]byte("\n")) + handle.Write([]byte("\n")) // Try some empty messages + } + + f, err := newReader( + &handle, + bufio.MaxScanTokenSize, + false, + []byte("\n"), + log.NewLogger(os.Stdout, logConfig), metrics.DudType{}, + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + f.CloseAsync() + if err := f.WaitForClose(time.Second); err != nil { + t.Error(err) + } + }() + + resChan := make(chan types.Response) + + if err = f.StartListening(resChan); err != nil { + t.Error(err) + return + } + + for _, msg := range messages { + select { + case resMsg, open := <-f.MessageChan(): + if !open { + t.Error("channel closed early") + } else if res := string(resMsg.Parts[0]); res != msg { + t.Errorf("Wrong result, %v != %v", res, msg) + } + case <-time.After(time.Second): + t.Error("Timed out waiting for message") + } + select { + case resChan <- types.NewSimpleResponse(nil): + case <-time.After(time.Second): + t.Error("Timed out waiting for response") + } + } + + select { + case _, open := <-f.MessageChan(): + if open { + t.Error("Channel not closed at end of messages") + } + case <-time.After(time.Second): + t.Error("Timed out waiting for channel close") + } +} + +func TestReaderSinglePartCustomDelim(t *testing.T) { + messages := []string{ + "first message", + "second message", + "third message", + } + + var handle bytes.Buffer + + for _, msg := range messages { + handle.Write([]byte(msg)) + handle.Write([]byte("")) + handle.Write([]byte("")) // Try some empty messages + } + + f, err := newReader( + &handle, + bufio.MaxScanTokenSize, + false, + []byte(""), + log.NewLogger(os.Stdout, logConfig), metrics.DudType{}, + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + f.CloseAsync() + if err := f.WaitForClose(time.Second); err != nil { + t.Error(err) + } + }() + + resChan := make(chan types.Response) + + if err = f.StartListening(resChan); err != nil { + t.Error(err) + return + } + + for _, msg := range messages { + select { + case resMsg, open := <-f.MessageChan(): + if !open { + t.Error("channel closed early") + } else if res := string(resMsg.Parts[0]); res != msg { + t.Errorf("Wrong result, %v != %v", res, msg) + } + case <-time.After(time.Second): + t.Error("Timed out waiting for message") + } + select { + case resChan <- types.NewSimpleResponse(nil): + case <-time.After(time.Second): + t.Error("Timed out waiting for response") + } + } + + select { + case _, open := <-f.MessageChan(): + if open { + t.Error("Channel not closed at end of messages") + } + case <-time.After(time.Second): + t.Error("Timed out waiting for channel close") + } +} + +func TestReaderMultiPart(t *testing.T) { + var handle bytes.Buffer + + messages := [][]string{ + { + "first message", + "1", + "2", + }, + { + "second message", + "1", + "2", + }, + { + "third message", + "1", + "2", + }, + } + + for _, msg := range messages { + for _, part := range msg { + handle.Write([]byte(part)) + handle.Write([]byte("\n")) + } + handle.Write([]byte("\n")) + } + + f, err := newReader( + &handle, + bufio.MaxScanTokenSize, + true, + []byte("\n"), + log.NewLogger(os.Stdout, logConfig), metrics.DudType{}, + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + f.CloseAsync() + if err := f.WaitForClose(time.Second); err != nil { + t.Error(err) + } + }() + + resChan := make(chan types.Response) + + if err := f.StartListening(resChan); err != nil { + t.Error(err) + return + } + + for _, msg := range messages { + select { + case resMsg, open := <-f.MessageChan(): + if !open { + t.Error("channel closed early") + } else { + for i, part := range msg { + if res := string(resMsg.Parts[i]); res != part { + t.Errorf("Wrong result, %v != %v", res, part) + } + } + } + case <-time.After(time.Second): + t.Error("Timed out waiting for message") + } + select { + case resChan <- types.NewSimpleResponse(nil): + case <-time.After(time.Second): + t.Error("Timed out waiting for response") + } + } + + select { + case _, open := <-f.MessageChan(): + if open { + t.Error("Channel not closed at end of messages") + } + case <-time.After(time.Second): + t.Error("Timed out waiting for channel close") + } +} + +func TestReaderMultiPartCustomDelim(t *testing.T) { + var handle bytes.Buffer + + messages := [][]string{ + { + "first message", + "1", + "2", + }, + { + "second message", + "1", + "2", + }, + { + "third message", + "1", + "2", + }, + } + + for _, msg := range messages { + for _, part := range msg { + handle.Write([]byte(part)) + handle.Write([]byte("")) + } + handle.Write([]byte("")) + } + + f, err := newReader( + &handle, + bufio.MaxScanTokenSize, + true, + []byte(""), + log.NewLogger(os.Stdout, logConfig), metrics.DudType{}, + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + f.CloseAsync() + if err := f.WaitForClose(time.Second); err != nil { + t.Error(err) + } + }() + + resChan := make(chan types.Response) + + if err := f.StartListening(resChan); err != nil { + t.Error(err) + return + } + + for _, msg := range messages { + select { + case resMsg, open := <-f.MessageChan(): + if !open { + t.Error("channel closed early") + } else { + for i, part := range msg { + if res := string(resMsg.Parts[i]); res != part { + t.Errorf("Wrong result, %v != %v", res, part) + } + } + } + case <-time.After(time.Second): + t.Error("Timed out waiting for message") + } + select { + case resChan <- types.NewSimpleResponse(nil): + case <-time.After(time.Second): + t.Error("Timed out waiting for response") + } + } + + select { + case _, open := <-f.MessageChan(): + if open { + t.Error("Channel not closed at end of messages") + } + case <-time.After(time.Second): + t.Error("Timed out waiting for channel close") + } +} diff --git a/lib/input/stdin.go b/lib/input/stdin.go index 44ef9c1255..3426403ba4 100644 --- a/lib/input/stdin.go +++ b/lib/input/stdin.go @@ -22,12 +22,8 @@ package input import ( "bufio" - "io" "os" - "sync/atomic" - "time" - "github.com/jeffail/benthos/lib/types" "github.com/jeffail/util/log" "github.com/jeffail/util/metrics" ) @@ -41,7 +37,10 @@ func init() { The stdin input simply reads any data piped to stdin as messages. By default the messages are assumed single part and are line delimited. If the multipart option is set to true then lines are interpretted as message parts, and an empty line -indicates the end of the message.`, +indicates the end of the message. + +Alternatively, a custom delimiter can be set that is used instead of line +breaks.`, } } @@ -49,186 +48,35 @@ indicates the end of the message.`, // STDINConfig contains config fields for the STDIN input type. type STDINConfig struct { - Multipart bool `json:"multipart" yaml:"multipart"` - MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` + Multipart bool `json:"multipart" yaml:"multipart"` + MaxBuffer int `json:"max_buffer" yaml:"max_buffer"` + CustomDelim string `json:"custom_delimiter" yaml:"custom_delimiter"` } // NewSTDINConfig creates a STDINConfig populated with default values. func NewSTDINConfig() STDINConfig { return STDINConfig{ - Multipart: false, - MaxBuffer: bufio.MaxScanTokenSize, + Multipart: false, + MaxBuffer: bufio.MaxScanTokenSize, + CustomDelim: "", } } //------------------------------------------------------------------------------ -// STDIN is an input type that reads lines from STDIN. -type STDIN struct { - running int32 - - handle io.Reader - - conf Config - log log.Modular - stats metrics.Type - - internalMessages chan [][]byte - - messages chan types.Message - responses <-chan types.Response - - closeChan chan struct{} - closedChan chan struct{} -} - // NewSTDIN creates a new STDIN input type. func NewSTDIN(conf Config, log log.Modular, stats metrics.Type) (Type, error) { - s := STDIN{ - running: 1, - handle: os.Stdin, - conf: conf, - log: log.NewModule(".input.stdin"), - stats: stats, - internalMessages: make(chan [][]byte), - messages: make(chan types.Message), - responses: nil, - closeChan: make(chan struct{}), - closedChan: make(chan struct{}), - } - - go s.readLoop() - - return &s, nil -} - -//------------------------------------------------------------------------------ - -// readLoop reads from stdin pipe and sends to internal messages chan. -func (s *STDIN) readLoop() { - defer func() { - close(s.internalMessages) - }() - stdin := bufio.NewScanner(s.handle) - if s.conf.STDIN.MaxBuffer != bufio.MaxScanTokenSize { - stdin.Buffer([]byte{}, s.conf.STDIN.MaxBuffer) - } - - var partsToSend, parts [][]byte - - for atomic.LoadInt32(&s.running) == 1 { - // If no bytes then read a line - if len(partsToSend) == 0 { - if stdin.Scan() { - newPart := make([]byte, len(stdin.Bytes())) - copy(newPart, stdin.Bytes()) - if len(newPart) > 0 { - if s.conf.STDIN.Multipart { - parts = append(parts, newPart) - } else { - partsToSend = append(partsToSend, newPart) - } - } else if s.conf.STDIN.Multipart { - // Empty line means we're finished reading parts for this - // message. - partsToSend = parts - parts = nil - } - } else { - return - } - } - - // If we have a line to push out - if len(partsToSend) != 0 { - select { - case s.internalMessages <- partsToSend: - partsToSend = nil - case <-time.After(time.Second): - } - } - } -} - -// loop is the internal loop that brokers incoming messages to output pipe. -func (s *STDIN) loop() { - defer func() { - atomic.StoreInt32(&s.running, 0) - close(s.messages) - close(s.closedChan) - }() - - var data [][]byte - var open bool - - readChan := s.internalMessages - - s.log.Infoln("Receiving messages through STDIN") - - for atomic.LoadInt32(&s.running) == 1 { - if data == nil { - select { - case data, open = <-readChan: - if !open { - return - } - s.stats.Incr("input.stdin.count", 1) - case <-s.closeChan: - return - } - } - if data != nil { - select { - case s.messages <- types.Message{Parts: data}: - case <-s.closeChan: - return - } - - var res types.Response - if res, open = <-s.responses; !open { - return - } - if res.Error() == nil { - s.stats.Incr("input.stdin.send.success", 1) - data = nil - } else { - s.stats.Incr("input.stdin.send.error", 1) - } - } - } -} - -// StartListening sets the channel used by the input to validate message -// receipt. -func (s *STDIN) StartListening(responses <-chan types.Response) error { - if s.responses != nil { - return types.ErrAlreadyStarted - } - s.responses = responses - go s.loop() - return nil -} - -// MessageChan returns the messages channel. -func (s *STDIN) MessageChan() <-chan types.Message { - return s.messages -} - -// CloseAsync shuts down the STDIN input and stops processing requests. -func (s *STDIN) CloseAsync() { - if atomic.CompareAndSwapInt32(&s.running, 1, 0) { - close(s.closeChan) - } -} - -// WaitForClose blocks until the STDIN input has closed down. -func (s *STDIN) WaitForClose(timeout time.Duration) error { - select { - case <-s.closedChan: - case <-time.After(timeout): - return types.ErrTimeout + delim := []byte("\n") + if len(conf.STDIN.CustomDelim) > 0 { + delim = []byte(conf.STDIN.CustomDelim) } - return nil + return newReader( + os.Stdin, + conf.STDIN.MaxBuffer, + conf.STDIN.Multipart, + delim, + log, stats, + ) } //------------------------------------------------------------------------------ diff --git a/lib/input/stdin_test.go b/lib/input/stdin_test.go index 4f942b3f3a..e49a936f4a 100644 --- a/lib/input/stdin_test.go +++ b/lib/input/stdin_test.go @@ -21,8 +21,6 @@ package input import ( - "bytes" - "io" "os" "testing" "time" @@ -32,25 +30,6 @@ import ( "github.com/jeffail/util/metrics" ) -func getTestSTDIN(handle io.ReadWriter, conf Config) *STDIN { - s := STDIN{ - running: 1, - handle: handle, - conf: conf, - log: log.NewLogger(os.Stdout, logConfig), - stats: metrics.DudType{}, - internalMessages: make(chan [][]byte), - messages: make(chan types.Message), - responses: nil, - closeChan: make(chan struct{}), - closedChan: make(chan struct{}), - } - - go s.readLoop() - - return &s -} - func TestSTDINClose(t *testing.T) { s, err := NewSTDIN(NewConfig(), log.NewLogger(os.Stdout, logConfig), metrics.DudType{}) if err != nil { @@ -71,142 +50,3 @@ func TestSTDINClose(t *testing.T) { t.Error(err) } } - -func TestSTDINSinglePart(t *testing.T) { - messages := []string{ - "first message", - "second message", - "third message", - } - - var handle bytes.Buffer - - for _, msg := range messages { - handle.Write([]byte(msg)) - handle.Write([]byte("\n")) - handle.Write([]byte("\n")) // Try some empty messages - } - - f := getTestSTDIN(&handle, NewConfig()) - - defer func() { - f.CloseAsync() - if err := f.WaitForClose(time.Second); err != nil { - t.Error(err) - } - }() - - resChan := make(chan types.Response) - - if err := f.StartListening(resChan); err != nil { - t.Error(err) - return - } - - for _, msg := range messages { - select { - case resMsg, open := <-f.MessageChan(): - if !open { - t.Error("channel closed early") - } else if res := string(resMsg.Parts[0]); res != msg { - t.Errorf("Wrong result, %v != %v", res, msg) - } - case <-time.After(time.Second): - t.Error("Timed out waiting for message") - } - select { - case resChan <- types.NewSimpleResponse(nil): - case <-time.After(time.Second): - t.Error("Timed out waiting for response") - } - } - - select { - case _, open := <-f.MessageChan(): - if open { - t.Error("Channel not closed at end of messages") - } - case <-time.After(time.Second): - t.Error("Timed out waiting for channel close") - } -} - -func TestSTDINMultiPart(t *testing.T) { - var handle bytes.Buffer - - messages := [][]string{ - { - "first message", - "1", - "2", - }, - { - "second message", - "1", - "2", - }, - { - "third message", - "1", - "2", - }, - } - - for _, msg := range messages { - for _, part := range msg { - handle.Write([]byte(part)) - handle.Write([]byte("\n")) - } - handle.Write([]byte("\n")) - } - - conf := NewConfig() - conf.STDIN.Multipart = true - - f := getTestSTDIN(&handle, conf) - - defer func() { - f.CloseAsync() - if err := f.WaitForClose(time.Second); err != nil { - t.Error(err) - } - }() - - resChan := make(chan types.Response) - - if err := f.StartListening(resChan); err != nil { - t.Error(err) - return - } - - for _, msg := range messages { - select { - case resMsg, open := <-f.MessageChan(): - if !open { - t.Error("channel closed early") - } else { - for i, part := range msg { - if res := string(resMsg.Parts[i]); res != part { - t.Errorf("Wrong result, %v != %v", res, part) - } - } - } - case <-time.After(time.Second): - t.Error("Timed out waiting for message") - } - select { - case resChan <- types.NewSimpleResponse(nil): - case <-time.After(time.Second): - t.Error("Timed out waiting for response") - } - } - - select { - case _, open := <-f.MessageChan(): - if open { - t.Error("Channel not closed at end of messages") - } - case <-time.After(time.Second): - t.Error("Timed out waiting for channel close") - } -} diff --git a/lib/input/zmq4.go b/lib/input/zmq4.go index c75fa4f648..414c6fa446 100644 --- a/lib/input/zmq4.go +++ b/lib/input/zmq4.go @@ -190,11 +190,11 @@ func (z *ZMQ4) loop() { z.stats.Incr("input.zmq4.receive.error", 1) z.log.Errorf("Failed to receive message bytes: %v\n", err) data = nil - } else { - z.stats.Incr("input.zmq4.count", 1) } if len(data) == 0 { data = nil + } else { + z.stats.Incr("input.zmq4.count", 1) } } @@ -210,6 +210,7 @@ func (z *ZMQ4) loop() { return } if resErr := res.Error(); resErr == nil { + z.stats.Incr("input.zmq4.send.success", 1) data = nil } else { z.stats.Incr("input.zmq4.send.error", 1) diff --git a/lib/output/constructor.go b/lib/output/constructor.go index d13b5d6e88..9fa8e0e3a9 100644 --- a/lib/output/constructor.go +++ b/lib/output/constructor.go @@ -58,7 +58,7 @@ type Config struct { NATS NATSConfig `json:"nats" yaml:"nats"` ZMQ4 *ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"` File FileConfig `json:"file" yaml:"file"` - STDOUT struct{} `json:"stdout" yaml:"stdout"` + STDOUT STDOUTConfig `json:"stdout" yaml:"stdout"` FanOut FanOutConfig `json:"fan_out" yaml:"fan_out"` RoundRobin RoundRobinConfig `json:"round_robin" yaml:"round_robin"` Processors []processor.Config `json:"processors" yaml:"processors"` @@ -77,7 +77,7 @@ func NewConfig() Config { NATS: NewNATSConfig(), ZMQ4: NewZMQ4Config(), File: NewFileConfig(), - STDOUT: struct{}{}, + STDOUT: NewSTDOUTConfig(), FanOut: NewFanOutConfig(), RoundRobin: NewRoundRobinConfig(), Processors: []processor.Config{}, diff --git a/lib/output/file.go b/lib/output/file.go index 7e0b3c5408..72d988f351 100644 --- a/lib/output/file.go +++ b/lib/output/file.go @@ -41,7 +41,10 @@ as: foo\n bar\n -baz\n\n`, +baz\n\n + +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above.`, } } @@ -49,13 +52,15 @@ baz\n\n`, // FileConfig is configuration values for the file based output type. type FileConfig struct { - Path string `json:"path" yaml:"path"` + Path string `json:"path" yaml:"path"` + CustomDelim string `json:"custom_delimiter" yaml:"custom_delimiter"` } // NewFileConfig creates a new FileConfig with default values. func NewFileConfig() FileConfig { return FileConfig{ - Path: "", + Path: "", + CustomDelim: "", } } @@ -67,7 +72,7 @@ func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error) { if err != nil { return nil, err } - return newWriter(file, log, stats) + return newWriter(file, []byte(conf.File.CustomDelim), log, stats) } //------------------------------------------------------------------------------ diff --git a/lib/output/http_client.go b/lib/output/http_client.go index cef88c06eb..5a46a2fb66 100644 --- a/lib/output/http_client.go +++ b/lib/output/http_client.go @@ -22,6 +22,7 @@ package output import ( "bytes" + "crypto/tls" "io" "mime/multipart" "net/http" @@ -30,6 +31,7 @@ import ( "time" "github.com/jeffail/benthos/lib/types" + "github.com/jeffail/benthos/lib/util/oauth" "github.com/jeffail/util/log" "github.com/jeffail/util/metrics" ) @@ -53,19 +55,23 @@ multipart, please read the 'docs/using_http.md' document.`, // HTTPClientConfig is configuration for the HTTPClient output type. type HTTPClientConfig struct { - URL string `json:"url" yaml:"url"` - TimeoutMS int64 `json:"timeout_ms" yaml:"timeout_ms"` - RetryMS int64 `json:"retry_period_ms" yaml:"retry_period_ms"` - NumRetries int `json:"retries" yaml:"retries"` + URL string `json:"url" yaml:"url"` + OAuth oauth.ClientConfig `json:"oauth" yaml:"oauth"` + TimeoutMS int64 `json:"timeout_ms" yaml:"timeout_ms"` + RetryMS int64 `json:"retry_period_ms" yaml:"retry_period_ms"` + NumRetries int `json:"retries" yaml:"retries"` + SkipCertVerify bool `json:"skip_cert_verify" yaml:"skip_cert_verify"` } // NewHTTPClientConfig creates a new HTTPClientConfig with default values. func NewHTTPClientConfig() HTTPClientConfig { return HTTPClientConfig{ - URL: "http://localhost:8081/post", - TimeoutMS: 5000, - RetryMS: 1000, - NumRetries: 3, + URL: "http://localhost:8081/post", + OAuth: oauth.NewClientConfig(), + TimeoutMS: 5000, + RetryMS: 1000, + NumRetries: 3, + SkipCertVerify: false, } } @@ -106,7 +112,9 @@ func NewHTTPClient(conf Config, log log.Modular, stats metrics.Type) (Type, erro //------------------------------------------------------------------------------ // createRequest creates an HTTP request out of a single message. -func createRequest(url string, msg *types.Message) (req *http.Request, err error) { +func createRequest( + url string, msg *types.Message, oauthConfig oauth.ClientConfig, +) (req *http.Request, err error) { if len(msg.Parts) == 1 { body := bytes.NewBuffer(msg.Parts[0]) if req, err = http.NewRequest("POST", url, body); err == nil { @@ -130,6 +138,9 @@ func createRequest(url string, msg *types.Message) (req *http.Request, err error req.Header.Add("Content-Type", writer.FormDataContentType()) } } + if oauthConfig.Enabled { + err = oauthConfig.Sign(req) + } return } @@ -145,6 +156,15 @@ func (h *HTTPClient) loop() { h.log.Infof("Sending HTTP Post messages to: %s\n", h.conf.HTTPClient.URL) + var client http.Client + client.Timeout = time.Duration(h.conf.HTTPClient.TimeoutMS) * time.Millisecond + + if h.conf.HTTPClient.SkipCertVerify { + client.Transport = &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + } + var open bool for atomic.LoadInt32(&h.running) == 1 { var msg types.Message @@ -154,30 +174,33 @@ func (h *HTTPClient) loop() { h.stats.Incr("output.http_client.count", 1) // POST message - var client http.Client - client.Timeout = time.Duration(h.conf.HTTPClient.TimeoutMS) * time.Millisecond - var req *http.Request var res *http.Response var err error - if req, err = createRequest(h.conf.HTTPClient.URL, &msg); err == nil { - if res, err = client.Do(req); err == nil && - (res.StatusCode < 200 || res.StatusCode > 299) { - err = types.ErrUnexpectedHTTPRes{Code: res.StatusCode, S: res.Status} + if req, err = createRequest( + h.conf.HTTPClient.URL, &msg, h.conf.HTTPClient.OAuth, + ); err == nil { + if res, err = client.Do(req); err == nil { + if res.StatusCode < 200 || res.StatusCode > 299 { + err = types.ErrUnexpectedHTTPRes{Code: res.StatusCode, S: res.Status} + } + res.Body.Close() } i, j := 0, h.conf.HTTPClient.NumRetries for i < j && err != nil { - req, _ = createRequest(h.conf.HTTPClient.URL, &msg) + req, _ = createRequest(h.conf.HTTPClient.URL, &msg, h.conf.HTTPClient.OAuth) select { case <-time.After(time.Duration(h.conf.HTTPClient.RetryMS) * time.Millisecond): case <-h.closeChan: return } - if res, err = client.Do(req); err == nil && - (res.StatusCode < 200 || res.StatusCode > 299) { - err = types.ErrUnexpectedHTTPRes{Code: res.StatusCode, S: res.Status} + if res, err = client.Do(req); err == nil { + if res.StatusCode < 200 || res.StatusCode > 299 { + err = types.ErrUnexpectedHTTPRes{Code: res.StatusCode, S: res.Status} + } + res.Body.Close() } i++ } diff --git a/lib/output/http_server.go b/lib/output/http_server.go index 28e9766341..531b8356b1 100644 --- a/lib/output/http_server.go +++ b/lib/output/http_server.go @@ -112,6 +112,8 @@ func NewHTTPServer(conf Config, log log.Modular, stats metrics.Type) (Type, erro //------------------------------------------------------------------------------ func (h *HTTPServer) getHandler(w http.ResponseWriter, r *http.Request) { + defer r.Body.Close() + if atomic.LoadInt32(&h.running) != 1 { http.Error(w, "Server closed", http.StatusServiceUnavailable) return diff --git a/lib/output/stdout.go b/lib/output/stdout.go index 03dbc28a8a..5a7ea99048 100644 --- a/lib/output/stdout.go +++ b/lib/output/stdout.go @@ -40,7 +40,24 @@ a multipart message [ "foo", "bar", "baz" ] would be written as: foo\n bar\n -baz\n\n`, +baz\n\n + +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above.`, + } +} + +//------------------------------------------------------------------------------ + +// STDOUTConfig is configuration values for the stdout based output type. +type STDOUTConfig struct { + CustomDelim string `json:"custom_delimiter" yaml:"custom_delimiter"` +} + +// NewSTDOUTConfig creates a new STDOUTConfig with default values. +func NewSTDOUTConfig() STDOUTConfig { + return STDOUTConfig{ + CustomDelim: "", } } @@ -48,7 +65,7 @@ baz\n\n`, // NewSTDOUT creates a new STDOUT output type. func NewSTDOUT(conf Config, log log.Modular, stats metrics.Type) (Type, error) { - return newWriter(os.Stdout, log, stats) + return newWriter(os.Stdout, []byte(conf.STDOUT.CustomDelim), log, stats) } //------------------------------------------------------------------------------ diff --git a/lib/output/writer.go b/lib/output/writer.go index b7cfb075bd..3c648b5d21 100644 --- a/lib/output/writer.go +++ b/lib/output/writer.go @@ -34,14 +34,15 @@ import ( //------------------------------------------------------------------------------ -// writer is an output type that pushes messages to a Writer type. +// writer is an output type that pushes messages to a io.WriterCloser type. type writer struct { running int32 - conf Config log log.Modular stats metrics.Type + customDelim []byte + messages <-chan types.Message responseChan chan types.Response @@ -52,11 +53,17 @@ type writer struct { } // newWriter creates a new writer output type. -func newWriter(handle io.WriteCloser, log log.Modular, stats metrics.Type) (Type, error) { +func newWriter( + handle io.WriteCloser, + customDelimiter []byte, + log log.Modular, + stats metrics.Type, +) (Type, error) { return &writer{ running: 1, - log: log.NewModule(".output.file"), + log: log.NewModule(".output.writer"), stats: stats, + customDelim: customDelimiter, messages: nil, responseChan: make(chan types.Response), handle: handle, @@ -76,6 +83,11 @@ func (w *writer) loop() { close(w.closedChan) }() + delim := []byte("\n") + if len(w.customDelim) > 0 { + delim = w.customDelim + } + for atomic.LoadInt32(&w.running) == 1 { var msg types.Message var open bool @@ -90,9 +102,9 @@ func (w *writer) loop() { } var err error if len(msg.Parts) == 1 { - _, err = fmt.Fprintf(w.handle, "%s\n", msg.Parts[0]) + _, err = fmt.Fprintf(w.handle, "%s%s", msg.Parts[0], delim) } else { - _, err = fmt.Fprintf(w.handle, "%s\n\n", bytes.Join(msg.Parts, []byte("\n"))) + _, err = fmt.Fprintf(w.handle, "%s%s%s", bytes.Join(msg.Parts, delim), delim, delim) } if err != nil { w.stats.Incr("output.writer.send.error", 1) diff --git a/lib/output/writer_test.go b/lib/output/writer_test.go index 635a970e08..1ca1cb422a 100644 --- a/lib/output/writer_test.go +++ b/lib/output/writer_test.go @@ -47,7 +47,7 @@ func TestWriterBasic(t *testing.T) { msgChan := make(chan types.Message) - writer, err := newWriter(&buf, log.NewLogger(os.Stdout, logConfig), metrics.DudType{}) + writer, err := newWriter(&buf, []byte{}, log.NewLogger(os.Stdout, logConfig), metrics.DudType{}) if err != nil { t.Error(err) return @@ -114,3 +114,76 @@ func TestWriterBasic(t *testing.T) { t.Error("Buffer was not closed by writer") } } + +func TestWriterCustomDelim(t *testing.T) { + var buf testBuffer + + msgChan := make(chan types.Message) + + writer, err := newWriter(&buf, []byte(""), log.NewLogger(os.Stdout, logConfig), metrics.DudType{}) + if err != nil { + t.Error(err) + return + } + + if err = writer.StartReceiving(msgChan); err != nil { + t.Error(err) + } + if err = writer.StartReceiving(msgChan); err == nil { + t.Error("Expected error from duplicate receiver call") + } + + testCases := []struct { + message []string + expectedOutput string + }{ + { + []string{`hello world`}, + "hello world", + }, + { + []string{`hello world`, `part 2`}, + "hello worldpart 2", + }, + } + + for _, c := range testCases { + msg := types.Message{} + for _, part := range c.message { + msg.Parts = append(msg.Parts, []byte(part)) + } + + select { + case msgChan <- msg: + case <-time.After(time.Second): + t.Error("Timed out sending message") + } + + select { + case res, open := <-writer.ResponseChan(): + if !open { + t.Error("writer closed early") + return + } + if res.Error() != nil { + t.Error(res.Error()) + } + case <-time.After(time.Second): + t.Error("Timed out waiting for response") + } + + if exp, act := c.expectedOutput, buf.String(); exp != act { + t.Errorf("Unexpected output from writer: %v != %v", exp, act) + } + buf.Reset() + } + + writer.CloseAsync() + if err = writer.WaitForClose(time.Second); err != nil { + t.Error(err) + } + + if !buf.closed { + t.Error("Buffer was not closed by writer") + } +} diff --git a/lib/util/oauth/oauth.go b/lib/util/oauth/oauth.go new file mode 100644 index 0000000000..b67bcb107e --- /dev/null +++ b/lib/util/oauth/oauth.go @@ -0,0 +1,96 @@ +package oauth + +import ( + "crypto/hmac" + "crypto/sha1" + "encoding/base64" + "fmt" + "math/rand" + "net/http" + "net/url" + "strconv" + "time" +) + +// ClientConfig holds the configuration parameters for an OAuth exchange. +type ClientConfig struct { + ConsumerKey string `json:"consumer_key" yaml:"consumer_key"` + ConsumerSecret string `json:"consumer_secret" yaml:"consumer_secret"` + AccessToken string `json:"access_token" yaml:"access_token"` + AccessTokenSecret string `json:"access_token_secret" yaml:"access_token_secret"` + RequestURL string `json:"request_url" yaml:"request_url"` + Enabled bool `json:"enabled" yaml:"enabled"` +} + +// NewClientConfig returns a new ClientConfig with default values. +func NewClientConfig() ClientConfig { + return ClientConfig{ + ConsumerKey: "", + ConsumerSecret: "", + AccessToken: "", + AccessTokenSecret: "", + RequestURL: "", + Enabled: false, + } +} + +// Sign method to sign an HTTP request for an OAuth exchange. +func (oauth ClientConfig) Sign(req *http.Request) error { + nonceGenerator := rand.New(rand.NewSource(time.Now().UnixNano())) + nonce := strconv.FormatInt(nonceGenerator.Int63(), 10) + ts := fmt.Sprintf("%d", time.Now().Unix()) + + params := &url.Values{} + params.Add("oauth_consumer_key", oauth.ConsumerKey) + params.Add("oauth_nonce", nonce) + params.Add("oauth_signature_method", "HMAC-SHA1") + params.Add("oauth_timestamp", ts) + params.Add("oauth_token", oauth.AccessToken) + params.Add("oauth_version", "1.0") + + sig, err := oauth.getSignature(req, params) + if err != nil { + return err + } + + str := fmt.Sprintf( + ` oauth_consumer_key="%s", oauth_nonce="%s", oauth_signature="%s",`+ + ` oauth_signature_method="%s", oauth_timestamp="%s",`+ + ` oauth_token="%s", oauth_version="%s"`, + url.QueryEscape(oauth.ConsumerKey), + nonce, + url.QueryEscape(sig), + "HMAC-SHA1", + ts, + url.QueryEscape(oauth.AccessToken), + "1.0", + ) + req.Header.Add("Authorization", str) + + return nil +} + +func (oauth ClientConfig) getSignature( + req *http.Request, + params *url.Values, +) (string, error) { + baseSignatureString := req.Method + "&" + + url.QueryEscape(req.URL.String()) + "&" + + url.QueryEscape(params.Encode()) + + signingKey := url.QueryEscape(oauth.ConsumerSecret) + "&" + + url.QueryEscape(oauth.AccessTokenSecret) + + return oauth.computeHMAC(baseSignatureString, signingKey) +} + +func (oauth ClientConfig) computeHMAC( + message string, + key string, +) (string, error) { + h := hmac.New(sha1.New, []byte(key)) + if _, err := h.Write([]byte(message)); nil != err { + return "", err + } + return base64.StdEncoding.EncodeToString(h.Sum(nil)), nil +} diff --git a/resources/docs/inputs/list.md b/resources/docs/inputs/list.md index 80832d42bd..aa14ae891c 100644 --- a/resources/docs/inputs/list.md +++ b/resources/docs/inputs/list.md @@ -70,6 +70,9 @@ The file type reads input from a file. If multipart is set to false each line is read as a separate message. If multipart is set to true each line is read as a message part, and an empty line indicates the end of a message. +Alternatively, a custom delimiter can be set that is used instead of line +breaks. + ## `http_server` In order to receive messages over HTTP Benthos hosts a server. Messages should @@ -131,3 +134,6 @@ The stdin input simply reads any data piped to stdin as messages. By default the messages are assumed single part and are line delimited. If the multipart option is set to true then lines are interpretted as message parts, and an empty line indicates the end of the message. + +Alternatively, a custom delimiter can be set that is used instead of line +breaks. diff --git a/resources/docs/outputs/list.md b/resources/docs/outputs/list.md index 913484faff..336aef9826 100644 --- a/resources/docs/outputs/list.md +++ b/resources/docs/outputs/list.md @@ -32,6 +32,9 @@ foo\n bar\n baz\n\n +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above. + ## `http_client` The HTTP client output type connects to a server and sends POST requests for @@ -105,3 +108,6 @@ a multipart message [ "foo", "bar", "baz" ] would be written as: foo\n bar\n baz\n\n + +You can alternatively specify a custom delimiter that will follow the same rules +as '\n' above. diff --git a/resources/scripts/create_test_certs.sh b/resources/scripts/create_test_certs.sh new file mode 100755 index 0000000000..a024f4c65a --- /dev/null +++ b/resources/scripts/create_test_certs.sh @@ -0,0 +1,2 @@ +#!/bin/bash +openssl req -newkey rsa:4096 -nodes -sha512 -x509 -days 3650 -nodes -out ./cert.pem -keyout ./key.pem