Skip to content

Commit

Permalink
TEZ-4521: Partition stats should be always uncompressed size (#317) (…
Browse files Browse the repository at this point in the history
…okumin reviewed by Laszlo Bodor)
  • Loading branch information
okumin authored Nov 28, 2023
1 parent 51d6f53 commit 43562ad
Show file tree
Hide file tree
Showing 5 changed files with 12 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ static class SourceVertexInfo {
final BitSet finishedTaskSet;
int numTasks;
int numVMEventsReceived;
// The total uncompressed size
long outputSize;
// The uncompressed size of each partition. The size might not be precise
int[] statsInMB;
EdgeManagerPluginDescriptor newDescriptor;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ public void progress() {
protected final boolean cleanup;

protected OutputStatisticsReporter statsReporter;
// uncompressed size for each partition
protected final long[] partitionStats;
protected final boolean finalMergeEnabled;
protected final boolean sendEmptyPartitionDetails;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,7 @@ public boolean spill(boolean ignoreEmptySpills) throws IOException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats()) {
partitionStats[i] += partLength;
partitionStats[i] += rawLength;
}
}

Expand Down Expand Up @@ -747,7 +747,7 @@ public void flush() throws IOException {
TezSpillRecord spillRecord = new TezSpillRecord(finalIndexFile, localFs);
if (reportPartitionStats()) {
for (int i = 0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
Expand Down Expand Up @@ -832,7 +832,7 @@ public void flush() throws IOException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
partitionStats[parts] += rawLength;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -956,7 +956,7 @@ protected void spill(int mstart, int mend, long sameKeyCount, long totalKeysCoun
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, i);
if (!isFinalMergeEnabled() && reportPartitionStats() && writer != null) {
partitionStats[i] += partLength;
partitionStats[i] += rawLength;
}
writer = null;
} finally {
Expand Down Expand Up @@ -1244,7 +1244,7 @@ private void mergeParts() throws IOException, InterruptedException {
}
if (spillRecord != null && reportPartitionStats()) {
for(int i=0; i < spillRecord.size(); i++) {
partitionStats[i] += spillRecord.getIndex(i).getPartLength();
partitionStats[i] += spillRecord.getIndex(i).getRawLength();
}
}
numShuffleChunks.setValue(numSpills);
Expand Down Expand Up @@ -1388,7 +1388,7 @@ private void mergeParts() throws IOException, InterruptedException {
new TezIndexRecord(segmentStart, rawLength, partLength);
spillRec.putIndex(rec, parts);
if (reportPartitionStats()) {
partitionStats[parts] += partLength;
partitionStats[parts] += rawLength;
}
}
numShuffleChunks.setValue(1); //final merge has happened
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@


import com.google.protobuf.ByteString;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
Expand Down Expand Up @@ -125,10 +126,10 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes,
long uncompressedTotalSize, String vertexName, boolean reportDetailedStats)
throws IOException {
ByteBuffer payload;
long totalSize = 0;
final long totalSize;
// Use partition sizes to compute the total size.
if (partitionSizes != null) {
totalSize = estimatedUncompressedSum(partitionSizes);
totalSize = Arrays.stream(partitionSizes).sum();
} else {
totalSize = uncompressedTotalSize;
}
Expand Down Expand Up @@ -169,16 +170,6 @@ VertexManagerEvent getVertexManagerEvent(long[] partitionSizes,
return vmEvent;
}

// Assume 3 : 1 compression ratio to estimate the total size
// of all partitions.
long estimatedUncompressedSum(long[] partitionStats) {
long sum = 0;
for (long partition : partitionStats) {
sum += partition;
}
return sum * 3;
}

public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);
when(mockVertex.getName()).thenReturn(vName);
Expand Down

0 comments on commit 43562ad

Please sign in to comment.