Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8839] CdcFileGroupIterator use spillable hashmap #12592

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Properties;
import java.util.stream.Collectors;

import static org.apache.hudi.common.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES;
import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_DEPRECATED_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.config.HoodieReaderConfig.RECORD_MERGE_IMPL_CLASSES_WRITE_CONFIG_KEY;
import static org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1;
Expand Down Expand Up @@ -529,6 +530,12 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Port at which the timeline server listens for requests. When running embedded in each writer, it picks "
+ "a free port and communicates to all the executors. This should rarely be changed.");

public static final ConfigProperty<Long> CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES = ConfigProperty
.key("hoodie.cdc.file.group.iterator.memory.spill.bytes")
.defaultValue(DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES)
.markAdvanced()
.withDocumentation("Amount of memory in bytes to be used in bytes for CDCFileGroupIterator holding data in-memory, before spilling to disk.");

public static final ConfigProperty<String> EMBEDDED_TIMELINE_NUM_SERVER_THREADS = ConfigProperty
.key("hoodie.embed.timeline.server.threads")
.defaultValue("-1")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import org.apache.hudi.HoodieBaseRelation.BaseFileReader
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.HoodieDataSourceHelper.AvroDeserializerSupport
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}

import org.apache.hudi.common.config.HoodieCommonConfig.{DISK_MAP_BITCASK_COMPRESSION_ENABLED, SPILLABLE_DISK_MAP_TYPE}
import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, HoodieMetadataConfig, TypedProperties}

import org.apache.hudi.common.model.{FileSlice, HoodieAvroRecordMerger, HoodieLogFile, HoodieRecord, HoodieRecordMerger, HoodieRecordPayload}
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
Expand All @@ -33,14 +36,19 @@ import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
import org.apache.hudi.common.table.log.HoodieCDCLogRecordIterator
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.common.util.{DefaultSizeEstimator, FileIOUtils}
import org.apache.hudi.common.util.collection.ExternalSpillableMap
import org.apache.hudi.config.HoodiePayloadConfig
import org.apache.hudi.config.HoodieWriteConfig

import org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
import org.apache.hudi.storage.{StorageConfiguration, StoragePath}

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, GenericRecord, IndexedRecord}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.avro.HoodieAvroDeserializer
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -50,9 +58,9 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

import java.io.Closeable
import java.util.Properties
import java.util
import java.util.{Locale, Map, Properties}
import java.util.stream.Collectors

import scala.annotation.tailrec
import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -196,7 +204,16 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
* Keep the after-image data. Only one case will use this:
* the cdc infer case is [[AS_IS]] and [[cdcSupplementalLoggingMode]] is [[OP_KEY_ONLY]] or [[DATA_BEFORE]].
*/
private var afterImageRecords: mutable.Map[String, InternalRow] = mutable.Map.empty
private var afterImageRecords: util.Map[String, InternalRow] = new ExternalSpillableMap[String, InternalRow](
props.getLong(HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.key(),
HoodieWriteConfig.CDC_FILE_GROUP_ITERATOR_MEMORY_SPILL_BYTES.defaultValue()),
props.getString(HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH.key, FileIOUtils.getDefaultSpillableMapBasePath),
new DefaultSizeEstimator[String],
new DefaultSizeEstimator[InternalRow],
ExternalSpillableMap.DiskMapType.valueOf(props.getString(
SPILLABLE_DISK_MAP_TYPE.key(), SPILLABLE_DISK_MAP_TYPE.defaultValue().toString)
.toUpperCase(Locale.ROOT)),
props.getBoolean(DISK_MAP_BITCASK_COMPRESSION_ENABLED.key(), DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue()))

private var internalRowToJsonStringConverter = new InternalRowToJsonStringConverter(originTableSchema)

Expand Down Expand Up @@ -273,9 +290,9 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
recordToLoad.update(2, recordToJsonAsUTF8String(before))
parse(op) match {
case INSERT =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case _ =>
recordToLoad.update(3, null)
}
Expand All @@ -287,10 +304,10 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
parse(op) match {
case INSERT =>
recordToLoad.update(2, null)
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case UPDATE =>
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords(recordKey)))
recordToLoad.update(3, convertRowToJsonString(afterImageRecords.get(recordKey)))
case _ =>
recordToLoad.update(2, recordToJsonAsUTF8String(beforeImageRecords(recordKey)))
recordToLoad.update(3, null)
Expand Down Expand Up @@ -398,7 +415,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
// load afterFileSlice to afterImageRecords
if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
afterImageRecords = mutable.Map.empty
afterImageRecords.clear()
iter.foreach { row =>
val key = getRecordKey(row)
afterImageRecords.put(key, row.copy())
Expand Down
Loading