@@ -16,6 +16,7 @@ use crate::service::Routes;
16
16
pub use conn:: { Connected , TcpConnectInfo } ;
17
17
use hyper_util:: {
18
18
rt:: { TokioExecutor , TokioIo , TokioTimer } ,
19
+ server:: graceful:: GracefulShutdown ,
19
20
service:: TowerToHyperService ,
20
21
} ;
21
22
#[ cfg( feature = "tls" ) ]
@@ -562,10 +563,9 @@ impl<L> Server<L> {
562
563
builder
563
564
} ;
564
565
565
- let ( signal_tx, signal_rx) = tokio:: sync:: watch:: channel ( ( ) ) ;
566
- let signal_tx = Arc :: new ( signal_tx) ;
567
-
568
- let graceful = signal. is_some ( ) ;
566
+ let graceful = signal
567
+ . is_some ( )
568
+ . then ( || hyper_util:: server:: graceful:: GracefulShutdown :: new ( ) ) ;
569
569
let mut sig = pin ! ( Fuse { inner: signal } ) ;
570
570
let mut incoming = pin ! ( incoming) ;
571
571
@@ -601,21 +601,13 @@ impl<L> Server<L> {
601
601
602
602
let hyper_svc = TowerToHyperService :: new( req_svc) ;
603
603
604
- serve_connection( io, hyper_svc, server. clone( ) , graceful. then ( || signal_rx . clone( ) ) ) ;
604
+ serve_connection( io, hyper_svc, server. clone( ) , graceful. clone( ) ) ;
605
605
}
606
606
}
607
607
}
608
608
609
- if graceful {
610
- let _ = signal_tx. send ( ( ) ) ;
611
- drop ( signal_rx) ;
612
- trace ! (
613
- "waiting for {} connections to close" ,
614
- signal_tx. receiver_count( )
615
- ) ;
616
-
617
- // Wait for all connections to close
618
- signal_tx. closed ( ) . await ;
609
+ if let Some ( graceful) = graceful {
610
+ graceful. shutdown ( ) . await ;
619
611
}
620
612
621
613
Ok ( ( ) )
@@ -628,7 +620,7 @@ fn serve_connection<IO, S>(
628
620
io : ServerIo < IO > ,
629
621
hyper_svc : TowerToHyperService < S > ,
630
622
builder : ConnectionBuilder ,
631
- mut watcher : Option < tokio :: sync :: watch :: Receiver < ( ) > > ,
623
+ graceful : Option < GracefulShutdown > ,
632
624
) where
633
625
S : Service < Request < Incoming > , Response = Response < BoxBody > > + Clone + Send + ' static ,
634
626
S :: Future : Send + ' static ,
@@ -638,28 +630,20 @@ fn serve_connection<IO, S>(
638
630
{
639
631
tokio:: spawn ( async move {
640
632
{
641
- let mut sig = pin ! ( Fuse {
642
- inner: watcher. as_mut( ) . map( |w| w. changed( ) ) ,
643
- } ) ;
633
+ let conn = builder. serve_connection ( TokioIo :: new ( io) , hyper_svc) ;
644
634
645
- let mut conn = pin ! ( builder. serve_connection( TokioIo :: new( io) , hyper_svc) ) ;
635
+ let result = if let Some ( graceful) = graceful {
636
+ let conn = graceful. watch ( conn) ;
637
+ conn. await
638
+ } else {
639
+ conn. await
640
+ } ;
646
641
647
- loop {
648
- tokio:: select! {
649
- rv = & mut conn => {
650
- if let Err ( err) = rv {
651
- debug!( "failed serving connection: {:#}" , err) ;
652
- }
653
- break ;
654
- } ,
655
- _ = & mut sig => {
656
- conn. as_mut( ) . graceful_shutdown( ) ;
657
- }
658
- }
642
+ if let Err ( err) = result {
643
+ debug ! ( "failed serving connection: {:#}" , err) ;
659
644
}
660
645
}
661
646
662
- drop ( watcher) ;
663
647
trace ! ( "connection closed" ) ;
664
648
} ) ;
665
649
}
0 commit comments