@@ -78,7 +78,7 @@ service:
78
78
- debug
79
79
- otlp/elastic`
80
80
81
- func TestOtelHybridFileProcessing (t * testing.T ) {
81
+ func TestOtelFileProcessing (t * testing.T ) {
82
82
define .Require (t , define.Requirements {
83
83
Group : Default ,
84
84
Local : true ,
@@ -89,73 +89,125 @@ func TestOtelHybridFileProcessing(t *testing.T) {
89
89
},
90
90
})
91
91
92
+ // replace default elastic-agent.yml with otel config
93
+ // otel mode should be detected automatically
94
+ tmpDir := t .TempDir ()
95
+ // create input file
96
+ numEvents := 50
97
+ inputFile , err := os .CreateTemp (tmpDir , "input.txt" )
98
+ require .NoError (t , err , "failed to create temp file to hold data to ingest" )
99
+ inputFilePath := inputFile .Name ()
100
+ for i := 0 ; i < numEvents ; i ++ {
101
+ _ , err = inputFile .Write ([]byte (fmt .Sprintf ("Line %d\n " , i )))
102
+ require .NoErrorf (t , err , "failed to write line %d to temp file" , i )
103
+ }
104
+ err = inputFile .Close ()
105
+ require .NoError (t , err , "failed to close data temp file" )
106
+ t .Cleanup (func () {
107
+ if t .Failed () {
108
+ contents , err := os .ReadFile (inputFilePath )
109
+ if err != nil {
110
+ t .Logf ("no data file to import at %s" , inputFilePath )
111
+ return
112
+ }
113
+ t .Logf ("contents of import file:\n %s\n " , string (contents ))
114
+ }
115
+ })
116
+ // create output filename
117
+ outputFilePath := filepath .Join (tmpDir , "output.txt" )
92
118
t .Cleanup (func () {
93
- _ = os .Remove (fileProcessingFilename )
119
+ if t .Failed () {
120
+ contents , err := os .ReadFile (outputFilePath )
121
+ if err != nil {
122
+ t .Logf ("no output data at %s" , inputFilePath )
123
+ return
124
+ }
125
+ t .Logf ("contents of output file:\n %s\n " , string (contents ))
126
+ }
94
127
})
128
+ // create the otel config with input and output
129
+ type otelConfigOptions struct {
130
+ InputPath string
131
+ OutputPath string
132
+ }
133
+ otelConfigTemplate := `receivers:
134
+ filelog:
135
+ include:
136
+ - {{.InputPath}}
137
+ start_at: beginning
95
138
96
- fixture , err := define .NewFixtureFromLocalBuild (t , define .Version ())
139
+ exporters:
140
+ file:
141
+ path: {{.OutputPath}}
142
+ service:
143
+ pipelines:
144
+ logs:
145
+ receivers:
146
+ - filelog
147
+ exporters:
148
+ - file
149
+ `
150
+ otelConfigPath := filepath .Join (tmpDir , "otel.yml" )
151
+ var otelConfigBuffer bytes.Buffer
152
+ require .NoError (t ,
153
+ template .Must (template .New ("otelConfig" ).Parse (otelConfigTemplate )).Execute (& otelConfigBuffer ,
154
+ otelConfigOptions {
155
+ InputPath : inputFilePath ,
156
+ OutputPath : outputFilePath ,
157
+ }))
158
+ require .NoError (t , os .WriteFile (otelConfigPath , otelConfigBuffer .Bytes (), 0o600 ))
159
+ t .Cleanup (func () {
160
+ if t .Failed () {
161
+ contents , err := os .ReadFile (otelConfigPath )
162
+ if err != nil {
163
+ t .Logf ("No otel configuration file at %s" , otelConfigPath )
164
+ return
165
+ }
166
+ t .Logf ("Contents of otel config file:\n %s\n " , string (contents ))
167
+ }
168
+ })
169
+ // now we can actually run the test
170
+
171
+ fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting .WithAdditionalArgs ([]string {"--config" , otelConfigPath }))
97
172
require .NoError (t , err )
98
173
99
174
ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (10 * time .Minute ))
100
175
defer cancel ()
101
176
err = fixture .Prepare (ctx , fakeComponent )
102
177
require .NoError (t , err )
103
178
179
+ // remove elastic-agent.yml, otel should be independent
180
+ require .NoError (t , os .Remove (filepath .Join (fixture .WorkDir (), "elastic-agent.yml" )))
181
+
104
182
var fixtureWg sync.WaitGroup
105
183
fixtureWg .Add (1 )
106
184
go func () {
107
185
defer fixtureWg .Done ()
108
- err = fixture .Run (ctx , aTesting.State {
109
- Configure : string (fileProcessingConfig ),
110
- Reached : func (state * client.AgentState ) bool {
111
- // keep running (context cancel will stop it)
112
- return false
113
- },
114
- })
186
+ err = fixture .RunOtelWithClient (ctx )
115
187
}()
116
188
117
- var content []byte
118
- watchLines := linesTrackMap ([]string {
119
- `"stringValue":"syslog"` , // syslog is being processed
120
- `"stringValue":"system.log"` , // system.log is being processed
121
- })
189
+ validateCommandIsWorking (t , ctx , fixture , tmpDir )
122
190
191
+ var content []byte
123
192
require .Eventually (t ,
124
193
func () bool {
125
194
// verify file exists
126
- content , err = os .ReadFile (fileProcessingFilename )
195
+ content , err = os .ReadFile (outputFilePath )
127
196
if err != nil || len (content ) == 0 {
128
197
return false
129
198
}
130
199
131
- for k , alreadyFound := range watchLines {
132
- if alreadyFound {
133
- continue
134
- }
135
- if bytes .Contains (content , []byte (k )) {
136
- watchLines [k ] = true
137
- }
138
- }
139
-
140
- return mapAtLeastOneTrue (watchLines )
200
+ found := bytes .Count (content , []byte (filepath .Base (inputFilePath )))
201
+ return found == numEvents
141
202
},
142
203
3 * time .Minute , 500 * time .Millisecond ,
143
204
fmt .Sprintf ("there should be exported logs by now" ))
144
-
145
- statusCtx , statusCancel := context .WithTimeout (ctx , 5 * time .Second )
146
- defer statusCancel ()
147
- output , err := fixture .ExecStatus (statusCtx )
148
- require .NoError (t , err , "status command failed" )
149
-
150
205
cancel ()
151
206
fixtureWg .Wait ()
152
207
require .True (t , err == nil || err == context .Canceled || err == context .DeadlineExceeded , "Retrieved unexpected error: %s" , err .Error ())
153
-
154
- assert .NotNil (t , output .Collector )
155
- assert .Equal (t , 2 , output .Collector .Status , "collector status should have been StatusOK" )
156
208
}
157
209
158
- func TestOtelFileProcessing (t * testing.T ) {
210
+ func TestOtelHybridFileProcessing (t * testing.T ) {
159
211
define .Require (t , define.Requirements {
160
212
Group : Default ,
161
213
Local : true ,
@@ -166,7 +218,6 @@ func TestOtelFileProcessing(t *testing.T) {
166
218
},
167
219
})
168
220
169
- // replace default elastic-agent.yml with otel config
170
221
// otel mode should be detected automatically
171
222
tmpDir := t .TempDir ()
172
223
// create input file
@@ -224,47 +275,35 @@ service:
224
275
exporters:
225
276
- file
226
277
`
227
- otelConfigPath := filepath .Join (tmpDir , "otel.yml" )
228
278
var otelConfigBuffer bytes.Buffer
229
279
require .NoError (t ,
230
280
template .Must (template .New ("otelConfig" ).Parse (otelConfigTemplate )).Execute (& otelConfigBuffer ,
231
281
otelConfigOptions {
232
282
InputPath : inputFilePath ,
233
283
OutputPath : outputFilePath ,
234
284
}))
235
- require .NoError (t , os .WriteFile (otelConfigPath , otelConfigBuffer .Bytes (), 0o600 ))
236
- t .Cleanup (func () {
237
- if t .Failed () {
238
- contents , err := os .ReadFile (otelConfigPath )
239
- if err != nil {
240
- t .Logf ("No otel configuration file at %s" , otelConfigPath )
241
- return
242
- }
243
- t .Logf ("Contents of otel config file:\n %s\n " , string (contents ))
244
- }
245
- })
246
- // now we can actually run the test
247
285
248
- fixture , err := define .NewFixtureFromLocalBuild (t , define .Version (), aTesting . WithAdditionalArgs ([] string { "--config" , otelConfigPath }) )
286
+ fixture , err := define .NewFixtureFromLocalBuild (t , define .Version ())
249
287
require .NoError (t , err )
250
288
251
289
ctx , cancel := testcontext .WithDeadline (t , context .Background (), time .Now ().Add (10 * time .Minute ))
252
290
defer cancel ()
253
291
err = fixture .Prepare (ctx , fakeComponent )
254
292
require .NoError (t , err )
255
293
256
- // remove elastic-agent.yml, otel should be independent
257
- require .NoError (t , os .Remove (filepath .Join (fixture .WorkDir (), "elastic-agent.yml" )))
258
-
259
294
var fixtureWg sync.WaitGroup
260
295
fixtureWg .Add (1 )
261
296
go func () {
262
297
defer fixtureWg .Done ()
263
- err = fixture .RunOtelWithClient (ctx )
298
+ err = fixture .Run (ctx , aTesting.State {
299
+ Configure : otelConfigBuffer .String (),
300
+ Reached : func (state * client.AgentState ) bool {
301
+ // keep running (context cancel will stop it)
302
+ return false
303
+ },
304
+ })
264
305
}()
265
306
266
- validateCommandIsWorking (t , ctx , fixture , tmpDir )
267
-
268
307
var content []byte
269
308
require .Eventually (t ,
270
309
func () bool {
@@ -279,9 +318,18 @@ service:
279
318
},
280
319
3 * time .Minute , 500 * time .Millisecond ,
281
320
fmt .Sprintf ("there should be exported logs by now" ))
321
+
322
+ statusCtx , statusCancel := context .WithTimeout (ctx , 5 * time .Second )
323
+ defer statusCancel ()
324
+ output , err := fixture .ExecStatus (statusCtx )
325
+ require .NoError (t , err , "status command failed" )
326
+
282
327
cancel ()
283
328
fixtureWg .Wait ()
284
329
require .True (t , err == nil || err == context .Canceled || err == context .DeadlineExceeded , "Retrieved unexpected error: %s" , err .Error ())
330
+
331
+ assert .NotNil (t , output .Collector )
332
+ assert .Equal (t , 2 , output .Collector .Status , "collector status should have been StatusOK" )
285
333
}
286
334
287
335
func validateCommandIsWorking (t * testing.T , ctx context.Context , fixture * aTesting.Fixture , tempDir string ) {
0 commit comments