Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor of HttpSender implementations with Factory Pattern (1/2) #171

Merged
merged 9 commits into from
Aug 29, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.aiven.kafka.connect.http.config.HttpSinkConfig;
import io.aiven.kafka.connect.http.recordsender.RecordSender;
import io.aiven.kafka.connect.http.sender.HttpSender;
import io.aiven.kafka.connect.http.sender.HttpSenderFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -57,7 +58,7 @@ public void start(final Map<String, String> props) {
Objects.requireNonNull(props);
final var config = new HttpSinkConfig(props);
if (this.httpSender == null) {
this.httpSender = HttpSender.createHttpSender(config);
this.httpSender = HttpSenderFactory.createHttpSender(config);
}

recordSender = RecordSender.createRecordSender(httpSender, config);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2023 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.sender;

import java.io.IOException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class AbstractHttpSender {

private static final Logger log = LoggerFactory.getLogger(AbstractHttpSender.class);

protected final HttpClient httpClient;
protected final HttpSinkConfig config;
protected final HttpRequestBuilder httpRequestBuilder;

protected AbstractHttpSender(
final HttpSinkConfig config, final HttpRequestBuilder httpRequestBuilder, final HttpClient httpClient
) {
Objects.requireNonNull(config);
this.config = config;
this.httpRequestBuilder = httpRequestBuilder;
this.httpClient = httpClient;
jjaakola-aiven marked this conversation as resolved.
Show resolved Hide resolved
}

public final HttpResponse<String> send(final String body) {
final var requestBuilder =
httpRequestBuilder.build(config).POST(HttpRequest.BodyPublishers.ofString(body));
return sendWithRetries(requestBuilder, HttpResponseHandler.ON_HTTP_ERROR_RESPONSE_HANDLER,
config.maxRetries());
}

/**
* Sends an HTTP body using {@code httpSender}, respecting the configured retry policy.
*
* @return whether the sending was successful.
*/
protected HttpResponse<String> sendWithRetries(
final Builder requestBuilderWithPayload, final HttpResponseHandler httpResponseHandler,
final int retries
) {
int remainingRetries = retries;
while (remainingRetries >= 0) {
try {
try {
final var response =
httpClient.send(requestBuilderWithPayload.build(), HttpResponse.BodyHandlers.ofString());
log.debug("Server replied with status code {} and body {}", response.statusCode(), response.body());
// Handle the response
httpResponseHandler.onResponse(response, remainingRetries);
return response;
} catch (final IOException e) {
log.info("Sending failed, will retry in {} ms ({} retries remain)", config.retryBackoffMs(),
remainingRetries, e);
remainingRetries -= 1;
TimeUnit.MILLISECONDS.sleep(config.retryBackoffMs());
}
} catch (final InterruptedException e) {
log.error("Sending failed due to InterruptedException, stopping", e);
throw new ConnectException(e);
}
}
log.error("Sending failed and no retries remain, stopping");
throw new ConnectException("Sending failed and no retries remain, stopping");
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2023 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.sender;

import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Base64;
import java.util.Objects;
import java.util.StringJoiner;

import org.apache.kafka.connect.errors.ConnectException;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

import static io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode.HEADER;
import static io.aiven.kafka.connect.http.config.OAuth2AuthorizationMode.URL;

class AccessTokenHttpSender extends AbstractHttpSender implements HttpSender {
jeqo marked this conversation as resolved.
Show resolved Hide resolved

AccessTokenHttpSender(final HttpSinkConfig config, final HttpClient httpClient) {
super(config, new AccessTokenHttpRequestBuilder(), httpClient);
}

HttpResponse<String> call() {
jeqo marked this conversation as resolved.
Show resolved Hide resolved
final var accessTokenRequestBodyBuilder = new StringJoiner("&");
accessTokenRequestBodyBuilder.add(encodeNameAndValue("grant_type", "client_credentials"));
if (config.oauth2ClientScope() != null) {
accessTokenRequestBodyBuilder.add(encodeNameAndValue("scope", config.oauth2ClientScope()));
}
setClientIdAndSecret(accessTokenRequestBodyBuilder, config);
return super.send(accessTokenRequestBodyBuilder.toString());
jeqo marked this conversation as resolved.
Show resolved Hide resolved
}

private void setClientIdAndSecret(
final StringJoiner requestBodyBuilder, final HttpSinkConfig config
) {
if (config.oauth2AuthorizationMode() == URL) {
addClientIdAndSecretInRequestBody(requestBodyBuilder, config);
} else if (config.oauth2AuthorizationMode() != HEADER) {
throw new ConnectException("Unknown OAuth2 authorization mode: " + config.oauth2AuthorizationMode());
}
}

private void addClientIdAndSecretInRequestBody(final StringJoiner requestBodyBuilder,
final HttpSinkConfig config) {
requestBodyBuilder
.add(encodeNameAndValue("client_id", config.oauth2ClientId()))
.add(encodeNameAndValue("client_secret", config.oauth2ClientSecret().value()));
}

private String encodeNameAndValue(final String name, final String value) {
return String.format("%s=%s", encode(name), encode(value));
}

private String encode(final String value) {
return URLEncoder.encode(value, StandardCharsets.UTF_8);
}

private static class AccessTokenHttpRequestBuilder implements HttpRequestBuilder {

static final String HEADER_CONTENT_TYPE_FORM = "application/x-www-form-urlencoded";

protected AccessTokenHttpRequestBuilder() {
}

@Override
public HttpRequest.Builder build(final HttpSinkConfig config) {
final var builder = HttpRequest
.newBuilder(Objects.requireNonNull(config.oauth2AccessTokenUri()))
.timeout(Duration.ofSeconds(config.httpTimeout()))
.header(HEADER_CONTENT_TYPE, HEADER_CONTENT_TYPE_FORM);
if (config.oauth2AuthorizationMode() == HEADER) {
addClientIdAndSecretInRequestHeader(config, builder);
}
return builder;
}

private void addClientIdAndSecretInRequestHeader(
final HttpSinkConfig config, final HttpRequest.Builder builder
) {
final var clientAndSecretBytes = (config.oauth2ClientId() + ":" + config
.oauth2ClientSecret()
.value()).getBytes(StandardCharsets.UTF_8);
final var clientAndSecretAuthHeader = "Basic " + Base64
.getEncoder()
.encodeToString(clientAndSecretBytes);
builder.header(HEADER_AUTHORIZATION, clientAndSecretAuthHeader);
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2023 Aiven Oy and http-connector-for-apache-kafka project contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.connect.http.sender;

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpRequest.Builder;
import java.time.Duration;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

class DefaultHttpSender extends AbstractHttpSender implements HttpSender {

DefaultHttpSender(final HttpSinkConfig config, final HttpClient client) {
super(config, new DefaultHttpRequestBuilder(), client);
}

static class DefaultHttpRequestBuilder implements HttpRequestBuilder {

@Override
public Builder build(final HttpSinkConfig config) {
final var httpRequest = HttpRequest
.newBuilder(config.httpUri())
.timeout(Duration.ofSeconds(config.httpTimeout()));
config
.getAdditionalHeaders()
.forEach(httpRequest::header);
if (config.headerContentType() != null) {
httpRequest.header(HEADER_CONTENT_TYPE, config.headerContentType());
}
return httpRequest;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.aiven.kafka.connect.http.sender;

import java.net.http.HttpRequest;
import java.time.Duration;

import io.aiven.kafka.connect.http.config.HttpSinkConfig;

Expand All @@ -27,19 +26,6 @@ interface HttpRequestBuilder {

String HEADER_CONTENT_TYPE = "Content-Type";

HttpRequest.Builder build(final HttpSinkConfig config);

HttpRequestBuilder DEFAULT_HTTP_REQUEST_BUILDER = config -> {
final var httpRequest = HttpRequest.newBuilder(config.httpUri())
.timeout(Duration.ofSeconds(config.httpTimeout()));
config.getAdditionalHeaders().forEach(httpRequest::header);
if (config.headerContentType() != null) {
httpRequest.header(HEADER_CONTENT_TYPE, config.headerContentType());
}
return httpRequest;
};

HttpRequestBuilder AUTH_HTTP_REQUEST_BUILDER = config -> DEFAULT_HTTP_REQUEST_BUILDER.build(config)
.header(HEADER_AUTHORIZATION, config.headerAuthorization());
HttpRequest.Builder build(HttpSinkConfig config);

}
Loading