-
Notifications
You must be signed in to change notification settings - Fork 71
/
Copy pathmain.go
176 lines (142 loc) · 4.33 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package main
import (
"bufio"
"context"
"io"
"os"
"os/signal"
"strings"
"syscall"
"github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"golang.org/x/crypto/ssh/terminal"
"github.com/batchcorp/plumber-schemas/build/go/protos/opts"
"github.com/streamdal/plumber/actions"
"github.com/streamdal/plumber/config"
"github.com/streamdal/plumber/kv"
"github.com/streamdal/plumber/options"
"github.com/streamdal/plumber/plumber"
"github.com/streamdal/plumber/printer"
"github.com/streamdal/plumber/prometheus"
)
func main() {
kongCtx, cliOpts, err := options.New(os.Args[1:])
if err != nil {
logrus.Fatalf("Unable to handle CLI input: %s", err)
}
// TODO: STDIN write should be continuous; punting for now.
readFromStdin(cliOpts)
switch {
case cliOpts.Global.Debug:
logrus.SetLevel(logrus.DebugLevel)
case cliOpts.Global.Quiet:
logrus.SetLevel(logrus.ErrorLevel)
}
serviceCtx, serviceShutdownFunc := context.WithCancel(context.Background())
mainShutdownCtx, mainShutdownFunc := context.WithCancel(context.Background())
var k kv.IKV
// If server && cluster mode is enabled, we should instantiate the kv store
// so that the config is stored in NATS instead of disk.
if cliOpts.Global.XAction == "server" && cliOpts.Server.EnableCluster {
var err error
k, err = kv.New(cliOpts.Server)
if err != nil {
logrus.Fatalf("unable to create KV store: %s", err)
}
}
persistentConfig, err := config.New(cliOpts.Server.EnableCluster, k)
if err != nil {
logrus.Fatalf("unable to create persistent config: %s", err)
}
// Save config automatically on exit
defer persistentConfig.Save()
// We only want to intercept interrupt signals in relay or server mode
if cliOpts.Global.XAction == "relay" || cliOpts.Global.XAction == "server" || cliOpts.Global.XAction == "read" {
logrus.Debug("Intercepting signals")
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
signal.Notify(c, syscall.SIGTERM)
go func() {
sig := <-c
logrus.Debugf("Received system call: %+v", sig)
logrus.Info("Shutting down plumber...")
serviceShutdownFunc()
}()
// Create prometheus counters/gauges
prometheus.InitPrometheusMetrics()
}
// TODO: This probably should be updated for server
// Launch a dedicated goroutine if stats display is enabled
if cliOpts.Relay != nil && cliOpts.Relay.StatsEnable {
prometheus.Start(cliOpts.Relay.StatsReportIntervalSec)
}
// Force JSON and don't print logo when NOT in terminal (ie. in container, detached)
if !terminal.IsTerminal(int(os.Stderr.Fd())) {
logrus.SetFormatter(&logrus.JSONFormatter{})
} else {
printer.PrintLogo()
}
// Actions contains server-related methods that are used by both the gRPC
// server and the etcd consumer.
a, err := actions.New(&actions.Config{
PersistentConfig: persistentConfig,
})
if err != nil {
logrus.Fatal(err)
}
p, err := plumber.New(&plumber.Config{
PersistentConfig: persistentConfig,
ServiceShutdownCtx: serviceCtx,
KongCtx: kongCtx,
CLIOptions: cliOpts,
Actions: a,
MainShutdownFunc: mainShutdownFunc,
MainShutdownCtx: mainShutdownCtx,
})
if err != nil {
logrus.Fatal(err)
}
p.Run()
}
// readFromStdin reads data piped into stdin
func readFromStdin(opts *opts.CLIOptions) {
if opts.Global.XAction != "write" {
return
}
info, err := os.Stdin.Stat()
if err != nil {
logrus.Fatal(err)
}
if info.Mode()&os.ModeCharDevice != 0 {
return
}
inputData := make([]string, 0)
reader := bufio.NewReader(os.Stdin)
for {
line, err := reader.ReadBytes('\n')
if err != nil && err == io.EOF {
break
}
inputData = append(inputData, strings.Trim(string(line), "\n"))
}
// Treat input as a JSON array
if opts.Write.XCliOptions.InputAsJsonArray {
opts.Write.XCliOptions.InputStdin = convertJSONInput(inputData[0])
return
}
// Treat input as new object on each line
opts.Write.XCliOptions.InputStdin = inputData
}
// convertJSONInput converts a JSON array to a slice of strings for the writer to consume
func convertJSONInput(value string) []string {
inputData := make([]string, 0)
jsonArray := gjson.Parse(value)
if !jsonArray.IsArray() {
logrus.Fatal("--input-as-json-array option was passed, but input data is not a valid JSON array")
}
jsonArray.ForEach(func(key, value gjson.Result) bool {
inputData = append(inputData, value.Raw)
return true
})
return inputData
}