@@ -228,6 +228,148 @@ func validateCommandIsWorking(t *testing.T, ctx context.Context, fixture *aTesti
228
228
require .Contains (t , string (out ), `service::pipelines::logs: references processor "nonexistingprocessor" which is not configured` )
229
229
}
230
230
231
+ var logsIngestionConfigTemplate = `
232
+ exporters:
233
+ debug:
234
+ verbosity: basic
235
+
236
+ elasticsearch:
237
+ api_key: {{.ESApiKey}}
238
+ endpoint: {{.ESEndpoint}}
239
+
240
+ processors:
241
+ resource/add-test-id:
242
+ attributes:
243
+ - key: test.id
244
+ action: insert
245
+ value: {{.TestId}}
246
+
247
+ receivers:
248
+ filelog:
249
+ include:
250
+ - {{.InputFilePath}}
251
+ start_at: beginning
252
+
253
+ service:
254
+ pipelines:
255
+ logs:
256
+ exporters:
257
+ - debug
258
+ - elasticsearch
259
+ processors:
260
+ - resource/add-test-id
261
+ receivers:
262
+ - filelog
263
+ `
264
+
265
+ func TestOtelLogsIngestion (t * testing.T ) {
266
+ info := define .Require (t , define.Requirements {
267
+ Group : Default ,
268
+ Local : true ,
269
+ OS : []define.OS {
270
+ // input path missing on windows
271
+ {Type : define .Linux },
272
+ {Type : define .Darwin },
273
+ },
274
+ Stack : & define.Stack {},
275
+ })
276
+
277
+ // Prepare the OTel config.
278
+ testId := info .Namespace
279
+
280
+ tempDir := t .TempDir ()
281
+ inputFilePath := filepath .Join (tempDir , "input.log" )
282
+
283
+ esHost , err := getESHost ()
284
+ require .NoError (t , err , "failed to get ES host" )
285
+ require .True (t , len (esHost ) > 0 )
286
+
287
+ esClient := info .ESClient
288
+ require .NotNil (t , esClient )
289
+ esApiKey , err := createESApiKey (esClient )
290
+ require .NoError (t , err , "failed to get api key" )
291
+ require .True (t , len (esApiKey .Encoded ) > 1 , "api key is invalid %q" , esApiKey )
292
+
293
+ logsIngestionConfig := logsIngestionConfigTemplate
294
+ logsIngestionConfig = strings .ReplaceAll (logsIngestionConfig , "{{.ESApiKey}}" , esApiKey .Encoded )
295
+ logsIngestionConfig = strings .ReplaceAll (logsIngestionConfig , "{{.ESEndpoint}}" , esHost )
296
+ logsIngestionConfig = strings .ReplaceAll (logsIngestionConfig , "{{.InputFilePath}}" , inputFilePath )
297
+ logsIngestionConfig = strings .ReplaceAll (logsIngestionConfig , "{{.TestId}}" , testId )
298
+
299
+ cfgFilePath := filepath .Join (tempDir , "otel.yml" )
300
+ require .NoError (t , os .WriteFile (cfgFilePath , []byte (logsIngestionConfig ), 0600 ))
301
+
302
+ fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting .WithAdditionalArgs ([]string {"--config" , cfgFilePath }))
303
+ require .NoError (t , err )
304
+
305
+ ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (10 * time .Minute ))
306
+ defer cancel ()
307
+ err = fixture .Prepare (ctx , fakeComponent , fakeShipper )
308
+ require .NoError (t , err )
309
+
310
+ // remove elastic-agent.yml, otel should be independent
311
+ require .NoError (t , os .Remove (filepath .Join (fixture .WorkDir (), "elastic-agent.yml" )))
312
+
313
+ var fixtureWg sync.WaitGroup
314
+ fixtureWg .Add (1 )
315
+ go func () {
316
+ defer fixtureWg .Done ()
317
+ err = fixture .RunOtelWithClient (ctx , false , false )
318
+ }()
319
+
320
+ validateCommandIsWorking (t , ctx , fixture , tempDir )
321
+
322
+ // check `elastic-agent status` returns successfully
323
+ require .Eventuallyf (t , func () bool {
324
+ // This will return errors until it connects to the agent,
325
+ // they're mostly noise because until the agent starts running
326
+ // we will get connection errors. If the test fails
327
+ // the agent logs will be present in the error message
328
+ // which should help to explain why the agent was not
329
+ // healthy.
330
+ err = fixture .IsHealthy (ctx )
331
+ return err == nil
332
+ },
333
+ 2 * time .Minute , time .Second ,
334
+ "Elastic-Agent did not report healthy. Agent status error: \" %v\" " ,
335
+ err ,
336
+ )
337
+
338
+ // Write logs to input file.
339
+ logsCount := 10_000
340
+ inputFile , err := os .OpenFile (inputFilePath , os .O_CREATE | os .O_WRONLY , 0600 )
341
+ require .NoError (t , err )
342
+ for i := 0 ; i < logsCount ; i ++ {
343
+ _ , err = fmt .Fprintf (inputFile , "This is a test log message %d\n " , i + 1 )
344
+ require .NoError (t , err )
345
+ }
346
+ inputFile .Close ()
347
+ t .Cleanup (func () {
348
+ _ = os .Remove (inputFilePath )
349
+ })
350
+
351
+ actualHits := & struct { Hits int }{}
352
+ require .Eventually (t ,
353
+ func () bool {
354
+ findCtx , findCancel := context .WithTimeout (context .Background (), 10 * time .Second )
355
+ defer findCancel ()
356
+
357
+ docs , err := estools .GetLogsForIndexWithContext (findCtx , esClient , ".ds-logs-generic-default*" , map [string ]interface {}{
358
+ "Resource.test.id" : testId ,
359
+ })
360
+ require .NoError (t , err )
361
+
362
+ actualHits .Hits = docs .Hits .Total .Value
363
+ return actualHits .Hits == logsCount
364
+ },
365
+ 2 * time .Minute , 1 * time .Second ,
366
+ "Expected %v logs, got %v" , logsCount , actualHits )
367
+
368
+ cancel ()
369
+ fixtureWg .Wait ()
370
+ require .True (t , err == nil || err == context .Canceled || err == context .DeadlineExceeded , "Retrieved unexpected error: %s" , err .Error ())
371
+ }
372
+
231
373
func TestOtelAPMIngestion (t * testing.T ) {
232
374
info := define .Require (t , define.Requirements {
233
375
Group : Default ,
@@ -280,13 +422,13 @@ func TestOtelAPMIngestion(t *testing.T) {
280
422
esClient := info .ESClient
281
423
esApiKey , err := createESApiKey (esClient )
282
424
require .NoError (t , err , "failed to get api key" )
283
- require .True (t , len (esApiKey ) > 1 , "api key is invalid %q" , esApiKey )
425
+ require .True (t , len (esApiKey . APIKey ) > 1 , "api key is invalid %q" , esApiKey )
284
426
285
427
apmArgs := []string {
286
428
"run" ,
287
429
"-e" ,
288
430
"-E" , "output.elasticsearch.hosts=['" + esHost + "']" ,
289
- "-E" , "output.elasticsearch.api_key=" + esApiKey ,
431
+ "-E" , "output.elasticsearch.api_key=" + fmt . Sprintf ( "%s:%s" , esApiKey . Id , esApiKey . APIKey ) ,
290
432
"-E" , "apm-server.host=127.0.0.1:8200" ,
291
433
"-E" , "apm-server.ssl.enabled=false" ,
292
434
}
@@ -415,13 +557,8 @@ func getESHost() (string, error) {
415
557
return fixedESHost , nil
416
558
}
417
559
418
- func createESApiKey (esClient * elasticsearch.Client ) (string , error ) {
419
- apiResp , err := estools .CreateAPIKey (context .Background (), esClient , estools.APIKeyRequest {Name : "test-api-key" , Expiration : "1d" })
420
- if err != nil {
421
- return "" , err
422
- }
423
-
424
- return fmt .Sprintf ("%s:%s" , apiResp .Id , apiResp .APIKey ), nil
560
+ func createESApiKey (esClient * elasticsearch.Client ) (estools.APIKeyResponse , error ) {
561
+ return estools .CreateAPIKey (context .Background (), esClient , estools.APIKeyRequest {Name : "test-api-key" , Expiration : "1d" })
425
562
}
426
563
427
564
func linesTrackMap (lines []string ) map [string ]bool {
0 commit comments