Skip to content

Commit

Permalink
log reduction
Browse files Browse the repository at this point in the history
  • Loading branch information
hr2904 committed Jan 7, 2025
1 parent 59aa9f5 commit bfc9fea
Showing 1 changed file with 8 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4732,7 +4732,7 @@ public void removePendingTaskFromEdge(String edgeId, String taskGuid) throws Atl
public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEntity) {
AtlasPerfMetrics.MetricRecorder methodRecorder = RequestContext.get().startMetricRecord("addHasLineage");
try {
LOG.info("Starting addHasLineage with {} edges. isRestoreEntity={}", inputOutputEdges.size(), isRestoreEntity);
LOG.info("addHasLineage.Starting addHasLineage with {} edges. isRestoreEntity={}", inputOutputEdges.size(), isRestoreEntity);

for (AtlasEdge atlasEdge : inputOutputEdges) {
AtlasPerfMetrics.MetricRecorder iterationRecorder = RequestContext.get().startMetricRecord("addHasLineage.loopIteration");
Expand All @@ -4741,37 +4741,31 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti

AtlasVertex processVertex = atlasEdge.getOutVertex();
AtlasVertex assetVertex = atlasEdge.getInVertex();
String edgeLabel = atlasEdge.getLabel();
String processVertexId = AtlasGraphUtilsV2.getIdFromVertex(processVertex);
String assetVertexId = AtlasGraphUtilsV2.getIdFromVertex(assetVertex);

LOG.info("Looking for edge label={} between processVertex={} and assetVertex={}",
edgeLabel, processVertexId, assetVertexId);

AtlasPerfMetrics.MetricRecorder checkLineageRecorder = RequestContext.get().startMetricRecord("addHasLineage.checkLineage");
try {
boolean processHasLineage = getEntityHasLineage(processVertex);
if (processHasLineage) {
LOG.info("Process vertex {} already has lineage. Setting lineage on asset vertex {} and skipping further processing.",
LOG.info("addHasLineage.Process vertex {} already has lineage. Setting lineage on asset vertex {} and skipping further processing.",
processVertexId, assetVertexId);

AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, true);
continue;
} else {
LOG.info("Process vertex {} does not have lineage yet.", processVertexId);
LOG.info("addHasLineage.Process vertex {} does not have lineage yet.", processVertexId);
}
} finally {
RequestContext.get().endMetricRecord(checkLineageRecorder);
}

String oppositeEdgeLabel = isOutputEdge ? PROCESS_INPUTS : PROCESS_OUTPUTS;
String oppositeEdgeLabel = isOutputEdge ? PROCESS_INPUTS : PROCESS_OUTPUTS;

AtlasPerfMetrics.MetricRecorder oppEdgesRecorder = RequestContext.get().startMetricRecord("addHasLineage.oppositeEdges");
Iterator<AtlasEdge> oppositeEdges;
try {
LOG.info("Looking for opposite edges with label={} for processVertex={}",
oppositeEdgeLabel, processVertexId);

oppositeEdges = processVertex.getEdges(AtlasEdgeDirection.BOTH, oppositeEdgeLabel).iterator();
} finally {
RequestContext.get().endMetricRecord(oppEdgesRecorder);
Expand All @@ -4788,28 +4782,22 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
AtlasEntity.Status oppositeEdgeStatus = getStatus(oppositeEdge);
AtlasEntity.Status oppositeEdgeAssetStatus = getStatus(oppositeEdgeAssetVertex);

LOG.info("Looking for lineage on oppositeEdgeAssetVertex={}, edgeStatus={}, vertexStatus={}",
oppositeEdgeAssetVertexId, oppositeEdgeStatus, oppositeEdgeAssetStatus);

if (oppositeEdgeStatus == ACTIVE && oppositeEdgeAssetStatus == ACTIVE) {
if (!isHasLineageSet) {
LOG.info("Setting lineage on asset={}, process={}", assetVertexId, processVertexId);
AtlasGraphUtilsV2.setEncodedProperty(assetVertex, HAS_LINEAGE, true);
AtlasGraphUtilsV2.setEncodedProperty(processVertex, HAS_LINEAGE, true);
isHasLineageSet = true;
}

if (isRestoreEntity) {
LOG.info("isRestoreEntity=true, setting lineage on oppositeEdgeAssetVertex={}",
oppositeEdgeAssetVertexId);
LOG.info("addHasLineage.isRestoreEntity=true, setting lineage");
AtlasGraphUtilsV2.setEncodedProperty(oppositeEdgeAssetVertex, HAS_LINEAGE, true);
} else {
LOG.info("isRestoreEntity=false, breaking out from opposite edges loop for processVertex={}",
processVertexId);
LOG.info("addHasLineage.isRestoreEntity=false, breaking out from opposite edges loop");
break;
}
} else {
LOG.info("Skipping lineage for oppositeEdgeAssetVertex={} because edgeStatus={} or vertexStatus={} is not ACTIVE.",
LOG.info("addHasLineage.Skipping lineage for oppositeEdgeAssetVertex={} because edgeStatus={} or vertexStatus={} is not ACTIVE.",
oppositeEdgeAssetVertexId, oppositeEdgeStatus, oppositeEdgeAssetStatus);
}
}
Expand All @@ -4822,7 +4810,7 @@ public void addHasLineage(Set<AtlasEdge> inputOutputEdges, boolean isRestoreEnti
}
}

LOG.info("Finished processing all edges in addHasLineage.");
LOG.info("addHasLineage.Finished processing all edges in addHasLineage.");

} finally {
RequestContext.get().endMetricRecord(methodRecorder);
Expand Down

0 comments on commit bfc9fea

Please sign in to comment.