diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index 713cba1647..13aa9efb89 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -26,6 +26,7 @@ on: - development - master - taskdg1924 + - taskdg1924deleteprop jobs: build: diff --git a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java index 0a1b633b47..0d7ebf4e4d 100644 --- a/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java +++ b/intg/src/main/java/org/apache/atlas/model/tasks/AtlasTask.java @@ -117,7 +117,6 @@ public AtlasTask(String type, String createdBy, Map parameters, this.assetsCountPropagated = 0L; } - public String getGuid() { return guid; } diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java index 68c6dacd9c..8aa88f1d23 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v1/DeleteHandlerV1.java @@ -1215,16 +1215,42 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship } } + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) addPropagationsMap.size() + removePropagationsMap.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + + int propagatedCount = 0; for (AtlasVertex classificationVertex : addPropagationsMap.keySet()) { List entitiesToAddPropagation = addPropagationsMap.get(classificationVertex); addTagPropagation(classificationVertex, entitiesToAddPropagation); + propagatedCount++; + if (propagatedCount == 100){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount - 1); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + propagatedCount = 0; + } } for (AtlasVertex classificationVertex : removePropagationsMap.keySet()) { List entitiesToRemovePropagation = removePropagationsMap.get(classificationVertex); removeTagPropagation(classificationVertex, entitiesToRemovePropagation); + propagatedCount++; + if (propagatedCount == 100){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + propagatedCount = 0; + } + } + if (propagatedCount != 0){ + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + propagatedCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); } } else { // update blocked propagated classifications only if there is no change is tag propagation (don't update both) @@ -1232,6 +1258,7 @@ public void updateTagPropagations(AtlasEdge edge, AtlasRelationship relationship } } + public void handleBlockedClassifications(AtlasEdge edge, Set blockedClassifications) throws AtlasBaseException { if (blockedClassifications != null) { List propagatableClassifications = getPropagatableClassifications(edge); diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java index 96d31bc7eb..4c32250d63 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/EntityGraphMapper.java @@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Iterators; import org.apache.atlas.*; import org.apache.atlas.annotation.GraphTransaction; import org.apache.atlas.authorize.AtlasAuthorizationUtils; @@ -78,7 +79,6 @@ import javax.inject.Inject; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -3139,9 +3139,10 @@ public void cleanUpClassificationPropagation(String classificationName, int batc long classificationEdgeCount = 0; long classificationEdgeInMemoryCount = 0; Iterator tagVertices = GraphHelper.getClassificationVertices(graph, classificationName, CLEANUP_BATCH_SIZE); + List tagVerticesProcessed = new ArrayList<>(0); List currentAssetVerticesBatch = new ArrayList<>(0); - + int totalCount = 0; while (tagVertices != null && tagVertices.hasNext()) { if (cleanedUpCount >= CLEANUP_MAX){ return; @@ -3160,6 +3161,8 @@ public void cleanUpClassificationPropagation(String classificationName, int batc } int currentAssetsBatchSize = currentAssetVerticesBatch.size(); + totalCount += currentAssetsBatchSize; + if (currentAssetsBatchSize > 0) { LOG.info("To clean up tag {} from {} entities", classificationName, currentAssetsBatchSize); int offset = 0; @@ -3191,17 +3194,20 @@ public void cleanUpClassificationPropagation(String classificationName, int batc classificationEdgeInMemoryCount = 0; } } + try { AtlasEntity entity = repairClassificationMappings(vertex); entityChangeNotifier.onClassificationDeletedFromEntity(entity, deletedClassifications); } catch (IllegalStateException | AtlasBaseException e) { e.printStackTrace(); } + } transactionInterceptHelper.intercept(); - offset += CHUNK_SIZE; + + } finally { LOG.info("For offset {} , classificationEdge were : {}", offset, classificationEdgeCount); classificationEdgeCount = 0; @@ -3217,6 +3223,17 @@ public void cleanUpClassificationPropagation(String classificationName, int batc e.printStackTrace(); } } + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) totalCount); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + totalCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); cleanedUpCount += currentAssetsBatchSize; @@ -3481,7 +3498,7 @@ public List processClassificationPropagationAddition(List v // update the 'assetsCountToPropagate' on in memory java object. AtlasTask currentTask = RequestContext.get().getCurrentTask(); - currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size()); + currentTask.setAssetsCountToPropagate((long) verticesToPropagate.size() - 1); //update the 'assetsCountToPropagate' in the current task vertex. AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); @@ -3519,7 +3536,7 @@ public List processClassificationPropagationAddition(List v propagatedEntitiesGuids.addAll(chunkedPropagatedEntitiesGuids); transactionInterceptHelper.intercept(); - int finishedTaskCount = toIndex - offset; + int finishedTaskCount = toIndex - offset - 1; offset += CHUNK_SIZE; currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); @@ -4067,6 +4084,16 @@ public void updateClassificationTextPropagation(String classificationVertexId) t AtlasClassification classification = entityRetriever.toAtlasClassification(classificationVertex); LOG.info("Fetched classification : {} ", classification.toString()); List impactedVertices = graphHelper.getAllPropagatedEntityVertices(classificationVertex); + + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) impactedVertices.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + LOG.info("impactedVertices : {}", impactedVertices.size()); int batchSize = 100; for (int i = 0; i < impactedVertices.size(); i += batchSize) { @@ -4081,6 +4108,10 @@ public void updateClassificationTextPropagation(String classificationVertexId) t entityChangeNotifier.onClassificationUpdatedToEntity(entity, Collections.singletonList(classification)); } } + + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + batch.size() - 1); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); + transactionInterceptHelper.intercept(); LOG.info("Updated classificationText from {} for {}", i, batchSize); } @@ -4271,6 +4302,15 @@ public void classificationRefreshPropagation(String classificationId) throws Atl .filter(vertex -> vertex != null) .collect(Collectors.toList()); + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) verticesToRemove.size() + verticesToAddClassification.size() - 1); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + //Remove classifications from unreachable vertices processPropagatedClassificationDeletionFromVertices(verticesToRemove, currentClassificationVertex, classification); @@ -4332,6 +4372,9 @@ private void processPropagatedClassificationDeletionFromVertices(List updatedEntities = updateClassificationText(classification, updatedVertices); entityChangeNotifier.onClassificationsDeletedFromEntities(updatedEntities, Collections.singletonList(classification)); + int finishedTaskCount = toIndex - offset; offset += CHUNK_SIZE; - + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); } while (offset < propagatedVerticesSize); @@ -4367,6 +4412,15 @@ List processClassificationEdgeDeletionInChunk(AtlasClassification classi int toIndex; int offset = 0; + // update the 'assetsCountToPropagate' on in memory java object. + AtlasTask currentTask = RequestContext.get().getCurrentTask(); + currentTask.setAssetsCountToPropagate((long) propagatedEdgesSize); + + //update the 'assetsCountToPropagate' in the current task vertex. + AtlasVertex currentTaskVertex = (AtlasVertex) graph.query().has(TASK_GUID, currentTask.getGuid()).vertices().iterator().next(); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_TO_PROPAGATE, currentTask.getAssetsCountToPropagate()); + graph.commit(); + do { toIndex = ((offset + CHUNK_SIZE > propagatedEdgesSize) ? propagatedEdgesSize : (offset + CHUNK_SIZE)); @@ -4382,8 +4436,12 @@ List processClassificationEdgeDeletionInChunk(AtlasClassification classi deletedPropagationsGuid.addAll(propagatedEntities.stream().map(x -> x.getGuid()).collect(Collectors.toList())); } + int finishedTaskCount = toIndex - offset; + offset += CHUNK_SIZE; + currentTask.setAssetsCountPropagated(currentTask.getAssetsCountPropagated() + finishedTaskCount); + currentTaskVertex.setProperty(TASK_ASSET_COUNT_PROPAGATED, currentTask.getAssetsCountPropagated()); transactionInterceptHelper.intercept(); } while (offset < propagatedEdgesSize); diff --git a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java index f7513eedf9..7c7f5d7ba8 100644 --- a/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java +++ b/repository/src/main/java/org/apache/atlas/tasks/TaskFactory.java @@ -27,7 +27,7 @@ public interface TaskFactory { * @param atlasTask * @return */ - AbstractTask create(AtlasTask atlasTask) throws AtlasException; + AbstractTask create(AtlasTask atlasTask); List getSupportedTypes(); } \ No newline at end of file