@@ -16,6 +16,7 @@ use crate::service::Routes;
16
16
pub use conn:: { Connected , TcpConnectInfo } ;
17
17
use hyper_util:: {
18
18
rt:: { TokioExecutor , TokioIo , TokioTimer } ,
19
+ server:: graceful:: GracefulShutdown ,
19
20
service:: TowerToHyperService ,
20
21
} ;
21
22
#[ cfg( feature = "tls" ) ]
@@ -562,10 +563,7 @@ impl<L> Server<L> {
562
563
builder
563
564
} ;
564
565
565
- let ( signal_tx, signal_rx) = tokio:: sync:: watch:: channel ( ( ) ) ;
566
- let signal_tx = Arc :: new ( signal_tx) ;
567
-
568
- let graceful = signal. is_some ( ) ;
566
+ let graceful = signal. is_some ( ) . then ( GracefulShutdown :: new) ;
569
567
let mut sig = pin ! ( Fuse { inner: signal } ) ;
570
568
let mut incoming = pin ! ( incoming) ;
571
569
@@ -601,21 +599,13 @@ impl<L> Server<L> {
601
599
602
600
let hyper_svc = TowerToHyperService :: new( req_svc) ;
603
601
604
- serve_connection( io, hyper_svc, server. clone( ) , graceful. then ( || signal_rx . clone( ) ) ) ;
602
+ serve_connection( io, hyper_svc, server. clone( ) , graceful. clone( ) ) ;
605
603
}
606
604
}
607
605
}
608
606
609
- if graceful {
610
- let _ = signal_tx. send ( ( ) ) ;
611
- drop ( signal_rx) ;
612
- trace ! (
613
- "waiting for {} connections to close" ,
614
- signal_tx. receiver_count( )
615
- ) ;
616
-
617
- // Wait for all connections to close
618
- signal_tx. closed ( ) . await ;
607
+ if let Some ( graceful) = graceful {
608
+ graceful. shutdown ( ) . await ;
619
609
}
620
610
621
611
Ok ( ( ) )
@@ -628,7 +618,7 @@ fn serve_connection<IO, S>(
628
618
io : ServerIo < IO > ,
629
619
hyper_svc : TowerToHyperService < S > ,
630
620
builder : ConnectionBuilder ,
631
- mut watcher : Option < tokio :: sync :: watch :: Receiver < ( ) > > ,
621
+ graceful : Option < GracefulShutdown > ,
632
622
) where
633
623
S : Service < Request < Incoming > , Response = Response < BoxBody > > + Clone + Send + ' static ,
634
624
S :: Future : Send + ' static ,
@@ -638,28 +628,20 @@ fn serve_connection<IO, S>(
638
628
{
639
629
tokio:: spawn ( async move {
640
630
{
641
- let mut sig = pin ! ( Fuse {
642
- inner: watcher. as_mut( ) . map( |w| w. changed( ) ) ,
643
- } ) ;
631
+ let conn = builder. serve_connection ( TokioIo :: new ( io) , hyper_svc) ;
644
632
645
- let mut conn = pin ! ( builder. serve_connection( TokioIo :: new( io) , hyper_svc) ) ;
633
+ let result = if let Some ( graceful) = graceful {
634
+ let conn = graceful. watch ( conn) ;
635
+ conn. await
636
+ } else {
637
+ conn. await
638
+ } ;
646
639
647
- loop {
648
- tokio:: select! {
649
- rv = & mut conn => {
650
- if let Err ( err) = rv {
651
- debug!( "failed serving connection: {:#}" , err) ;
652
- }
653
- break ;
654
- } ,
655
- _ = & mut sig => {
656
- conn. as_mut( ) . graceful_shutdown( ) ;
657
- }
658
- }
640
+ if let Err ( err) = result {
641
+ debug ! ( "failed serving connection: {:#}" , err) ;
659
642
}
660
643
}
661
644
662
- drop ( watcher) ;
663
645
trace ! ( "connection closed" ) ;
664
646
} ) ;
665
647
}
0 commit comments