diff --git a/ipa-core/src/net/client/dns.rs b/ipa-core/src/net/client/dns.rs new file mode 100644 index 000000000..480f940ab --- /dev/null +++ b/ipa-core/src/net/client/dns.rs @@ -0,0 +1,181 @@ +use std::{ + future::Future, + io, iter, mem, + net::SocketAddr, + pin::Pin, + str::FromStr, + task::{Context, Poll}, +}; + +use dashmap::DashMap; +use hyper_util::client::legacy::connect::dns::{GaiFuture, GaiResolver, Name}; +use pin_project::pin_project; +use tower::Service; + +use crate::sync::Arc; + +/// Wrapper around Hyper's [`GaiResolver`] to cache the DNS response. +/// The nature of IPA dictates clients talking to a very limited set of services. +/// Assuming there are M shards in the system: +/// - Report collector talks to each shard on every helper (3*M) +/// - Shard talks to each other shard and to the shard with the same index on two other helpers (M+2) +/// So the memory usage is proportional to the number of shards. We need to cache the response, +/// because there is no other caching layer in the system: +/// - Hyper uses [`GaiResolver`] that is basically just a call to libc `getaddrinfo` +/// - Linux by default does not have any OS-level DNS caching enabled. There is [`nscd`], but +/// it is disabled by default and is claimed to be broken on some distributions [`issue`]. +/// +/// Given these constraints, it is probably much simpler to cache the DNS response in application +/// layer. With the model where each instance just runs a single query it should simplify things +/// by a lot. +/// +/// This struct does exactly that. +/// +/// [`nscd`]: https://linux.die.net/man/8/nscd +/// [`issue`]: https://bugs.debian.org/cgi-bin/bugreport.cgi?bug=335476 +#[derive(Clone)] +pub(super) struct CachingGaiResolver { + cache: Arc>>, + resolver: GaiResolver, +} + +impl CachingGaiResolver { + pub fn new() -> Self { + Self::seeded(iter::empty()) + } + + pub fn seeded<'a, I: IntoIterator>(items: I) -> Self { + let cache = DashMap::default(); + for (name, addr) in items { + cache.insert( + Name::from_str(name) + .unwrap_or_else(|_| panic!("{name} is not a valid domain name")), + vec![addr.parse().unwrap()], + ); + } + Self { + cache: Arc::new(cache), + resolver: GaiResolver::new(), + } + } +} + +#[derive(Default)] +pub struct IpAddresses { + iter: std::vec::IntoIter, +} + +impl Iterator for IpAddresses { + type Item = SocketAddr; + + fn next(&mut self) -> Option { + self.iter.next() + } +} + +#[pin_project(project = ResolvingFutureEnumProj)] +pub enum ResolvingFuture { + Ready(IpAddresses), + Pending(#[pin] GaiFuture, Name, Arc>>), +} + +impl Future for ResolvingFuture { + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.project() { + ResolvingFutureEnumProj::Ready(addr) => Poll::Ready(Ok(mem::take(addr))), + ResolvingFutureEnumProj::Pending(fut, name, cache) => { + let res = match fut.poll(cx) { + Poll::Ready(Ok(addr)) => addr, + Poll::Ready(Err(err)) => return Poll::Ready(Err(err)), + Poll::Pending => return Poll::Pending, + }; + let addrs = res.collect::>(); + // This should probably be a trace span, once we have full confidence + // that this module works fine + tracing::info!("caching IP addresses for {name}: {addrs:?}"); + assert!( + cache.insert(name.clone(), addrs.clone()).is_none(), + "{name} is in the cache already" + ); + Poll::Ready(Ok(IpAddresses { + iter: addrs.into_iter(), + })) + } + } + } +} + +impl Service for CachingGaiResolver { + type Response = IpAddresses; + type Error = io::Error; + type Future = ResolvingFuture; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Name) -> Self::Future { + if let Some(addr) = self.cache.get(&req) { + ResolvingFuture::Ready(IpAddresses { + iter: addr.clone().into_iter(), + }) + } else { + let fut = self.resolver.call(req.clone()); + ResolvingFuture::Pending(fut, req, Arc::clone(&self.cache)) + } + } +} + +#[cfg(all(test, unit_test))] +mod tests { + use std::str::FromStr; + + use hyper_util::client::legacy::connect::dns::Name; + use tower::Service; + + use crate::{ + net::client::dns::{IpAddresses, ResolvingFuture}, + test_executor::run, + }; + + #[test] + fn cache_data() { + let name = Name::from_str("www.notadomain.com").unwrap(); + let mut resolver = + super::CachingGaiResolver::seeded([(name.as_str(), "172.20.0.254:8000")]); + let fut = resolver.call(name); + let res = futures::executor::block_on(fut) + .unwrap() + .map(|v| v.to_string()) + .collect::>(); + assert_eq!(vec!["172.20.0.254:8000"], res); + } + + #[test] + fn calls_real_resolver() { + fn assert_localhost_present(input: IpAddresses) { + let input = input.into_iter().map(|v| v.to_string()).collect::>(); + assert!( + input.contains(&"127.0.0.1:0".to_string()), + "{input:?} does not include localhost" + ); + } + + run(|| async move { + let name = Name::from_str("localhost").unwrap(); + let mut resolver = super::CachingGaiResolver::new(); + let res = resolver.call(name.clone()).await.unwrap(); + assert_localhost_present(res); + + // call again and make sure it is ready + let fut = resolver.call(name.clone()); + if let ResolvingFuture::Ready(ip_addrs) = fut { + assert_localhost_present(ip_addrs); + } else { + panic!("{name} hasn't been cached"); + } + }); + } +} diff --git a/ipa-core/src/net/client/mod.rs b/ipa-core/src/net/client/mod.rs index d4789b198..1dff106ee 100644 --- a/ipa-core/src/net/client/mod.rs +++ b/ipa-core/src/net/client/mod.rs @@ -1,3 +1,5 @@ +mod dns; + use std::{ collections::HashMap, future::Future, @@ -36,10 +38,15 @@ use crate::{ query::{CompareStatusRequest, PrepareQuery, QueryConfig, QueryInput}, TransportIdentity, }, - net::{error::ShardQueryStatusMismatchError, http_serde, Error, CRYPTO_PROVIDER}, + net::{ + client::dns::CachingGaiResolver, error::ShardQueryStatusMismatchError, http_serde, Error, + CRYPTO_PROVIDER, + }, protocol::{Gate, QueryId}, }; +type DnsResolver = CachingGaiResolver; + #[derive(Default)] pub enum ClientIdentity { /// Claim the specified helper identity without any additional authentication. @@ -179,7 +186,7 @@ async fn response_to_bytes(resp: ResponseFromEndpoint) -> Result { /// client can be configured to talk to all three helpers. #[derive(Debug, Clone)] pub struct IpaHttpClient { - client: Client, Body>, + client: Client>, Body>, scheme: uri::Scheme, authority: uri::Authority, auth_header: Option<(HeaderName, HeaderValue)>, @@ -277,7 +284,7 @@ impl IpaHttpClient { fn new_internal( runtime: IpaRuntime, addr: Uri, - connector: HttpsConnector, + connector: HttpsConnector>, auth_header: Option<(HeaderName, HeaderValue)>, conf: &C, ) -> Self { @@ -536,8 +543,8 @@ impl IpaHttpClient { } } -fn make_http_connector() -> HttpConnector { - let mut connector = HttpConnector::new(); +fn make_http_connector() -> HttpConnector { + let mut connector = HttpConnector::new_with_resolver(DnsResolver::new()); // IPA uses HTTP2 and it is sensitive to those delays especially in high-latency network // configurations. connector.set_nodelay(true);