diff --git a/.ci/smoke-tests.groovy b/.ci/smoke-tests.groovy index e0ae857ee6b..ea0e87322be 100644 --- a/.ci/smoke-tests.groovy +++ b/.ci/smoke-tests.groovy @@ -11,8 +11,8 @@ pipeline { TERRAFORM_VERSION = '1.2.3' CREATED_DATE = "${new Date().getTime()}" SLACK_CHANNEL = "#apm-server" + SMOKETEST_VERSIONS = "${params.SMOKETEST_VERSIONS}" } - options { timeout(time: 3, unit: 'HOURS') buildDiscarder(logRotator(numToKeepStr: '100', artifactNumToKeepStr: '30', daysToKeepStr: '30')) @@ -31,9 +31,10 @@ pipeline { steps { deleteDir() gitCheckout(basedir: "${BASE_DIR}", shallow: false) + setEnvVar('GO_VERSION', readFile(file: "${BASE_DIR}/.go-version").trim()) + stash(allowEmpty: true, name: 'source', useDefaultExcludes: false) } } - stage('Smoke Tests') { options { skipDefaultCheckout() } environment { @@ -49,15 +50,18 @@ pipeline { steps { dir ("${BASE_DIR}") { withTestClusterEnv { - withGoEnv(version: readFile(file: ".go-version").trim()) { - script { - def smokeTests = sh(returnStdout: true, script: 'make smoketest/discover').trim().split('\r?\n') - def smokeTestJobs = [:] - for (smokeTest in smokeTests) { - smokeTestJobs["Run smoke tests in ${smokeTest}"] = runSmokeTest(smokeTest) + script { + def smokeTests = sh(returnStdout: true, script: 'make smoketest/discover').trim().split('\r?\n') + log(level: 'INFO', text: "make smoketest/discover: '${smokeTests}'") + def smokeTestJobs = [:] + for (smokeTest in smokeTests) { + // get the title for the stage based on the basename of the smoke test full path to be run + def title = sh(script: "basename ${smokeTest}", returnStdout:true).trim() + env.SMOKETEST_VERSIONS.trim().split(',').each { version -> + smokeTestJobs["${version}-${title}"] = runSmokeTestWithVersion(smokeTest: smokeTest, version: version, title: title) } - parallel smokeTestJobs } + parallel smokeTestJobs } } } @@ -66,9 +70,7 @@ pipeline { always { dir("${BASE_DIR}") { withTestClusterEnv { - withGoEnv(version: readFile(file: ".go-version").trim()) { - sh(label: 'Teardown smoke tests infra', script: 'make smoketest/all/cleanup') - } + sh(label: 'Teardown smoke tests infra', script: 'make smoketest/all/cleanup') } } } @@ -82,19 +84,30 @@ pipeline { } } -def runSmokeTest(String testDir) { +def runSmokeTestWithVersion(Map args = [:]) { + def testDir = args.smokeTest + def title = args.get('title', testDir) + def version = args.version return { - stage("Run smoke tests in ${testDir}") { - sh(label: 'Run smoke tests', script: "make smoketest/run TEST_DIR=${testDir}") + withNode(labels: 'linux && immutable', forceWorker: true) { + deleteDir() + unstash 'source' + dir("${BASE_DIR}") { + withTestClusterEnv { + sh(label: "Run smoke tests ${testDir} for ${version}", script: "make smoketest/run-version TEST_DIR=${testDir} SMOKETEST_VERSION=${version}") + } + } } } } -def withTestClusterEnv(Closure body) { +def withTestClusterEnv(Closure body) { withAWSEnv(secret: "${AWS_ACCOUNT_SECRET}", version: "2.7.6") { withTerraformEnv(version: "${TERRAFORM_VERSION}", forceInstallation: true) { withSecretVault(secret: "${EC_KEY_SECRET}", data: ['apiKey': 'EC_API_KEY'] ) { - body() + withGoEnv() { + body() + } } } } diff --git a/Makefile b/Makefile index 9a998a6e2c7..70861c04a6b 100644 --- a/Makefile +++ b/Makefile @@ -221,7 +221,7 @@ update-beats-docs: # Linting, style-checking, license header checks, etc. ############################################################################## -# NOTE(axw) ST1000 is disabled for the moment as many packages do not have +# NOTE(axw) ST1000 is disabled for the moment as many packages do not have # comments. It would be a good idea to add them later, and remove this exception, # so we're a bit more intentional about the meaning of packages and how code is # organised. @@ -325,11 +325,15 @@ SMOKETEST_DIRS = $$(find $(CURRENT_DIR)/testing/smoke -mindepth 1 -maxdepth 1 -t smoketest/discover: @echo "$(SMOKETEST_DIRS)" +.PHONY: smoketest/run-version +smoketest/run-version: + @ echo "-> Running $(TEST_DIR) smoke tests for version $${SMOKETEST_VERSION}..." + @ cd $(TEST_DIR) && ./test.sh "$(SMOKETEST_VERSION)" + .PHONY: smoketest/run smoketest/run: @ for version in $(shell echo $(SMOKETEST_VERSIONS) | tr ',' ' '); do \ - echo "-> Running $(TEST_DIR) smoke tests for version $${version}..."; \ - cd $(TEST_DIR) && ./test.sh $${version}; \ + $(MAKE) smoketest/run-version SMOKETEST_VERSION=$${version}; \ done .PHONY: smoketest/cleanup diff --git a/NOTICE.txt b/NOTICE.txt index 4e2add69a2f..23449730ce3 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -260,11 +260,11 @@ SOFTWARE. -------------------------------------------------------------------------------- Dependency : github.com/elastic/beats/v7 -Version: v7.0.0-alpha2.0.20220928194719-77e4e30df6e2 +Version: v7.0.0-alpha2.0.20220929190017-2eb8bb8f90e7 Licence type (autodetected): Elastic -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20220928194719-77e4e30df6e2/LICENSE.txt: +Contents of probable licence file $GOMODCACHE/github.com/elastic/beats/v7@v7.0.0-alpha2.0.20220929190017-2eb8bb8f90e7/LICENSE.txt: Source code in this repository is variously licensed under the Apache License Version 2.0, an Apache compatible license, or the Elastic License. Outside of @@ -15346,11 +15346,11 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -------------------------------------------------------------------------------- Dependency : google.golang.org/genproto -Version: v0.0.0-20220927151529-dcaddaf36704 +Version: v0.0.0-20220929141241-1ce7b20da813 Licence type (autodetected): Apache-2.0 -------------------------------------------------------------------------------- -Contents of probable licence file $GOMODCACHE/google.golang.org/genproto@v0.0.0-20220927151529-dcaddaf36704/LICENSE: +Contents of probable licence file $GOMODCACHE/google.golang.org/genproto@v0.0.0-20220929141241-1ce7b20da813/LICENSE: Apache License diff --git a/docker-compose.yml b/docker-compose.yml index b8b57cdfa10..474513d5d31 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: '3.9' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0-326f84b0-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.6.0-158a13db-SNAPSHOT ports: - 9200:9200 healthcheck: @@ -31,7 +31,7 @@ services: - "./testing/docker/elasticsearch/ingest-geoip:/usr/share/elasticsearch/config/ingest-geoip" kibana: - image: docker.elastic.co/kibana/kibana:8.6.0-326f84b0-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.6.0-158a13db-SNAPSHOT ports: - 5601:5601 healthcheck: @@ -50,7 +50,7 @@ services: - "./testing/docker/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml" fleet-server: - image: docker.elastic.co/beats/elastic-agent:8.6.0-326f84b0-SNAPSHOT + image: docker.elastic.co/beats/elastic-agent:8.6.0-158a13db-SNAPSHOT ports: - 8220:8220 healthcheck: @@ -78,7 +78,7 @@ services: - "./testing/docker/fleet-server/key.pem:/etc/pki/tls/private/fleet-server-key.pem" metricbeat: - image: docker.elastic.co/beats/metricbeat:8.6.0-326f84b0-SNAPSHOT + image: docker.elastic.co/beats/metricbeat:8.6.0-158a13db-SNAPSHOT environment: ELASTICSEARCH_HOSTS: '["http://elasticsearch:9200"]' ELASTICSEARCH_USERNAME: "${KIBANA_ES_USER:-admin}" diff --git a/go.mod b/go.mod index a460d3ee0da..576e468c7bd 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/dgraph-io/badger/v2 v2.2007.3-0.20201012072640-f5a7e0a1c83b github.com/dustin/go-humanize v1.0.0 github.com/elastic/apm-server/internal/approvaltest v0.0.0-00010101000000-000000000000 - github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220928194719-77e4e30df6e2 + github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220929190017-2eb8bb8f90e7 github.com/elastic/elastic-agent-libs v0.2.11 github.com/elastic/elastic-agent-system-metrics v0.4.4 github.com/elastic/gmux v0.2.0 @@ -151,7 +151,7 @@ require ( golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect golang.org/x/sys v0.0.0-20220928140112-f11e5e49a4ec // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20220927151529-dcaddaf36704 // indirect + google.golang.org/genproto v0.0.0-20220929141241-1ce7b20da813 // indirect gopkg.in/jcmturner/aescts.v1 v1.0.1 // indirect gopkg.in/jcmturner/dnsutils.v1 v1.0.1 // indirect gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect diff --git a/go.sum b/go.sum index 19e0f95ffa7..91af19dbdc0 100644 --- a/go.sum +++ b/go.sum @@ -301,8 +301,8 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/eclipse/paho.mqtt.golang v1.3.5 h1:sWtmgNxYM9P2sP+xEItMozsR3w0cqZFlqnNN1bdl41Y= github.com/elastic/bayeux v1.0.5 h1:UceFq01ipmT3S8DzFK+uVAkbCdiPR0Bqei8qIGmUeY0= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220928194719-77e4e30df6e2 h1:+Chfw+ssUS5Qnsb6Wf2bsJIU0hVGpjeWNO4NS5pk2JQ= -github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220928194719-77e4e30df6e2/go.mod h1:qERNHLzzEo+GdjCdcoP5Dp4US1xjyLWYVMVkt1PRbCc= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220929190017-2eb8bb8f90e7 h1:fmE2K8ZzsOOSSR3KZ9BPzZzEU3WRCTglgle4MVjgz/A= +github.com/elastic/beats/v7 v7.0.0-alpha2.0.20220929190017-2eb8bb8f90e7/go.mod h1:qERNHLzzEo+GdjCdcoP5Dp4US1xjyLWYVMVkt1PRbCc= github.com/elastic/elastic-agent v0.0.0-20220831162706-5f1e54f40d3e h1:uGDp9HesS9m3T7YwgM0ATE/YP5FXcxxAAKHQDgP/GS0= github.com/elastic/elastic-agent-autodiscover v0.3.0 h1:kdpNnIDnVk7gvQxxR6PzZY7aM8LyMTRkwI/p+FNS17s= github.com/elastic/elastic-agent-client/v7 v7.0.0-20220915231534-3e014f7c0e83 h1:vYpMoGkWOf9Yl3hBahn0WC2G4NaVOJnkMGBuCOpHgPE= @@ -1239,8 +1239,8 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= -google.golang.org/genproto v0.0.0-20220927151529-dcaddaf36704 h1:H1AcWFV69NFCMeBJ8nVLtv8uHZZ5Ozcgoq012hHEFuU= -google.golang.org/genproto v0.0.0-20220927151529-dcaddaf36704/go.mod h1:woMGP53BroOrRY3xTxlbr8Y3eB/nzAvvFM83q7kG2OI= +google.golang.org/genproto v0.0.0-20220929141241-1ce7b20da813 h1:buul04Ikd79A5tP8nGhKEyMfr+/HplsO6nqSUapWZ/M= +google.golang.org/genproto v0.0.0-20220929141241-1ce7b20da813/go.mod h1:woMGP53BroOrRY3xTxlbr8Y3eB/nzAvvFM83q7kG2OI= google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= diff --git a/testing/smoke/legacy-managed/test.sh b/testing/smoke/legacy-managed/test.sh index 49a4a01e6e0..9eb7733913b 100755 --- a/testing/smoke/legacy-managed/test.sh +++ b/testing/smoke/legacy-managed/test.sh @@ -2,8 +2,8 @@ set -eo pipefail -if [[ ${1} != 7.17 ]]; then - echo "-> Skipping smoke test..." +if [[ "${1}" != "7.17" ]]; then + echo "-> Skipping smoke test ['${1}' is not supported]..." exit 0 fi @@ -20,13 +20,14 @@ if [[ -z ${SKIP_DESTROY} ]]; then trap "terraform_destroy" EXIT fi -terraform_apply ${LATEST_VERSION} +INTEGRATIONS_SERVER=false +terraform_apply ${LATEST_VERSION} ${INTEGRATIONS_SERVER} healthcheck 1 send_events legacy_assertions ${LATEST_VERSION} echo "-> Upgrading APM Server to managed mode" -upgrade_managed ${LATEST_VERSION} +upgrade_managed ${LATEST_VERSION} ${INTEGRATIONS_SERVER} healthcheck 1 send_events data_stream_assertions ${LATEST_VERSION} diff --git a/testing/smoke/legacy-standalone-major-managed/test.sh b/testing/smoke/legacy-standalone-major-managed/test.sh index e25bc3cff5e..5aa7223b43a 100755 --- a/testing/smoke/legacy-standalone-major-managed/test.sh +++ b/testing/smoke/legacy-standalone-major-managed/test.sh @@ -2,8 +2,8 @@ set -eo pipefail -if [[ ${1} != 7.17 ]]; then - echo "-> Skipping smoke test..." +if [[ "${1}" != "7.17" ]]; then + echo "-> Skipping smoke test ['${1}' is not supported]..." exit 0 fi @@ -21,12 +21,13 @@ if [[ -z ${SKIP_DESTROY} ]]; then trap "terraform_destroy" EXIT fi -terraform_apply ${LATEST_VERSION} +INTEGRATIONS_SERVER=false +terraform_apply ${LATEST_VERSION} ${INTEGRATIONS_SERVER} healthcheck 1 send_events legacy_assertions ${LATEST_VERSION} -terraform_apply ${NEXT_MAJOR_LATEST} +terraform_apply ${NEXT_MAJOR_LATEST} ${INTEGRATIONS_SERVER} healthcheck 1 send_events data_stream_assertions ${NEXT_MAJOR_LATEST} diff --git a/testing/smoke/lib.sh b/testing/smoke/lib.sh index 40c5e0e7634..bcded6c9bae 100644 --- a/testing/smoke/lib.sh +++ b/testing/smoke/lib.sh @@ -22,14 +22,13 @@ get_latest_patch() { terraform_init() { if [[ ! -f main.tf ]]; then cp ../main.tf .; fi - if [[ ! -f .terraform.lock.hcl ]]; then terraform init >> tf.log; fi + terraform init >> tf.log } terraform_apply() { echo "-> Creating / Upgrading deployment to version ${1}" echo stack_version=\"${1}\" > terraform.tfvars if [[ ! -z ${2} ]]; then echo integrations_server=${2} >> terraform.tfvars; fi - terraform_init terraform apply -auto-approve >> tf.log if [[ ${EXPORTED_AUTH} ]]; then diff --git a/x-pack/apm-server/main.go b/x-pack/apm-server/main.go index c3c2d38ba53..c8e2252b96f 100644 --- a/x-pack/apm-server/main.go +++ b/x-pack/apm-server/main.go @@ -6,6 +6,8 @@ package main import ( "context" + "encoding/json" + "net/http" "os" "sync" "time" @@ -240,6 +242,16 @@ func runServerWithProcessors(ctx context.Context, runServer beater.RunServerFunc } func newProfilingCollector(args beater.ServerParams) (*profiling.ElasticCollector, func(context.Context) error, error) { + logger := args.Logger.Named("profiling") + + client, err := args.NewElasticsearchClient(args.Config.Profiling.ESConfig) + if err != nil { + return nil, nil, err + } + clusterName, err := queryElasticsearchClusterName(client, logger) + if err != nil { + return nil, nil, err + } // Elasticsearch should default to 100 MB for the http.max_content_length configuration, // so we flush the buffer when at 16 MiB or every 4 seconds. // This should reduce the lock contention between multiple workers trying to write or flush @@ -248,11 +260,6 @@ func newProfilingCollector(args beater.ServerParams) (*profiling.ElasticCollecto FlushBytes: 1 << 24, FlushInterval: 4 * time.Second, } - - client, err := args.NewElasticsearchClient(args.Config.Profiling.ESConfig) - if err != nil { - return nil, nil, err - } indexer, err := client.NewBulkIndexer(bulkIndexerConfig) if err != nil { return nil, nil, err @@ -270,7 +277,8 @@ func newProfilingCollector(args beater.ServerParams) (*profiling.ElasticCollecto profilingCollector := profiling.NewCollector( indexer, metricsIndexer, - args.Logger.Named("profiling"), + clusterName, + logger, ) cleanup := func(ctx context.Context) error { var errors error @@ -285,6 +293,32 @@ func newProfilingCollector(args beater.ServerParams) (*profiling.ElasticCollecto return profilingCollector, cleanup, nil } +// Fetch the Cluster name from Elasticsearch: Profiling adds it as a field in +// the host-agent metrics documents for debugging purposes. +// In Cloud deployments, the Cluster name is set equal to the Cluster ID. +func queryElasticsearchClusterName(client elasticsearch.Client, logger *logp.Logger) (string, error) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/", nil) + if err != nil { + return "", err + } + resp, err := client.Perform(req) + if err != nil { + logger.Warnf("failed to fetch cluster name from Elasticsearch: %v", err) + return "", nil + } + var r struct { + ClusterName string `json:"cluster_name"` + } + if err := json.NewDecoder(resp.Body).Decode(&r); err != nil { + logger.Warnf("failed to parse Elasticsearch JSON response: %v", err) + } + _ = resp.Body.Close() + + return r.ClusterName, nil +} + func wrapServer(args beater.ServerParams, runServer beater.RunServerFunc) (beater.ServerParams, beater.RunServerFunc, error) { processors, err := newProcessors(args) if err != nil { diff --git a/x-pack/apm-server/main_test.go b/x-pack/apm-server/main_test.go index 7e6e51fa019..7b4fcd312c6 100644 --- a/x-pack/apm-server/main_test.go +++ b/x-pack/apm-server/main_test.go @@ -7,9 +7,16 @@ package main // This file is mandatory as otherwise the apm-server.test binary is not generated correctly. import ( + "bytes" "context" + "fmt" + "io" + "net/http" "testing" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -65,3 +72,82 @@ func TestMonitoring(t *testing.T) { assert.NotEqual(t, monitoring.MakeFlatSnapshot(), tailSamplingMonitoringSnapshot) } } + +func TestQueryElasticsearchClusterName(t *testing.T) { + const clusterName = "test-cluster-123" + cases := []struct { + name, + expectedResult, + body string + throwErr bool + expectedLogMsg string + }{ + { + name: "valid_JSON_response_with_cluster_name", + expectedResult: clusterName, + body: fmt.Sprintf(`{"cluster_name":"%s"}`, clusterName), + }, { + name: "valid_JSON_response_without_cluster_name", + expectedResult: "", + body: `{"anything":"42"}`, + expectedLogMsg: "failed to parse Elasticsearch JSON response", + }, { + name: "invalid_JSON_response", + expectedResult: "", + body: "::error::", + expectedLogMsg: "failed to parse Elasticsearch JSON response", + }, { + name: "server_unresponsive", + expectedResult: "", + body: "", + throwErr: true, + expectedLogMsg: "failed to fetch cluster name from Elasticsearch", + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + logger := logp.NewLogger("go_tests_apm_server", zap.Hooks(detectMessageInLog(t, tc.expectedLogMsg))) + mockedClient := mockEsClusterName{io.NopCloser(bytes.NewBufferString(tc.body)), tc.throwErr} + result, err := queryElasticsearchClusterName(&mockedClient, logger) + if tc.throwErr { + // Even when the server does not reply, we don't want to return an error to the caller + require.Nil(t, err) + assert.Empty(t, result) + return + } + assert.Equal(t, tc.expectedResult, result) + }) + } +} + +func detectMessageInLog(t *testing.T, contained string) func(zapcore.Entry) error { + return func(entry zapcore.Entry) error { + assert.Equal(t, entry.Level, logp.WarnLevel) + if !assert.Contains(t, entry.Message, contained) { + t.Fatalf("didn't find '%s' in log Message field", contained) + } + return nil + } +} + +type mockEsClusterName struct { + body io.ReadCloser + throwErr bool +} + +func (c *mockEsClusterName) Perform(r *http.Request) (*http.Response, error) { + if c.throwErr { + return nil, fmt.Errorf("connection closed") + } + return &http.Response{ + StatusCode: 200, + Body: c.body, + Request: r, + }, nil +} + +func (c *mockEsClusterName) NewBulkIndexer(_ elasticsearch.BulkIndexerConfig) (elasticsearch.BulkIndexer, error) { + return nil, nil +} diff --git a/x-pack/apm-server/profiling/collector.go b/x-pack/apm-server/profiling/collector.go index c868eb70454..13ab9fee04a 100644 --- a/x-pack/apm-server/profiling/collector.go +++ b/x-pack/apm-server/profiling/collector.go @@ -66,6 +66,8 @@ type ElasticCollector struct { sourceFilesLock sync.Mutex sourceFiles *simplelru.LRU + + clusterID string } // NewCollector returns a new ElasticCollector uses indexer for storing stack trace data in @@ -74,6 +76,7 @@ type ElasticCollector struct { func NewCollector( indexer elasticsearch.BulkIndexer, metricsIndexer elasticsearch.BulkIndexer, + esClusterID string, logger *logp.Logger, ) *ElasticCollector { sourceFiles, err := simplelru.NewLRU(sourceFileCacheSize, nil) @@ -86,6 +89,7 @@ func NewCollector( indexer: indexer, metricsIndexer: metricsIndexer, sourceFiles: sourceFiles, + clusterID: esClusterID, } // Precalculate index names to minimise per-TraceEvent overhead. @@ -668,9 +672,11 @@ func (e *ElasticCollector) AddMetrics(ctx context.Context, in *Metrics) (*empty. body.WriteString(fmt.Sprintf( "{\"project.id\":%d,\"host.id\":%d,\"@timestamp\":%d,"+ - "\"ecs.version\":\"%s\"", + "\"ecs.version\":%q", ProjectID, HostID, metric.Timestamp, ecsVersionString)) - + if e.clusterID != "" { + body.WriteString(fmt.Sprintf(",\"Elasticsearch.cluster.id\":%q", e.clusterID)) + } for i, metricID := range metric.IDs { if int(metricID) >= len(metricTypes) { // Protect against panic on HA / collector version mismatch.