@@ -54,7 +54,7 @@ pub struct ApplicationConfig {
54
54
}
55
55
56
56
pub struct Application {
57
- pub require_healthy : Option < bool > ,
57
+ pub root_opts : RootOpts ,
58
58
pub config : ApplicationConfig ,
59
59
pub signals : SignalPair ,
60
60
}
@@ -73,6 +73,7 @@ impl ApplicationConfig {
73
73
& config_paths,
74
74
opts. watch_config ,
75
75
opts. require_healthy ,
76
+ opts. allow_empty_config ,
76
77
graceful_shutdown_duration,
77
78
signal_handler,
78
79
)
@@ -211,7 +212,7 @@ impl Application {
211
212
Ok ( (
212
213
runtime,
213
214
Self {
214
- require_healthy : opts. root . require_healthy ,
215
+ root_opts : opts. root ,
215
216
config,
216
217
signals,
217
218
} ,
@@ -227,7 +228,7 @@ impl Application {
227
228
handle. spawn ( heartbeat:: heartbeat ( ) ) ;
228
229
229
230
let Self {
230
- require_healthy ,
231
+ root_opts ,
231
232
config,
232
233
signals,
233
234
} = self ;
@@ -237,7 +238,7 @@ impl Application {
237
238
api_server : config. setup_api ( handle) ,
238
239
topology : config. topology ,
239
240
config_paths : config. config_paths . clone ( ) ,
240
- require_healthy,
241
+ require_healthy : root_opts . require_healthy ,
241
242
#[ cfg( feature = "enterprise" ) ]
242
243
enterprise_reporter : config. enterprise ,
243
244
} ) ;
@@ -248,6 +249,7 @@ impl Application {
248
249
graceful_crash_receiver : config. graceful_crash_receiver ,
249
250
signals,
250
251
topology_controller,
252
+ allow_empty_config : root_opts. allow_empty_config ,
251
253
} )
252
254
}
253
255
}
@@ -258,6 +260,7 @@ pub struct StartedApplication {
258
260
pub graceful_crash_receiver : ShutdownErrorReceiver ,
259
261
pub signals : SignalPair ,
260
262
pub topology_controller : SharedTopologyController ,
263
+ pub allow_empty_config : bool ,
261
264
}
262
265
263
266
impl StartedApplication {
@@ -272,6 +275,7 @@ impl StartedApplication {
272
275
signals,
273
276
topology_controller,
274
277
internal_topologies,
278
+ allow_empty_config,
275
279
} = self ;
276
280
277
281
let mut graceful_crash = UnboundedReceiverStream :: new ( graceful_crash_receiver) ;
@@ -280,18 +284,20 @@ impl StartedApplication {
280
284
let mut signal_rx = signals. receiver ;
281
285
282
286
let signal = loop {
287
+ let has_sources = !topology_controller. lock ( ) . await . topology . config . is_empty ( ) ;
283
288
tokio:: select! {
284
289
signal = signal_rx. recv( ) => if let Some ( signal) = handle_signal(
285
290
signal,
286
291
& topology_controller,
287
292
& config_paths,
288
293
& mut signal_handler,
294
+ allow_empty_config,
289
295
) . await {
290
296
break signal;
291
297
} ,
292
298
// Trigger graceful shutdown if a component crashed, or all sources have ended.
293
299
error = graceful_crash. next( ) => break SignalTo :: Shutdown ( error) ,
294
- _ = TopologyController :: sources_finished( topology_controller. clone( ) ) => {
300
+ _ = TopologyController :: sources_finished( topology_controller. clone( ) ) , if has_sources => {
295
301
info!( "All sources have finished." ) ;
296
302
break SignalTo :: Shutdown ( None )
297
303
} ,
@@ -313,6 +319,7 @@ async fn handle_signal(
313
319
topology_controller : & SharedTopologyController ,
314
320
config_paths : & [ ConfigPath ] ,
315
321
signal_handler : & mut SignalHandler ,
322
+ allow_empty_config : bool ,
316
323
) -> Option < SignalTo > {
317
324
match signal {
318
325
Ok ( SignalTo :: ReloadFromConfigBuilder ( config_builder) ) => {
@@ -335,6 +342,7 @@ async fn handle_signal(
335
342
let new_config = config:: load_from_paths_with_provider_and_secrets (
336
343
& topology_controller. config_paths ,
337
344
signal_handler,
345
+ allow_empty_config,
338
346
)
339
347
. await
340
348
. map_err ( handle_config_errors)
@@ -479,6 +487,7 @@ pub async fn load_configs(
479
487
config_paths : & [ ConfigPath ] ,
480
488
watch_config : bool ,
481
489
require_healthy : Option < bool > ,
490
+ allow_empty_config : bool ,
482
491
graceful_shutdown_duration : Option < Duration > ,
483
492
signal_handler : & mut SignalHandler ,
484
493
) -> Result < Config , ExitCode > {
@@ -503,10 +512,13 @@ pub async fn load_configs(
503
512
#[ cfg( not( feature = "enterprise-tests" ) ) ]
504
513
config:: init_log_schema ( & config_paths, true ) . map_err ( handle_config_errors) ?;
505
514
506
- let mut config =
507
- config:: load_from_paths_with_provider_and_secrets ( & config_paths, signal_handler)
508
- . await
509
- . map_err ( handle_config_errors) ?;
515
+ let mut config = config:: load_from_paths_with_provider_and_secrets (
516
+ & config_paths,
517
+ signal_handler,
518
+ allow_empty_config,
519
+ )
520
+ . await
521
+ . map_err ( handle_config_errors) ?;
510
522
511
523
config:: init_telemetry ( config. global . telemetry . clone ( ) , true ) ;
512
524
0 commit comments