Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement(http sink): improve config UX #21857

2 changes: 1 addition & 1 deletion lib/vector-core/src/tls/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub struct TlsSourceConfig {
/// TLS configuration.
#[configurable_component]
#[configurable(metadata(docs::advanced))]
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, PartialEq, Eq)]
#[serde(deny_unknown_fields)]
pub struct TlsConfig {
/// Enables certificate verification. For components that create a server, this requires that the
Expand Down
112 changes: 42 additions & 70 deletions src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,14 +331,9 @@ where
pub fn new_with_auth_extension(
tls_settings: impl Into<MaybeTlsSettings>,
proxy_config: &ProxyConfig,
auth_config: Option<AuthorizationConfig>,
auth: Option<AuthorizationConfig>,
) -> Result<HttpClient<B>, HttpError> {
HttpClient::new_with_custom_client(
tls_settings,
proxy_config,
&mut Client::builder(),
auth_config,
)
HttpClient::new_with_custom_client(tls_settings, proxy_config, &mut Client::builder(), auth)
}

pub fn new_with_custom_client(
Expand Down Expand Up @@ -438,7 +433,7 @@ where
}

fn build_auth_extension<B>(
authorization_config: Option<AuthorizationConfig>,
auth: Option<AuthorizationConfig>,
proxy_config: &ProxyConfig,
client_builder: &mut client::Builder,
) -> Option<Arc<dyn AuthExtension<B>>>
Expand All @@ -447,19 +442,18 @@ where
B::Data: Send,
B::Error: Into<crate::Error> + Send,
{
if let Some(authorization_config) = authorization_config {
match authorization_config.strategy {
HttpClientAuthorizationStrategy::Basic { user, password } => {
let basic_auth_extension = BasicAuthExtension { user, password };
return Some(Arc::new(basic_auth_extension));
if let Some(auth) = auth {
match auth.strategy {
Auth::Basic { user, password } => {
return Some(Arc::new(BasicAuthExtension { user, password }));
}
HttpClientAuthorizationStrategy::OAuth2 {
Auth::OAuth2 {
token_endpoint,
client_id,
client_secret,
grace_period,
} => {
let tls_for_auth = authorization_config.tls.clone();
let tls_for_auth = auth.tls.clone();
let tls_for_auth: TlsSettings = TlsSettings::from_options(&tls_for_auth).unwrap();

let auth_proxy_connector =
Expand All @@ -475,6 +469,9 @@ where
);
return Some(Arc::new(oauth2_extension));
}
Auth::Bearer { .. } => {
unimplemented!("Bearer authentication is not supported currently.")
}
}
}

Expand Down Expand Up @@ -583,7 +580,7 @@ impl<B> fmt::Debug for HttpClient<B> {
pub struct AuthorizationConfig {
/// Define how to authorize against an upstream.
#[configurable]
strategy: HttpClientAuthorizationStrategy,
strategy: Auth,

/// The TLS settings for the http client's connection.
///
Expand All @@ -597,25 +594,35 @@ pub struct AuthorizationConfig {
/// HTTP authentication should be used with HTTPS only, as the authentication credentials are passed as an
/// HTTP header without any additional encryption beyond what is provided by the transport itself.
#[configurable_component]
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))]
pub enum HttpClientAuthorizationStrategy {
pub enum Auth {
/// Basic authentication.
///
/// The username and password are concatenated and encoded via [base64][base64].
///
/// [base64]: https://en.wikipedia.org/wiki/Base64
Basic {
/// The basic authentication username.
#[configurable(metadata(docs::examples = "${USERNAME}"))]
#[configurable(metadata(docs::examples = "username"))]
user: String,

/// The basic authentication password.
#[configurable(metadata(docs::examples = "${PASSWORD}"))]
#[configurable(metadata(docs::examples = "password"))]
password: SensitiveString,
},

/// Bearer authentication.
///
/// The bearer token value (OAuth2, JWT, etc.) is passed as-is.
Bearer {
/// The bearer authentication token.
token: SensitiveString,
},

/// Authentication based on OAuth 2.0 protocol.
///
/// This strategy allows to dynamically acquire and use token based on provided parameters.
Expand Down Expand Up @@ -674,41 +681,6 @@ const fn default_oauth2_token_grace_period() -> u32 {
300 // 5 minutes
}

/// Configuration of the authentication strategy for HTTP requests.
///
/// HTTP authentication should be used with HTTPS only, as the authentication credentials are passed as an
/// HTTP header without any additional encryption beyond what is provided by the transport itself.
#[configurable_component]
#[derive(Clone, Debug, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "snake_case", tag = "strategy")]
#[configurable(metadata(docs::enum_tag_description = "The authentication strategy to use."))]
pub enum Auth {
/// Basic authentication.
///
/// The username and password are concatenated and encoded via [base64][base64].
///
/// [base64]: https://en.wikipedia.org/wiki/Base64
Basic {
/// The basic authentication username.
#[configurable(metadata(docs::examples = "${USERNAME}"))]
#[configurable(metadata(docs::examples = "username"))]
user: String,

/// The basic authentication password.
#[configurable(metadata(docs::examples = "${PASSWORD}"))]
#[configurable(metadata(docs::examples = "password"))]
password: SensitiveString,
},

/// Bearer authentication.
///
/// The bearer token value (OAuth2, JWT, etc.) is passed as-is.
Bearer {
/// The bearer authentication token.
token: SensitiveString,
},
}

pub trait MaybeAuth: Sized {
fn choose_one(&self, other: &Self) -> crate::Result<Self>;
}
Expand Down Expand Up @@ -745,6 +717,7 @@ impl Auth {
Ok(auth) => map.typed_insert(auth),
Err(error) => error!(message = "Invalid bearer token.", token = %token, %error),
},
Auth::OAuth2 { .. } => panic!("OAuth2 authentication is not supported currently"),
}
}
}
Expand Down Expand Up @@ -981,9 +954,9 @@ mod tests {
use tokio_rustls::TlsAcceptor;
use tower::ServiceBuilder;

use crate::test_util::next_addr;

use super::*;
use crate::test_util::next_addr;
use crate::tls::TlsConfig;

#[test]
fn test_default_request_headers_defaults() {
Expand Down Expand Up @@ -1235,9 +1208,8 @@ mod tests {
async fn test_oauth2extension_handle_errors_gently_with_hyper_server() {
let addr: SocketAddr = next_addr();
// Simplest possible configuration for oauth's client connector.
let tls: vector_lib::tls::MaybeTls<(), TlsSettings> =
MaybeTlsSettings::from_config(&None, false).unwrap();
let proxy_connector = build_proxy_connector(tls, &ProxyConfig::default()).unwrap();
let maybe_tls = MaybeTlsSettings::tls_client(&None).unwrap();
let proxy_connector = build_proxy_connector(maybe_tls, &ProxyConfig::default()).unwrap();
let auth_client = Client::builder().build(proxy_connector);

let token_endpoint = format!("http://{}", addr);
Expand Down Expand Up @@ -1353,7 +1325,7 @@ mod tests {
let client_secret = Some(SensitiveString::from(String::from("some_secret")));
let grace_period = 5;

let oauth2_strategy = HttpClientAuthorizationStrategy::OAuth2 {
let oauth2_strategy = Auth::OAuth2 {
token_endpoint,
client_id,
client_secret,
Expand Down Expand Up @@ -1489,7 +1461,15 @@ mod tests {
let client_id: String = String::from("some_client_secret");
let grace_period = 5;

let oauth2_strategy = HttpClientAuthorizationStrategy::OAuth2 {
let tls_config = TlsConfig {
verify_hostname: Some(false),
ca_file: Some("tests/data/ca/certs/ca.cert.pem".into()),
crt_file: Some("tests/data/ca/intermediate_client/certs/localhost.cert.pem".into()),
key_file: Some("tests/data/ca/intermediate_client/private/localhost.key.pem".into()),
..Default::default()
};

let oauth2_strategy = Auth::OAuth2 {
token_endpoint,
client_id,
client_secret: None,
Expand All @@ -1498,15 +1478,7 @@ mod tests {

let auth_config = AuthorizationConfig {
strategy: oauth2_strategy,
tls: Some(TlsConfig {
verify_hostname: Some(false),
ca_file: Some("tests/data/ca/certs/ca.cert.pem".into()),
crt_file: Some("tests/data/ca/intermediate_client/certs/localhost.cert.pem".into()),
key_file: Some(
"tests/data/ca/intermediate_client/private/localhost.key.pem".into(),
),
..Default::default()
}),
tls: Some(tls_config),
};

let client =
Expand Down Expand Up @@ -1549,7 +1521,7 @@ mod tests {
let user = String::from("user");
let password = SensitiveString::from(String::from("password"));

let basic_strategy = HttpClientAuthorizationStrategy::Basic { user, password };
let basic_strategy = Auth::Basic { user, password };

let auth_config = AuthorizationConfig {
strategy: basic_strategy,
Expand Down
2 changes: 1 addition & 1 deletion src/sinks/axiom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ impl SinkConfig for AxiomConfig {
}),
method: HttpMethod::Post,
tls: self.tls.clone(),
authorization_config: None,
request,
authorization_config: None,
acknowledgements: self.acknowledgements,
batch: self.batch,
headers: None,
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/databend/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ impl SinkConfig for DatabendConfig {
Some(Auth::Bearer { .. }) => {
return Err("Bearer authentication is not supported currently".into());
}
Some(Auth::OAuth2 { .. }) => {
return Err("OAuth2 authentication is not supported currently".into());
}
None => {}
}
if let Some(database) = &self.database {
Expand Down
3 changes: 1 addition & 2 deletions src/sinks/http/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,10 @@ impl From<HttpMethod> for Method {
impl HttpSinkConfig {
fn build_http_client(&self, cx: &SinkContext) -> crate::Result<HttpClient> {
let tls = TlsSettings::from_options(&self.tls)?;
let auth_strategy = self.authorization_config.clone();
Ok(HttpClient::new_with_auth_extension(
tls,
cx.proxy(),
auth_strategy,
self.authorization_config.clone(),
)?)
}

Expand Down
3 changes: 3 additions & 0 deletions src/sinks/prometheus/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,9 @@ fn authorized<T: HttpBody>(req: &Request<T>, auth: &Option<Auth>) -> bool {
Auth::Bearer { token } => {
HeaderValue::from_str(format!("Bearer {}", token.inner()).as_str())
}
Auth::OAuth2 { .. } => {
panic!("OAuth2 authentication is not supported currently")
}
};

if let Ok(encoded_credentials) = encoded_credentials {
Expand Down
3 changes: 3 additions & 0 deletions src/sinks/websocket/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,6 +586,9 @@ mod tests {
user: _user,
password: _password,
} => { /* Not needed for tests at the moment */ }
Auth::OAuth2 { .. } => {
panic!("OAuth2 authentication is not supported currently.")
}
}
}
Ok(res)
Expand Down
64 changes: 64 additions & 0 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,33 @@ base: components: sinks: clickhouse: configuration: {
"""
required: false
type: object: options: {
client_id: {
description: "The client id."
relevant_when: "strategy = \"o_auth2\""
required: true
type: string: examples: ["client_id"]
}
client_secret: {
description: "The sensitive client secret."
relevant_when: "strategy = \"o_auth2\""
required: false
type: string: examples: ["client_secret"]
}
grace_period: {
description: """
The grace period configuration for a bearer token.
To avoid random authorization failures caused by expired token exception,
we will acquire new token, some time (grace period) before current token will be expired,
because of that, we will always execute request with fresh enough token.
pront marked this conversation as resolved.
Show resolved Hide resolved
"""
relevant_when: "strategy = \"o_auth2\""
required: false
type: uint: {
default: 300
examples: [300]
unit: "seconds"
}
}
password: {
description: "The basic authentication password."
relevant_when: "strategy = \"basic\""
Expand All @@ -58,6 +85,37 @@ base: components: sinks: clickhouse: configuration: {

The bearer token value (OAuth2, JWT, etc.) is passed as-is.
"""
o_auth2: """
Authentication based on OAuth 2.0 protocol.

This strategy allows to dynamically acquire and use token based on provided parameters.
Both standard client_credentials and mTLS extension is supported, for standard client_credentials just provide both
client_id and client_secret parameters:
pront marked this conversation as resolved.
Show resolved Hide resolved

# Example

```yaml
strategy:
strategy: "o_auth2"
client_id: "client.id"
client_secret: "secret-value"
token_endpoint: "https://yourendpoint.com/oauth/token"
```
In case you want to use mTLS extension [rfc8705](https://datatracker.ietf.org/doc/html/rfc8705), provide desired key and certificate,
together with client_id (with no client_secret parameter).
pront marked this conversation as resolved.
Show resolved Hide resolved

# Example

```yaml
strategy:
strategy: "o_auth2"
client_id: "client.id"
token_endpoint: "https://yourendpoint.com/oauth/token"
tls:
crt_path: cert.pem
key_file: key.pem
```
"""
}
}
token: {
Expand All @@ -66,6 +124,12 @@ base: components: sinks: clickhouse: configuration: {
required: true
type: string: {}
}
token_endpoint: {
description: "Token endpoint location, required for token acquisition."
relevant_when: "strategy = \"o_auth2\""
required: true
type: string: examples: ["https://auth.provider/oauth/token"]
}
user: {
description: "The basic authentication username."
relevant_when: "strategy = \"basic\""
Expand Down
Loading