@@ -16,6 +16,7 @@ use crate::service::Routes;
16
16
pub use conn:: { Connected , TcpConnectInfo } ;
17
17
use hyper_util:: {
18
18
rt:: { TokioExecutor , TokioIo , TokioTimer } ,
19
+ server:: graceful:: GracefulShutdown ,
19
20
service:: TowerToHyperService ,
20
21
} ;
21
22
#[ cfg( feature = "tls" ) ]
@@ -562,10 +563,7 @@ impl<L> Server<L> {
562
563
builder
563
564
} ;
564
565
565
- let ( signal_tx, signal_rx) = tokio:: sync:: watch:: channel ( ( ) ) ;
566
- let signal_tx = Arc :: new ( signal_tx) ;
567
-
568
- let graceful = signal. is_some ( ) ;
566
+ let graceful = signal. is_some ( ) . then ( GracefulShutdown :: new) ;
569
567
let mut sig = pin ! ( Fuse { inner: signal } ) ;
570
568
let mut incoming = pin ! ( incoming) ;
571
569
@@ -602,21 +600,13 @@ impl<L> Server<L> {
602
600
let hyper_io = TokioIo :: new( io) ;
603
601
let hyper_svc = TowerToHyperService :: new( req_svc) ;
604
602
605
- serve_connection( hyper_io, hyper_svc, server. clone( ) , graceful. then ( || signal_rx . clone( ) ) ) ;
603
+ serve_connection( hyper_io, hyper_svc, server. clone( ) , graceful. clone( ) ) ;
606
604
}
607
605
}
608
606
}
609
607
610
- if graceful {
611
- let _ = signal_tx. send ( ( ) ) ;
612
- drop ( signal_rx) ;
613
- trace ! (
614
- "waiting for {} connections to close" ,
615
- signal_tx. receiver_count( )
616
- ) ;
617
-
618
- // Wait for all connections to close
619
- signal_tx. closed ( ) . await ;
608
+ if let Some ( graceful) = graceful {
609
+ graceful. shutdown ( ) . await ;
620
610
}
621
611
622
612
Ok ( ( ) )
@@ -629,7 +619,7 @@ fn serve_connection<IO, S>(
629
619
hyper_io : IO ,
630
620
hyper_svc : S ,
631
621
builder : ConnectionBuilder ,
632
- mut watcher : Option < tokio :: sync :: watch :: Receiver < ( ) > > ,
622
+ graceful : Option < GracefulShutdown > ,
633
623
) where
634
624
IO : hyper:: rt:: Read + hyper:: rt:: Write + Unpin + Send + ' static ,
635
625
S : HyperService < Request < Incoming > , Response = Response < BoxBody > > + Clone + Send + ' static ,
@@ -638,28 +628,20 @@ fn serve_connection<IO, S>(
638
628
{
639
629
tokio:: spawn ( async move {
640
630
{
641
- let mut sig = pin ! ( Fuse {
642
- inner: watcher. as_mut( ) . map( |w| w. changed( ) ) ,
643
- } ) ;
631
+ let conn = builder. serve_connection ( hyper_io, hyper_svc) ;
644
632
645
- let mut conn = pin ! ( builder. serve_connection( hyper_io, hyper_svc) ) ;
633
+ let result = if let Some ( graceful) = graceful {
634
+ let conn = graceful. watch ( conn) ;
635
+ conn. await
636
+ } else {
637
+ conn. await
638
+ } ;
646
639
647
- loop {
648
- tokio:: select! {
649
- rv = & mut conn => {
650
- if let Err ( err) = rv {
651
- debug!( "failed serving connection: {:#}" , err) ;
652
- }
653
- break ;
654
- } ,
655
- _ = & mut sig => {
656
- conn. as_mut( ) . graceful_shutdown( ) ;
657
- }
658
- }
640
+ if let Err ( err) = result {
641
+ debug ! ( "failed serving connection: {:#}" , err) ;
659
642
}
660
643
}
661
644
662
- drop ( watcher) ;
663
645
trace ! ( "connection closed" ) ;
664
646
} ) ;
665
647
}
0 commit comments