Skip to content

Commit

Permalink
Add http_client input
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jan 28, 2018
1 parent f25d601 commit 0c11725
Show file tree
Hide file tree
Showing 20 changed files with 1,930 additions and 406 deletions.
207 changes: 116 additions & 91 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,104 +10,129 @@ and [here][1] respectively.
The full list of variables and their default values:

``` sh
BENTHOS_HTTP_ADDRESS = 0.0.0.0:4195
BENTHOS_INPUT =
BENTHOS_OUTPUT =
BENTHOS_LOG_LEVEL = INFO

METRICS_TYPE = http_server
STATSD_ADDRESS = localhost:9125
STATSD_NETWORK = udp

BENTHOS_MAX_PARTS = 100
BENTHOS_MIN_PARTS = 1
BENTHOS_MAX_PART_SIZE = 1073741824
BENTHOS_INPUT_PROCESSOR = noop
BENTHOS_OUTPUT_PROCESSOR = noop

ZMQ_INPUT_URLS =
ZMQ_INPUT_BIND = true
ZMQ_INPUT_SOCKET = PULL
ZMQ_OUTPUT_URLS =
ZMQ_OUTPUT_BIND = true
ZMQ_OUTPUT_SOCKET = PULL

SCALE_PROTO_INPUT_URLS =
SCALE_PROTO_INPUT_BIND = true
SCALE_PROTO_INPUT_SOCKET = PULL
SCALE_PROTO_OUTPUT_URLS =
SCALE_PROTO_OUTPUT_BIND = false
SCALE_PROTO_OUTPUT_SOCKET = PUSH

KAFKA_INPUT_BROKER_ADDRESSES =
KAFKA_INPUT_CLIENT_ID = benthos-client
KAFKA_INPUT_CONSUMER_GROUP = benthos-group
KAFKA_INPUT_TOPIC = benthos-stream
KAFKA_INPUT_PARTITION = 0
KAFKA_INPUT_START_OLDEST = true
KAFKA_OUTPUT_BROKER_ADDRESSES =
KAFKA_OUTPUT_CLIENT_ID = benthos-client
KAFKA_OUTPUT_CONSUMER_GROUP = benthos-group
KAFKA_OUTPUT_TOPIC = benthos-stream
KAFKA_OUTPUT_PARTITION = 0
KAFKA_OUTPUT_MAX_MSG_BYTES = 1000000
KAFKA_OUTPUT_START_OLDEST = true
KAFKA_OUTPUT_ACK_REP = true

AMQP_INPUT_URL =
AMQP_INPUT_EXCHANGE = benthos-exchange
AMQP_INPUT_EXCHANGE_TYPE = direct
AMQP_INPUT_QUEUE = benthos-stream
AMQP_INPUT_KEY = benthos-key
AMQP_INPUT_CONSUMER_TAG = benthos-consumer
AMQP_OUTPUT_URL =
AMQP_OUTPUT_EXCHANGE = benthos-exchange
AMQP_OUTPUT_EXCHANGE_TYPE = direct
AMQP_OUTPUT_QUEUE = benthos-stream
AMQP_OUTPUT_KEY = benthos-key
AMQP_OUTPUT_CONSUMER_TAG = benthos-consumer

NSQD_INPUT_TCP_ADDRESSES =
NSQD_INPUT_LOOKUP_ADDRESSES =
NSQ_INPUT_TOPIC = benthos-messages
NSQ_INPUT_CHANNEL = benthos-stream
NSQ_INPUT_USER_AGENT = benthos-consumer
NSQ_OUTPUT_TCP_ADDRESS =
NSQ_OUTPUT_TOPIC = benthos-messages
NSQ_OUTPUT_CHANNEL = benthos-stream
NSQ_OUTPUT_USER_AGENT = benthos-consumer

NATS_INPUT_URLS =
NATS_INPUT_SUBJECT = benthos-stream
NATS_INPUT_CLUSTER_ID = benthos-cluster # Used only for nats_stream
NATS_INPUT_CLIENT_ID = benthos-consumer # ^
NATS_INPUT_QUEUE = benthos-queue # ^
NATS_INPUT_DURABLE_NAME = benthos-offset # ^
NATS_OUTPUT_URLS =
NATS_OUTPUT_SUBJECT = benthos-stream
NATS_OUTPUT_CLUSTER_ID = benthos-cluster # Used only for nats_stream
NATS_OUTPUT_CLIENT_ID = benthos-consumer # ^

REDIS_INPUT_URL =
REDIS_INPUT_CHANNEL = benthos-stream
REDIS_OUTPUT_URL =
REDIS_OUTPUT_CHANNEL = benthos-stream
BENTHOS_HTTP_ADDRESS = 0.0.0.0:4195
BENTHOS_INPUT =
BENTHOS_OUTPUT =
BENTHOS_LOG_LEVEL = INFO

METRICS_TYPE = http_server
STATSD_ADDRESS = localhost:9125
STATSD_NETWORK = udp

BENTHOS_MAX_PARTS = 100
BENTHOS_MIN_PARTS = 1
BENTHOS_MAX_PART_SIZE = 1073741824
BENTHOS_INPUT_PROCESSOR = noop
BENTHOS_OUTPUT_PROCESSOR = noop

ZMQ_INPUT_URLS =
ZMQ_INPUT_BIND = true
ZMQ_INPUT_SOCKET = PULL
ZMQ_OUTPUT_URLS =
ZMQ_OUTPUT_BIND = true
ZMQ_OUTPUT_SOCKET = PULL

SCALE_PROTO_INPUT_URLS =
SCALE_PROTO_INPUT_BIND = true
SCALE_PROTO_INPUT_SOCKET = PULL
SCALE_PROTO_OUTPUT_URLS =
SCALE_PROTO_OUTPUT_BIND = false
SCALE_PROTO_OUTPUT_SOCKET = PUSH

KAFKA_INPUT_BROKER_ADDRESSES =
KAFKA_INPUT_CLIENT_ID = benthos-client
KAFKA_INPUT_CONSUMER_GROUP = benthos-group
KAFKA_INPUT_TOPIC = benthos-stream
KAFKA_INPUT_PARTITION = 0
KAFKA_INPUT_START_OLDEST = true
KAFKA_OUTPUT_BROKER_ADDRESSES =
KAFKA_OUTPUT_CLIENT_ID = benthos-client
KAFKA_OUTPUT_CONSUMER_GROUP = benthos-group
KAFKA_OUTPUT_TOPIC = benthos-stream
KAFKA_OUTPUT_PARTITION = 0
KAFKA_OUTPUT_MAX_MSG_BYTES = 1000000
KAFKA_OUTPUT_START_OLDEST = true
KAFKA_OUTPUT_ACK_REP = true

AMQP_INPUT_URL =
AMQP_INPUT_EXCHANGE = benthos-exchange
AMQP_INPUT_EXCHANGE_TYPE = direct
AMQP_INPUT_QUEUE = benthos-stream
AMQP_INPUT_KEY = benthos-key
AMQP_INPUT_CONSUMER_TAG = benthos-consumer
AMQP_OUTPUT_URL =
AMQP_OUTPUT_EXCHANGE = benthos-exchange
AMQP_OUTPUT_EXCHANGE_TYPE = direct
AMQP_OUTPUT_QUEUE = benthos-stream
AMQP_OUTPUT_KEY = benthos-key
AMQP_OUTPUT_CONSUMER_TAG = benthos-consumer

NSQD_INPUT_TCP_ADDRESSES =
NSQD_INPUT_LOOKUP_ADDRESSES =
NSQ_INPUT_TOPIC = benthos-messages
NSQ_INPUT_CHANNEL = benthos-stream
NSQ_INPUT_USER_AGENT = benthos-consumer
NSQ_OUTPUT_TCP_ADDRESS =
NSQ_OUTPUT_TOPIC = benthos-messages
NSQ_OUTPUT_CHANNEL = benthos-stream
NSQ_OUTPUT_USER_AGENT = benthos-consumer

NATS_INPUT_URLS =
NATS_INPUT_SUBJECT = benthos-stream
NATS_INPUT_CLUSTER_ID = benthos-cluster # Used only for nats_stream
NATS_INPUT_CLIENT_ID = benthos-consumer # ^
NATS_INPUT_QUEUE = benthos-queue # ^
NATS_INPUT_DURABLE_NAME = benthos-offset # ^
NATS_OUTPUT_URLS =
NATS_OUTPUT_SUBJECT = benthos-stream
NATS_OUTPUT_CLUSTER_ID = benthos-cluster # Used only for nats_stream
NATS_OUTPUT_CLIENT_ID = benthos-consumer # ^

REDIS_INPUT_URL =
REDIS_INPUT_CHANNEL = benthos-stream
REDIS_OUTPUT_URL =
REDIS_OUTPUT_CHANNEL = benthos-stream

HTTP_SERVER_INPUT_ADDRESS =
HTTP_SERVER_INPUT_PATH = /post
HTTP_SERVER_OUTPUT_ADDRESS =
HTTP_SERVER_OUTPUT_PATH = /get
HTTP_SERVER_OUTPUT_STREAM_PATH = /get/stream

HTTP_CLIENT_INPUT_URL =
HTTP_CLIENT_OUTPUT_URL =

FILE_INPUT_PATH =
FILE_INPUT_MULTIPART = false
FILE_INPUT_MAX_BUFFER = 65536
FILE_OUTPUT_PATH =
FILE_OUTPUT_MULTIPART = false
FILE_OUTPUT_MAX_BUFFER = 65536
HTTP_CLIENT_INPUT_URL =
HTTP_CLIENT_INPUT_VERB = GET
HTTP_CLIENT_INPUT_PAYLOAD =
HTTP_CLIENT_INPUT_CONTENT_TYPE = application/octet-stream
HTTP_CLIENT_INPUT_STREAM = false
HTTP_CLIENT_INPUT_STREAM_MULTIPART = false
HTTP_CLIENT_INPUT_STREAM_MAX_BUFFER = 65536
HTTP_CLIENT_INPUT_STREAM_DELIMITER =
HTTP_CLIENT_INPUT_OAUTH_KEY =
HTTP_CLIENT_INPUT_OAUTH_SECRET =
HTTP_CLIENT_INPUT_OAUTH_TOKEN =
HTTP_CLIENT_INPUT_OAUTH_TOKEN_SECRET =
HTTP_CLIENT_INPUT_OAUTH_URL =
HTTP_CLIENT_INPUT_OAUTH_ENABLED = false
HTTP_CLIENT_INPUT_TIMEOUT_MS = 5000
HTTP_CLIENT_INPUT_SKIP_CERT_VERIFY = false
HTTP_CLIENT_OUTPUT_URL =
HTTP_CLIENT_OUTPUT_VERB = POST
HTTP_CLIENT_OUTPUT_CONTENT_TYPE = application/octet-stream
HTTP_CLIENT_OUTPUT_OAUTH_KEY =
HTTP_CLIENT_OUTPUT_OAUTH_SECRET =
HTTP_CLIENT_OUTPUT_OAUTH_TOKEN =
HTTP_CLIENT_OUTPUT_OAUTH_TOKEN_SECRET =
HTTP_CLIENT_OUTPUT_OAUTH_URL =
HTTP_CLIENT_OUTPUT_OAUTH_ENABLED = false
HTTP_CLIENT_OUTPUT_TIMEOUT_MS = 5000
HTTP_CLIENT_OUTPUT_SKIP_CERT_VERIFY = false

FILE_INPUT_PATH =
FILE_INPUT_MULTIPART = false
FILE_INPUT_MAX_BUFFER = 65536
FILE_OUTPUT_PATH =
FILE_OUTPUT_MULTIPART = false
FILE_OUTPUT_MAX_BUFFER = 65536
```

[0]: ../../resources/docs/inputs/list.md
Expand Down
29 changes: 29 additions & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,24 @@ input:
http_server:
address: ${HTTP_SERVER_INPUT_ADDRESS}
path: ${HTTP_SERVER_INPUT_PATH:/post}
http_client:
url: ${HTTP_CLIENT_INPUT_URL}
verb: ${HTTP_CLIENT_INPUT_VERB:GET}
payload: ${HTTP_CLIENT_INPUT_PAYLOAD}
content_type: ${HTTP_CLIENT_INPUT_CONTENT_TYPE:application/octet-stream}
stream: ${HTTP_CLIENT_INPUT_STREAM:false}
stream_multipart: ${HTTP_CLIENT_INPUT_STREAM_MULTIPART:false}
stream_max_buffer: ${HTTP_CLIENT_INPUT_STREAM_MAX_BUFFER:65536}
stream_custom_delimiter: ${HTTP_CLIENT_INPUT_STREAM_DELIMITER}
oauth:
consumer_key: ${HTTP_CLIENT_INPUT_OAUTH_KEY}
consumer_secret: ${HTTP_CLIENT_INPUT_OAUTH_SECRET}
access_token: ${HTTP_CLIENT_INPUT_OAUTH_TOKEN}
access_token_secret: ${HTTP_CLIENT_INPUT_OAUTH_TOKEN_SECRET}
request_url: ${HTTP_CLIENT_INPUT_OAUTH_URL}
enabled: ${HTTP_CLIENT_INPUT_OAUTH_ENABLED:false}
timeout_ms: ${HTTP_CLIENT_INPUT_TIMEOUT_MS:5000}
skip_cert_verify: ${HTTP_CLIENT_INPUT_SKIP_CERT_VERIFY:false}
zmq4:
urls:
- ${ZMQ_INPUT_URLS}
Expand Down Expand Up @@ -94,6 +112,17 @@ output:
type: ${BENTHOS_OUTPUT:http_server}
http_client:
url: ${HTTP_CLIENT_OUTPUT_URL}
verb: ${HTTP_CLIENT_OUTPUT_VERB:POST}
content_type: ${HTTP_CLIENT_OUTPUT_CONTENT_TYPE:application/octet-stream}
oauth:
consumer_key: ${HTTP_CLIENT_OUTPUT_OAUTH_KEY}
consumer_secret: ${HTTP_CLIENT_OUTPUT_OAUTH_SECRET}
access_token: ${HTTP_CLIENT_OUTPUT_OAUTH_TOKEN}
access_token_secret: ${HTTP_CLIENT_OUTPUT_OAUTH_TOKEN_SECRET}
request_url: ${HTTP_CLIENT_OUTPUT_OAUTH_URL}
enabled: ${HTTP_CLIENT_OUTPUT_OAUTH_ENABLED:false}
timeout_ms: ${HTTP_CLIENT_OUTPUT_TIMEOUT_MS:5000}
skip_cert_verify: ${HTTP_CLIENT_OUTPUT_SKIP_CERT_VERIFY:false}
http_server:
address: ${HTTP_SERVER_OUTPUT_ADDRESS}
path: ${HTTP_SERVER_OUTPUT_PATH:/get}
Expand Down
24 changes: 23 additions & 1 deletion config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,26 @@ input:
timeout_ms: 5000
cert_file: ""
key_file: ""
http_client:
url: http://localhost:4195/get/stream
verb: GET
payload: ""
content_type: application/octet-stream
stream: false
stream_multipart: false
stream_max_buffer: 65536
stream_custom_delimiter: ""
oauth:
consumer_key: ""
consumer_secret: ""
access_token: ""
access_token_secret: ""
request_url: ""
enabled: false
timeout_ms: 5000
retry_period_ms: 1000
retries: 3
skip_cert_verify: false
scalability_protocols:
urls:
- tcp://*:5555
Expand Down Expand Up @@ -116,6 +136,8 @@ output:
type: stdout
http_client:
url: http://localhost:8081/post
verb: POST
content_type: application/octet-stream
oauth:
consumer_key: ""
consumer_secret: ""
Expand All @@ -130,7 +152,7 @@ output:
http_server:
address: ""
path: /get
stream_path: /stream
stream_path: /get/stream
timeout_ms: 5000
cert_file: ""
key_file: ""
Expand Down
2 changes: 2 additions & 0 deletions lib/input/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ var constructors = map[string]typeSpec{}
type Config struct {
Type string `json:"type" yaml:"type"`
HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"`
ScaleProto ScaleProtoConfig `json:"scalability_protocols" yaml:"scalability_protocols"`
ZMQ4 *ZMQ4Config `json:"zmq4,omitempty" yaml:"zmq4,omitempty"`
Kafka KafkaConfig `json:"kafka" yaml:"kafka"`
Expand All @@ -76,6 +77,7 @@ func NewConfig() Config {
return Config{
Type: "stdin",
HTTPServer: NewHTTPServerConfig(),
HTTPClient: NewHTTPClientConfig(),
ScaleProto: NewScaleProtoConfig(),
ZMQ4: NewZMQ4Config(),
Kafka: NewKafkaConfig(),
Expand Down
26 changes: 19 additions & 7 deletions lib/input/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package input

import (
"bufio"
"io"
"os"

"github.com/Jeffail/benthos/lib/util/service/log"
Expand Down Expand Up @@ -71,16 +72,27 @@ func NewFile(conf Config, log log.Modular, stats metrics.Type) (Type, error) {
if err != nil {
return nil, err
}
delim := []byte("\n")
delim := "\n"
if len(conf.File.CustomDelim) > 0 {
delim = []byte(conf.File.CustomDelim)
delim = conf.File.CustomDelim
}
return newReader(
file,
conf.File.MaxBuffer,
conf.File.Multipart,
delim,
return NewLineReader(
"file",
func() (io.Reader, error) {
// Swap so this only works once since we don't want to read the file
// multiple times.
if file == nil {
return nil, io.EOF
}
sendFile := file
file = nil
return sendFile, nil
},
func() {},
log, stats,
OptLineReaderSetDelimiter(delim),
OptLineReaderSetMaxBuffer(conf.File.MaxBuffer),
OptLineReaderSetMultipart(conf.File.Multipart),
)
}

Expand Down
Loading

0 comments on commit 0c11725

Please sign in to comment.