Skip to content

Commit

Permalink
socket sink: configure UDS mode via the mode option instead of unix_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jpovixwm committed Jan 3, 2025
1 parent b648cfe commit a0d1ca1
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:

- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)

This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets.
The `socket` sink now supports `unix_datagram` as a valid `mode`. This feature is unavailable on macOS.

authors: jpovixwm
38 changes: 27 additions & 11 deletions src/sinks/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,14 @@ pub enum Mode {
/// Send over UDP.
Udp(UdpMode),

/// Send over a Unix domain socket (UDS).
/// Send over a Unix domain socket (UDS), in stream mode.
#[cfg(unix)]
Unix(UnixMode),

/// Send over a Unix domain socket (UDS), in datagram mode. Unavailable on macOS.
#[cfg(unix)]
#[cfg_attr(target_os = "macos", serde(skip))]
UnixDatagram(UnixMode),
}

/// TCP configuration.
Expand Down Expand Up @@ -137,7 +142,22 @@ impl SinkConfig for SocketSinkConfig {
let transformer = encoding.transformer();
let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);
config.build(transformer, encoder)
config.build(
transformer,
encoder,
super::util::service::net::UnixMode::Stream,
)
}
#[cfg(unix)]
Mode::UnixDatagram(UnixMode { config, encoding }) => {
let transformer = encoding.transformer();
let (framer, serializer) = encoding.build(SinkType::StreamBased)?;
let encoder = Encoder::<Framer>::new(framer, serializer);
config.build(
transformer,
encoder,
super::util::service::net::UnixMode::Datagram,
)
}
}
}
Expand All @@ -148,6 +168,8 @@ impl SinkConfig for SocketSinkConfig {
Mode::Udp(UdpMode { encoding, .. }) => encoding.config().input_type(),
#[cfg(unix)]
Mode::Unix(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
#[cfg(unix)]
Mode::UnixDatagram(UnixMode { encoding, .. }) => encoding.config().1.input_type(),
};
Input::new(encoder_input_type)
}
Expand Down Expand Up @@ -224,11 +246,8 @@ mod test {
encoding: JsonSerializerConfig::default().into(),
}),
#[cfg(unix)]
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
path.to_path_buf(),
crate::sinks::util::service::net::UnixMode::Datagram,
),
DatagramSocketAddr::Unix(path) => Mode::UnixDatagram(UnixMode {
config: UnixSinkConfig::new(path.to_path_buf()),
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
}),
},
Expand Down Expand Up @@ -335,10 +354,7 @@ mod test {

let config = SocketSinkConfig {
mode: Mode::Unix(UnixMode {
config: UnixSinkConfig::new(
out_path,
crate::sinks::util::service::net::UnixMode::Stream,
),
config: UnixSinkConfig::new(out_path),
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
}),
acknowledgements: Default::default(),
Expand Down
44 changes: 20 additions & 24 deletions src/sinks/util/unix.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,11 @@ pub struct UnixSinkConfig {
/// This should be an absolute path.
#[configurable(metadata(docs::examples = "/path/to/socket"))]
pub path: PathBuf,

/// The Unix socket mode to use.
///
/// Unavailable on macOS, where the mode is always `Stream`.
#[cfg_attr(target_os = "macos", serde(skip))]
#[serde(default = "default_unix_mode")]
unix_mode: UnixMode,
}

const fn default_unix_mode() -> UnixMode {
UnixMode::Stream
}

impl UnixSinkConfig {
pub const fn new(path: PathBuf, unix_mode: UnixMode) -> Self {
Self { path, unix_mode }
pub const fn new(path: PathBuf) -> Self {
Self { path }
}

pub fn build(
Expand All @@ -91,8 +80,9 @@ impl UnixSinkConfig {
+ Send
+ Sync
+ 'static,
unix_mode: UnixMode,
) -> crate::Result<(VectorSink, Healthcheck)> {
let connector = UnixConnector::new(self.path.clone(), self.unix_mode);
let connector = UnixConnector::new(self.path.clone(), unix_mode);
let sink = UnixSink::new(connector.clone(), transformer, encoder);
Ok((
VectorSink::from_event_streamsink(sink),
Expand Down Expand Up @@ -332,20 +322,22 @@ mod tests {
async fn unix_sink_healthcheck() {
let good_path = temp_uds_path("valid_stream_uds");
let _listener = UnixListener::bind(&good_path).unwrap();
assert!(UnixSinkConfig::new(good_path.clone(), UnixMode::Stream)
assert!(UnixSinkConfig::new(good_path.clone())
.build(
Default::default(),
Encoder::<()>::new(TextSerializerConfig::default().build().into())
Encoder::<()>::new(TextSerializerConfig::default().build().into()),
UnixMode::Stream
)
.unwrap()
.1
.await
.is_ok());
assert!(
UnixSinkConfig::new(good_path.clone(), UnixMode::Datagram)
UnixSinkConfig::new(good_path.clone())
.build(
Default::default(),
Encoder::<()>::new(TextSerializerConfig::default().build().into())
Encoder::<()>::new(TextSerializerConfig::default().build().into()),
UnixMode::Datagram
)
.unwrap()
.1
Expand All @@ -355,19 +347,21 @@ mod tests {
);

let bad_path = temp_uds_path("no_one_listening");
assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Stream)
assert!(UnixSinkConfig::new(bad_path.clone())
.build(
Default::default(),
Encoder::<()>::new(TextSerializerConfig::default().build().into())
Encoder::<()>::new(TextSerializerConfig::default().build().into()),
UnixMode::Stream
)
.unwrap()
.1
.await
.is_err());
assert!(UnixSinkConfig::new(bad_path.clone(), UnixMode::Datagram)
assert!(UnixSinkConfig::new(bad_path.clone())
.build(
Default::default(),
Encoder::<()>::new(TextSerializerConfig::default().build().into())
Encoder::<()>::new(TextSerializerConfig::default().build().into()),
UnixMode::Datagram
)
.unwrap()
.1
Expand All @@ -384,14 +378,15 @@ mod tests {
let mut receiver = CountReceiver::receive_lines_unix(out_path.clone());

// Set up Sink
let config = UnixSinkConfig::new(out_path, UnixMode::Stream);
let config = UnixSinkConfig::new(out_path);
let (sink, _healthcheck) = config
.build(
Default::default(),
Encoder::<Framer>::new(
NewlineDelimitedEncoder::default().into(),
TextSerializerConfig::default().build().into(),
),
UnixMode::Stream,
)
.unwrap();

Expand Down Expand Up @@ -438,14 +433,15 @@ mod tests {
ready_rx.await.expect("failed to receive ready signal");

// Set up Sink
let config = UnixSinkConfig::new(out_path.clone(), UnixMode::Datagram);
let config = UnixSinkConfig::new(out_path.clone());
let (sink, _healthcheck) = config
.build(
Default::default(),
Encoder::<Framer>::new(
BytesEncoder.into(),
TextSerializerConfig::default().build().into(),
),
UnixMode::Datagram,
)
.unwrap();

Expand Down
27 changes: 6 additions & 21 deletions website/cue/reference/components/sinks/base/socket.cue
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ base: components: sinks: socket: configuration: {
}
framing: {
description: "Framing configuration."
relevant_when: "mode = \"tcp\" or mode = \"unix\""
relevant_when: "mode = \"tcp\" or mode = \"unix\" or mode = \"unix_datagram\""
required: false
type: object: options: {
character_delimited: {
Expand Down Expand Up @@ -453,9 +453,10 @@ base: components: sinks: socket: configuration: {
description: "The type of socket to use."
required: true
type: string: enum: {
tcp: "Send over TCP."
udp: "Send over UDP."
unix: "Send over a Unix domain socket (UDS)."
tcp: "Send over TCP."
udp: "Send over UDP."
unix: "Send over a Unix domain socket (UDS), in stream mode."
unix_datagram: "Send over a Unix domain socket (UDS), in datagram mode. Unavailable on macOS."
}
}
path: {
Expand All @@ -464,7 +465,7 @@ base: components: sinks: socket: configuration: {
This should be an absolute path.
"""
relevant_when: "mode = \"unix\""
relevant_when: "mode = \"unix\" or mode = \"unix_datagram\""
required: true
type: string: examples: ["/path/to/socket"]
}
Expand Down Expand Up @@ -588,20 +589,4 @@ base: components: sinks: socket: configuration: {
}
}
}
unix_mode: {
description: """
The Unix socket mode to use.
Unavailable on macOS, where the mode is always `Stream`.
"""
relevant_when: "mode = \"unix\""
required: false
type: string: {
default: "Stream"
enum: {
Datagram: "Datagram-oriented (`SOCK_DGRAM`)."
Stream: "Stream-oriented (`SOCK_STREAM`)."
}
}
}
}

0 comments on commit a0d1ca1

Please sign in to comment.