Skip to content

Commit

Permalink
Add inproc inputs and outputs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jul 4, 2018
1 parent fdf5a7b commit 39df06e
Show file tree
Hide file tree
Showing 23 changed files with 853 additions and 28 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 0.15.4 - 2018-07-04

### Added

- New `http` processor, where payloads can be sent to arbitrary HTTP endpoints
and the result constructed into a new payload.
- New `inproc` inputs and outputs for linking streams together.

## 0.15.3 - 2018-07-03

Expand Down
2 changes: 2 additions & 0 deletions config/env/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ INPUT_HTTP_SERVER_KEY_FILE
INPUT_HTTP_SERVER_PATH = /post
INPUT_HTTP_SERVER_TIMEOUT_MS = 5000
INPUT_HTTP_SERVER_WS_PATH = /post/ws
INPUT_INPROC
INPUT_KAFKA_ADDRESSES = localhost:9092
INPUT_KAFKA_BALANCED_ADDRESSES = localhost:9092
INPUT_KAFKA_BALANCED_CLIENT_ID = benthos_kafka_input
Expand Down Expand Up @@ -304,6 +305,7 @@ OUTPUT_HTTP_SERVER_PATH = /get
OUTPUT_HTTP_SERVER_STREAM_PATH = /get/stream
OUTPUT_HTTP_SERVER_TIMEOUT_MS = 5000
OUTPUT_HTTP_SERVER_WS_PATH = /get/ws
OUTPUT_INPROC
OUTPUT_KAFKA_ACK_REPLICAS = false
OUTPUT_KAFKA_ADDRESSES = localhost:9092
OUTPUT_KAFKA_CLIENT_ID = benthos_kafka_output
Expand Down
2 changes: 2 additions & 0 deletions config/env/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ input:
path: ${INPUT_HTTP_SERVER_PATH:/post}
timeout_ms: ${INPUT_HTTP_SERVER_TIMEOUT_MS:5000}
ws_path: ${INPUT_HTTP_SERVER_WS_PATH:/post/ws}
inproc: ${INPUT_INPROC}
kafka:
addresses:
- ${INPUT_KAFKA_ADDRESSES:localhost:9092}
Expand Down Expand Up @@ -327,6 +328,7 @@ output:
stream_path: ${OUTPUT_HTTP_SERVER_STREAM_PATH:/get/stream}
timeout_ms: ${OUTPUT_HTTP_SERVER_TIMEOUT_MS:5000}
ws_path: ${OUTPUT_HTTP_SERVER_WS_PATH:/get/ws}
inproc: ${OUTPUT_INPROC}
kafka:
ack_replicas: ${OUTPUT_KAFKA_ACK_REPLICAS:false}
addresses:
Expand Down
2 changes: 2 additions & 0 deletions config/everything.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ input:
timeout_ms: 5000
cert_file: ""
key_file: ""
inproc: ""
kafka:
addresses:
- localhost:9092
Expand Down Expand Up @@ -435,6 +436,7 @@ output:
timeout_ms: 5000
cert_file: ""
key_file: ""
inproc: ""
kafka:
addresses:
- localhost:9092
Expand Down
42 changes: 42 additions & 0 deletions config/inproc.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"http": {
"address": "0.0.0.0:4195",
"read_timeout_ms": 5000,
"root_path": "/benthos",
"debug_endpoints": false
},
"input": {
"type": "inproc",
"inproc": ""
},
"buffer": {
"type": "none",
"none": {}
},
"pipeline": {
"processors": [],
"threads": 1
},
"output": {
"type": "inproc",
"inproc": ""
},
"logger": {
"prefix": "benthos",
"log_level": "INFO",
"add_timestamp": true,
"json_format": true
},
"metrics": {
"type": "http_server",
"prefix": "benthos",
"http_server": {},
"prometheus": {},
"statsd": {
"address": "localhost:4040",
"flush_period": "100ms",
"max_packet_size": 1440,
"network": "udp"
}
}
}
33 changes: 33 additions & 0 deletions config/inproc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# This file was auto generated by benthos_config_gen.
http:
address: 0.0.0.0:4195
read_timeout_ms: 5000
root_path: /benthos
debug_endpoints: false
input:
type: inproc
inproc: ""
buffer:
type: none
none: {}
pipeline:
processors: []
threads: 1
output:
type: inproc
inproc: ""
logger:
prefix: benthos
log_level: INFO
add_timestamp: true
json_format: true
metrics:
type: http_server
prefix: benthos
http_server: {}
prometheus: {}
statsd:
address: localhost:4040
flush_period: 100ms
max_packet_size: 1440
network: udp
44 changes: 31 additions & 13 deletions docs/inputs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,20 @@ level which is only applied to messages from the baz input.
7. [`files`](#files)
8. [`http_client`](#http_client)
9. [`http_server`](#http_server)
10. [`kafka`](#kafka)
11. [`kafka_balanced`](#kafka_balanced)
12. [`mqtt`](#mqtt)
13. [`nats`](#nats)
14. [`nats_stream`](#nats_stream)
15. [`nsq`](#nsq)
16. [`read_until`](#read_until)
17. [`redis_list`](#redis_list)
18. [`redis_pubsub`](#redis_pubsub)
19. [`scalability_protocols`](#scalability_protocols)
20. [`stdin`](#stdin)
21. [`websocket`](#websocket)
22. [`zmq4`](#zmq4)
10. [`inproc`](#inproc)
11. [`kafka`](#kafka)
12. [`kafka_balanced`](#kafka_balanced)
13. [`mqtt`](#mqtt)
14. [`nats`](#nats)
15. [`nats_stream`](#nats_stream)
16. [`nsq`](#nsq)
17. [`read_until`](#read_until)
18. [`redis_list`](#redis_list)
19. [`redis_pubsub`](#redis_pubsub)
20. [`scalability_protocols`](#scalability_protocols)
21. [`stdin`](#stdin)
22. [`websocket`](#websocket)
23. [`zmq4`](#zmq4)

## `amazon_s3`

Expand Down Expand Up @@ -348,6 +349,23 @@ which is enabled when key and cert files are specified.
You can leave the 'address' config field blank in order to use the instance wide
HTTP server.

## `inproc`

``` yaml
type: inproc
inproc: ""
```

Directly connect to an output within a Benthos process by referencing it by a
chosen ID. This allows you to hook up isolated streams whilst running Benthos in
[`--streams` mode](../streams/README.md) mode, it is NOT recommended
that you connect the inputs of a stream with an output of the same stream, as
feedback loops can lead to deadlocks in your message flow.

It is possible to connect multiple inputs to the same inproc ID, but only one
output can connect to an inproc ID, and will replace existing outputs if a
collision occurs.

## `kafka`

``` yaml
Expand Down
40 changes: 29 additions & 11 deletions docs/outputs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ conditions please [read the docs here](../conditions/README.md)
8. [`files`](#files)
9. [`http_client`](#http_client)
10. [`http_server`](#http_server)
11. [`kafka`](#kafka)
12. [`mqtt`](#mqtt)
13. [`nats`](#nats)
14. [`nats_stream`](#nats_stream)
15. [`nsq`](#nsq)
16. [`redis_list`](#redis_list)
17. [`redis_pubsub`](#redis_pubsub)
18. [`scalability_protocols`](#scalability_protocols)
19. [`stdout`](#stdout)
20. [`websocket`](#websocket)
21. [`zmq4`](#zmq4)
11. [`inproc`](#inproc)
12. [`kafka`](#kafka)
13. [`mqtt`](#mqtt)
14. [`nats`](#nats)
15. [`nats_stream`](#nats_stream)
16. [`nsq`](#nsq)
17. [`redis_list`](#redis_list)
18. [`redis_pubsub`](#redis_pubsub)
19. [`scalability_protocols`](#scalability_protocols)
20. [`stdout`](#stdout)
21. [`websocket`](#websocket)
22. [`zmq4`](#zmq4)

## `amazon_s3`

Expand Down Expand Up @@ -332,6 +333,23 @@ You can receive a single, discrete message on the configured 'path' endpoint, or
receive a constant stream of line delimited messages on the configured
'stream_path' endpoint.

## `inproc`

``` yaml
type: inproc
inproc: ""
```

Sends data directly to inputs by connecting to a unique ID. This allows you to
hook up isolated streams whilst running Benthos in
[`--streams` mode](../streams/README.md) mode, it is NOT recommended
that you connect the inputs of a stream with an output of the same stream, as
feedback loops can lead to deadlocks in your message flow.

It is possible to connect multiple inputs to the same inproc ID, but only one
output can connect to an inproc ID, and will replace existing outputs if a
collision occurs.

## `kafka`

``` yaml
Expand Down
2 changes: 2 additions & 0 deletions lib/input/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Config struct {
Files reader.FilesConfig `json:"files" yaml:"files"`
HTTPClient HTTPClientConfig `json:"http_client" yaml:"http_client"`
HTTPServer HTTPServerConfig `json:"http_server" yaml:"http_server"`
Inproc InprocConfig `json:"inproc" yaml:"inproc"`
Kafka reader.KafkaConfig `json:"kafka" yaml:"kafka"`
KafkaBalanced reader.KafkaBalancedConfig `json:"kafka_balanced" yaml:"kafka_balanced"`
MQTT reader.MQTTConfig `json:"mqtt" yaml:"mqtt"`
Expand Down Expand Up @@ -100,6 +101,7 @@ func NewConfig() Config {
Files: reader.NewFilesConfig(),
HTTPClient: NewHTTPClientConfig(),
HTTPServer: NewHTTPServerConfig(),
Inproc: NewInprocConfig(),
Kafka: reader.NewKafkaConfig(),
KafkaBalanced: reader.NewKafkaBalancedConfig(),
MQTT: reader.NewMQTTConfig(),
Expand Down
Loading

0 comments on commit 39df06e

Please sign in to comment.