diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index c7e14b6b4e1b..fb938ef6d31f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -104,6 +104,7 @@ import java.util.Properties; import java.util.stream.Collectors; +import static org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY; import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY; import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1; @@ -529,6 +530,12 @@ public class HoodieWriteConfig extends HoodieConfig { .withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks " + "a free port and communicates to all the executors. This should rarely be changed."); + public static final ConfigProperty CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES = ConfigProperty + .key("hoodie.cdc.file.group.iterator.memory.spill.bytes") + .defaultValue(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES) + .markAdvanced() + .withDocumentation("Amount of memory in bytes to be used in bytes for CDCFileGroupIterator holding data in-memory, before spilling to disk."); + public static final ConfigProperty EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty .key("hoodie.embed.timeline.server.threads") .defaultValue("-1") diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala index c9516ff249e3..1b0ad4557fc7 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala @@ -24,7 +24,10 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader import org.apache.hudi.HoodieConversionUtils.toScalaOption import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport import org.apache.hudi.avro.HoodieAvroUtils -import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties} + +import org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED, SPILLABLE_DISK_MAP_TYPE} +import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, HoodieMetadataConfig, TypedProperties} + import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload} import org.apache.hudi.common.table.HoodieTableMetaClient import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils} @@ -33,7 +36,11 @@ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._ import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._ import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator import org.apache.hudi.common.util.ValidationUtils.checkState +import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils} +import org.apache.hudi.common.util.collection.ExternalSpillableMap import org.apache.hudi.config.HoodiePayloadConfig +import org.apache.hudi.config.HoodieWriteConfig + import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory import org.apache.hudi.storage.{StorageConfiguration, StoragePath} @@ -41,6 +48,7 @@ import org.apache.avro.Schema import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path + import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection import org.apache.spark.sql.avro.HoodieAvroDeserializer import org.apache.spark.sql.catalyst.InternalRow @@ -50,9 +58,9 @@ import org.apache.spark.sql.types.StructType import org.apache.spark.unsafe.types.UTF8String import java.io.Closeable -import java.util.Properties +import java.util +import java.util.{Locale, Map, Properties} import java.util.stream.Collectors - import scala.annotation.tailrec import scala.collection.JavaConverters._ import scala.collection.mutable @@ -196,7 +204,16 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, * Keep the after-image data. Only one case will use this: * the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[OP_KEY_ONLY]] or [[DATA_BEFORE]]. */ - private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty + private var afterImageRecords: util.Map[String, InternalRow] = new ExternalSpillableMap[String, InternalRow]( + props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(), + HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()), + props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath), + new DefaultSizeEstimator[String], + new DefaultSizeEstimator[InternalRow], + ExternalSpillableMap.DiskMapType.valueOf(props.getString( + SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString) + .toUpperCase(Locale.ROOT)), + props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue())) private var internalRowToJsonStringConverter = new InternalRowToJsonStringConverter(originTableSchema) @@ -273,9 +290,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, recordToLoad.update(2, recordToJsonAsUTF8String(before)) parse(op) match { case INSERT => - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case UPDATE => - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case _ => recordToLoad.update(3, null) } @@ -287,10 +304,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, parse(op) match { case INSERT => recordToLoad.update(2, null) - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case UPDATE => recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) - recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey))) + recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey))) case _ => recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey))) recordToLoad.update(3, null) @@ -398,7 +415,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit, // load afterFileSlice to afterImageRecords if (currentCDCFileSplit.getAfterFileSlice.isPresent) { val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get()) - afterImageRecords = mutable.Map.empty + afterImageRecords.clear() iter.foreach { row => val key = getRecordKey(row) afterImageRecords.put(key, row.copy())