Skip to content

Commit

Permalink
refactor(meta): optimize Hummock delta log deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 committed Jan 12, 2025
1 parent 4c38440 commit 749eceb
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 12 deletions.
7 changes: 7 additions & 0 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,9 @@ pub struct MetaDeveloperConfig {
#[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
/// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing.
pub hummock_time_travel_sst_info_insert_batch_size: usize,

#[serde(default = "default::developer::hummock_delta_log_delete_batch_size")]
pub hummock_delta_log_delete_batch_size: usize,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -2054,6 +2057,10 @@ pub mod default {
100
}

pub fn hummock_delta_log_delete_batch_size() -> usize {
512
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
1 change: 1 addition & 0 deletions src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000
meta_hummock_time_travel_sst_info_insert_batch_size = 100
meta_hummock_delta_log_delete_batch_size = 512

[meta.meta_store_config]
max_connections = 10
Expand Down
4 changes: 4 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ pub fn start(
.meta
.developer
.hummock_time_travel_sst_info_insert_batch_size,
hummock_delta_log_delete_batch_size: config
.meta
.developer
.hummock_delta_log_delete_batch_size,
min_delta_log_num_for_hummock_version_checkpoint: config
.meta
.min_delta_log_num_for_hummock_version_checkpoint,
Expand Down
18 changes: 9 additions & 9 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,23 @@ impl HummockManager {
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
let deltas_to_delete = versioning
let deltas_to_delete_count = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.count();
let batch = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.take(batch_size)
.collect_vec();
// If there is any safe point, skip this to ensure meta backup has required delta logs to
// replay version.
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete.len()));
return Ok((0, deltas_to_delete_count));
}
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
let batch = deltas_to_delete
.iter()
.take(batch_size)
.cloned()
.collect_vec();
if batch.is_empty() {
return Ok((0, 0));
}
Expand All @@ -213,7 +213,7 @@ impl HummockManager {
drop(versioning_guard);
self.check_state_consistency().await;
}
Ok((batch.len(), deltas_to_delete.len() - batch.len()))
Ok((batch.len(), deltas_to_delete_count - batch.len()))
}

/// Filters by Hummock version and Writes GC history.
Expand Down Expand Up @@ -487,7 +487,7 @@ impl HummockManager {
///
/// Returns number of deleted deltas
pub async fn delete_metadata(&self) -> MetaResult<usize> {
let batch_size = 64usize;
let batch_size = self.env.opts.hummock_delta_log_delete_batch_size;
let mut total_deleted = 0;
loop {
if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 {
Expand Down
20 changes: 17 additions & 3 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,25 @@ impl HummockManager {
self.write_checkpoint(&versioning_guard.checkpoint).await?;
checkpoint_version
};
for version_delta in hummock_version_deltas.values() {
if version_delta.prev_id == redo_state.id {
redo_state.apply_version_delta(version_delta);
let mut applied_delta_count = 0;
let total_to_apply = hummock_version_deltas.range(redo_state.id..).count();
tracing::info!("Start redo Hummock version.");
for version_delta in hummock_version_deltas
.range(redo_state.id..)
.map(|(_, v)| v)
{
assert_eq!(
version_delta.prev_id, redo_state.id,
"delta prev_id {}, redo state id {}",
version_delta.prev_id, redo_state.id
);
redo_state.apply_version_delta(version_delta);
applied_delta_count += 1;
if applied_delta_count % 1000 == 0 {
tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
}
}
tracing::info!("Finish redo Hummock version.");
versioning_guard.version_stats = hummock_version_stats::Entity::find()
.one(&meta_store.conn)
.await
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ pub struct MetaOpts {
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
pub hummock_time_travel_sst_info_insert_batch_size: usize,
pub hummock_delta_log_delete_batch_size: usize,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
/// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
Expand Down Expand Up @@ -274,6 +275,7 @@ impl MetaOpts {
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
hummock_time_travel_sst_info_insert_batch_size: 10,
hummock_delta_log_delete_batch_size: 1000,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
Expand Down

0 comments on commit 749eceb

Please sign in to comment.