Skip to content

Commit

Permalink
Generate env vars config
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Jun 28, 2018
1 parent a2a3d1e commit f0b1c0b
Show file tree
Hide file tree
Showing 19 changed files with 1,317 additions and 578 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ All notable changes to this project will be documented in this file.

## Unreleased

## 0.15.0 - 2018-06-28

### Added

- Support for PATCH verb on the streams mode `/streams/{id}` endpoint.

### Changed

- Sweeping changes were made to the environment variable configuration file.
This file is now auto generated along with its supporting document. This
change will impact the docker image.

## 0.14.7 - 2018-06-24

### Added
Expand Down
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ Or, with docker:
``` shell
# Send HTTP /POST data to Kafka:
docker run --rm \
-e "BENTHOS_INPUT=http_server" \
-e "BENTHOS_OUTPUT=kafka" \
-e "KAFKA_OUTPUT_BROKER_ADDRESSES=kafka-server:9092" \
-e "KAFKA_OUTPUT_TOPIC=benthos_topic" \
-e "INPUT_TYPE=http_server" \
-e "OUTPUT_TYPE=kafka" \
-e "OUTPUT_KAFKA_ADDRESSES=kafka-server:9092" \
-e "OUTPUT_KAFKA_TOPIC=benthos_topic" \
-p 4195:4195 \
jeffail/benthos

Expand Down
11 changes: 7 additions & 4 deletions cmd/benthos/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,10 +338,13 @@ func main() {
}

// Create our metrics type.
stats, err := metrics.New(config.Metrics, metrics.OptSetLogger(logger))
if err != nil {
logger.Errorf("Metrics error: %v\n", err)
os.Exit(1)
var stats metrics.Type
var err error
stats, err = metrics.New(config.Metrics, metrics.OptSetLogger(logger))
for err != nil {
logger.Errorf("Failed to connect to metrics aggregator: %v\n", err)
<-time.After(time.Second)
stats, err = metrics.New(config.Metrics, metrics.OptSetLogger(logger))
}
defer stats.Close()

Expand Down
123 changes: 111 additions & 12 deletions cmd/tools/benthos_config_gen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func createJSON(t, path string, sanit interface{}) {
fmt.Printf("Generated '%v' config at: %v\n", t, path)
}

func envify(rootPath string, conf interface{}, paths map[string]interface{}) (newConf interface{}) {
func envify(rootPath string, conf interface{}, paths map[string]string) (newConf interface{}) {
genBytes, err := json.Marshal(conf)
if err != nil {
panic(err)
Expand All @@ -163,6 +163,10 @@ func envify(rootPath string, conf interface{}, paths map[string]interface{}) (ne
panic(err)
}

staticlist := []string{
"INPUT_TYPE",
"OUTPUT_TYPE",
}
blacklist := []string{
"READ_UNTIL",
"CONDITIONAL",
Expand All @@ -188,6 +192,13 @@ func envify(rootPath string, conf interface{}, paths map[string]interface{}) (ne
keyIter:
for k, v := range obj {
newPath := path + "_" + strings.ToUpper(k)
for _, b := range staticlist {
if strings.Contains(newPath, b) {
// Preserve values that hit our staticlist.
newMap[k] = v
continue keyIter
}
}
for _, b := range blacklist {
if strings.Contains(newPath, b) {
// Skip values that hit our blacklist.
Expand Down Expand Up @@ -239,7 +250,7 @@ func envify(rootPath string, conf interface{}, paths map[string]interface{}) (ne
case json.Number:
valStr = t.String()
}
paths[path] = from
paths[path] = valStr
if len(valStr) > 0 {
*to = "${" + path + ":" + valStr + "}"
} else {
Expand All @@ -251,22 +262,110 @@ func envify(rootPath string, conf interface{}, paths map[string]interface{}) (ne
return
}

func formatEnvVars(vars map[string]interface{}) []byte {
func formatEnvVars(vars map[string]string) []byte {
categories := []string{
"HTTP", "INPUT", "BUFFER", "PROCESSOR", "OUTPUT", "LOGGER", "METRICS",
}
priorityVars := []string{
"INPUTS", "PROCESSOR_THREADS", "OUTPUTS", "OUTPUTS_PATTERN",
"INPUT_TYPE", "BUFFER_TYPE", "PROCESSOR_TYPE",
"OUTPUT_TYPE", "METRICS_TYPE",
}

sortedVars := []string{}
for k := range vars {
sortedVars = append(sortedVars, k)
}
sort.Strings(sortedVars)

var buf bytes.Buffer

buf.WriteString(`Environment Config
==================
This document was auto generated by ` + "`benthos_config_gen`" + `.
The environment variables config ` + "`default.yaml`" + ` is an auto generated
Benthos configuration where _all_ fields can be set with environment variables.
The architecture of the config is a standard bridge between N replicated
sources, M replicated sinks and an optional buffer and processing pipeline
between them.
The original intent of this config is to be deployed within a docker image, but
as it is a standard config it can be used in other deployments.
In order to use this config simply define your env vars and point Benthos to it.
For example, to send Kafka data to RabbitMQ you can run:
` + "``` sh" + `
INPUT_TYPE=kafka_balanced \
INPUT_KAFKA_ADDRESSES=localhost:9092 \
INPUT_KAFKA_TOPIC=foo-topic \
INPUT_KAFKA_CONSUMER_GROUP=foo-consumer \
OUTPUT_TYPE=amqp \
OUTPUT_AMQP_URL=amqp://guest:guest@localhost:5672/ \
OUTPUT_AMQP_EXCHANGE=foo-exchange \
OUTPUT_AMQP_EXCHANGE_TYPE=direct \
benthos -c ./config/env/default.yaml
` + "```" + `
All variables within the config are listed in this document.
for _, cat := range categories {
sortedVars := []string{}
for k := range vars {
if strings.HasPrefix(k, cat) {
sortedVars = append(sortedVars, k)
## Contents
`)

for _, section := range categories {
buf.WriteByte('\n')
buf.WriteString("- [" + section + "](#" + section + ")")
}
buf.WriteByte('\n')

for _, section := range categories {
buf.WriteString("\n")
buf.WriteString("## " + section)
buf.WriteString("\n\n```\n")

catVars := []string{}

for _, v := range priorityVars {
if !strings.HasPrefix(v, section) {
continue
}
catVars = append(catVars, v)
}
sortedIter:
for _, v := range sortedVars {
if !strings.HasPrefix(v, section) {
continue
}
for _, v2 := range priorityVars {
if v == v2 {
continue sortedIter
}
}
catVars = append(catVars, v)
}

vMaxLen := 0
for _, v := range catVars {
if len(v) > vMaxLen {
vMaxLen = len(v)
}
}
for _, v := range catVars {
buf.WriteString(v)
if defVal := vars[v]; len(defVal) > 0 {
for i := len(v); i < vMaxLen; i++ {
buf.WriteByte(' ')
}
buf.WriteString(" = " + defVal)
}
buf.WriteByte('\n')
}
sort.Strings(sortedVars)
buf.WriteString("```\n")
}

return []byte{} // TODO
return buf.Bytes()
}

func createEnvConf(configsDir string) {
Expand Down Expand Up @@ -332,7 +431,7 @@ func createEnvConf(configsDir string) {
Metrics: metrics.NewConfig(),
}

pathsMap := map[string]interface{}{}
pathsMap := map[string]string{}
envConf.HTTP = envify("HTTP", envConf.HTTP, pathsMap)
envConf.Input = envify("INPUT", envConf.Input, pathsMap)
envConf.Buffer = envify("BUFFER", envConf.Buffer, pathsMap)
Expand Down Expand Up @@ -420,7 +519,7 @@ func main() {
}

// Create Environment Vars Config
// createEnvConf(configsDir)
createEnvConf(configsDir)
}

//------------------------------------------------------------------------------
Loading

0 comments on commit f0b1c0b

Please sign in to comment.