Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add oauth2 configuration (2/2) #172

Merged
merged 5 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 35 additions & 3 deletions docs/sink-connector-config-options.rst
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,30 @@ Connection
* Default: null
* Valid Values: HTTP(S) URL
* Importance: high
* Dependents: ``oauth2.client.id``, ``oauth2.client.secret``, ``oauth2.client.authorization.mode``, ``oauth2.client.scope``, ``oauth2.response.token.property``
* Dependents: ``oauth2.request.grant.type.property``, ``oauth2.grant.type``, ``oauth2.request.client.id.property``, ``oauth2.client.id``, ``oauth2.request.client.secret.property``, ``oauth2.client.secret``, ``oauth2.client.authorization.mode``, ``oauth2.client.scope``, ``oauth2.response.token.property``

``oauth2.request.grant.type.property``
The grant type Key used for fetching an access token.

* Type: string
* Default: "grant_type"
* Importance: high
* Dependents: ``oauth2.grant.type``

``oauth2.grant.type``
The grant type used for fetching an access token.

* Type: string
* Default: "client_credentials"
* Importance: high

``oauth2.request.client.id.property``
The client id Key used for fetching an access token.

* Type: string
* Default: "client_id"
* Importance: high
* Dependents: ``oauth2.client.id``

``oauth2.client.id``
The client id used for fetching an access token.
Expand All @@ -61,6 +84,15 @@ Connection
* Importance: high
* Dependents: ``oauth2.access.token.url``, ``oauth2.client.secret``, ``oauth2.client.authorization.mode``, ``oauth2.client.scope``, ``oauth2.response.token.property``


``oauth2.request.client.secret.property``
The client secret Key used for fetching an access token.

* Type: string
* Default: "client_secret"
* Importance: high
* Dependents: ``oauth2.client.secret``

``oauth2.client.secret``
The secret used for fetching an access token.

Expand All @@ -74,9 +106,9 @@ Connection

* Type: string
* Default: HEADER
* Valid Values: HEADER,URL
* Valid Values: [HEADER, URL]
* Importance: medium
* Dependents: ``oauth2.access.token.url``, ``oauth2.client.id``, ``oauth2.client.secret``, ``oauth2.client.scope``, ``oauth2.response.token.property``
* Dependents: ``oauth2.access.token.url``, ``oauth2.request.grant.type.property``, ``oauth2.grant.type``, ``oauth2.request.client.id.property``, ``oauth2.client.id``, ``oauth2.request.client.secret.property``, ``oauth2.client.secret``, ``oauth2.client.scope``, ``oauth2.response.token.property``

``oauth2.client.scope``
The scope used for fetching an access token.
Expand Down
137 changes: 109 additions & 28 deletions src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Type;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.connect.errors.ConnectException;
Expand All @@ -47,7 +49,11 @@ public class HttpSinkConfig extends AbstractConfig {
public static final String KAFKA_RETRY_BACKOFF_MS_CONFIG = "kafka.retry.backoff.ms";

private static final String OAUTH2_ACCESS_TOKEN_URL_CONFIG = "oauth2.access.token.url";
private static final String OAUTH2_GRANT_TYPE_PROP_CONFIG = "oauth2.request.grant.type.property";
private static final String OAUTH2_GRANT_TYPE_CONFIG = "oauth2.grant.type";
private static final String OAUTH2_CLIENT_ID_PROP_CONFIG = "oauth2.request.client.id.property";
private static final String OAUTH2_CLIENT_ID_CONFIG = "oauth2.client.id";
private static final String OAUTH2_CLIENT_SECRET_PROP_CONFIG = "oauth2.request.client.secret.property";
private static final String OAUTH2_CLIENT_SECRET_CONFIG = "oauth2.client.secret";
private static final String OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG = "oauth2.client.authorization.mode";
private static final String OAUTH2_CLIENT_SCOPE_CONFIG = "oauth2.client.scope";
Expand Down Expand Up @@ -198,10 +204,60 @@ public boolean visible(final String name, final Map<String, Object> parsedConfig
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_ACCESS_TOKEN_URL_CONFIG,
List.of(OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG,
List.of(OAUTH2_GRANT_TYPE_PROP_CONFIG, OAUTH2_GRANT_TYPE_CONFIG, OAUTH2_CLIENT_ID_PROP_CONFIG,
OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_PROP_CONFIG, OAUTH2_CLIENT_SECRET_CONFIG,
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG, OAUTH2_CLIENT_SCOPE_CONFIG,
OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG)
);
configDef.define(OAUTH2_GRANT_TYPE_PROP_CONFIG,
ConfigDef.Type.STRING,
"grant_type",
new ConfigDef.NonEmptyStringWithoutControlChars() {
@Override
public String toString() {
return "OAuth2 grant type key";
}
},
ConfigDef.Importance.HIGH,
"The grant type Key used for fetching an access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG, OAUTH2_GRANT_TYPE_PROP_CONFIG,
List.of(OAUTH2_GRANT_TYPE_CONFIG)
);
configDef.define(
OAUTH2_GRANT_TYPE_CONFIG,
ConfigDef.Type.STRING,
"client_credentials",
new ConfigDef.NonEmptyStringWithoutControlChars() {
@Override
public String toString() {
return "OAuth2 grant type";
}
},
ConfigDef.Importance.HIGH,
"The grant type used for fetching an access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG,
OAUTH2_GRANT_TYPE_CONFIG
);
configDef.define(OAUTH2_CLIENT_ID_PROP_CONFIG,
ConfigDef.Type.STRING,
"client_id",
new ConfigDef.NonEmptyStringWithoutControlChars() {
@Override
public String toString() {
return "OAuth2 client id Key";
}
},
ConfigDef.Importance.HIGH,
"The client id Key used for fetching an access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG, OAUTH2_CLIENT_ID_PROP_CONFIG,
List.of(OAUTH2_CLIENT_ID_CONFIG)
);
configDef.define(
OAUTH2_CLIENT_ID_CONFIG,
ConfigDef.Type.STRING,
Expand All @@ -222,6 +278,16 @@ public String toString() {
OAUTH2_CLIENT_AUTHORIZATION_MODE_CONFIG,
OAUTH2_CLIENT_SCOPE_CONFIG, OAUTH2_RESPONSE_TOKEN_PROPERTY_CONFIG)
);
configDef.define(OAUTH2_CLIENT_SECRET_PROP_CONFIG,
Type.STRING,
"client_secret",
ConfigDef.Importance.HIGH,
"The secret Key used for fetching an access token.",
CONNECTION_GROUP,
groupCounter++,
ConfigDef.Width.LONG, OAUTH2_CLIENT_SECRET_PROP_CONFIG,
List.of(OAUTH2_CLIENT_SECRET_CONFIG)
);
configDef.define(
OAUTH2_CLIENT_SECRET_CONFIG,
ConfigDef.Type.PASSWORD,
Expand Down Expand Up @@ -491,44 +557,25 @@ public HttpSinkConfig(final Map<String, String> properties) {
}

private void validate() {
switch (authorizationType()) {
final AuthorizationType authorizationType = authorizationType();
switch (authorizationType) {
case STATIC:
if (headerAuthorization() == null || headerAuthorization().isBlank()) {
throw new ConfigException(
HTTP_HEADERS_AUTHORIZATION_CONFIG,
getPassword(HTTP_HEADERS_AUTHORIZATION_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.STATIC);
"Must be present when " + HTTP_AUTHORIZATION_TYPE_CONFIG
+ " = " + authorizationType);
}
break;
case OAUTH2:
if (oauth2AccessTokenUri() == null) {
throw new ConfigException(
OAUTH2_ACCESS_TOKEN_URL_CONFIG, getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
if (oauth2ClientId() == null || oauth2ClientId().isEmpty()) {
throw new ConfigException(
OAUTH2_CLIENT_ID_CONFIG,
getString(OAUTH2_CLIENT_ID_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
if (oauth2ClientSecret() == null || oauth2ClientSecret().value().isEmpty()) {
throw new ConfigException(
OAUTH2_CLIENT_SECRET_CONFIG,
getPassword(OAUTH2_CLIENT_SECRET_CONFIG),
"Must be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
+ " = " + AuthorizationType.OAUTH2);
}
case OAUTH2: validateOAuth2Configuration();
break;
case NONE:
if (headerAuthorization() != null && !headerAuthorization().isBlank()) {
throw new ConfigException(
HTTP_HEADERS_AUTHORIZATION_CONFIG,
getPassword(HTTP_HEADERS_AUTHORIZATION_CONFIG),
"Must not be present when " + HTTP_HEADERS_CONTENT_TYPE_CONFIG
"Must not be present when " + HTTP_AUTHORIZATION_TYPE_CONFIG
+ " != " + AuthorizationType.STATIC);
}
break;
Expand All @@ -544,6 +591,24 @@ OAUTH2_ACCESS_TOKEN_URL_CONFIG, getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG),

}

private void validateOAuth2Configuration() {
Stream.of(OAUTH2_ACCESS_TOKEN_URL_CONFIG, OAUTH2_GRANT_TYPE_PROP_CONFIG, OAUTH2_GRANT_TYPE_CONFIG,
OAUTH2_CLIENT_ID_PROP_CONFIG, OAUTH2_CLIENT_ID_CONFIG, OAUTH2_CLIENT_SECRET_PROP_CONFIG)
.filter(configKey -> getString(configKey) == null || getString(configKey).isBlank())
.findFirst()
.ifPresent(missingConfiguration -> {
throw new ConfigException(missingConfiguration, getString(missingConfiguration),
"Must be present when " + HTTP_HEADERS_AUTHORIZATION_CONFIG + " = "
+
AuthorizationType.OAUTH2);
});

if (oauth2ClientSecret() == null || oauth2ClientSecret().value().isEmpty()) {
throw new ConfigException(OAUTH2_CLIENT_SECRET_CONFIG, oauth2ClientSecret(),
"Must be present when " + HTTP_HEADERS_AUTHORIZATION_CONFIG + " = " + AuthorizationType.OAUTH2);
}
}

public final URI httpUri() {
return toURI(HTTP_URL_CONFIG);
}
Expand Down Expand Up @@ -620,21 +685,37 @@ public final String connectorName() {
}

public final URI oauth2AccessTokenUri() {
return getString(OAUTH2_ACCESS_TOKEN_URL_CONFIG) != null ? toURI(OAUTH2_ACCESS_TOKEN_URL_CONFIG) : null;
return toURI(OAUTH2_ACCESS_TOKEN_URL_CONFIG);
}

private URI toURI(final String propertyName) {
try {
return new URL(getString(propertyName)).toURI();
} catch (final MalformedURLException | URISyntaxException e) {
throw new ConnectException(e);
throw new ConnectException(String.format("Could not retrieve proper URI from %s", propertyName), e);
}
}

public final String oauth2GrantTypeProperty() {
return getString(OAUTH2_GRANT_TYPE_PROP_CONFIG);
}

public final String oauth2GrantType() {
return getString(OAUTH2_GRANT_TYPE_CONFIG);
}

public final String oauth2ClientIdProperty() {
return getString(OAUTH2_CLIENT_ID_PROP_CONFIG);
}

public final String oauth2ClientId() {
return getString(OAUTH2_CLIENT_ID_CONFIG);
}

public final String oauth2ClientSecretProperty() {
return getString(OAUTH2_CLIENT_SECRET_PROP_CONFIG);
}

public final Password oauth2ClientSecret() {
return getPassword(OAUTH2_CLIENT_SECRET_CONFIG);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,19 @@

package io.aiven.kafka.connect.http.sender;

import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.Objects;
import java.util.StringJoiner;

import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;
import io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode;
import io.aiven.kafka.connect.http.sender.request.OAuth2AccessTokenRequestForm;

import static io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode.HEADER;
import static io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode.URL;

class OAuth2AccessTokenHttpSender extends AbstractHttpSender implements HttpSender {

Expand All @@ -40,38 +37,24 @@ class OAuth2AccessTokenHttpSender extends AbstractHttpSender implements HttpSend
}

HttpResponse<String> call() {
final var accessTokenRequestBodyBuilder = new StringJoiner("&");
accessTokenRequestBodyBuilder.add(encodeNameAndValue("grant_type", "client_credentials"));
if (config.oauth2ClientScope() != null) {
accessTokenRequestBodyBuilder.add(encodeNameAndValue("scope", config.oauth2ClientScope()));
}
setClientIdAndSecret(accessTokenRequestBodyBuilder, config);
return super.send(accessTokenRequestBodyBuilder.toString());
}

private void setClientIdAndSecret(
final StringJoiner requestBodyBuilder, final HttpSinkConfig config
) {
if (config.oauth2AuthorizationMode() == URL) {
addClientIdAndSecretInRequestBody(requestBodyBuilder, config);
} else if (config.oauth2AuthorizationMode() != HEADER) {
throw new ConnectException("Unknown OAuth2 authorization mode: " + config.oauth2AuthorizationMode());
final OAuth2AccessTokenRequestForm.Builder formBuilder = OAuth2AccessTokenRequestForm
.newBuilder()
.withGrantTypeProperty(config.oauth2GrantTypeProperty())
.withGrantType(config.oauth2GrantType())
.withScope(config.oauth2ClientScope());

if (config.oauth2AuthorizationMode() == OAuth2AuthorizationMode.URL) {
formBuilder
.withClientIdProperty(config.oauth2ClientIdProperty())
.withClientId(config.oauth2ClientId())
.withClientSecretProperty(config.oauth2ClientSecretProperty())
.withClientSecret(config
.oauth2ClientSecret()
.value());
}
}

private void addClientIdAndSecretInRequestBody(final StringJoiner requestBodyBuilder,
final HttpSinkConfig config) {
requestBodyBuilder
.add(encodeNameAndValue("client_id", config.oauth2ClientId()))
.add(encodeNameAndValue("client_secret", config.oauth2ClientSecret().value()));
}

private String encodeNameAndValue(final String name, final String value) {
return String.format("%s=%s", encode(name), encode(value));
}

private String encode(final String value) {
return URLEncoder.encode(value, StandardCharsets.UTF_8);
return super.send(formBuilder
.build()
.toBodyString());
}

private static class AccessTokenHttpRequestBuilder implements HttpRequestBuilder {
Expand Down Expand Up @@ -104,6 +87,7 @@ private void addClientIdAndSecretInRequestHeader(
.encodeToString(clientAndSecretBytes);
builder.header(HEADER_AUTHORIZATION, clientAndSecretAuthHeader);
}

}

}
Loading
Loading