Skip to content

Commit 7c6d0c9

Browse files
jpovixwmpront
andauthored
enhancement(socket sink): support unix datagram mode (vectordotdev#21762)
* enhancement(socket sink): support unix datagram mode * 5269_support_unix_datagram_mode_in_socket_sink.enhancement.md: fix linter error * sinks/util/{udp,unix}.rs: abstract out common logic into sinks/util/datagram.rs * sinks/util/service/net/unix: use sinks/util/unix/UnixEither and move impls there * remove problematic feature gates for 'sinks-socket' and 'sinks-statsd' * use std type and spawn blocking * basic_unix_datagram_sink: attempt to reduce flakiness * socket sink: ignore unix_mode on macOS --------- Co-authored-by: Pavlos Rontidis <pavlos.rontidis@gmail.com>
1 parent 1275f1a commit 7c6d0c9

File tree

10 files changed

+426
-135
lines changed

10 files changed

+426
-135
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
The `socket` sink now supports the `unix_mode` configuration option that specifies the Unix socket mode to use. Valid values:
2+
3+
- `Stream` (default) - Stream-oriented (`SOCK_STREAM`)
4+
- `Datagram` - Datagram-oriented (`SOCK_DGRAM`)
5+
6+
This option only applies when `mode = "unix"`, and is unavailable on macOS, where `SOCK_STREAM` is always used for Unix sockets.
7+
8+
authors: jpovixwm

src/internal_events/unix.rs

-2
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,12 @@ impl<E: std::fmt::Display> InternalEvent for UnixSocketError<'_, E> {
6565
}
6666
}
6767

68-
#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
6968
#[derive(Debug)]
7069
pub struct UnixSocketSendError<'a, E> {
7170
pub(crate) error: &'a E,
7271
pub path: &'a std::path::Path,
7372
}
7473

75-
#[cfg(all(unix, any(feature = "sinks-socket", feature = "sinks-statsd")))]
7674
impl<E: std::fmt::Display> InternalEvent for UnixSocketSendError<'_, E> {
7775
fn emit(self) {
7876
let reason = "Unix socket send error.";

src/sinks/socket.rs

+60-14
Original file line numberDiff line numberDiff line change
@@ -159,12 +159,12 @@ impl SinkConfig for SocketSinkConfig {
159159

160160
#[cfg(test)]
161161
mod test {
162-
#[cfg(unix)]
163-
use std::path::PathBuf;
164162
use std::{
165163
future::ready,
166164
net::{SocketAddr, UdpSocket},
167165
};
166+
#[cfg(unix)]
167+
use std::{os::unix::net::UnixDatagram, path::PathBuf};
168168

169169
use futures::stream::StreamExt;
170170
use futures_util::stream;
@@ -196,14 +196,42 @@ mod test {
196196
crate::test_util::test_generate_config::<SocketSinkConfig>();
197197
}
198198

199-
async fn test_udp(addr: SocketAddr) {
200-
let receiver = UdpSocket::bind(addr).unwrap();
199+
enum DatagramSocket {
200+
Udp(UdpSocket),
201+
#[cfg(unix)]
202+
Unix(UnixDatagram),
203+
}
204+
205+
enum DatagramSocketAddr {
206+
Udp(SocketAddr),
207+
#[cfg(unix)]
208+
Unix(PathBuf),
209+
}
210+
211+
async fn test_datagram(datagram_addr: DatagramSocketAddr) {
212+
let receiver = match &datagram_addr {
213+
DatagramSocketAddr::Udp(addr) => DatagramSocket::Udp(UdpSocket::bind(addr).unwrap()),
214+
#[cfg(unix)]
215+
DatagramSocketAddr::Unix(path) => {
216+
DatagramSocket::Unix(UnixDatagram::bind(path).unwrap())
217+
}
218+
};
201219

202220
let config = SocketSinkConfig {
203-
mode: Mode::Udp(UdpMode {
204-
config: UdpSinkConfig::from_address(addr.to_string()),
205-
encoding: JsonSerializerConfig::default().into(),
206-
}),
221+
mode: match &datagram_addr {
222+
DatagramSocketAddr::Udp(addr) => Mode::Udp(UdpMode {
223+
config: UdpSinkConfig::from_address(addr.to_string()),
224+
encoding: JsonSerializerConfig::default().into(),
225+
}),
226+
#[cfg(unix)]
227+
DatagramSocketAddr::Unix(path) => Mode::Unix(UnixMode {
228+
config: UnixSinkConfig::new(
229+
path.to_path_buf(),
230+
crate::sinks::util::service::net::UnixMode::Datagram,
231+
),
232+
encoding: (None::<FramingConfig>, JsonSerializerConfig::default()).into(),
233+
}),
234+
},
207235
acknowledgements: Default::default(),
208236
};
209237

@@ -218,9 +246,13 @@ mod test {
218246
.expect("Running sink failed");
219247

220248
let mut buf = [0; 256];
221-
let (size, _src_addr) = receiver
222-
.recv_from(&mut buf)
223-
.expect("Did not receive message");
249+
let size = match &receiver {
250+
DatagramSocket::Udp(sock) => {
251+
sock.recv_from(&mut buf).expect("Did not receive message").0
252+
}
253+
#[cfg(unix)]
254+
DatagramSocket::Unix(sock) => sock.recv(&mut buf).expect("Did not receive message"),
255+
};
224256

225257
let packet = String::from_utf8(buf[..size].to_vec()).expect("Invalid data received");
226258
let data = serde_json::from_str::<Value>(&packet).expect("Invalid JSON received");
@@ -234,14 +266,25 @@ mod test {
234266
async fn udp_ipv4() {
235267
trace_init();
236268

237-
test_udp(next_addr()).await;
269+
test_datagram(DatagramSocketAddr::Udp(next_addr())).await;
238270
}
239271

240272
#[tokio::test]
241273
async fn udp_ipv6() {
242274
trace_init();
243275

244-
test_udp(next_addr_v6()).await;
276+
test_datagram(DatagramSocketAddr::Udp(next_addr_v6())).await;
277+
}
278+
279+
#[cfg(unix)]
280+
#[tokio::test]
281+
async fn unix_datagram() {
282+
trace_init();
283+
284+
test_datagram(DatagramSocketAddr::Unix(temp_uds_path(
285+
"unix_datagram_socket_test",
286+
)))
287+
.await;
245288
}
246289

247290
#[tokio::test]
@@ -292,7 +335,10 @@ mod test {
292335

293336
let config = SocketSinkConfig {
294337
mode: Mode::Unix(UnixMode {
295-
config: UnixSinkConfig::new(out_path),
338+
config: UnixSinkConfig::new(
339+
out_path,
340+
crate::sinks::util::service::net::UnixMode::Stream,
341+
),
296342
encoding: (None::<FramingConfig>, NativeJsonSerializerConfig).into(),
297343
}),
298344
acknowledgements: Default::default(),

src/sinks/util/datagram.rs

+106
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#[cfg(unix)]
2+
use std::path::PathBuf;
3+
4+
use bytes::BytesMut;
5+
use futures::{stream::BoxStream, StreamExt};
6+
use futures_util::stream::Peekable;
7+
use tokio::net::UdpSocket;
8+
#[cfg(unix)]
9+
use tokio::net::UnixDatagram;
10+
use tokio_util::codec::Encoder;
11+
use vector_lib::internal_event::RegisterInternalEvent;
12+
use vector_lib::internal_event::{ByteSize, BytesSent, InternalEventHandle};
13+
use vector_lib::EstimatedJsonEncodedSizeOf;
14+
15+
use crate::{
16+
codecs::Transformer,
17+
event::{Event, EventStatus, Finalizable},
18+
internal_events::{SocketEventsSent, SocketMode, SocketSendError, UdpSendIncompleteError},
19+
};
20+
21+
#[cfg(unix)]
22+
use crate::internal_events::{UnixSendIncompleteError, UnixSocketSendError};
23+
24+
pub enum DatagramSocket {
25+
Udp(UdpSocket),
26+
#[cfg(unix)]
27+
Unix(UnixDatagram, PathBuf),
28+
}
29+
30+
pub async fn send_datagrams<E: Encoder<Event, Error = vector_lib::codecs::encoding::Error>>(
31+
input: &mut Peekable<BoxStream<'_, Event>>,
32+
mut socket: DatagramSocket,
33+
transformer: &Transformer,
34+
encoder: &mut E,
35+
bytes_sent: &<BytesSent as RegisterInternalEvent>::Handle,
36+
) {
37+
while let Some(mut event) = input.next().await {
38+
let byte_size = event.estimated_json_encoded_size_of();
39+
40+
transformer.transform(&mut event);
41+
42+
let finalizers = event.take_finalizers();
43+
let mut bytes = BytesMut::new();
44+
45+
// Errors are handled by `Encoder`.
46+
if encoder.encode(event, &mut bytes).is_err() {
47+
continue;
48+
}
49+
50+
match send_datagram(&mut socket, &bytes).await {
51+
Ok(()) => {
52+
emit!(SocketEventsSent {
53+
mode: match socket {
54+
DatagramSocket::Udp(_) => SocketMode::Udp,
55+
#[cfg(unix)]
56+
DatagramSocket::Unix(..) => SocketMode::Unix,
57+
},
58+
count: 1,
59+
byte_size,
60+
});
61+
62+
bytes_sent.emit(ByteSize(bytes.len()));
63+
finalizers.update_status(EventStatus::Delivered);
64+
}
65+
Err(error) => {
66+
match socket {
67+
DatagramSocket::Udp(_) => emit!(SocketSendError {
68+
mode: SocketMode::Udp,
69+
error
70+
}),
71+
#[cfg(unix)]
72+
DatagramSocket::Unix(_, path) => {
73+
emit!(UnixSocketSendError {
74+
path: path.as_path(),
75+
error: &error
76+
})
77+
}
78+
};
79+
finalizers.update_status(EventStatus::Errored);
80+
return;
81+
}
82+
}
83+
}
84+
}
85+
86+
async fn send_datagram(socket: &mut DatagramSocket, buf: &[u8]) -> tokio::io::Result<()> {
87+
let sent = match socket {
88+
DatagramSocket::Udp(udp) => udp.send(buf).await,
89+
#[cfg(unix)]
90+
DatagramSocket::Unix(uds, _) => uds.send(buf).await,
91+
}?;
92+
if sent != buf.len() {
93+
match socket {
94+
DatagramSocket::Udp(_) => emit!(UdpSendIncompleteError {
95+
data_size: buf.len(),
96+
sent,
97+
}),
98+
#[cfg(unix)]
99+
DatagramSocket::Unix(..) => emit!(UnixSendIncompleteError {
100+
data_size: buf.len(),
101+
sent,
102+
}),
103+
}
104+
}
105+
Ok(())
106+
}

src/sinks/util/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub mod batch;
66
pub mod buffer;
77
pub mod builder;
88
pub mod compressor;
9+
pub mod datagram;
910
pub mod encoding;
1011
pub mod http;
1112
pub mod metadata;
@@ -23,7 +24,7 @@ pub mod tcp;
2324
#[cfg(any(test, feature = "test-utils"))]
2425
pub mod test;
2526
pub mod udp;
26-
#[cfg(all(any(feature = "sinks-socket", feature = "sinks-statsd"), unix))]
27+
#[cfg(unix)]
2728
pub mod unix;
2829
pub mod uri;
2930
pub mod zstd;

src/sinks/util/service/net/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
};
1313

1414
#[cfg(unix)]
15-
use std::path::PathBuf;
15+
use {crate::sinks::util::unix::UnixEither, std::path::PathBuf};
1616

1717
use crate::{
1818
internal_events::{
@@ -33,7 +33,7 @@ pub use self::unix::{UnixConnectorConfig, UnixMode};
3333
use self::tcp::TcpConnector;
3434
use self::udp::UdpConnector;
3535
#[cfg(unix)]
36-
use self::unix::{UnixConnector, UnixEither};
36+
use self::unix::UnixConnector;
3737

3838
use futures_util::{future::BoxFuture, FutureExt};
3939
use snafu::{ResultExt, Snafu};

src/sinks/util/service/net/unix.rs

+3-33
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,11 @@
1-
use std::{
2-
io,
3-
os::fd::{AsFd, BorrowedFd},
4-
path::{Path, PathBuf},
5-
};
1+
use std::path::{Path, PathBuf};
62

73
use snafu::ResultExt;
8-
use tokio::{
9-
io::AsyncWriteExt,
10-
net::{UnixDatagram, UnixStream},
11-
};
4+
use tokio::net::{UnixDatagram, UnixStream};
125

136
use vector_lib::configurable::configurable_component;
147

15-
use crate::net;
8+
use crate::{net, sinks::util::unix::UnixEither};
169

1710
use super::{net_error::*, ConnectorType, NetError, NetworkConnector};
1811

@@ -74,29 +67,6 @@ impl UnixConnectorConfig {
7467
}
7568
}
7669

77-
pub(super) enum UnixEither {
78-
Datagram(UnixDatagram),
79-
Stream(UnixStream),
80-
}
81-
82-
impl UnixEither {
83-
pub(super) async fn send(&mut self, buf: &[u8]) -> io::Result<usize> {
84-
match self {
85-
Self::Datagram(datagram) => datagram.send(buf).await,
86-
Self::Stream(stream) => stream.write_all(buf).await.map(|_| buf.len()),
87-
}
88-
}
89-
}
90-
91-
impl AsFd for UnixEither {
92-
fn as_fd(&self) -> BorrowedFd<'_> {
93-
match self {
94-
Self::Datagram(datagram) => datagram.as_fd(),
95-
Self::Stream(stream) => stream.as_fd(),
96-
}
97-
}
98-
}
99-
10070
#[derive(Clone)]
10171
pub(super) struct UnixConnector {
10272
path: PathBuf,

0 commit comments

Comments
 (0)