Skip to content

Commit 029a2ff

Browse files
authored
feat(aws_s3 sink): add option to use virtual addressing (vectordotdev#21999)
* feat(aws_s3 sink): add option to use virtual addressing Signed-off-by: Scott Miller <smiller1@coreweave.com> * fix(aws_s3 sink): move force_path_style to S3ClientBuilder Signed-off-by: Scott Miller <smiller1@coreweave.com> * fix(aws_s3 sink): simplify S3ClientBuilder logic --------- Signed-off-by: Scott Miller <smiller1@coreweave.com>
1 parent 7c6d0c9 commit 029a2ff

File tree

22 files changed

+112
-25
lines changed

22 files changed

+112
-25
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Adds a `force_path_style` option to the `aws_s3` sink that allows users to configure virtual host style addressing. The value defaults to `true` to maintain existing behavior.
2+
3+
authors: sam6258

src/aws/mod.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ pub trait ClientBuilder {
123123
type Client;
124124

125125
/// Build the client using the given config settings.
126-
fn build(config: &SdkConfig) -> Self::Client;
126+
fn build(&self, config: &SdkConfig) -> Self::Client;
127127
}
128128

129129
fn region_provider(
@@ -161,28 +161,36 @@ async fn resolve_region(
161161
}
162162

163163
/// Create the SDK client using the provided settings.
164-
pub async fn create_client<T: ClientBuilder>(
164+
pub async fn create_client<T>(
165+
builder: &T,
165166
auth: &AwsAuthentication,
166167
region: Option<Region>,
167168
endpoint: Option<String>,
168169
proxy: &ProxyConfig,
169170
tls_options: &Option<TlsConfig>,
170171
timeout: &Option<AwsTimeout>,
171-
) -> crate::Result<T::Client> {
172-
create_client_and_region::<T>(auth, region, endpoint, proxy, tls_options, timeout)
172+
) -> crate::Result<T::Client>
173+
where
174+
T: ClientBuilder,
175+
{
176+
create_client_and_region::<T>(builder, auth, region, endpoint, proxy, tls_options, timeout)
173177
.await
174178
.map(|(client, _)| client)
175179
}
176180

177181
/// Create the SDK client and resolve the region using the provided settings.
178-
pub async fn create_client_and_region<T: ClientBuilder>(
182+
pub async fn create_client_and_region<T>(
183+
builder: &T,
179184
auth: &AwsAuthentication,
180185
region: Option<Region>,
181186
endpoint: Option<String>,
182187
proxy: &ProxyConfig,
183188
tls_options: &Option<TlsConfig>,
184189
timeout: &Option<AwsTimeout>,
185-
) -> crate::Result<(T::Client, Region)> {
190+
) -> crate::Result<(T::Client, Region)>
191+
where
192+
T: ClientBuilder,
193+
{
186194
let retry_config = RetryConfig::disabled();
187195

188196
// The default credentials chains will look for a region if not given but we'd like to
@@ -239,7 +247,7 @@ pub async fn create_client_and_region<T: ClientBuilder>(
239247

240248
let config = config_builder.build();
241249

242-
Ok((T::build(&config), region))
250+
Ok((T::build(builder, &config), region))
243251
}
244252

245253
#[derive(Snafu, Debug)]

src/common/s3.rs

+7-4
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,16 @@ use aws_sdk_s3::config;
22

33
use crate::aws::ClientBuilder;
44

5-
pub(crate) struct S3ClientBuilder;
5+
pub(crate) struct S3ClientBuilder {
6+
pub force_path_style: Option<bool>,
7+
}
68

79
impl ClientBuilder for S3ClientBuilder {
810
type Client = aws_sdk_s3::client::Client;
911

10-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
11-
let config = config::Builder::from(config).force_path_style(true).build();
12-
aws_sdk_s3::client::Client::from_conf(config)
12+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
13+
let builder =
14+
config::Builder::from(config).force_path_style(self.force_path_style.unwrap_or(true));
15+
aws_sdk_s3::client::Client::from_conf(builder.build())
1316
}
1417
}

src/common/sqs.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ pub(crate) struct SqsClientBuilder;
55
impl ClientBuilder for SqsClientBuilder {
66
type Client = aws_sdk_sqs::client::Client;
77

8-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
8+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
99
aws_sdk_sqs::client::Client::new(config)
1010
}
1111
}

src/secrets/aws_secrets_manager.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ pub(crate) struct SecretsManagerClientBuilder;
1313
impl ClientBuilder for SecretsManagerClientBuilder {
1414
type Client = Client;
1515

16-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
16+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
1717
let config = config::Builder::from(config).build();
1818
Client::from_conf(config)
1919
}
@@ -57,6 +57,7 @@ impl SecretBackend for AwsSecretsManagerBackend {
5757
_: &mut signal::SignalRx,
5858
) -> crate::Result<HashMap<String, String>> {
5959
let client = create_client::<SecretsManagerClientBuilder>(
60+
&SecretsManagerClientBuilder {},
6061
&self.auth,
6162
self.region.region(),
6263
self.region.endpoint(),

src/sinks/aws_cloudwatch_logs/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ pub struct CloudwatchLogsClientBuilder;
3333
impl ClientBuilder for CloudwatchLogsClientBuilder {
3434
type Client = aws_sdk_cloudwatchlogs::client::Client;
3535

36-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
36+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
3737
aws_sdk_cloudwatchlogs::client::Client::new(config)
3838
}
3939
}
@@ -169,6 +169,7 @@ pub struct CloudwatchLogsSinkConfig {
169169
impl CloudwatchLogsSinkConfig {
170170
pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<CloudwatchLogsClient> {
171171
create_client::<CloudwatchLogsClientBuilder>(
172+
&CloudwatchLogsClientBuilder {},
172173
&self.auth,
173174
self.region.region(),
174175
self.region.endpoint(),

src/sinks/aws_cloudwatch_logs/integration_tests.rs

+11-3
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,17 @@ async fn create_client_test() -> CloudwatchLogsClient {
467467
let endpoint = Some(cloudwatch_address());
468468
let proxy = ProxyConfig::default();
469469

470-
create_client::<CloudwatchLogsClientBuilder>(&auth, region, endpoint, &proxy, &None, &None)
471-
.await
472-
.unwrap()
470+
create_client::<CloudwatchLogsClientBuilder>(
471+
&CloudwatchLogsClientBuilder {},
472+
&auth,
473+
region,
474+
endpoint,
475+
&proxy,
476+
&None,
477+
&None,
478+
)
479+
.await
480+
.unwrap()
473481
}
474482

475483
async fn ensure_group() {

src/sinks/aws_cloudwatch_metrics/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ struct CloudwatchMetricsClientBuilder;
119119
impl ClientBuilder for CloudwatchMetricsClientBuilder {
120120
type Client = aws_sdk_cloudwatch::client::Client;
121121

122-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
122+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
123123
aws_sdk_cloudwatch::client::Client::new(config)
124124
}
125125
}
@@ -172,6 +172,7 @@ impl CloudWatchMetricsSinkConfig {
172172
};
173173

174174
create_client::<CloudwatchMetricsClientBuilder>(
175+
&CloudwatchMetricsClientBuilder {},
175176
&self.auth,
176177
region,
177178
self.region.endpoint(),

src/sinks/aws_kinesis/firehose/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ pub struct KinesisFirehoseClientBuilder;
3838
impl ClientBuilder for KinesisFirehoseClientBuilder {
3939
type Client = KinesisClient;
4040

41-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
41+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
4242
Self::Client::new(config)
4343
}
4444
}
@@ -102,6 +102,7 @@ impl KinesisFirehoseSinkConfig {
102102

103103
pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
104104
create_client::<KinesisFirehoseClientBuilder>(
105+
&KinesisFirehoseClientBuilder {},
105106
&self.base.auth,
106107
self.base.region.region(),
107108
self.base.region.endpoint(),

src/sinks/aws_kinesis/firehose/integration_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ async fn firehose_client() -> aws_sdk_firehose::Client {
255255
let proxy = ProxyConfig::default();
256256

257257
create_client::<KinesisFirehoseClientBuilder>(
258+
&KinesisFirehoseClientBuilder {},
258259
&auth,
259260
region_endpoint.region(),
260261
region_endpoint.endpoint(),

src/sinks/aws_kinesis/streams/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct KinesisClientBuilder;
3737
impl ClientBuilder for KinesisClientBuilder {
3838
type Client = KinesisClient;
3939

40-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
40+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
4141
KinesisClient::new(config)
4242
}
4343
}
@@ -99,6 +99,7 @@ impl KinesisStreamsSinkConfig {
9999

100100
pub async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<KinesisClient> {
101101
create_client::<KinesisClientBuilder>(
102+
&KinesisClientBuilder {},
102103
&self.base.auth,
103104
self.base.region.region(),
104105
self.base.region.endpoint(),

src/sinks/aws_kinesis/streams/integration_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ async fn client() -> aws_sdk_kinesis::Client {
177177
let proxy = ProxyConfig::default();
178178
let region = RegionOrEndpoint::with_both("us-east-1", kinesis_address());
179179
create_client::<KinesisClientBuilder>(
180+
&KinesisClientBuilder {},
180181
&auth,
181182
region.region(),
182183
region.endpoint(),

src/sinks/aws_s3/config.rs

+15-1
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,12 @@ pub struct S3SinkConfig {
139139
#[configurable(derived)]
140140
#[serde(default)]
141141
pub timezone: Option<TimeZone>,
142+
143+
/// Specifies which addressing style to use.
144+
///
145+
/// This controls if the bucket name is in the hostname or part of the URL.
146+
#[serde(default = "crate::serde::default_true")]
147+
pub force_path_style: bool,
142148
}
143149

144150
pub(super) fn default_key_prefix() -> String {
@@ -167,6 +173,7 @@ impl GenerateConfig for S3SinkConfig {
167173
auth: AwsAuthentication::default(),
168174
acknowledgements: Default::default(),
169175
timezone: Default::default(),
176+
force_path_style: Default::default(),
170177
})
171178
.unwrap()
172179
}
@@ -251,7 +258,14 @@ impl S3SinkConfig {
251258
}
252259

253260
pub async fn create_service(&self, proxy: &ProxyConfig) -> crate::Result<S3Service> {
254-
s3_common::config::create_service(&self.region, &self.auth, proxy, &self.tls).await
261+
s3_common::config::create_service(
262+
&self.region,
263+
&self.auth,
264+
proxy,
265+
&self.tls,
266+
self.force_path_style,
267+
)
268+
.await
255269
}
256270
}
257271

src/sinks/aws_s3/integration_tests.rs

+7
Original file line numberDiff line numberDiff line change
@@ -438,6 +438,7 @@ async fn s3_flush_on_exhaustion() {
438438
auth: Default::default(),
439439
acknowledgements: Default::default(),
440440
timezone: Default::default(),
441+
force_path_style: true,
441442
}
442443
};
443444
let prefix = config.key_prefix.clone();
@@ -489,7 +490,12 @@ async fn client() -> S3Client {
489490
let region = RegionOrEndpoint::with_both("us-east-1", s3_address());
490491
let proxy = ProxyConfig::default();
491492
let tls_options = None;
493+
let force_path_style_value: bool = true;
494+
492495
create_client::<S3ClientBuilder>(
496+
&S3ClientBuilder {
497+
force_path_style: Some(force_path_style_value),
498+
},
493499
&auth,
494500
region.region(),
495501
region.endpoint(),
@@ -522,6 +528,7 @@ fn config(bucket: &str, batch_size: usize) -> S3SinkConfig {
522528
auth: Default::default(),
523529
acknowledgements: Default::default(),
524530
timezone: Default::default(),
531+
force_path_style: true,
525532
}
526533
}
527534

src/sinks/aws_s_s/sns/config.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ impl GenerateConfig for SnsSinkConfig {
4848
impl SnsSinkConfig {
4949
pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SnsClient> {
5050
create_client::<SnsClientBuilder>(
51+
&SnsClientBuilder {},
5152
&self.base_config.auth,
5253
self.region.region(),
5354
self.region.endpoint(),
@@ -108,7 +109,7 @@ pub(super) struct SnsClientBuilder;
108109
impl ClientBuilder for SnsClientBuilder {
109110
type Client = aws_sdk_sns::client::Client;
110111

111-
fn build(config: &aws_types::SdkConfig) -> Self::Client {
112+
fn build(&self, config: &aws_types::SdkConfig) -> Self::Client {
112113
aws_sdk_sns::client::Client::new(config)
113114
}
114115
}

src/sinks/aws_s_s/sns/integration_tests.rs

+2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ async fn create_sns_test_client() -> SnsClient {
3232
let endpoint = sns_address();
3333
let proxy = ProxyConfig::default();
3434
create_client::<SnsClientBuilder>(
35+
&SnsClientBuilder {},
3536
&auth,
3637
Some(Region::new("us-east-1")),
3738
Some(endpoint),
@@ -53,6 +54,7 @@ async fn create_sqs_test_client() -> SqsClient {
5354
let endpoint = sqs_address();
5455
let proxy = ProxyConfig::default();
5556
create_client::<SqsClientBuilder>(
57+
&SqsClientBuilder {},
5658
&auth,
5759
Some(Region::new("us-east-1")),
5860
Some(endpoint),

src/sinks/aws_s_s/sqs/config.rs

+1
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ impl GenerateConfig for SqsSinkConfig {
4949
impl SqsSinkConfig {
5050
pub(super) async fn create_client(&self, proxy: &ProxyConfig) -> crate::Result<SqsClient> {
5151
create_client::<SqsClientBuilder>(
52+
&SqsClientBuilder {},
5253
&self.base_config.auth,
5354
self.region.region(),
5455
self.region.endpoint(),

src/sinks/aws_s_s/sqs/integration_tests.rs

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ async fn create_test_client() -> SqsClient {
3030
let endpoint = sqs_address();
3131
let proxy = ProxyConfig::default();
3232
create_client::<SqsClientBuilder>(
33+
&SqsClientBuilder {},
3334
&auth,
3435
Some(Region::new("us-east-1")),
3536
Some(endpoint),

src/sinks/s3_common/config.rs

+15-3
Original file line numberDiff line numberDiff line change
@@ -363,12 +363,24 @@ pub async fn create_service(
363363
auth: &AwsAuthentication,
364364
proxy: &ProxyConfig,
365365
tls_options: &Option<TlsConfig>,
366+
force_path_style: impl Into<bool>,
366367
) -> crate::Result<S3Service> {
367368
let endpoint = region.endpoint();
368369
let region = region.region();
369-
let client =
370-
create_client::<S3ClientBuilder>(auth, region.clone(), endpoint, proxy, tls_options, &None)
371-
.await?;
370+
let force_path_style_value: bool = force_path_style.into();
371+
372+
let client = create_client::<S3ClientBuilder>(
373+
&S3ClientBuilder {
374+
force_path_style: Some(force_path_style_value),
375+
},
376+
auth,
377+
region.clone(),
378+
endpoint,
379+
proxy,
380+
tls_options,
381+
&None,
382+
)
383+
.await?;
372384
Ok(S3Service::new(client))
373385
}
374386

src/sources/aws_s3/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,12 @@ impl AwsS3Config {
230230
) -> crate::Result<sqs::Ingestor> {
231231
let region = self.region.region();
232232
let endpoint = self.region.endpoint();
233+
let force_path_style_value: bool = true;
233234

234235
let s3_client = create_client::<S3ClientBuilder>(
236+
&S3ClientBuilder {
237+
force_path_style: Some(force_path_style_value),
238+
},
235239
&self.auth,
236240
region.clone(),
237241
endpoint.clone(),
@@ -248,6 +252,7 @@ impl AwsS3Config {
248252
match self.sqs {
249253
Some(ref sqs) => {
250254
let (sqs_client, region) = create_client_and_region::<SqsClientBuilder>(
255+
&SqsClientBuilder {},
251256
&self.auth,
252257
region.clone(),
253258
endpoint,
@@ -1016,7 +1021,11 @@ mod integration_tests {
10161021
endpoint: Some(s3_address()),
10171022
};
10181023
let proxy_config = ProxyConfig::default();
1024+
let force_path_style_value: bool = true;
10191025
create_client::<S3ClientBuilder>(
1026+
&S3ClientBuilder {
1027+
force_path_style: Some(force_path_style_value),
1028+
},
10201029
&auth,
10211030
region_endpoint.region(),
10221031
region_endpoint.endpoint(),
@@ -1036,6 +1045,7 @@ mod integration_tests {
10361045
};
10371046
let proxy_config = ProxyConfig::default();
10381047
create_client::<SqsClientBuilder>(
1048+
&SqsClientBuilder {},
10391049
&auth,
10401050
region_endpoint.region(),
10411051
region_endpoint.endpoint(),

0 commit comments

Comments
 (0)