Skip to content

Commit

Permalink
Merge branch 'develop' into release
Browse files Browse the repository at this point in the history
  • Loading branch information
vincenzo-ingenito committed Dec 20, 2024
2 parents 4eb8772 + 48f5905 commit 8b9d7c2
Show file tree
Hide file tree
Showing 11 changed files with 193 additions and 126 deletions.
13 changes: 13 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,19 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.microsoft.azure/msal4j -->
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>msal4j</artifactId>
<version>1.17.3</version>
</dependency>
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-client-authentication</artifactId>
<version>1.6.15</version>
<scope>compile</scope>
</dependency>

</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public class KafkaHealthIndicator implements HealthIndicator {

@Autowired
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

Expand Down Expand Up @@ -74,6 +75,7 @@ public class KafkaPropertiesCFG {
private ProfileUtility profileUtility;

@Bean
@ConditionalOnProperty(name = "sasl.mechanism", havingValue = "OAUTHBEARER", matchIfMissing = false)
public AdminClient client() {
Properties configProperties = new Properties();
configProperties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, producerBootstrapServers);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//Copyright (c) Microsoft Corporation. All rights reserved.
//Licensed under the MIT License.

package it.finanze.sanita.fse2.ms.iniclient.config.kafka.oauth2;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import javax.security.auth.callback.Callback;
import javax.security.auth.callback.UnsupportedCallbackException;
import javax.security.auth.login.AppConfigurationEntry;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;

import com.microsoft.aad.msal4j.ClientCredentialFactory;
import com.microsoft.aad.msal4j.ClientCredentialParameters;
import com.microsoft.aad.msal4j.ConfidentialClientApplication;
import com.microsoft.aad.msal4j.IAuthenticationResult;
import com.microsoft.aad.msal4j.IClientCredential;

import it.finanze.sanita.fse2.ms.iniclient.exceptions.base.BusinessException;
import it.finanze.sanita.fse2.ms.iniclient.utility.FileUtility;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CustomAuthenticateCallbackHandler implements AuthenticateCallbackHandler {

private String tenantId;

private String appId;

private String pfxPathName;

private String pwd;

private ConfidentialClientApplication aadClient;
private ClientCredentialParameters aadParameters;

@Override
public void configure(Map<String, ?> configs, String mechanism, List<AppConfigurationEntry> jaasConfigEntries) {
String bootstrapServer = Arrays.asList(configs.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)).get(0).toString();

bootstrapServer = bootstrapServer.replaceAll("\\[|\\]", "");
URI uri = URI.create("https://" + bootstrapServer);
String sbUri = uri.getScheme() + "://" + uri.getHost();
this.aadParameters =
ClientCredentialParameters.builder(Collections.singleton(sbUri + "/.default"))
.build();
this.tenantId = "https://login.microsoftonline.com/"+ Arrays.asList(configs.get("kafka.oauth.tenantId")).get(0).toString();
this.appId = Arrays.asList(configs.get("kafka.oauth.appId")).get(0).toString();
this.pfxPathName = Arrays.asList(configs.get("kafka.oauth.pfxPathName")).get(0).toString();
this.pwd = Arrays.asList(configs.get("kafka.oauth.pwd")).get(0).toString();

}

public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
for (Callback callback: callbacks) {
if (callback instanceof OAuthBearerTokenCallback) {
try {
OAuthBearerToken token = getOAuthBearerToken();
OAuthBearerTokenCallback oauthCallback = (OAuthBearerTokenCallback) callback;
oauthCallback.token(token);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
e.printStackTrace();
}
} else {
throw new UnsupportedCallbackException(callback);
}
}
}

private OAuthBearerToken getOAuthBearerToken() throws MalformedURLException, InterruptedException, ExecutionException, TimeoutException {
if (this.aadClient == null) {
synchronized(this) {
if (this.aadClient == null) {
IClientCredential credential = null;
try (FileInputStream certificato = new FileInputStream(new File(pfxPathName))) {
credential = ClientCredentialFactory.createFromCertificate(certificato, this.pwd);
} catch(Exception ex) {
log.error("Error while try to crate credential from certificate");
throw new BusinessException(ex);
}
this.aadClient = ConfidentialClientApplication.builder(this.appId, credential)
.authority(this.tenantId)
.build();
}
}
}

IAuthenticationResult authResult = this.aadClient.acquireToken(this.aadParameters).get();
log.info("Token oauth2 acquired");
return new OAuthBearerTokenImp(authResult.accessToken(), authResult.expiresOnDate());
}

public void close() throws KafkaException {
// NOOP
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package it.finanze.sanita.fse2.ms.iniclient.config.kafka.oauth2;
import java.util.Date;
import java.util.Set;

import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;


public class OAuthBearerTokenImp implements OAuthBearerToken {
private String token;
private long lifetimeMs;

public OAuthBearerTokenImp(final String token, Date expiresOn) {
this.token = token;
this.lifetimeMs = expiresOn.getTime();
}

@Override
public String value() {
return this.token;
}

@Override
public Set<String> scope() {
return null;
}

@Override
public long lifetimeMs() {
return this.lifetimeMs;
}

@Override
public String principalName() {
return null;
}

@Override
public Long startTimeMs() {
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,7 @@ public GetMergedMetadatiDTO getMergedMetadati(final String oidToUpdate,final Mer
out.setDocumentType(CommonUtility.extractDocumentTypeFromQueryResponse(oldMetadata));

if(oldMetadata.getRegistryObjectList().getIdentifiable().isEmpty()) {
throw new MergeMetadatoNotFoundException("Attezione, metadati non trovati");
throw new MergeMetadatoNotFoundException("Attenzione, metadati non trovati");
}

ExtrinsicObjectType val = (ExtrinsicObjectType)oldMetadata.getRegistryObjectList().getIdentifiable().get(0).getValue();
Expand Down
4 changes: 4 additions & 0 deletions src/main/resources/application-dev.properties
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,8 @@ kafka.properties.sasl.mechanism=PLAINTEXT
kafka.properties.sasl.jaas.config=PLAINTEXT
kafka.properties.ssl.truststore.location=PLAINTEXT
kafka.properties.ssl.truststore.password=PLAINTEXT
kafka.oauth.tenantId=
kafka.oauth.appId=
kafka.oauth.pfxPathName=
kafka.oauth.pwd=
ms.url.gtw-crash-program-validator=http://localhost:8080
6 changes: 5 additions & 1 deletion src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ docs.info.summary=Ini client
docs.info.title=Ini client
docs.info.description=Ini client


####### LOGGING OUTPUT FORMAT ############
# Must be one of console, json
#######################################
Expand All @@ -59,6 +58,11 @@ kafka.properties.ssl.truststore.password=${TRUST_JKS_PASSWORD}
kafka.producer.retries=5
kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

kafka.oauth.tenantId=${TENANT_ID}
kafka.oauth.appId=${APP_ID}
kafka.oauth.pfxPathName=${PFX_NAME_RESOURCE_PATH}
kafka.oauth.pwd=${PFX_PASSWORD}
#######################################
# DATASOURCE DB
#######################################
Expand Down
17 changes: 12 additions & 5 deletions src/main/resources/logback-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,19 @@
<producerConfig>bootstrap.servers=${kafka.bootstrap-servers}</producerConfig>
<producerConfig>metadata.fetch.timeout.ms=99999999999</producerConfig>
<producerConfig>client.id=${kafka.producer.client-id}</producerConfig>
<producerConfig>security.protocol=${kafka.properties.security.protocol}</producerConfig>
<producerConfig>sasl.mechanism=${kafka.properties.sasl.mechanism}</producerConfig>
<producerConfig>sasl.jaas.config=${kafka.properties.sasl.jaas.config}</producerConfig>
<producerConfig>sasl.login.callback.handler.class=it.finanze.sanita.fse2.ms.iniclient.config.kafka.oauth2.CustomAuthenticateCallbackHandler</producerConfig>
<producerConfig>kafka.oauth.tenantId=${kafka.oauth.tenantId}</producerConfig>
<producerConfig>kafka.oauth.appId=${kafka.oauth.appId}</producerConfig>
<producerConfig>kafka.oauth.pfxPathName=${kafka.oauth.pfxPathName}</producerConfig>
<producerConfig>kafka.oauth.pwd=${kafka.oauth.pwd}</producerConfig>
<springProfile name="default">
<producerConfig>security.protocol=${kafka.properties.security.protocol}</producerConfig>
<producerConfig>sasl.mechanism=${kafka.properties.sasl.mechanism}</producerConfig>
<producerConfig>sasl.jaas.config=${kafka.properties.sasl.jaas.config}</producerConfig>
<producerConfig>ssl.truststore.location=${kafka.properties.ssl.truststore.location}</producerConfig>
<producerConfig>ssl.truststore.password=${kafka.properties.ssl.truststore.password}</producerConfig>
<producerConfig>ssl.truststore.location=${kafka.properties.ssl.truststore.location}
</producerConfig>
<producerConfig>ssl.truststore.password=${kafka.properties.ssl.truststore.password}
</producerConfig>
</springProfile>
</appender>
</logger>
Expand Down

0 comments on commit 8b9d7c2

Please sign in to comment.