Skip to content

Commit

Permalink
Merge pull request #3897 from atlanhq/taskdg1924deleteprop
Browse files Browse the repository at this point in the history
DG-1924 | Update task vertex w impacted-vertices counts for remaining types of propagation
  • Loading branch information
abhijeet-atlan authored Jan 13, 2025
2 parents 5d0068e + 10f9bde commit dc614df
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 8 deletions.
1 change: 1 addition & 0 deletions .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ on:
- development
- master
- taskdg1924
- taskdg1924deleteprop

jobs:
build:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ public AtlasTask(String type, String createdBy, Map<String, Object> parameters,
this.assetsCountPropagated = 0L;
}


public String getGuid() {
return guid;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1215,23 +1215,50 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship
}
}

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) addPropagationsMap.size() + removePropagationsMap.size() - 1);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

int propagatedCount = 0;
for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) {
List<AtlasVertex> entitiesToAddPropagation = addPropagationsMap.get(classificationVertex);

addTagPropagation(classificationVertex, entitiesToAddPropagation);
propagatedCount++;
if (propagatedCount == 100){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount - 1);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
propagatedCount = 0;
}
}

for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) {
List<AtlasVertex> entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex);

removeTagPropagation(classificationVertex, entitiesToRemovePropagation);
propagatedCount++;
if (propagatedCount == 100){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
propagatedCount = 0;
}
}
if (propagatedCount != 0){
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
}
} else {
// update blocked propagated classifications only if there is no change is tag propagation (don't update both)
handleBlockedClassifications(edge, relationship.getBlockedPropagatedClassifications());
}
}


public void handleBlockedClassifications(AtlasEdge edge, Set<AtlasClassification> blockedClassifications) throws AtlasBaseException {
if (blockedClassifications != null) {
List<AtlasVertex> propagatableClassifications = getPropagatableClassifications(edge);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Iterators;
import org.apache.atlas.*;
import org.apache.atlas.annotation.GraphTransaction;
import org.apache.atlas.authorize.AtlasAuthorizationUtils;
Expand Down Expand Up @@ -78,7 +79,6 @@

import javax.inject.Inject;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -3139,9 +3139,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
long classificationEdgeCount = 0;
long classificationEdgeInMemoryCount = 0;
Iterator<AtlasVertex> tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE);

List<AtlasVertex> tagVerticesProcessed = new ArrayList<>(0);
List<AtlasVertex> currentAssetVerticesBatch = new ArrayList<>(0);

int totalCount = 0;
while (tagVertices != null && tagVertices.hasNext()) {
if (cleanedUpCount >= CLEANUP_MAX){
return;
Expand All @@ -3160,6 +3161,8 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
}

int currentAssetsBatchSize = currentAssetVerticesBatch.size();
totalCount += currentAssetsBatchSize;

if (currentAssetsBatchSize > 0) {
LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize);
int offset = 0;
Expand Down Expand Up @@ -3191,17 +3194,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
classificationEdgeInMemoryCount = 0;
}
}

try {
AtlasEntity entity = repairClassificationMappings(vertex);
entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications);
} catch (IllegalStateException | AtlasBaseException e) {
e.printStackTrace();
}

}

transactionInterceptHelper.intercept();

offset += CHUNK_SIZE;


} finally {
LOG.info("For offset {} , classificationEdge were : {}", offset, classificationEdgeCount);
classificationEdgeCount = 0;
Expand All @@ -3217,6 +3223,17 @@ public void cleanUpClassificationPropagation(String classificationName, int batc
e.printStackTrace();
}
}
// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) totalCount);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + totalCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
transactionInterceptHelper.intercept();

cleanedUpCount += currentAssetsBatchSize;
Expand Down Expand Up @@ -3481,7 +3498,7 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size());
currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size() - 1);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
Expand Down Expand Up @@ -3519,7 +3536,7 @@ public List<String> processClassificationPropagationAddition(List<AtlasVertex> v
propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids);

transactionInterceptHelper.intercept();
int finishedTaskCount = toIndex - offset;
int finishedTaskCount = toIndex - offset - 1;

offset += CHUNK_SIZE;
currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount);
Expand Down Expand Up @@ -4067,6 +4084,16 @@ public void updateClassificationTextPropagation(String classificationVertexId) t
AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex);
LOG.info("Fetched classification : {} ", classification.toString());
List<AtlasVertex> impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex);

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) impactedVertices.size() - 1);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

LOG.info("impactedVertices : {}", impactedVertices.size());
int batchSize = 100;
for (int i = 0; i < impactedVertices.size(); i += batchSize) {
Expand All @@ -4081,6 +4108,10 @@ public void updateClassificationTextPropagation(String classificationVertexId) t
entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification));
}
}

currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + batch.size() - 1);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());

transactionInterceptHelper.intercept();
LOG.info("Updated classificationText from {} for {}", i, batchSize);
}
Expand Down Expand Up @@ -4271,6 +4302,15 @@ public void classificationRefreshPropagation(String classificationId) throws Atl
.filter(vertex -> vertex != null)
.collect(Collectors.toList());

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) verticesToRemove.size() + verticesToAddClassification.size() - 1);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

//Remove classifications from unreachable vertices
processPropagatedClassificationDeletionFromVertices(verticesToRemove, currentClassificationVertex, classification);

Expand Down Expand Up @@ -4332,6 +4372,9 @@ private void processPropagatedClassificationDeletionFromVertices(List<AtlasVerte
int toIndex;
int offset = 0;

AtlasTask currentTask = RequestContext.get().getCurrentTask();
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();

LOG.info("To delete classification of vertex id {} from {} entity vertices", classificationVertex.getIdForDisplay(), propagatedVerticesSize);

try {
Expand All @@ -4348,8 +4391,10 @@ private void processPropagatedClassificationDeletionFromVertices(List<AtlasVerte
List<AtlasEntity> updatedEntities = updateClassificationText(classification, updatedVertices);
entityChangeNotifier.onClassificationsDeletedFromEntities(updatedEntities, Collections.singletonList(classification));

int finishedTaskCount = toIndex - offset;
offset += CHUNK_SIZE;

currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
transactionInterceptHelper.intercept();

} while (offset < propagatedVerticesSize);
Expand All @@ -4367,6 +4412,15 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi
int toIndex;
int offset = 0;

// update the 'assetsCountToPropagate' on in memory java object.
AtlasTask currentTask = RequestContext.get().getCurrentTask();
currentTask.setAssetsCountToPropagate((long) propagatedEdgesSize);

//update the 'assetsCountToPropagate' in the current task vertex.
AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next();
currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate());
graph.commit();

do {
toIndex = ((offset + CHUNK_SIZE > propagatedEdgesSize) ? propagatedEdgesSize : (offset + CHUNK_SIZE));

Expand All @@ -4382,8 +4436,12 @@ List<String> processClassificationEdgeDeletionInChunk(AtlasClassification classi
deletedPropagationsGuid.addAll(propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList()));
}

int finishedTaskCount = toIndex - offset;

offset += CHUNK_SIZE;

currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount);
currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated());
transactionInterceptHelper.intercept();

} while (offset < propagatedEdgesSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public interface TaskFactory {
* @param atlasTask
* @return
*/
AbstractTask create(AtlasTask atlasTask) throws AtlasException;
AbstractTask create(AtlasTask atlasTask);

List<String> getSupportedTypes();
}

0 comments on commit dc614df

Please sign in to comment.