Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DG-1988 Minor improvements on append/remove/replace for Tags #3957

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,7 @@ public class AtlasEntityHeader extends AtlasStruct implements Serializable {
private String displayText = null;
private List<String> classificationNames = null;
private List<AtlasClassification> classifications = null;
private List<AtlasClassification> appendClassifications = null;


private List<AtlasClassification> updateClassifications = null;
private List<AtlasClassification> addOrUpdateClassifications = null;
private List<AtlasClassification> removeClassifications = null;
private List<String> meaningNames = null;
private List<AtlasTermAssignmentHeader> meanings = null;
Expand Down Expand Up @@ -106,20 +103,12 @@ public AtlasEntityHeader(String typeName, Map<String, Object> attributes) {
setLabels(null);
}

public List<AtlasClassification> getAppendClassifications() {
return appendClassifications;
}

public void setAppendClassifications(List<AtlasClassification> appendClassifications) {
this.appendClassifications = appendClassifications;
}

public List<AtlasClassification> getUpdateClassifications() {
return updateClassifications;
public List<AtlasClassification> getAddOrUpdateClassifications() {
return addOrUpdateClassifications;
}

public void setUpdateClassifications(List<AtlasClassification> updateClassifications) {
this.updateClassifications = updateClassifications;
public void setAddOrUpdateClassifications(List<AtlasClassification> addOrUpdateClassifications) {
this.addOrUpdateClassifications = addOrUpdateClassifications;
}

public List<AtlasClassification> getRemoveClassifications() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.atlas.AtlasConfiguration.ENTITY_CHANGE_NOTIFY_IGNORE_RELATIONSHIP_ATTRIBUTES;

Expand Down Expand Up @@ -245,39 +244,44 @@ public void setClassifications(Map<String, AtlasEntityHeader> map, boolean overr
private Map<String, List<AtlasClassification>> validateAndTransfer(AtlasEntityHeader incomingEntityHeader, AtlasEntityHeader entityToBeChanged) throws AtlasBaseException {
Map<String, List<AtlasClassification>> operationListMap = new HashMap<>();

Set<String> requiredClassificationKeys = Stream.concat(
Optional.ofNullable(incomingEntityHeader.getRemoveClassifications()).orElse(Collections.emptyList()).stream(),
Optional.ofNullable(incomingEntityHeader.getUpdateClassifications()).orElse(Collections.emptyList()).stream()
).filter(classification -> classification.getEntityGuid().equals(entityToBeChanged.getGuid()))
.map(this::generateClassificationComparisonKey)
.collect(Collectors.toSet());
Set<String> preExistingClassificationKeys = new HashSet<>();
List<AtlasClassification> filteredRemoveClassifications = new ArrayList<>();

Set<String> preExistingClassificationKeys = Optional.ofNullable(entityToBeChanged.getClassifications())
.orElse(Collections.emptyList())
.stream()
.filter(classification -> classification.getEntityGuid().equals(entityToBeChanged.getGuid()))
.map(this::generateClassificationComparisonKey)
.collect(Collectors.toSet());
ListOps<AtlasClassification> listOps = new ListOps<>();

Set<String> diff = requiredClassificationKeys.stream()
.filter(key -> !preExistingClassificationKeys.contains(key))
.collect(Collectors.toSet());
for (AtlasClassification classification : Optional.ofNullable(entityToBeChanged.getClassifications()).orElse(Collections.emptyList())) {
if (entityToBeChanged.getGuid().equals(classification.getEntityGuid())) {
String key = generateClassificationComparisonKey(classification);
preExistingClassificationKeys.add(key); // Track pre-existing keys
}
}

if (!diff.isEmpty()) {
String firstTypeName = diff.iterator().next().split("\\|")[1];
throw new AtlasBaseException(AtlasErrorCode.CLASSIFICATION_NOT_ASSOCIATED_WITH_ENTITY, firstTypeName);
for (AtlasClassification classification : Optional.ofNullable(incomingEntityHeader.getRemoveClassifications()).orElse(Collections.emptyList())) {
if (entityToBeChanged.getGuid().equals(classification.getEntityGuid())) {
String key = generateClassificationComparisonKey(classification);
// If the classification doesn't exist in pre-existing keys, log it
if (!preExistingClassificationKeys.contains(key)) {
String typeName = key.split("\\|")[1];
LOG.info("Classification {} is not associated with entity {}", typeName, entityToBeChanged.getGuid());
} else {
filteredRemoveClassifications.add(classification);
}
}
}

List<AtlasClassification> filteredClassifications = Optional.ofNullable(incomingEntityHeader.getAppendClassifications())
List<AtlasClassification> filteredClassifications = Optional.ofNullable(incomingEntityHeader.getAddOrUpdateClassifications())
.orElse(Collections.emptyList())
.stream()
.filter(classification -> classification.getEntityGuid().equals(entityToBeChanged.getGuid()))
.filter(appendClassification -> !preExistingClassificationKeys.contains(generateClassificationComparisonKey(appendClassification)))
.collect(Collectors.toList());

bucket(PROCESS_DELETE, operationListMap, incomingEntityHeader.getRemoveClassifications());
bucket(PROCESS_UPDATE, operationListMap, incomingEntityHeader.getUpdateClassifications());
bucket(PROCESS_ADD, operationListMap, filteredClassifications);
List<AtlasClassification> incomingClassifications = listOps.filter(incomingEntityHeader.getGuid(), filteredClassifications);
List<AtlasClassification> entityClassifications = listOps.filter(entityToBeChanged.getGuid(), entityToBeChanged.getClassifications());

bucket(PROCESS_DELETE, operationListMap, filteredRemoveClassifications);
bucket(PROCESS_UPDATE, operationListMap, listOps.intersect(incomingClassifications, entityClassifications));
bucket(PROCESS_ADD, operationListMap, listOps.subtract(incomingClassifications, entityClassifications));

return operationListMap;
}

Expand Down
Loading