From 9d5fe443a46ad773d978a9f1e2aa3ab30c351f2c Mon Sep 17 00:00:00 2001 From: Rahul Madan Date: Thu, 28 Nov 2024 12:04:44 +0530 Subject: [PATCH 1/3] fix errors --- .../apache/atlas/repository/Constants.java | 6 + .../atlas/discovery/EntityLineageService.java | 86 ++- .../store/graph/v2/AtlasEntityStoreV2.java | 8 + .../lineage/LineagePreProcessor.java | 560 ++++++++++++++++++ 4 files changed, 636 insertions(+), 24 deletions(-) create mode 100644 repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java diff --git a/common/src/main/java/org/apache/atlas/repository/Constants.java b/common/src/main/java/org/apache/atlas/repository/Constants.java index 4fc62e0e6e..7460115fc8 100644 --- a/common/src/main/java/org/apache/atlas/repository/Constants.java +++ b/common/src/main/java/org/apache/atlas/repository/Constants.java @@ -207,6 +207,11 @@ public final class Constants { public static String[] PROCESS_EDGE_LABELS = {PROCESS_OUTPUTS, PROCESS_INPUTS}; + public static final String PROCESS_ENTITY_TYPE = "Process"; + + public static final String CONNECTION_PROCESS_ENTITY_TYPE = "ConnectionProcess"; + public static final String PARENT_CONNECTION_PROCESS_QUALIFIED_NAME = "parentConnectionProcessQualifiedName"; + /** * The homeId field is used when saving into Atlas a copy of an object that is being imported from another * repository. The homeId will be set to a String that identifies the other repository. The specific format @@ -269,6 +274,7 @@ public final class Constants { public static final String NAME = "name"; public static final String QUALIFIED_NAME = "qualifiedName"; + public static final String CONNECTION_QUALIFIED_NAME = "connectionQualifiedName"; public static final String TYPE_NAME_PROPERTY_KEY = INTERNAL_PROPERTY_KEY_PREFIX + "typeName"; public static final String INDEX_SEARCH_MAX_RESULT_SET_SIZE = "atlas.graph.index.search.max-result-set-size"; public static final String INDEX_SEARCH_TYPES_MAX_QUERY_STR_LENGTH = "atlas.graph.index.search.types.max-query-str-length"; diff --git a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java index a873105a6d..ff1b76a390 100644 --- a/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java +++ b/repository/src/main/java/org/apache/atlas/discovery/EntityLineageService.java @@ -84,6 +84,8 @@ public class EntityLineageService implements AtlasLineageService { private static final String PROCESS_INPUTS_EDGE = "__Process.inputs"; private static final String PROCESS_OUTPUTS_EDGE = "__Process.outputs"; + private static final String CONNECTION_PROCESS_INPUTS_EDGE = "__ConnectionProcess.inputs"; + private static final String CONNECTION_PROCESS_OUTPUTS_EDGE = "__ConnectionProcess.outputs"; private static final String COLUMNS = "columns"; private static final boolean LINEAGE_USING_GREMLIN = AtlasConfiguration.LINEAGE_USING_GREMLIN.getBoolean(); private static final Integer DEFAULT_LINEAGE_MAX_NODE_COUNT = 9000; @@ -177,8 +179,8 @@ public AtlasLineageOnDemandInfo getAtlasLineageInfo(String guid, LineageOnDemand RequestContext.get().setRelationAttrsForSearch(lineageOnDemandRequest.getRelationAttributes()); AtlasLineageOnDemandContext atlasLineageOnDemandContext = new AtlasLineageOnDemandContext(lineageOnDemandRequest, atlasTypeRegistry); - boolean isDataSet = validateEntityTypeAndCheckIfDataSet(guid); - AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, isDataSet); + EntityValidationResult entityValidationResult = validateEntityTypeAndCheckIfDataSet(guid); + AtlasLineageOnDemandInfo ret = getLineageInfoOnDemand(guid, atlasLineageOnDemandContext, entityValidationResult); appendLineageOnDemandPayload(ret, lineageOnDemandRequest); // filtering out on-demand relations which has input & output nodes within the limit cleanupRelationsOnDemand(ret); @@ -203,20 +205,44 @@ public AtlasLineageListInfo getLineageListInfoOnDemand(String guid, LineageListR return ret; } - private boolean validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { + public class EntityValidationResult { + public final boolean isProcess; + public final boolean isDataSet; + public final boolean isConnection; + public final boolean isConnectionProcess; + + public EntityValidationResult(boolean isProcess, boolean isDataSet, boolean isConnection, boolean isConnectionProcess) { + this.isProcess = isProcess; + this.isDataSet = isDataSet; + this.isConnection = isConnection; + this.isConnectionProcess = isConnectionProcess; + } + } + + + private EntityValidationResult validateEntityTypeAndCheckIfDataSet(String guid) throws AtlasBaseException { String typeName = entityRetriever.getEntityVertex(guid).getProperty(Constants.TYPE_NAME_PROPERTY_KEY, String.class); AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); if (entityType == null) { throw new AtlasBaseException(AtlasErrorCode.TYPE_NAME_NOT_FOUND, typeName); } boolean isProcess = entityType.getTypeAndAllSuperTypes().contains(PROCESS_SUPER_TYPE); + boolean isConnectionProcess = false; + boolean isDataSet = false; + boolean isConnection = false; if (!isProcess) { - boolean isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); - if (!isDataSet) { - throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + isConnectionProcess = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_PROCESS_ENTITY_TYPE); + if(!isConnectionProcess){ + isDataSet = entityType.getTypeAndAllSuperTypes().contains(DATA_SET_SUPER_TYPE); + if (!isDataSet) { + isConnection = entityType.getTypeAndAllSuperTypes().contains(CONNECTION_ENTITY_TYPE); + if(!isConnection){ + throw new AtlasBaseException(AtlasErrorCode.INVALID_LINEAGE_ENTITY_TYPE, guid, typeName); + } + } } } - return !isProcess; + return new EntityValidationResult(isProcess, isDataSet, isConnection, isConnectionProcess); } private LineageOnDemandConstraints getLineageConstraints(String guid, LineageOnDemandBaseParams defaultParams) { @@ -281,7 +307,7 @@ private void cleanupRelationsOnDemand(AtlasLineageOnDemandInfo lineageInfo) { } } - private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, boolean isDataSet) throws AtlasBaseException { + private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineageOnDemandContext atlasLineageOnDemandContext, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("getLineageInfoOnDemand"); LineageOnDemandConstraints lineageConstraintsByGuid = getAndValidateLineageConstraintsByGuid(guid, atlasLineageOnDemandContext); @@ -298,12 +324,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtomicInteger inputEntitiesTraversed = new AtomicInteger(0); AtomicInteger outputEntitiesTraversed = new AtomicInteger(0); AtomicInteger traversalOrder = new AtomicInteger(1); - if (isDataSet) { + if (entityValidationResult.isConnection || entityValidationResult.isDataSet) { AtlasVertex datasetVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, true, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) - traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder); + traverseEdgesOnDemand(datasetVertex, false, depth, level, new HashSet<>(), atlasLineageOnDemandContext, ret, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); AtlasEntityHeader baseEntityHeader = entityRetriever.toAtlasEntityHeader(datasetVertex, atlasLineageOnDemandContext.getAttributes()); setGraphTraversalMetadata(level, traversalOrder, baseEntityHeader); ret.getGuidEntityMap().put(guid, baseEntityHeader); @@ -311,12 +337,12 @@ private AtlasLineageOnDemandInfo getLineageInfoOnDemand(String guid, AtlasLineag AtlasVertex processVertex = AtlasGraphUtilsV2.findByGuid(this.graph, guid); // make one hop to the next dataset vertices from process vertex and traverse with 'depth = depth - 1' if (direction == AtlasLineageOnDemandInfo.LineageDirection.INPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_INPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_INPUTS_EDGE:CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, true, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, inputEntitiesTraversed, traversalOrder, entityValidationResult); } if (direction == AtlasLineageOnDemandInfo.LineageDirection.OUTPUT || direction == AtlasLineageOnDemandInfo.LineageDirection.BOTH) { - Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, PROCESS_OUTPUTS_EDGE).iterator(); - traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder); + Iterator processEdges = processVertex.getEdges(AtlasEdgeDirection.OUT, entityValidationResult.isProcess? PROCESS_OUTPUTS_EDGE:CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + traverseEdgesOnDemand(processEdges, false, depth, level, atlasLineageOnDemandContext, ret, processVertex, guid, outputEntitiesTraversed, traversalOrder, entityValidationResult); } } RequestContext.get().endMetricRecord(metricRecorder); @@ -329,7 +355,7 @@ private static void setGraphTraversalMetadata(int level, AtomicInteger traversal baseEntityHeader.setFinishTime(traversalOrder.get()); } - private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(Iterator processEdges, boolean isInput, int depth, int level, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, AtlasVertex processVertex, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { AtlasLineageOnDemandInfo.LineageDirection direction = isInput ? AtlasLineageOnDemandInfo.LineageDirection.INPUT : AtlasLineageOnDemandInfo.LineageDirection.OUTPUT; int nextLevel = isInput ? level - 1: level + 1; @@ -360,11 +386,12 @@ private void traverseEdgesOnDemand(Iterator processEdges, boolean isI ret.getRelationsOnDemand().put(inGuid, new LineageInfoOnDemand(inGuidLineageConstrains)); } - traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(inGuid); + traverseEdgesOnDemand(datasetVertex, isInput, depth - 1, nextLevel, new HashSet<>(), atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); } } - private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder) throws AtlasBaseException { + private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, int depth, int level, Set visitedVertices, AtlasLineageOnDemandContext atlasLineageOnDemandContext, AtlasLineageOnDemandInfo ret, String baseGuid, AtomicInteger entitiesTraversed, AtomicInteger traversalOrder, EntityValidationResult entityValidationResult) throws AtlasBaseException { if (isEntityTraversalLimitReached(entitiesTraversed)) return; if (depth != 0) { // base condition of recursion for depth @@ -374,8 +401,13 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i // keep track of visited vertices to avoid circular loop visitedVertices.add(getId(datasetVertex)); + Iterator incomingEdges = null; AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesIn = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesIn"); - Iterator incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + if(entityValidationResult.isDataSet){ + incomingEdges = datasetVertex.getEdges(IN, isInput ? PROCESS_OUTPUTS_EDGE : PROCESS_INPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + incomingEdges = datasetVertex.getEdges(IN, isInput ? CONNECTION_PROCESS_OUTPUTS_EDGE : CONNECTION_PROCESS_INPUTS_EDGE).iterator(); + } RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesIn); while (incomingEdges.hasNext()) { @@ -403,7 +435,12 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i } AtlasPerfMetrics.MetricRecorder traverseEdgesOnDemandGetEdgesOut = RequestContext.get().startMetricRecord("traverseEdgesOnDemandGetEdgesOut"); - Iterator outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + Iterator outgoingEdges = null; + if(entityValidationResult.isDataSet){ + outgoingEdges = processVertex.getEdges(OUT, isInput ? PROCESS_INPUTS_EDGE : PROCESS_OUTPUTS_EDGE).iterator(); + } else if (entityValidationResult.isConnection) { + outgoingEdges = processVertex.getEdges(OUT, isInput ? CONNECTION_PROCESS_INPUTS_EDGE : CONNECTION_PROCESS_OUTPUTS_EDGE).iterator(); + } RequestContext.get().endMetricRecord(traverseEdgesOnDemandGetEdgesOut); while (outgoingEdges.hasNext()) { @@ -434,7 +471,8 @@ private void traverseEdgesOnDemand(AtlasVertex datasetVertex, boolean isInput, i setEntityLimitReachedFlag(isInput, ret); } if (entityVertex != null && !visitedVertices.contains(getId(entityVertex))) { - traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder); // execute inner depth + EntityValidationResult entityValidationResult1 = validateEntityTypeAndCheckIfDataSet(getGuid(entityVertex)); + traverseEdgesOnDemand(entityVertex, isInput, depth - 1, nextLevel, visitedVertices, atlasLineageOnDemandContext, ret, baseGuid, entitiesTraversed, traversalOrder, entityValidationResult1); // execute inner depth AtlasEntityHeader traversedEntity = ret.getGuidEntityMap().get(AtlasGraphUtilsV2.getIdFromVertex(entityVertex)); traversedEntity.setFinishTime(traversalOrder.get()); } @@ -465,7 +503,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line AtlasVertex baseVertex = AtlasGraphUtilsV2.findByGuid(this.graph, baseGuid); - boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid); + boolean isBaseNodeDataset = validateEntityTypeAndCheckIfDataSet(baseGuid).isDataSet; // Get the neighbors for the current node enqueueNeighbours(baseVertex, isBaseNodeDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); int currentDepth = 0; @@ -493,7 +531,7 @@ private void traverseEdgesUsingBFS(String baseGuid, AtlasLineageListContext line if (Objects.isNull(currentVertex)) throw new AtlasBaseException("Found null vertex during lineage graph traversal for guid: " + currentGUID); - boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID); + boolean isDataset = validateEntityTypeAndCheckIfDataSet(currentGUID).isDataSet; if (!lineageListContext.evaluateVertexFilter(currentVertex)) { enqueueNeighbours(currentVertex, isDataset, lineageListContext, traversalQueue, visitedVertices, skippedVertices, lineageParentsForEntityMap, lineageChildrenForEntityMap); continue; @@ -1538,7 +1576,7 @@ private void processEdge(final AtlasEdge edge, final Map getPreProcessor(String typeName) { case STAKEHOLDER_TITLE_ENTITY_TYPE: preProcessors.add(new StakeholderTitlePreProcessor(graph, typeRegistry, entityRetriever)); break; + + case PROCESS_ENTITY_TYPE: + preProcessors.add(new LineagePreProcessor(typeRegistry, entityRetriever, graph, this)); } // The default global pre-processor for all AssetTypes diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java new file mode 100644 index 0000000000..f7e5b2edfb --- /dev/null +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java @@ -0,0 +1,560 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.atlas.repository.store.graph.v2.preprocessor.lineage; + + +import org.apache.atlas.AtlasErrorCode; +import org.apache.atlas.AtlasException; +import org.apache.atlas.DeleteType; +import org.apache.atlas.RequestContext; +import org.apache.atlas.discovery.EntityDiscoveryService; +import org.apache.atlas.exception.AtlasBaseException; +import org.apache.atlas.model.instance.*; +import org.apache.atlas.repository.graphdb.AtlasEdge; +import org.apache.atlas.repository.graphdb.AtlasEdgeDirection; +import org.apache.atlas.repository.graphdb.AtlasGraph; +import org.apache.atlas.repository.graphdb.AtlasVertex; +import org.apache.atlas.repository.store.graph.AtlasEntityStore; +import org.apache.atlas.repository.store.graph.v2.AtlasEntityStream; +import org.apache.atlas.repository.store.graph.v2.EntityGraphRetriever; +import org.apache.atlas.repository.store.graph.v2.EntityMutationContext; +import org.apache.atlas.repository.store.graph.v2.EntityStream; +import org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessor; +import org.apache.atlas.repository.util.AtlasEntityUtils; +import org.apache.atlas.type.AtlasTypeRegistry; +import org.apache.atlas.utils.AtlasPerfMetrics; +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +import static org.apache.atlas.model.instance.AtlasEntity.Status.ACTIVE; +import static org.apache.atlas.repository.Constants.*; +import static org.apache.atlas.repository.Constants.NAME; +import static org.apache.atlas.repository.graph.GraphHelper.*; +import static org.apache.atlas.repository.store.graph.v2.preprocessor.PreProcessorUtils.indexSearchPaginated; +import static org.apache.atlas.repository.util.AtlasEntityUtils.mapOf; + +public class LineagePreProcessor implements PreProcessor { + private static final Logger LOG = LoggerFactory.getLogger(LineagePreProcessor.class); + private static final List FETCH_ENTITY_ATTRIBUTES = Arrays.asList(CONNECTION_QUALIFIED_NAME); + private final AtlasTypeRegistry typeRegistry; + private final EntityGraphRetriever entityRetriever; + private AtlasEntityStore entityStore; + protected EntityDiscoveryService discovery; + private static final String HAS_LINEAGE = "__hasLineage"; + + public LineagePreProcessor(AtlasTypeRegistry typeRegistry, EntityGraphRetriever entityRetriever, AtlasGraph graph, AtlasEntityStore entityStore) { + this.entityRetriever = entityRetriever; + this.typeRegistry = typeRegistry; + this.entityStore = entityStore; + try { + this.discovery = new EntityDiscoveryService(typeRegistry, graph, null, null, null, null); + } catch (AtlasException e) { + e.printStackTrace(); + } + } + + @Override + public void processAttributes(AtlasStruct entityStruct, EntityMutationContext context, + EntityMutations.EntityOperation operation) throws AtlasBaseException { + + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processAttributesForLineagePreprocessor"); + + try { + if (LOG.isDebugEnabled()) { + LOG.debug("LineageProcessPreProcessor.processAttributes: pre processing {}, {}", entityStruct.getAttribute(QUALIFIED_NAME), operation); + } + + AtlasEntity entity = (AtlasEntity) entityStruct; + AtlasVertex vertex = context.getVertex(entity.getGuid()); + ArrayList connectionProcessQNs = getConnectionProcessQNsForTheGivenInputOutputs(entity); + + switch (operation) { + case CREATE: + processCreateLineageProcess(entity, connectionProcessQNs); + break; + case UPDATE: + processUpdateLineageProcess(entity, vertex, context, connectionProcessQNs); + break; + } + }catch(Exception exp){ + if (LOG.isDebugEnabled()) { + LOG.debug("Lineage preprocessor: " + exp); + } + }finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + + } + + private void processCreateLineageProcess(AtlasEntity entity, ArrayList connectionProcessList) { + // if not exist create lineage process + // add owner connection process + if(!connectionProcessList.isEmpty()){ + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessList); + } + } + + private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, EntityMutationContext context, ArrayList newConnectionProcessList) throws AtlasBaseException { + // Get the old parentConnectionProcessQualifiedName from the existing vertex + List oldConnectionProcessList = null; + try { + Object propertyValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + if (propertyValue instanceof String) { + oldConnectionProcessList = Arrays.asList((String) propertyValue); + } else if (propertyValue instanceof List) { + oldConnectionProcessList = (List) propertyValue; + } else if (propertyValue != null) { + oldConnectionProcessList = Collections.singletonList(propertyValue.toString()); + } else { + oldConnectionProcessList = Collections.emptyList(); + } + } catch (Exception e) { + oldConnectionProcessList = Collections.emptyList(); + } + + // Identify ConnectionProcesses to remove (present in old list but not in new list) + Set connectionProcessesToRemove = new HashSet<>(oldConnectionProcessList); + connectionProcessesToRemove.removeAll(newConnectionProcessList); + + // Identify ConnectionProcesses to add (present in new list but not in old list) + Set connectionProcessesToAdd = new HashSet<>(newConnectionProcessList); + connectionProcessesToAdd.removeAll(oldConnectionProcessList); + + // For each ConnectionProcess to remove + for (String connectionProcessQn : connectionProcessesToRemove) { + // Check if more child Processes exist for this ConnectionProcess + if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { + // Delete the ConnectionProcess + deleteConnectionProcess(connectionProcessQn); + } + // Update __hasLineage for involved Connections + updateConnectionsHasLineageForConnectionProcess(connectionProcessQn); + } + + // For new ConnectionProcesses, we've already created or retrieved them in getConnectionProcessQNsForTheGivenInputOutputs + + // Update the Process entity's parentConnectionProcessQualifiedName attribute + entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, newConnectionProcessList); + } + + private AtlasEntity createConnectionProcessEntity(Map connectionProcessInfo) throws AtlasBaseException { + AtlasEntity processEntity = new AtlasEntity(); + processEntity.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + processEntity.setAttribute(NAME, connectionProcessInfo.get("connectionProcessName")); + processEntity.setAttribute(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName")); + + // Set up relationship attributes for input and output connections + AtlasObjectId inputConnection = new AtlasObjectId(); + inputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + inputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("input"))); + + AtlasObjectId outputConnection = new AtlasObjectId(); + outputConnection.setTypeName(CONNECTION_ENTITY_TYPE); + outputConnection.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("output"))); + + Map relationshipAttributes = new HashMap<>(); + relationshipAttributes.put("inputs", Collections.singletonList(inputConnection)); + relationshipAttributes.put("outputs", Collections.singletonList(outputConnection)); + processEntity.setRelationshipAttributes(relationshipAttributes); + + try { + RequestContext.get().setSkipAuthorizationCheck(true); + AtlasEntity.AtlasEntitiesWithExtInfo processExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + processExtInfo.addEntity(processEntity); + EntityStream entityStream = new AtlasEntityStream(processExtInfo); + entityStore.createOrUpdate(entityStream, false); + + // Update hasLineage for both connections + updateConnectionLineageFlag((String) connectionProcessInfo.get("input"), true); + updateConnectionLineageFlag((String) connectionProcessInfo.get("output"), true); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + + return processEntity; + } + + private void updateConnectionLineageFlag(String connectionQualifiedName, boolean hasLineage) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasLineage); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private void checkAndUpdateConnectionLineage(String connectionQualifiedName) throws AtlasBaseException { + AtlasObjectId connectionId = new AtlasObjectId(); + connectionId.setTypeName(CONNECTION_ENTITY_TYPE); + connectionId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionQualifiedName)); + + try { + AtlasVertex connectionVertex = entityRetriever.getEntityVertex(connectionId); + + // Check if this connection has any active connection processes + boolean hasActiveConnectionProcess = hasActiveConnectionProcesses(connectionVertex); + + // Only update if the hasLineage status needs to change + boolean currentHasLineage = getEntityHasLineage(connectionVertex); + if (currentHasLineage != hasActiveConnectionProcess) { + if (LOG.isDebugEnabled()) { + LOG.debug("Updating hasLineage for connection {} from {} to {}", + connectionQualifiedName, currentHasLineage, hasActiveConnectionProcess); + } + + AtlasEntity connection = entityRetriever.toAtlasEntity(connectionVertex); + connection.setAttribute(HAS_LINEAGE, hasActiveConnectionProcess); + + AtlasEntity.AtlasEntitiesWithExtInfo connectionExtInfo = new AtlasEntity.AtlasEntitiesWithExtInfo(); + connectionExtInfo.addEntity(connection); + EntityStream entityStream = new AtlasEntityStream(connectionExtInfo); + + RequestContext.get().setSkipAuthorizationCheck(true); + try { + entityStore.createOrUpdate(entityStream, false); + } finally { + RequestContext.get().setSkipAuthorizationCheck(false); + } + } + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + } + + private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) { + // Iterate over both input and output edges connected to this connection + Iterator edges = connectionVertex.getEdges(AtlasEdgeDirection.BOTH, + new String[]{"__ConnectionProcess.inputs", "__ConnectionProcess.outputs"}).iterator(); + + while (edges.hasNext()) { + AtlasEdge edge = edges.next(); + + // Check if the edge is ACTIVE + if (getStatus(edge) == ACTIVE) { + // Get the connected process vertex (the other vertex of the edge) + AtlasVertex processVertex = edge.getOutVertex().equals(connectionVertex) ? + edge.getInVertex() : edge.getOutVertex(); + + // Check if the connected vertex is an ACTIVE ConnectionProcess + if (getStatus(processVertex) == ACTIVE && + getTypeName(processVertex).equals(CONNECTION_PROCESS_ENTITY_TYPE)) { + return true; + } + } + } + return false; + } + + private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{ + + // check connection lineage exists or not + // check if connection lineage exists + Map entityAttrValues = processEntity.getRelationshipAttributes(); + + ArrayList inputsAssets = (ArrayList) entityAttrValues.get("inputs"); + ArrayList outputsAssets = (ArrayList) entityAttrValues.get("outputs"); + + // get connection process + Set> uniquesSetOfConnectionProcess = new HashSet<>(); + + for (AtlasObjectId input : inputsAssets){ + AtlasVertex inputVertex = entityRetriever.getEntityVertex(input); + Map inputVertexConnectionQualifiedName = fetchAttributes(inputVertex, FETCH_ENTITY_ATTRIBUTES); + for (AtlasObjectId output : outputsAssets){ + AtlasVertex outputVertex = entityRetriever.getEntityVertex(output); + Map outputVertexConnectionQualifiedName = fetchAttributes(outputVertex, FETCH_ENTITY_ATTRIBUTES); + + if(inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) == outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)){ + continue; + } + + String connectionProcessName = "(" + inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")->(" + outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")"; + String connectionProcessQualifiedName = outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + "/" + connectionProcessName; + // Create a map to store both connectionProcessName and connectionProcessQualifiedName + Map connectionProcessMap = new HashMap<>(); + connectionProcessMap.put("input", inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("output", outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("connectionProcessName", connectionProcessName); + connectionProcessMap.put("connectionProcessQualifiedName", connectionProcessQualifiedName); + + // Add the map to the set + uniquesSetOfConnectionProcess.add(connectionProcessMap); + } + } + + ArrayList connectionProcessList = new ArrayList<>(); + + // check if connection process exists + for (Map connectionProcessInfo : uniquesSetOfConnectionProcess){ + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(mapOf(QUALIFIED_NAME, connectionProcessInfo.get("connectionProcessQualifiedName"))); + AtlasVertex connectionProcessVertex = null; + try { + // TODO add caching here + connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + } + catch(AtlasBaseException exp){ + if(!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)){ + throw exp; + } + } + + AtlasEntity connectionProcess; + if (connectionProcessVertex == null) { + connectionProcess = createConnectionProcessEntity(connectionProcessInfo); + } else { + // exist so retrieve and perform any update so below statement to retrieve + // TODO add caching here + connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + } + // only add in list if created + connectionProcessList.add(connectionProcess.getAttribute(QUALIFIED_NAME)); + } + + return connectionProcessList; + } + + public boolean checkIfMoreChildProcessExistForConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("checkIfMoreChileProcessExistForConnectionProcess"); + boolean ret = false; + + try { + List mustClauseList = new ArrayList(); + mustClauseList.add(mapOf("term", mapOf("__typeName.keyword", PROCESS_ENTITY_TYPE))); + mustClauseList.add(mapOf("term", mapOf("__state", "ACTIVE"))); + mustClauseList.add(mapOf("term", mapOf(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, connectionProcessQn))); + + Map dsl = mapOf("query", mapOf("bool", mapOf("must", mustClauseList))); + + List process = indexSearchPaginated(dsl, new HashSet<>(Arrays.asList(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME)) , this.discovery); + + if (CollectionUtils.isNotEmpty(process) && process.size()>1) { + ret = true; + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + return ret; + } + + // handle process delete logic + @Override + public void processDelete(AtlasVertex vertex) throws AtlasBaseException { + AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess"); + + try { + // Collect all connections involved in the process being deleted + AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); + + Set involvedConnections = new HashSet<>(); + + // Retrieve inputs and outputs from the process + List inputs = (List) processEntity.getRelationshipAttribute("inputs"); + List outputs = (List) processEntity.getRelationshipAttribute("outputs"); + + if (inputs == null) inputs = Collections.emptyList(); + if (outputs == null) outputs = Collections.emptyList(); + + List allAssets = new ArrayList<>(); + allAssets.addAll(inputs); + allAssets.addAll(outputs); + + // For each asset, get its connection and add to involvedConnections + for (AtlasObjectId assetId : allAssets) { + try { + AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); + Map assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); + if (assetConnectionAttributes != null) { + String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME); + if (StringUtils.isNotEmpty(connectionQN)) { + involvedConnections.add(connectionQN); + } + } + } catch (AtlasBaseException e) { + LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); + } + } + + // Collect affected connections from connection processes to be deleted + Set connectionProcessQNs = new HashSet<>(); + + // Handle both single value and multi-value cases + try { + // Try getting as a list first + Iterable propertyValues = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + if (propertyValues != null) { + for (Object value : propertyValues) { + if (value != null) { + connectionProcessQNs.add(value.toString()); + } + } + } + } catch (Exception e) { + // If getting as list fails, try getting as single value + try { + String singleValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class); + if (StringUtils.isNotEmpty(singleValue)) { + connectionProcessQNs.add(singleValue); + } + } catch (Exception ex) { + LOG.warn("Error getting parentConnectionProcessQualifiedName property: {}", ex.getMessage()); + } + } + + if (connectionProcessQNs.isEmpty()) { + return; + } + + Set affectedConnections = new HashSet<>(); + + // Process each connection process + for (String connectionProcessQn : connectionProcessQNs) { + if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + // Get connection process before deletion to track affected connections + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + // Get all input and output connections + List inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs"); + List outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs"); + + // Add all connections to affected set + affectedConnections.addAll(inputConnQNs); + affectedConnections.addAll(outputConnQNs); + + // Delete the connection process + entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); + } catch (AtlasBaseException exp) { + if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw exp; + } + } + } + } + + // Combine involved and affected connections + Set connectionsToCheck = new HashSet<>(); + connectionsToCheck.addAll(involvedConnections); + connectionsToCheck.addAll(affectedConnections); + + // Check and update hasLineage for all connections involved + for (String connectionQN : connectionsToCheck) { + checkAndUpdateConnectionLineage(connectionQN); + } + } finally { + RequestContext.get().endMetricRecord(metricRecorder); + } + } + + private List getConnectionQualifiedNames(AtlasEntity connectionProcess, String attributeName) { + List connectionQualifiedNames = new ArrayList<>(); + try { + Object relationshipAttr = connectionProcess.getRelationshipAttribute(attributeName); + if (relationshipAttr instanceof List) { + List connObjectIds = (List) relationshipAttr; + for (AtlasObjectId connObjectId : connObjectIds) { + Map uniqueAttributes = connObjectId.getUniqueAttributes(); + if (uniqueAttributes != null) { + String qualifiedName = (String) uniqueAttributes.get(QUALIFIED_NAME); + if (StringUtils.isNotEmpty(qualifiedName)) { + connectionQualifiedNames.add(qualifiedName); + } + } + } + } + } catch (Exception e) { + LOG.warn("Error getting {} qualified name for connection process {}: {}", + attributeName, connectionProcess.getGuid(), e.getMessage()); + } + return connectionQualifiedNames; + } + + private void deleteConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); + } catch (AtlasBaseException exp) { + if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw exp; + } + } + } + + private void updateConnectionsHasLineageForConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + // Get the ConnectionProcess entity + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + // Get input and output connections + List inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs"); + List outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs"); + + // For each connection, check and update __hasLineage + Set connectionsToUpdate = new HashSet<>(); + connectionsToUpdate.addAll(inputConnQNs); + connectionsToUpdate.addAll(outputConnQNs); + + for (String connectionQN : connectionsToUpdate) { + checkAndUpdateConnectionLineage(connectionQN); + } + } catch (AtlasBaseException exp) { + if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw exp; + } + } + } +} From d2c96ec9ba486b32ee1754eb835c1f9ba93a5287 Mon Sep 17 00:00:00 2001 From: Rahul Madan Date: Thu, 28 Nov 2024 21:06:46 +0530 Subject: [PATCH 2/3] corrected qualified name --- .../lineage/LineagePreProcessor.java | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java index f7e5b2edfb..ea0d3d2b6b 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java @@ -284,8 +284,7 @@ private boolean hasActiveConnectionProcesses(AtlasVertex connectionVertex) { return false; } - private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException{ - + private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEntity processEntity) throws AtlasBaseException { // check connection lineage exists or not // check if connection lineage exists Map entityAttrValues = processEntity.getRelationshipAttributes(); @@ -293,30 +292,47 @@ private ArrayList getConnectionProcessQNsForTheGivenInputOutputs(AtlasEn ArrayList inputsAssets = (ArrayList) entityAttrValues.get("inputs"); ArrayList outputsAssets = (ArrayList) entityAttrValues.get("outputs"); - // get connection process - Set> uniquesSetOfConnectionProcess = new HashSet<>(); - - for (AtlasObjectId input : inputsAssets){ + // Get unique input connections + Set inputConnectionQNs = new HashSet<>(); + for (AtlasObjectId input : inputsAssets) { AtlasVertex inputVertex = entityRetriever.getEntityVertex(input); Map inputVertexConnectionQualifiedName = fetchAttributes(inputVertex, FETCH_ENTITY_ATTRIBUTES); - for (AtlasObjectId output : outputsAssets){ - AtlasVertex outputVertex = entityRetriever.getEntityVertex(output); - Map outputVertexConnectionQualifiedName = fetchAttributes(outputVertex, FETCH_ENTITY_ATTRIBUTES); + String inputConnQN = inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME); + if (inputConnQN != null) { + inputConnectionQNs.add(inputConnQN); + } + } - if(inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) == outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)){ + // Get unique output connections + Set outputConnectionQNs = new HashSet<>(); + for (AtlasObjectId output : outputsAssets) { + AtlasVertex outputVertex = entityRetriever.getEntityVertex(output); + Map outputVertexConnectionQualifiedName = fetchAttributes(outputVertex, FETCH_ENTITY_ATTRIBUTES); + String outputConnQN = outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME); + if (outputConnQN != null) { + outputConnectionQNs.add(outputConnQN); + } + } + + // Create connection processes for each input-output connection pair + Set> uniquesSetOfConnectionProcess = new HashSet<>(); + for (String inputConnectionQN : inputConnectionQNs) { + for (String outputConnectionQN : outputConnectionQNs) { + // Skip if input and output connections are the same + if (inputConnectionQN.equals(outputConnectionQN)) { continue; } - String connectionProcessName = "(" + inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")->(" + outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + ")"; - String connectionProcessQualifiedName = outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME) + "/" + connectionProcessName; - // Create a map to store both connectionProcessName and connectionProcessQualifiedName + String connectionProcessName = "(" + inputConnectionQN + ")->(" + outputConnectionQN + ")"; + // Use the connectionProcessName as the qualifiedName directly + String connectionProcessQualifiedName = connectionProcessName; + Map connectionProcessMap = new HashMap<>(); - connectionProcessMap.put("input", inputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); - connectionProcessMap.put("output", outputVertexConnectionQualifiedName.get(CONNECTION_QUALIFIED_NAME)); + connectionProcessMap.put("input", inputConnectionQN); + connectionProcessMap.put("output", outputConnectionQN); connectionProcessMap.put("connectionProcessName", connectionProcessName); connectionProcessMap.put("connectionProcessQualifiedName", connectionProcessQualifiedName); - // Add the map to the set uniquesSetOfConnectionProcess.add(connectionProcessMap); } } From ece1435219aba4622e2df25348e1a2e5feac254d Mon Sep 17 00:00:00 2001 From: Rahul Madan Date: Sun, 1 Dec 2024 07:03:31 +0530 Subject: [PATCH 3/3] updated delete logic simplified --- .../lineage/LineagePreProcessor.java | 181 +++++++++--------- 1 file changed, 92 insertions(+), 89 deletions(-) diff --git a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java index ea0d3d2b6b..13390461e8 100644 --- a/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java +++ b/repository/src/main/java/org/apache/atlas/repository/store/graph/v2/preprocessor/lineage/LineagePreProcessor.java @@ -154,6 +154,11 @@ private void processUpdateLineageProcess(AtlasEntity entity, AtlasVertex vertex, // Update the Process entity's parentConnectionProcessQualifiedName attribute entity.setAttribute(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, newConnectionProcessList); + + // Update __hasLineage for all connections involved in the new connection processes + for (String connectionProcessQn : newConnectionProcessList) { + updateConnectionsHasLineageForConnectionProcess(connectionProcessQn); + } } private AtlasEntity createConnectionProcessEntity(Map connectionProcessInfo) throws AtlasBaseException { @@ -399,106 +404,31 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException { AtlasPerfMetrics.MetricRecorder metricRecorder = RequestContext.get().startMetricRecord("processDeleteLineageProcess"); try { - // Collect all connections involved in the process being deleted - AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); - - Set involvedConnections = new HashSet<>(); - - // Retrieve inputs and outputs from the process - List inputs = (List) processEntity.getRelationshipAttribute("inputs"); - List outputs = (List) processEntity.getRelationshipAttribute("outputs"); - - if (inputs == null) inputs = Collections.emptyList(); - if (outputs == null) outputs = Collections.emptyList(); - - List allAssets = new ArrayList<>(); - allAssets.addAll(inputs); - allAssets.addAll(outputs); - - // For each asset, get its connection and add to involvedConnections - for (AtlasObjectId assetId : allAssets) { - try { - AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); - Map assetConnectionAttributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); - if (assetConnectionAttributes != null) { - String connectionQN = assetConnectionAttributes.get(CONNECTION_QUALIFIED_NAME); - if (StringUtils.isNotEmpty(connectionQN)) { - involvedConnections.add(connectionQN); - } - } - } catch (AtlasBaseException e) { - LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); - } - } - - // Collect affected connections from connection processes to be deleted - Set connectionProcessQNs = new HashSet<>(); - - // Handle both single value and multi-value cases - try { - // Try getting as a list first - Iterable propertyValues = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); - if (propertyValues != null) { - for (Object value : propertyValues) { - if (value != null) { - connectionProcessQNs.add(value.toString()); - } - } - } - } catch (Exception e) { - // If getting as list fails, try getting as single value - try { - String singleValue = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class); - if (StringUtils.isNotEmpty(singleValue)) { - connectionProcessQNs.add(singleValue); - } - } catch (Exception ex) { - LOG.warn("Error getting parentConnectionProcessQualifiedName property: {}", ex.getMessage()); - } - } - + // Get all connection process QNs associated with this process + Set connectionProcessQNs = getConnectionProcessQNs(vertex); if (connectionProcessQNs.isEmpty()) { return; } - Set affectedConnections = new HashSet<>(); + // Track all connections that need their hasLineage status checked + Set connectionsToCheck = new HashSet<>(); - // Process each connection process + // For each connection process for (String connectionProcessQn : connectionProcessQNs) { + // Only delete connection process if this is the last child process if (!checkIfMoreChildProcessExistForConnectionProcess(connectionProcessQn)) { - AtlasObjectId atlasObjectId = new AtlasObjectId(); - atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); - atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); - - try { - // Get connection process before deletion to track affected connections - AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); - AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); - - // Get all input and output connections - List inputConnQNs = getConnectionQualifiedNames(connectionProcess, "inputs"); - List outputConnQNs = getConnectionQualifiedNames(connectionProcess, "outputs"); - - // Add all connections to affected set - affectedConnections.addAll(inputConnQNs); - affectedConnections.addAll(outputConnQNs); - - // Delete the connection process - entityStore.deleteById(connectionProcessVertex.getProperty("__guid", String.class)); - } catch (AtlasBaseException exp) { - if (!exp.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { - throw exp; - } - } + // Get affected connections before deleting the connection process + connectionsToCheck.addAll(getConnectionsFromConnectionProcess(connectionProcessQn)); + + // Delete the connection process + deleteConnectionProcess(connectionProcessQn); } } - // Combine involved and affected connections - Set connectionsToCheck = new HashSet<>(); - connectionsToCheck.addAll(involvedConnections); - connectionsToCheck.addAll(affectedConnections); + // Add connections from the process being deleted + connectionsToCheck.addAll(getConnectionsFromProcess(vertex)); - // Check and update hasLineage for all connections involved + // Update hasLineage for all affected connections for (String connectionQN : connectionsToCheck) { checkAndUpdateConnectionLineage(connectionQN); } @@ -507,6 +437,79 @@ public void processDelete(AtlasVertex vertex) throws AtlasBaseException { } } + private Set getConnectionProcessQNs(AtlasVertex vertex) { + Set connectionProcessQNs = new HashSet<>(); + try { + Iterable values = vertex.getPropertyValues(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, Object.class); + if (values != null) { + for (Object value : values) { + if (value != null) { + connectionProcessQNs.add(value.toString()); + } + } + } + } catch (Exception e) { + // Try single value if list fails + try { + String value = vertex.getProperty(PARENT_CONNECTION_PROCESS_QUALIFIED_NAME, String.class); + if (StringUtils.isNotEmpty(value)) { + connectionProcessQNs.add(value); + } + } catch (Exception ex) { + LOG.warn("Error getting parentConnectionProcessQualifiedName property", ex); + } + } + return connectionProcessQNs; + } + + // New helper method to get connections from a connection process + private Set getConnectionsFromConnectionProcess(String connectionProcessQn) throws AtlasBaseException { + Set connections = new HashSet<>(); + AtlasObjectId atlasObjectId = new AtlasObjectId(); + atlasObjectId.setTypeName(CONNECTION_PROCESS_ENTITY_TYPE); + atlasObjectId.setUniqueAttributes(AtlasEntityUtils.mapOf(QUALIFIED_NAME, connectionProcessQn)); + + try { + AtlasVertex connectionProcessVertex = entityRetriever.getEntityVertex(atlasObjectId); + AtlasEntity connectionProcess = entityRetriever.toAtlasEntity(connectionProcessVertex); + + connections.addAll(getConnectionQualifiedNames(connectionProcess, "inputs")); + connections.addAll(getConnectionQualifiedNames(connectionProcess, "outputs")); + } catch (AtlasBaseException e) { + if (!e.getAtlasErrorCode().equals(AtlasErrorCode.INSTANCE_BY_UNIQUE_ATTRIBUTE_NOT_FOUND)) { + throw e; + } + } + return connections; + } + + // New helper method to get connections from a process + private Set getConnectionsFromProcess(AtlasVertex vertex) throws AtlasBaseException { + Set connections = new HashSet<>(); + AtlasEntity processEntity = entityRetriever.toAtlasEntity(vertex); + + List inputs = (List) processEntity.getRelationshipAttribute("inputs"); + List outputs = (List) processEntity.getRelationshipAttribute("outputs"); + + List allAssets = new ArrayList<>(); + if (inputs != null) allAssets.addAll(inputs); + if (outputs != null) allAssets.addAll(outputs); + + for (AtlasObjectId assetId : allAssets) { + try { + AtlasVertex assetVertex = entityRetriever.getEntityVertex(assetId); + Map attributes = fetchAttributes(assetVertex, FETCH_ENTITY_ATTRIBUTES); + String connectionQN = attributes.get(CONNECTION_QUALIFIED_NAME); + if (StringUtils.isNotEmpty(connectionQN)) { + connections.add(connectionQN); + } + } catch (AtlasBaseException e) { + LOG.warn("Failed to retrieve connection for asset {}: {}", assetId.getGuid(), e.getMessage()); + } + } + return connections; + } + private List getConnectionQualifiedNames(AtlasEntity connectionProcess, String attributeName) { List connectionQualifiedNames = new ArrayList<>(); try {