Skip to content

Commit 0d69657

Browse files
authored
support multiple configurations per collector (#441)
* Add configuration id to backend name and configuration path This enables us to use the same backend with multiple configurations * Support multiple configs per assignment Expand each assignment and backend into a unique ID that combines the backendID + configID Most of the stores us a map with this Id that holds everything together. * Replace GetBackendById with simpler GetBackend method * Cleanup * Keep using old assignment struct Multiple configs can still be assigned by using multiple assignments. E.g.: [ { "collector-1": "config-A" }, { "collector-1": "config-B" } ] * Report collector config directory to the server This will be used to create the ${sidecar.spoolDir} config variable. * Only report CollectorConfigurationDirectory to newer Graylog instances Older Graylog servers will refuse unknown NodeDetails attributes Also update go module to version 1.19 * Make collector status ID backwards compatible If we are talking to an older Graylog instance, report the collectors with just the collector ID. Refactor version handling into separate type * Fix and refactor version comparison * Ignore sleep on the first periodical loop Speeds up testing the sidecar * fix benchmarks * build with go 1.19 * fix jeninks file * Fix null pointer deref. The recent code changes triggered an old exisiting bug, because we would only keep a backend list of actually assigned backends. * Prefer pointer receivers * Send tags and collector config regardless of send_status * Remove configuration_file_name support. Refs Graylog2/graylog2-server#5278 * Add configuration_id to each collector status request Newer Graylog servers can use it to differentiate multiple configs on one collector. Older servers won't see this field for backwards compatibility. * fix runner comparision
1 parent b1f2f9b commit 0d69657

15 files changed

+177
-92
lines changed

api/graylog.go

+36-7
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,26 @@ var (
4040
configurationOverride = false
4141
)
4242

43+
func GetServerVersion(httpClient *http.Client, ctx *context.Ctx) (*GraylogVersion, error) {
44+
// In case of an error just assume 4.0.0
45+
fallbackVersion, _ := NewGraylogVersion("4.0.0")
46+
47+
c := rest.NewClient(httpClient, ctx)
48+
c.BaseURL = ctx.ServerUrl
49+
r, err := c.NewRequest("GET", "/", nil, nil)
50+
if err != nil {
51+
log.Errorf("Cannot retrieve server version %v", err)
52+
return fallbackVersion, err
53+
}
54+
versionResponse := graylog.ServerVersionResponse{}
55+
resp, err := c.Do(r, &versionResponse)
56+
if err != nil || resp == nil {
57+
log.Errorf("Error fetching server version %v", err)
58+
return fallbackVersion, err
59+
}
60+
return NewGraylogVersion(versionResponse.Version)
61+
}
62+
4363
func RequestBackendList(httpClient *http.Client, checksum string, ctx *context.Ctx) (graylog.ResponseBackendList, error) {
4464
c := rest.NewClient(httpClient, ctx)
4565
c.BaseURL = ctx.ServerUrl
@@ -137,15 +157,14 @@ func RequestConfiguration(
137157
return configurationResponse, nil
138158
}
139159

140-
func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) {
160+
func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.Ctx, serverVersion *GraylogVersion, status *graylog.StatusRequest) (graylog.ResponseCollectorRegistration, error) {
141161
c := rest.NewClient(httpClient, ctx)
142162
c.BaseURL = ctx.ServerUrl
143163

144164
registration := graylog.RegistrationRequest{}
145165

146166
registration.NodeName = ctx.UserConfig.NodeName
147167
registration.NodeDetails.OperatingSystem = common.GetSystemName()
148-
registration.NodeDetails.Tags = ctx.UserConfig.Tags
149168

150169
if ctx.UserConfig.SendStatus {
151170
metrics := &graylog.MetricsRequest{
@@ -173,6 +192,10 @@ func UpdateRegistration(httpClient *http.Client, checksum string, ctx *context.C
173192
}
174193
}
175194
}
195+
if serverVersion.SupportsExtendedNodeDetails() {
196+
registration.NodeDetails.CollectorConfigurationDirectory = ctx.UserConfig.CollectorConfigurationDirectory
197+
registration.NodeDetails.Tags = ctx.UserConfig.Tags
198+
}
176199

177200
r, err := c.NewRequest("PUT", "/sidecars/"+ctx.NodeId, nil, registration)
178201
if checksum != "" {
@@ -255,18 +278,24 @@ func GetTlsConfig(ctx *context.Ctx) *tls.Config {
255278
return tlsConfig
256279
}
257280

258-
func NewStatusRequest() graylog.StatusRequest {
281+
func NewStatusRequest(serverVersion *GraylogVersion) graylog.StatusRequest {
259282
statusRequest := graylog.StatusRequest{Backends: make([]graylog.StatusRequestBackend, 0)}
260283
combinedStatus := backends.StatusUnknown
261284
runningCount, stoppedCount, errorCount := 0, 0, 0
262285

263286
for id, runner := range daemon.Daemon.Runner {
287+
collectorId := strings.Split(id, "-")[0]
288+
configurationId := ""
289+
if serverVersion.SupportsMultipleBackends() {
290+
configurationId = strings.Split(id, "-")[1]
291+
}
264292
backendStatus := runner.GetBackend().Status()
265293
statusRequest.Backends = append(statusRequest.Backends, graylog.StatusRequestBackend{
266-
Id: id,
267-
Status: backendStatus.Status,
268-
Message: backendStatus.Message,
269-
VerboseMessage: backendStatus.VerboseMessage,
294+
CollectorId: collectorId,
295+
ConfigurationId: configurationId,
296+
Status: backendStatus.Status,
297+
Message: backendStatus.Message,
298+
VerboseMessage: backendStatus.VerboseMessage,
270299
})
271300
switch backendStatus.Status {
272301
case backends.StatusRunning:

api/graylog/requests.go

+12-10
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,21 @@ type RegistrationRequest struct {
2525
}
2626

2727
type NodeDetailsRequest struct {
28-
OperatingSystem string `json:"operating_system"`
29-
IP string `json:"ip,omitempty"`
30-
LogFileList []common.File `json:"log_file_list,omitempty"`
31-
Metrics *MetricsRequest `json:"metrics,omitempty"`
32-
Status *StatusRequest `json:"status,omitempty"`
33-
Tags []string `json:"tags,omitempty"`
28+
OperatingSystem string `json:"operating_system"`
29+
IP string `json:"ip,omitempty"`
30+
LogFileList []common.File `json:"log_file_list,omitempty"`
31+
Metrics *MetricsRequest `json:"metrics,omitempty"`
32+
Status *StatusRequest `json:"status,omitempty"`
33+
CollectorConfigurationDirectory string `json:"collector_configuration_directory,omitempty"`
34+
Tags []string `json:"tags,omitempty"`
3435
}
3536

3637
type StatusRequestBackend struct {
37-
Id string `json:"collector_id"`
38-
Status int `json:"status"`
39-
Message string `json:"message"`
40-
VerboseMessage string `json:"verbose_message"`
38+
CollectorId string `json:"collector_id"`
39+
ConfigurationId string `json:"configuration_id,omitempty"`
40+
Status int `json:"status"`
41+
Message string `json:"message"`
42+
VerboseMessage string `json:"verbose_message"`
4143
}
4244

4345
type StatusRequest struct {

api/graylog/responses.go

+13-8
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,20 @@ type ResponseBackendList struct {
4242
NotModified bool
4343
}
4444

45+
type ServerVersionResponse struct {
46+
ClusterId string `json:"cluster_id"`
47+
NodeId string `json:"node_id"`
48+
Version string `json:"version"`
49+
}
50+
4551
type ResponseCollectorBackend struct {
46-
Id string `json:"id"`
47-
Name string `json:"name"`
48-
ServiceType string `json:"service_type"`
49-
OperatingSystem string `json:"node_operating_system"`
50-
ExecutablePath string `json:"executable_path"`
51-
ConfigurationFileName string `json:"configuration_file_name"`
52-
ExecuteParameters string `json:"execute_parameters"`
53-
ValidationParameters string `json:"validation_parameters"`
52+
Id string `json:"id"`
53+
Name string `json:"name"`
54+
ServiceType string `json:"service_type"`
55+
OperatingSystem string `json:"node_operating_system"`
56+
ExecutablePath string `json:"executable_path"`
57+
ExecuteParameters string `json:"execute_parameters"`
58+
ValidationParameters string `json:"validation_parameters"`
5459
}
5560

5661
type ResponseCollectorConfiguration struct {

api/version.go

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package api
2+
3+
import "github.com/hashicorp/go-version"
4+
5+
type GraylogVersion struct {
6+
*version.Version
7+
}
8+
9+
func NewGraylogVersion(v string) (*GraylogVersion, error) {
10+
newVersion, err := version.NewVersion(v)
11+
if err != nil {
12+
return nil, err
13+
}
14+
return &GraylogVersion{newVersion}, nil
15+
}
16+
17+
func (v *GraylogVersion) SupportsMultipleBackends() bool {
18+
// cannot use version.Constraints because of a bug in comparing pre-releases
19+
return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4
20+
}
21+
22+
func (v *GraylogVersion) SupportsExtendedNodeDetails() bool {
23+
// cannot use version.Constraints because of a bug in comparing pre-releases
24+
return v.Version.Segments()[0] >= 4 && v.Version.Segments()[1] >= 4
25+
}

assignments/assignment.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import (
2121
)
2222

2323
var (
24-
// global store of configuration assignments, [backendId]ConfigurationId
24+
// global store of configuration assignments, [backendId-configurationID]ConfigurationId
2525
Store = &assignmentStore{make(map[string]string)}
2626
)
2727

@@ -34,9 +34,9 @@ type ConfigurationAssignment struct {
3434
ConfigurationId string `json:"configuration_id"`
3535
}
3636

37-
func (as *assignmentStore) SetAssignment(assignment *ConfigurationAssignment) {
38-
if as.assignments[assignment.BackendId] != assignment.ConfigurationId {
39-
as.assignments[assignment.BackendId] = assignment.ConfigurationId
37+
func (as *assignmentStore) SetAssignment(backendId string, configId string) {
38+
if as.assignments[backendId] != configId {
39+
as.assignments[backendId] = configId
4040
}
4141
}
4242

@@ -60,16 +60,28 @@ func (as *assignmentStore) AssignedBackendIds() []string {
6060
return result
6161
}
6262

63+
func expandAssignments(assignments []ConfigurationAssignment) map[string]string {
64+
expandedAssignments := make(map[string]string)
65+
66+
for _, assignment := range assignments {
67+
configId := assignment.ConfigurationId
68+
expandedAssignments[assignment.BackendId+"-"+configId] = configId
69+
}
70+
return expandedAssignments
71+
}
72+
6373
func (as *assignmentStore) Update(assignments []ConfigurationAssignment) bool {
74+
expandedAssignments := expandAssignments(assignments)
75+
6476
beforeUpdate := make(map[string]string)
6577
for k, v := range as.assignments {
6678
beforeUpdate[k] = v
6779
}
68-
if len(assignments) != 0 {
80+
if len(expandedAssignments) != 0 {
6981
var activeIds []string
70-
for _, assignment := range assignments {
71-
Store.SetAssignment(&assignment)
72-
activeIds = append(activeIds, assignment.BackendId)
82+
for backendId, assignment := range expandedAssignments {
83+
Store.SetAssignment(backendId, assignment)
84+
activeIds = append(activeIds, backendId)
7385
}
7486
Store.cleanup(activeIds)
7587
} else {

backends/backend.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ import (
3535
type Backend struct {
3636
Enabled *bool
3737
Id string
38+
ConfigId string
3839
Name string
3940
ServiceType string
4041
OperatingSystem string
@@ -46,27 +47,24 @@ type Backend struct {
4647
backendStatus system.VerboseStatus
4748
}
4849

49-
func BackendFromResponse(response graylog.ResponseCollectorBackend, ctx *context.Ctx) *Backend {
50+
func BackendFromResponse(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) *Backend {
5051
return &Backend{
5152
Enabled: common.NewTrue(),
52-
Id: response.Id,
53-
Name: response.Name,
53+
Id: response.Id + "-" + configId,
54+
ConfigId: configId,
55+
Name: response.Name + "-" + configId,
5456
ServiceType: response.ServiceType,
5557
OperatingSystem: response.OperatingSystem,
5658
ExecutablePath: response.ExecutablePath,
57-
ConfigurationPath: BuildConfigurationPath(response, ctx),
59+
ConfigurationPath: BuildConfigurationPath(response, configId, ctx),
5860
ExecuteParameters: response.ExecuteParameters,
5961
ValidationParameters: response.ValidationParameters,
6062
backendStatus: system.VerboseStatus{},
6163
}
6264
}
6365

64-
func BuildConfigurationPath(response graylog.ResponseCollectorBackend, ctx *context.Ctx) string {
65-
if response.ConfigurationFileName != "" {
66-
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.ConfigurationFileName)
67-
} else {
68-
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, response.Name+".conf")
69-
}
66+
func BuildConfigurationPath(response graylog.ResponseCollectorBackend, configId string, ctx *context.Ctx) string {
67+
return filepath.Join(ctx.UserConfig.CollectorConfigurationDirectory, configId, response.Name+".conf")
7068
}
7169

7270
func (b *Backend) Equals(a *Backend) bool {
@@ -84,6 +82,7 @@ func (b *Backend) EqualSettings(a *Backend) bool {
8482
aBackend := &Backend{
8583
Enabled: b.Enabled,
8684
Id: a.Id,
85+
ConfigId: a.ConfigId,
8786
Name: a.Name,
8887
ServiceType: a.ServiceType,
8988
OperatingSystem: a.OperatingSystem,

backends/registry.go

-9
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,6 @@ func (bs *backendStore) GetBackend(id string) *Backend {
5050
return bs.backends[id]
5151
}
5252

53-
func (bs *backendStore) GetBackendById(id string) *Backend {
54-
for _, backend := range bs.backends {
55-
if backend.Id == id {
56-
return backend
57-
}
58-
}
59-
return nil
60-
}
61-
6253
func (bs *backendStore) Update(backends []Backend) {
6354
if len(backends) != 0 {
6455
var activeIds []string

backends/render.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,9 @@ package backends
1818
import (
1919
"bytes"
2020
"fmt"
21+
"github.com/Graylog2/collector-sidecar/common"
2122
"github.com/Graylog2/collector-sidecar/context"
2223
"io/ioutil"
23-
24-
"github.com/Graylog2/collector-sidecar/common"
2524
)
2625

2726
func (b *Backend) render() []byte {

benchmarks/bench-rest-api.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,10 @@ func startHeartbeat(ctx *context.Ctx, done chan bool, metrics chan time.Duration
6969
return
7070
default:
7171
time.Sleep(time.Duration(ctx.UserConfig.UpdateInterval) * time.Second)
72-
statusRequest := api.NewStatusRequest()
72+
version, _ := api.NewGraylogVersion("4.0.0")
73+
statusRequest := api.NewStatusRequest(version)
7374
t := time.Now()
74-
response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, &statusRequest)
75+
response, err := api.UpdateRegistration(httpClient, "nochecksum", ctx, version, &statusRequest)
7576
if err != nil {
7677
fmt.Printf("[%s] can't register sidecar: %v\n", ctx.UserConfig.NodeId, err)
7778
return

daemon/action_handler.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
func HandleCollectorActions(actions []graylog.ResponseCollectorAction) {
2525
for _, action := range actions {
26-
backend := backends.Store.GetBackendById(action.BackendId)
26+
backend := backends.Store.GetBackend(action.BackendId)
2727
if backend == nil {
2828
log.Errorf("Got action for non-existing collector: %s", action.BackendId)
2929
continue

daemon/daemon.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,14 @@ func (dc *DaemonConfig) SyncWithAssignments(context *context.Ctx) {
142142
}
143143

144144
// cleanup backends that should not run anymore
145-
if backend == nil || assignments.Store.GetAll()[backend.Id] == "" {
146-
log.Info("Removing process runner: " + backend.Name)
145+
if backend == nil || assignments.Store.GetAssignment(backend.Id) == "" {
146+
log.Info("Removing process runner: " + id)
147147
dc.DeleteRunner(id)
148148
}
149149
}
150150
assignedBackends := []*backends.Backend{}
151151
for backendId := range assignments.Store.GetAll() {
152-
backend := backends.Store.GetBackendById(backendId)
152+
backend := backends.Store.GetBackend(backendId)
153153
if backend != nil {
154154
assignedBackends = append(assignedBackends, backend)
155155
}

go.mod

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,29 @@
11
module github.com/Graylog2/collector-sidecar
22

3-
go 1.14
3+
go 1.19
44

55
require (
6-
github.com/BurntSushi/toml v0.3.1 // indirect
76
github.com/Sirupsen/logrus v0.11.0
8-
github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect
9-
github.com/davecgh/go-spew v1.1.1 // indirect
107
github.com/docker/go-units v0.3.3
118
github.com/elastic/go-ucfg v0.7.0
129
github.com/elastic/gosigar v0.0.0-20160829190344-2716c1fe855e
1310
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568
14-
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect
11+
github.com/hashicorp/go-version v1.6.0
1512
github.com/kardianos/service v1.2.1
16-
github.com/kr/text v0.2.0 // indirect
1713
github.com/natefinch/lumberjack v2.0.0+incompatible
18-
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
1914
github.com/pborman/uuid v0.0.0-20160216163710-c55201b03606
2015
github.com/rifflock/lfshook v0.0.0-20161216150210-24f7833daaff
21-
github.com/stretchr/testify v1.6.1 // indirect
2216
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68
17+
)
18+
19+
require (
20+
github.com/BurntSushi/toml v0.3.1 // indirect
21+
github.com/StackExchange/wmi v0.0.0-20160811214555-e54cbda6595d // indirect
22+
github.com/davecgh/go-spew v1.1.1 // indirect
23+
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 // indirect
24+
github.com/kr/text v0.2.0 // indirect
25+
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
26+
github.com/stretchr/testify v1.6.1 // indirect
2327
gopkg.in/check.v1 v1.0.0-20200902074654-038fdea0a05b // indirect
2428
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
2529
gopkg.in/yaml.v2 v2.3.0 // indirect

go.sum

+2
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568 h1:BMXYYRWT
1818
github.com/flynn-archive/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:rZfgFAXFS/z/lEd6LJmf9HVZ1LkgYiHx5pHhV5DR16M=
1919
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78 h1:0afyVEbxVeRz0ioQYn+9oDaeveMBXJi7juWqz2newuY=
2020
github.com/go-ole/go-ole v1.2.1-0.20161116064658-5e9c030faf78/go.mod h1:7FAglXiTm7HKlQRDeOQ6ZNUHidzCWXuZWq/1dTyBNF8=
21+
github.com/hashicorp/go-version v1.6.0 h1:feTTfFNnjP967rlCxM/I9g701jU+RN74YKx2mOkIeek=
22+
github.com/hashicorp/go-version v1.6.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA=
2123
github.com/kardianos/service v1.2.1 h1:AYndMsehS+ywIS6RB9KOlcXzteWUzxgMgBymJD7+BYk=
2224
github.com/kardianos/service v1.2.1/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
2325
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=

0 commit comments

Comments
 (0)