diff --git a/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md b/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md deleted file mode 100644 index 13e41be657c1d..0000000000000 --- a/changelog.d/5269_support_unix_datagram_mode_in_socket_sink.enhancement.md +++ /dev/null @@ -1,8 +0,0 @@ -The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values: - -- `Stream` (default) - Stream-oriented (`SOCK_STREAM`) -- `Datagram` - Datagram-oriented (`SOCK_DGRAM`) - -This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets. - -authors: jpovixwm diff --git a/src/internal_events/unix.rs b/src/internal_events/unix.rs index a45777ab8f2e1..febe72eaa0853 100644 --- a/src/internal_events/unix.rs +++ b/src/internal_events/unix.rs @@ -65,12 +65,14 @@ impl InternalEvent for UnixSocketError<'_, E> { } } +#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] #[derive(Debug)] pub struct UnixSocketSendError<'a, E> { pub(crate) error: &'a E, pub path: &'a std::path::Path, } +#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))] impl InternalEvent for UnixSocketSendError<'_, E> { fn emit(self) { let reason = "Unix socket send error."; diff --git a/src/sinks/socket.rs b/src/sinks/socket.rs index 4805f06af7261..64369e67b856e 100644 --- a/src/sinks/socket.rs +++ b/src/sinks/socket.rs @@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig { #[cfg(test)] mod test { + #[cfg(unix)] + use std::path::PathBuf; use std::{ future::ready, net::{SocketAddr, UdpSocket}, }; - #[cfg(unix)] - use std::{os::unix::net::UnixDatagram, path::PathBuf}; use futures::stream::StreamExt; use futures_util::stream; @@ -196,42 +196,14 @@ mod test { crate::test_util::test_generate_config::(); } - enum DatagramSocket { - Udp(UdpSocket), - #[cfg(unix)] - Unix(UnixDatagram), - } - - enum DatagramSocketAddr { - Udp(SocketAddr), - #[cfg(unix)] - Unix(PathBuf), - } - - async fn test_datagram(datagram_addr: DatagramSocketAddr) { - let receiver = match &datagram_addr { - DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()), - #[cfg(unix)] - DatagramSocketAddr::Unix(path) => { - DatagramSocket::Unix(UnixDatagram::bind(path).unwrap()) - } - }; + async fn test_udp(addr: SocketAddr) { + let receiver = UdpSocket::bind(addr).unwrap(); let config = SocketSinkConfig { - mode: match &datagram_addr { - DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode { - config: UdpSinkConfig::from_address(addr.to_string()), - encoding: JsonSerializerConfig::default().into(), - }), - #[cfg(unix)] - DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode { - config: UnixSinkConfig::new( - path.to_path_buf(), - crate::sinks::util::service::net::UnixMode::Datagram, - ), - encoding: (None::, JsonSerializerConfig::default()).into(), - }), - }, + mode: Mode::Udp(UdpMode { + config: UdpSinkConfig::from_address(addr.to_string()), + encoding: JsonSerializerConfig::default().into(), + }), acknowledgements: Default::default(), }; @@ -246,13 +218,9 @@ mod test { .expect("Running sink failed"); let mut buf = [0; 256]; - let size = match &receiver { - DatagramSocket::Udp(sock) => { - sock.recv_from(&mut buf).expect("Did not receive message").0 - } - #[cfg(unix)] - DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"), - }; + let (size, _src_addr) = receiver + .recv_from(&mut buf) + .expect("Did not receive message"); let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received"); let data = serde_json::from_str::(&packet).expect("Invalid JSON received"); @@ -266,25 +234,14 @@ mod test { async fn udp_ipv4() { trace_init(); - test_datagram(DatagramSocketAddr::Udp(next_addr())).await; + test_udp(next_addr()).await; } #[tokio::test] async fn udp_ipv6() { trace_init(); - test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await; - } - - #[cfg(unix)] - #[tokio::test] - async fn unix_datagram() { - trace_init(); - - test_datagram(DatagramSocketAddr::Unix(temp_uds_path( - "unix_datagram_socket_test", - ))) - .await; + test_udp(next_addr_v6()).await; } #[tokio::test] @@ -335,10 +292,7 @@ mod test { let config = SocketSinkConfig { mode: Mode::Unix(UnixMode { - config: UnixSinkConfig::new( - out_path, - crate::sinks::util::service::net::UnixMode::Stream, - ), + config: UnixSinkConfig::new(out_path), encoding: (None::, NativeJsonSerializerConfig).into(), }), acknowledgements: Default::default(), diff --git a/src/sinks/util/datagram.rs b/src/sinks/util/datagram.rs deleted file mode 100644 index c9baf175855e8..0000000000000 --- a/src/sinks/util/datagram.rs +++ /dev/null @@ -1,106 +0,0 @@ -#[cfg(unix)] -use std::path::PathBuf; - -use bytes::BytesMut; -use futures::{stream::BoxStream, StreamExt}; -use futures_util::stream::Peekable; -use tokio::net::UdpSocket; -#[cfg(unix)] -use tokio::net::UnixDatagram; -use tokio_util::codec::Encoder; -use vector_lib::internal_event::RegisterInternalEvent; -use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle}; -use vector_lib::EstimatedJsonEncodedSizeOf; - -use crate::{ - codecs::Transformer, - event::{Event, EventStatus, Finalizable}, - internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError}, -}; - -#[cfg(unix)] -use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError}; - -pub enum DatagramSocket { - Udp(UdpSocket), - #[cfg(unix)] - Unix(UnixDatagram, PathBuf), -} - -pub async fn send_datagrams>( - input: &mut Peekable>, - mut socket: DatagramSocket, - transformer: &Transformer, - encoder: &mut E, - bytes_sent: &::Handle, -) { - while let Some(mut event) = input.next().await { - let byte_size = event.estimated_json_encoded_size_of(); - - transformer.transform(&mut event); - - let finalizers = event.take_finalizers(); - let mut bytes = BytesMut::new(); - - // Errors are handled by `Encoder`. - if encoder.encode(event, &mut bytes).is_err() { - continue; - } - - match send_datagram(&mut socket, &bytes).await { - Ok(()) => { - emit!(SocketEventsSent { - mode: match socket { - DatagramSocket::Udp(_) => SocketMode::Udp, - #[cfg(unix)] - DatagramSocket::Unix(..) => SocketMode::Unix, - }, - count: 1, - byte_size, - }); - - bytes_sent.emit(ByteSize(bytes.len())); - finalizers.update_status(EventStatus::Delivered); - } - Err(error) => { - match socket { - DatagramSocket::Udp(_) => emit!(SocketSendError { - mode: SocketMode::Udp, - error - }), - #[cfg(unix)] - DatagramSocket::Unix(_, path) => { - emit!(UnixSocketSendError { - path: path.as_path(), - error: &error - }) - } - }; - finalizers.update_status(EventStatus::Errored); - return; - } - } - } -} - -async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> { - let sent = match socket { - DatagramSocket::Udp(udp) => udp.send(buf).await, - #[cfg(unix)] - DatagramSocket::Unix(uds, _) => uds.send(buf).await, - }?; - if sent != buf.len() { - match socket { - DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError { - data_size: buf.len(), - sent, - }), - #[cfg(unix)] - DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError { - data_size: buf.len(), - sent, - }), - } - } - Ok(()) -} diff --git a/src/sinks/util/mod.rs b/src/sinks/util/mod.rs index 990095ed3452f..c71b49163f0d2 100644 --- a/src/sinks/util/mod.rs +++ b/src/sinks/util/mod.rs @@ -6,7 +6,6 @@ pub mod batch; pub mod buffer; pub mod builder; pub mod compressor; -pub mod datagram; pub mod encoding; pub mod http; pub mod metadata; @@ -24,7 +23,7 @@ pub mod tcp; #[cfg(any(test, feature = "test-utils"))] pub mod test; pub mod udp; -#[cfg(unix)] +#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))] pub mod unix; pub mod uri; pub mod zstd; diff --git a/src/sinks/util/service/net/mod.rs b/src/sinks/util/service/net/mod.rs index aa5f6451a4a69..3ad532f747994 100644 --- a/src/sinks/util/service/net/mod.rs +++ b/src/sinks/util/service/net/mod.rs @@ -12,7 +12,7 @@ use std::{ }; #[cfg(unix)] -use {crate::sinks::util::unix::UnixEither, std::path::PathBuf}; +use std::path::PathBuf; use crate::{ internal_events::{ @@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode}; use self::tcp::TcpConnector; use self::udp::UdpConnector; #[cfg(unix)] -use self::unix::UnixConnector; +use self::unix::{UnixConnector, UnixEither}; use futures_util::{future::BoxFuture, FutureExt}; use snafu::{ResultExt, Snafu}; diff --git a/src/sinks/util/service/net/unix.rs b/src/sinks/util/service/net/unix.rs index 0cae976b04630..f0655015f06fe 100644 --- a/src/sinks/util/service/net/unix.rs +++ b/src/sinks/util/service/net/unix.rs @@ -1,11 +1,18 @@ -use std::path::{Path, PathBuf}; +use std::{ + io, + os::fd::{AsFd, BorrowedFd}, + path::{Path, PathBuf}, +}; use snafu::ResultExt; -use tokio::net::{UnixDatagram, UnixStream}; +use tokio::{ + io::AsyncWriteExt, + net::{UnixDatagram, UnixStream}, +}; use vector_lib::configurable::configurable_component; -use crate::{net, sinks::util::unix::UnixEither}; +use crate::net; use super::{net_error::*, ConnectorType, NetError, NetworkConnector}; @@ -67,6 +74,29 @@ impl UnixConnectorConfig { } } +pub(super) enum UnixEither { + Datagram(UnixDatagram), + Stream(UnixStream), +} + +impl UnixEither { + pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result { + match self { + Self::Datagram(datagram) => datagram.send(buf).await, + Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()), + } + } +} + +impl AsFd for UnixEither { + fn as_fd(&self) -> BorrowedFd<'_> { + match self { + Self::Datagram(datagram) => datagram.as_fd(), + Self::Stream(stream) => stream.as_fd(), + } + } +} + #[derive(Clone)] pub(super) struct UnixConnector { path: PathBuf, diff --git a/src/sinks/util/udp.rs b/src/sinks/util/udp.rs index e681e7fd0dc66..96b0c0393e37c 100644 --- a/src/sinks/util/udp.rs +++ b/src/sinks/util/udp.rs @@ -5,22 +5,24 @@ use std::{ }; use async_trait::async_trait; +use bytes::BytesMut; use futures::{stream::BoxStream, FutureExt, StreamExt}; use snafu::{ResultExt, Snafu}; use tokio::{net::UdpSocket, time::sleep}; use tokio_util::codec::Encoder; use vector_lib::configurable::configurable_component; -use vector_lib::internal_event::{BytesSent, Protocol, Registered}; +use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle, Protocol, Registered}; +use vector_lib::EstimatedJsonEncodedSizeOf; -use super::{ - datagram::{send_datagrams, DatagramSocket}, - SinkBuildError, -}; +use super::SinkBuildError; use crate::{ codecs::Transformer, dns, - event::Event, - internal_events::{UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError}, + event::{Event, EventStatus, Finalizable}, + internal_events::{ + SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError, + UdpSocketConnectionEstablished, UdpSocketOutgoingConnectionError, + }, net, sinks::{ util::{retries::ExponentialBackoff, StreamSink}, @@ -196,21 +198,58 @@ where let mut encoder = self.encoder.clone(); while Pin::new(&mut input).peek().await.is_some() { - let socket = self.connector.connect_backoff().await; - send_datagrams( - &mut input, - DatagramSocket::Udp(socket), - &self.transformer, - &mut encoder, - &self.bytes_sent, - ) - .await; + let mut socket = self.connector.connect_backoff().await; + while let Some(mut event) = input.next().await { + let byte_size = event.estimated_json_encoded_size_of(); + + self.transformer.transform(&mut event); + + let finalizers = event.take_finalizers(); + let mut bytes = BytesMut::new(); + + // Errors are handled by `Encoder`. + if encoder.encode(event, &mut bytes).is_err() { + continue; + } + + match udp_send(&mut socket, &bytes).await { + Ok(()) => { + emit!(SocketEventsSent { + mode: SocketMode::Udp, + count: 1, + byte_size, + }); + + self.bytes_sent.emit(ByteSize(bytes.len())); + finalizers.update_status(EventStatus::Delivered); + } + Err(error) => { + emit!(SocketSendError { + mode: SocketMode::Udp, + error + }); + finalizers.update_status(EventStatus::Errored); + break; + } + } + } } Ok(()) } } +async fn udp_send(socket: &mut UdpSocket, buf: &[u8]) -> tokio::io::Result<()> { + let sent = socket.send(buf).await?; + if sent != buf.len() { + emit!(UdpSendIncompleteError { + data_size: buf.len(), + sent, + }); + } + Ok(()) +} + pub(super) const fn find_bind_address(remote_addr: &SocketAddr) -> SocketAddr { match remote_addr { SocketAddr::V4(_) => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), diff --git a/src/sinks/util/unix.rs b/src/sinks/util/unix.rs index 9afde6d053ad3..b12434c347cd6 100644 --- a/src/sinks/util/unix.rs +++ b/src/sinks/util/unix.rs @@ -1,26 +1,13 @@ -use std::{ - io, - os::fd::{AsFd, BorrowedFd}, - path::PathBuf, - pin::Pin, - time::Duration, -}; +use std::{path::PathBuf, pin::Pin, time::Duration}; use async_trait::async_trait; use bytes::{Bytes, BytesMut}; use futures::{stream::BoxStream, SinkExt, StreamExt}; use snafu::{ResultExt, Snafu}; -use tokio::{ - io::AsyncWriteExt, - net::{UnixDatagram, UnixStream}, - time::sleep, -}; +use tokio::{net::UnixStream, time::sleep}; use tokio_util::codec::Encoder; +use vector_lib::configurable::configurable_component; use vector_lib::json_size::JsonSize; -use vector_lib::{ - configurable::configurable_component, - internal_event::{BytesSent, Protocol}, -}; use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; use crate::{ @@ -34,7 +21,6 @@ use crate::{ sinks::{ util::{ retries::ExponentialBackoff, - service::net::UnixMode, socket_bytes_sink::{BytesSink, ShutdownCheck}, EncodedEvent, StreamSink, }, @@ -42,8 +28,6 @@ use crate::{ }, }; -use super::datagram::{send_datagrams, DatagramSocket}; - #[derive(Debug, Snafu)] pub enum UnixError { #[snafu(display("Failed connecting to socket at path {}: {}", path.display(), source))] @@ -51,9 +35,6 @@ pub enum UnixError { source: tokio::io::Error, path: PathBuf, }, - - #[snafu(display("Failed to bind socket: {}.", source))] - FailedToBind { source: std::io::Error }, } /// A Unix Domain Socket sink. @@ -65,22 +46,11 @@ pub struct UnixSinkConfig { /// This should be an absolute path. #[configurable(metadata(docs::examples = "/path/to/socket"))] pub path: PathBuf, - - /// The Unix socket mode to use. - /// - /// Unavailable on macOS, where the mode is always `Stream`. - #[cfg_attr(target_os = "macos", serde(skip))] - #[serde(default = "default_unix_mode")] - unix_mode: UnixMode, -} - -const fn default_unix_mode() -> UnixMode { - UnixMode::Stream } impl UnixSinkConfig { - pub const fn new(path: PathBuf, unix_mode: UnixMode) -> Self { - Self { path, unix_mode } + pub const fn new(path: PathBuf) -> Self { + Self { path } } pub fn build( @@ -92,7 +62,7 @@ impl UnixSinkConfig { + Sync + 'static, ) -> crate::Result<(VectorSink, Healthcheck)> { - let connector = UnixConnector::new(self.path.clone(), self.unix_mode); + let connector = UnixConnector::new(self.path.clone()); let sink = UnixSink::new(connector.clone(), transformer, encoder); Ok(( VectorSink::from_event_streamsink(sink), @@ -101,38 +71,14 @@ impl UnixSinkConfig { } } -pub enum UnixEither { - Datagram(UnixDatagram), - Stream(UnixStream), -} - -impl UnixEither { - pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result { - match self { - Self::Datagram(datagram) => datagram.send(buf).await, - Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()), - } - } -} - -impl AsFd for UnixEither { - fn as_fd(&self) -> BorrowedFd<'_> { - match self { - Self::Datagram(datagram) => datagram.as_fd(), - Self::Stream(stream) => stream.as_fd(), - } - } -} - #[derive(Debug, Clone)] struct UnixConnector { pub path: PathBuf, - mode: UnixMode, } impl UnixConnector { - const fn new(path: PathBuf, mode: UnixMode) -> Self { - Self { path, mode } + const fn new(path: PathBuf) -> Self { + Self { path } } const fn fresh_backoff() -> ExponentialBackoff { @@ -142,30 +88,15 @@ impl UnixConnector { .max_delay(Duration::from_secs(60)) } - async fn connect(&self) -> Result { - match self.mode { - UnixMode::Stream => UnixStream::connect(&self.path) - .await - .context(ConnectionSnafu { - path: self.path.clone(), - }) - .map(UnixEither::Stream), - UnixMode::Datagram => { - UnixDatagram::unbound() - .context(FailedToBindSnafu) - .and_then(|datagram| { - datagram - .connect(&self.path) - .context(ConnectionSnafu { - path: self.path.clone(), - }) - .map(|_| UnixEither::Datagram(datagram)) - }) - } - } + async fn connect(&self) -> Result { + UnixStream::connect(&self.path) + .await + .context(ConnectionSnafu { + path: self.path.clone(), + }) } - async fn connect_backoff(&self) -> UnixEither { + async fn connect_backoff(&self) -> UnixStream { let mut backoff = Self::fresh_backoff(); loop { match self.connect().await { @@ -208,22 +139,18 @@ where } async fn connect(&mut self) -> BytesSink { - let stream = match self.connector.connect_backoff().await { - UnixEither::Stream(stream) => stream, - UnixEither::Datagram(_) => unreachable!("connect is only called with Stream mode"), - }; + let stream = self.connector.connect_backoff().await; BytesSink::new(stream, |_| ShutdownCheck::Alive, SocketMode::Unix) } +} - async fn run_internal(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - match self.connector.mode { - UnixMode::Stream => self.run_stream(input).await, - UnixMode::Datagram => self.run_datagram(input).await, - } - } - +#[async_trait] +impl StreamSink for UnixSink +where + E: Encoder + Clone + Send + Sync, +{ // Same as TcpSink, more details there. - async fn run_stream(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { let mut encoder = self.encoder.clone(); let transformer = self.transformer.clone(); let mut input = input @@ -270,50 +197,12 @@ where Ok(()) } - - async fn run_datagram(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - let bytes_sent = register!(BytesSent::from(Protocol::UNIX)); - let mut input = input.peekable(); - - let mut encoder = self.encoder.clone(); - while Pin::new(&mut input).peek().await.is_some() { - let socket = match self.connector.connect_backoff().await { - UnixEither::Datagram(datagram) => datagram, - UnixEither::Stream(_) => { - unreachable!("run_datagram is only called with Datagram mode") - } - }; - - send_datagrams( - &mut input, - DatagramSocket::Unix(socket, self.connector.path.clone()), - &self.transformer, - &mut encoder, - &bytes_sent, - ) - .await; - } - - Ok(()) - } -} - -#[async_trait] -impl StreamSink for UnixSink -where - E: Encoder + Clone + Send + Sync, -{ - async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - self.run_internal(input).await - } } #[cfg(test)] mod tests { use tokio::net::UnixListener; - use vector_lib::codecs::{ - encoding::Framer, BytesEncoder, NewlineDelimitedEncoder, TextSerializerConfig, - }; + use vector_lib::codecs::{encoding::Framer, NewlineDelimitedEncoder, TextSerializerConfig}; use super::*; use crate::{ @@ -330,9 +219,9 @@ mod tests { #[tokio::test] async fn unix_sink_healthcheck() { - let good_path = temp_uds_path("valid_stream_uds"); + let good_path = temp_uds_path("valid_uds"); let _listener = UnixListener::bind(&good_path).unwrap(); - assert!(UnixSinkConfig::new(good_path.clone(), UnixMode::Stream) + assert!(UnixSinkConfig::new(good_path) .build( Default::default(), Encoder::<()>::new(TextSerializerConfig::default().build().into()) @@ -341,30 +230,9 @@ mod tests { .1 .await .is_ok()); - assert!( - UnixSinkConfig::new(good_path.clone(), UnixMode::Datagram) - .build( - Default::default(), - Encoder::<()>::new(TextSerializerConfig::default().build().into()) - ) - .unwrap() - .1 - .await - .is_err(), - "datagram mode should fail when attempting to send into a stream mode UDS" - ); let bad_path = temp_uds_path("no_one_listening"); - assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Stream) - .build( - Default::default(), - Encoder::<()>::new(TextSerializerConfig::default().build().into()) - ) - .unwrap() - .1 - .await - .is_err()); - assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Datagram) + assert!(UnixSinkConfig::new(bad_path) .build( Default::default(), Encoder::<()>::new(TextSerializerConfig::default().build().into()) @@ -384,7 +252,7 @@ mod tests { let mut receiver = CountReceiver::receive_lines_unix(out_path.clone()); // Set up Sink - let config = UnixSinkConfig::new(out_path, UnixMode::Stream); + let config = UnixSinkConfig::new(out_path); let (sink, _healthcheck) = config .build( Default::default(), @@ -408,57 +276,4 @@ mod tests { // Receive the data sent by the Sink to the receiver assert_eq!(input_lines, receiver.await); } - - #[cfg_attr(target_os = "macos", ignore)] - #[tokio::test] - async fn basic_unix_datagram_sink() { - let num_lines = 1000; - let out_path = temp_uds_path("unix_datagram_test"); - - // Set up listener to receive events from the Sink. - let receiver = std::os::unix::net::UnixDatagram::bind(out_path.clone()).unwrap(); - let (ready_tx, ready_rx) = tokio::sync::oneshot::channel(); - - // Listen in the background to avoid blocking - let handle = tokio::task::spawn_blocking(move || { - let mut output_lines = Vec::::with_capacity(num_lines); - - ready_tx.send(()).expect("failed to signal readiness"); - for _ in 0..num_lines { - let mut buf = [0; 101]; - let (size, _) = receiver - .recv_from(&mut buf) - .expect("Did not receive message"); - let line = String::from_utf8_lossy(&buf[..size]).to_string(); - output_lines.push(line); - } - - output_lines - }); - ready_rx.await.expect("failed to receive ready signal"); - - // Set up Sink - let config = UnixSinkConfig::new(out_path.clone(), UnixMode::Datagram); - let (sink, _healthcheck) = config - .build( - Default::default(), - Encoder::::new( - BytesEncoder.into(), - TextSerializerConfig::default().build().into(), - ), - ) - .unwrap(); - - // Send the test data - let (input_lines, events) = random_lines_with_stream(100, num_lines, None); - - assert_sink_compliance(&SINK_TAGS, async move { sink.run(events).await }) - .await - .expect("Running sink failed"); - - // Receive the data sent by the Sink to the receiver - let output_lines = handle.await.expect("UDS Datagram receiver failed"); - - assert_eq!(input_lines, output_lines); - } } diff --git a/website/cue/reference/components/sinks/base/socket.cue b/website/cue/reference/components/sinks/base/socket.cue index 55fa8321652c3..df3f0c6aae57c 100644 --- a/website/cue/reference/components/sinks/base/socket.cue +++ b/website/cue/reference/components/sinks/base/socket.cue @@ -588,20 +588,4 @@ base: components: sinks: socket: configuration: { } } } - unix_mode: { - description: """ - The Unix socket mode to use. - - Unavailable on macOS, where the mode is always `Stream`. - """ - relevant_when: "mode = \"unix\"" - required: false - type: string: { - default: "Stream" - enum: { - Datagram: "Datagram-oriented (`SOCK_DGRAM`)." - Stream: "Stream-oriented (`SOCK_STREAM`)." - } - } - } }