-
Notifications
You must be signed in to change notification settings - Fork 847
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
289 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
Streams Mode | ||
============ | ||
|
||
A Benthos stream consists of four components; an input, an optional buffer, | ||
processor pipelines and an output. Under normal use a Benthos instance is a | ||
single stream, and these components are configured within the service config | ||
file. | ||
|
||
Alternatively, Benthos can be run in `--streams` mode, where a single running | ||
Benthos instance is able to run multiple entirely isolated streams. Adding | ||
streams in this mode can be done in two ways: | ||
|
||
1. [Static configuration files][static-files] allows you to maintain a directory | ||
of static stream configuration files that will be traversed by Benthos. | ||
|
||
2. An [HTTP REST API][rest-api] allows you to dynamically create, read the | ||
status of, update, and delete streams at runtime. | ||
|
||
These two methods can be use interchangably, i.e. it's possible to update and | ||
delete streams that were created with static files. | ||
|
||
[static-files]: using_config_files.md | ||
[rest-api]: using_REST_API.md |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,241 @@ | ||
Streams Via REST API | ||
==================== | ||
|
||
By using the Benthos `--streams` mode REST API you can dynamically control which | ||
streams are active at runtime. The full spec for the Benthos streams mode REST | ||
API can be [found here][http-interface]. | ||
|
||
Note that stream configs created and updated using this API do *not* benefit | ||
from [environment variable interpolation][interpolation] (function interpolation | ||
will still work). | ||
|
||
## Walkthrough | ||
|
||
Start by running Benthos in streams mode: | ||
|
||
``` bash | ||
$ benthos --streams | ||
``` | ||
|
||
On a separate terminal we can add our first stream `foo` by `POST`ing a JSON or | ||
YAML config to the `/streams/foo` endpoint: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/foo -X POST --data-binary @- <<EOF | ||
input: | ||
type: http_server | ||
buffer: | ||
type: memory | ||
pipeline: | ||
threads: 4 | ||
processors: | ||
- type: jmespath | ||
jmespath: | ||
query: "{id: user.id, content: body.content}" | ||
output: | ||
type: http_server | ||
EOF | ||
``` | ||
|
||
Now we can check the full set of streams loaded by `GET`ing the `/streams` | ||
endpoint: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams | jq '.' | ||
{ | ||
"foo": { | ||
"active": true, | ||
"uptime": 7.223545951, | ||
"uptime_str": "7.223545951s" | ||
} | ||
} | ||
``` | ||
|
||
Good, now let's add another stream `bar` the same way: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/bar -X POST --data-binary @- <<EOF | ||
input: | ||
type: kafka | ||
kafka: | ||
addresses: | ||
- localhost:9092 | ||
topic: my_topic | ||
buffer: | ||
type: none | ||
pipeline: | ||
threads: 1 | ||
processors: | ||
- type: sample | ||
sample: | ||
retain: 10 | ||
output: | ||
type: elasticsearch | ||
elasticsearch: | ||
urls: | ||
- http://localhost:9200 | ||
EOF | ||
``` | ||
|
||
And check the set again: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams | jq '.' | ||
{ | ||
"bar": { | ||
"active": true, | ||
"uptime": 10.121344484, | ||
"uptime_str": "10.121344484s" | ||
}, | ||
"foo": { | ||
"active": true, | ||
"uptime": 19.380582951, | ||
"uptime_str": "19.380583306s" | ||
} | ||
} | ||
``` | ||
|
||
It's also possible to get the configuration of a loaded stream by `GET`ing the | ||
path `/streams/{id}`: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/foo | jq '.' | ||
{ | ||
"active": true, | ||
"uptime": 30.123488951, | ||
"uptime_str": "30.123488951s" | ||
"config": { | ||
"input": { | ||
"type": "http_server", | ||
"http_server": { | ||
"address": "", | ||
"cert_file": "", | ||
"key_file": "", | ||
"path": "/post", | ||
"timeout_ms": 5000 | ||
} | ||
}, | ||
"buffer": { | ||
"type": "memory", | ||
"memory": { | ||
"limit": 10000000 | ||
} | ||
}, | ||
... etc ... | ||
} | ||
} | ||
``` | ||
|
||
Next, we might want to update stream `foo` by `PUT`ing a new config to the path | ||
`/streams/foo`: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/foo -X PUT --data-binary @- <<EOF | ||
input: | ||
type: http_server | ||
buffer: | ||
type: none | ||
pipeline: | ||
threads: 4 | ||
processors: | ||
- type: jmespath | ||
jmespath: | ||
query: "{id: user.id, content: body.content}" | ||
output: | ||
type: http_server | ||
EOF | ||
``` | ||
|
||
We have removed the memory buffer with this change, let's check that the config | ||
has actually been updated: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/foo | jq '.' | ||
{ | ||
"active": true, | ||
"uptime": 12.328482951, | ||
"uptime_str": "12.328482951s" | ||
"config": { | ||
"input": { | ||
"type": "http_server", | ||
"http_server": { | ||
"address": "", | ||
"cert_file": "", | ||
"key_file": "", | ||
"path": "/post", | ||
"timeout_ms": 5000 | ||
} | ||
}, | ||
"buffer": { | ||
"type": "none" | ||
}, | ||
... etc ... | ||
} | ||
} | ||
``` | ||
|
||
Good, we are done with stream `bar` now, so let's delete it by `DELETE`ing the | ||
`/streams/bar` endpoint: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams/bar -X DELETE | ||
``` | ||
|
||
And let's `GET` the `/streams` endpoint to see the new set: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams | jq '.' | ||
{ | ||
"foo": { | ||
"active": true, | ||
"uptime": 31.872448851, | ||
"uptime_str": "31.872448851s" | ||
} | ||
} | ||
``` | ||
|
||
Great. Another useful feature is `POST`ing to `/streams`, this allows us to set | ||
the entire set of streams with a single request. | ||
|
||
The payload is a map of stream ids to configurations and this will become the | ||
exclusive set of active streams. If there are existing streams that are not on | ||
the list they will be removed. | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams -X POST --data-binary @- <<EOF | ||
bar: | ||
input: | ||
type: http_client | ||
http_client: | ||
url: http://localhost:4195/baz/get | ||
output: | ||
type: stdout | ||
baz: | ||
input: | ||
type: http_server | ||
output: | ||
type: http_server | ||
EOF | ||
``` | ||
|
||
Let's check our new set of streams: | ||
|
||
``` bash | ||
$ curl http://localhost:4195/streams | jq '.' | ||
{ | ||
"bar": { | ||
"active": true, | ||
"uptime": 3.183883444, | ||
"uptime_str": "3.183883444s" | ||
}, | ||
"baz": { | ||
"active": true, | ||
"uptime": 3.183883449, | ||
"uptime_str": "3.183883449s" | ||
} | ||
} | ||
``` | ||
|
||
Done. | ||
|
||
[http-interface]: ../api/streams.md |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters