diff --git a/src/common/src/config.rs b/src/common/src/config.rs index 7de485a096362..416467c4e868a 100644 --- a/src/common/src/config.rs +++ b/src/common/src/config.rs @@ -532,6 +532,12 @@ pub struct MetaDeveloperConfig { #[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")] /// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing. pub hummock_time_travel_sst_info_insert_batch_size: usize, + + #[serde(default = "default::developer::hummock_delta_log_delete_batch_size")] + pub hummock_delta_log_delete_batch_size: usize, + + #[serde(default = "default::developer::time_travel_vacuum_interval_sec")] + pub time_travel_vacuum_interval_sec: u64, } /// The section `[server]` in `risingwave.toml`. @@ -1475,7 +1481,7 @@ pub mod default { } pub fn vacuum_spin_interval_ms() -> u64 { - 200 + 100 } pub fn hummock_version_checkpoint_interval_sec() -> u64 { @@ -2054,6 +2060,14 @@ pub mod default { 100 } + pub fn hummock_delta_log_delete_batch_size() -> usize { + 512 + } + + pub fn time_travel_vacuum_interval_sec() -> u64 { + 30 + } + pub fn memory_controller_threshold_aggressive() -> f64 { 0.9 } diff --git a/src/config/docs.md b/src/config/docs.md index 43ab00e12cab6..f055d53abce6f 100644 --- a/src/config/docs.md +++ b/src/config/docs.md @@ -69,7 +69,7 @@ This page is automatically generated by `./risedev generate-example-config` | table_stat_throuput_window_seconds_for_merge | The window seconds of table throughput statistic history for merge compaction group. | 240 | | table_stat_throuput_window_seconds_for_split | The window seconds of table throughput statistic history for split compaction group. | 60 | | vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 | -| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 | +| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 100 | ## meta.compaction_config diff --git a/src/config/example.toml b/src/config/example.toml index 0cc848b073093..8644d8c303f1d 100644 --- a/src/config/example.toml +++ b/src/config/example.toml @@ -21,7 +21,7 @@ gc_history_retention_time_sec = 21600 max_inflight_time_travel_query = 1000 periodic_compaction_interval_sec = 60 vacuum_interval_sec = 30 -vacuum_spin_interval_ms = 200 +vacuum_spin_interval_ms = 100 hummock_version_checkpoint_interval_sec = 30 enable_hummock_data_archive = false hummock_time_travel_snapshot_interval = 100 @@ -93,6 +93,8 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100 meta_actor_cnt_per_worker_parallelism_hard_limit = 400 meta_hummock_time_travel_sst_info_fetch_batch_size = 10000 meta_hummock_time_travel_sst_info_insert_batch_size = 100 +meta_hummock_delta_log_delete_batch_size = 512 +meta_time_travel_vacuum_interval_sec = 30 [meta.meta_store_config] max_connections = 10 diff --git a/src/meta/node/src/lib.rs b/src/meta/node/src/lib.rs index ce26b9395457c..9370a2e09b7a0 100644 --- a/src/meta/node/src/lib.rs +++ b/src/meta/node/src/lib.rs @@ -354,6 +354,10 @@ pub fn start( compaction_deterministic_test: config.meta.enable_compaction_deterministic, default_parallelism: config.meta.default_parallelism, vacuum_interval_sec: config.meta.vacuum_interval_sec, + time_travel_vacuum_interval_sec: config + .meta + .developer + .time_travel_vacuum_interval_sec, vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms, hummock_version_checkpoint_interval_sec: config .meta @@ -370,6 +374,10 @@ pub fn start( .meta .developer .hummock_time_travel_sst_info_insert_batch_size, + hummock_delta_log_delete_batch_size: config + .meta + .developer + .hummock_delta_log_delete_batch_size, min_delta_log_num_for_hummock_version_checkpoint: config .meta .min_delta_log_num_for_hummock_version_checkpoint, diff --git a/src/meta/src/hummock/manager/gc.rs b/src/meta/src/hummock/manager/gc.rs index eba07b7002244..064e0b951690b 100644 --- a/src/meta/src/hummock/manager/gc.rs +++ b/src/meta/src/hummock/manager/gc.rs @@ -183,23 +183,23 @@ impl HummockManager { let mut versioning_guard = self.versioning.write().await; let versioning = versioning_guard.deref_mut(); let context_info = self.context_info.read().await; - let deltas_to_delete = versioning + let deltas_to_delete_count = versioning .hummock_version_deltas .range(..=versioning.checkpoint.version.id) - .map(|(k, _)| *k) - .collect_vec(); + .count(); // If there is any safe point, skip this to ensure meta backup has required delta logs to // replay version. if !context_info.version_safe_points.is_empty() { - return Ok((0, deltas_to_delete.len())); + return Ok((0, deltas_to_delete_count)); } - let mut hummock_version_deltas = - BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); - let batch = deltas_to_delete - .iter() + let batch = versioning + .hummock_version_deltas + .range(..=versioning.checkpoint.version.id) + .map(|(k, _)| *k) .take(batch_size) - .cloned() .collect_vec(); + let mut hummock_version_deltas = + BTreeMapTransaction::new(&mut versioning.hummock_version_deltas); if batch.is_empty() { return Ok((0, 0)); } @@ -213,7 +213,7 @@ impl HummockManager { drop(versioning_guard); self.check_state_consistency().await; } - Ok((batch.len(), deltas_to_delete.len() - batch.len())) + Ok((batch.len(), deltas_to_delete_count - batch.len())) } /// Filters by Hummock version and Writes GC history. @@ -487,7 +487,7 @@ impl HummockManager { /// /// Returns number of deleted deltas pub async fn delete_metadata(&self) -> MetaResult { - let batch_size = 64usize; + let batch_size = self.env.opts.hummock_delta_log_delete_batch_size; let mut total_deleted = 0; loop { if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 { @@ -500,7 +500,10 @@ impl HummockManager { break; } } + Ok(total_deleted) + } + pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> { let current_epoch_time = Epoch::now().physical_time(); let epoch_watermark = Epoch::from_physical_time( current_epoch_time.saturating_sub( @@ -512,8 +515,7 @@ impl HummockManager { ) .0; self.truncate_time_travel_metadata(epoch_watermark).await?; - - Ok(total_deleted) + Ok(()) } /// Deletes stale SST objects from object store. diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index 5166acd6fcad7..f25405ef1ad91 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -392,11 +392,29 @@ impl HummockManager { self.write_checkpoint(&versioning_guard.checkpoint).await?; checkpoint_version }; - for version_delta in hummock_version_deltas.values() { - if version_delta.prev_id == redo_state.id { - redo_state.apply_version_delta(version_delta); + let mut applied_delta_count = 0; + let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count(); + tracing::info!( + total_delta = hummock_version_deltas.len(), + total_to_apply, + "Start redo Hummock version." + ); + for version_delta in hummock_version_deltas + .range(redo_state.id + 1..) + .map(|(_, v)| v) + { + assert_eq!( + version_delta.prev_id, redo_state.id, + "delta prev_id {}, redo state id {}", + version_delta.prev_id, redo_state.id + ); + redo_state.apply_version_delta(version_delta); + applied_delta_count += 1; + if applied_delta_count % 1000 == 0 { + tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}."); } } + tracing::info!("Finish redo Hummock version."); versioning_guard.version_stats = hummock_version_stats::Entity::find() .one(&meta_store.conn) .await diff --git a/src/meta/src/hummock/mod.rs b/src/meta/src/hummock/mod.rs index bcc7229e267c7..db6c9e87d38e4 100644 --- a/src/meta/src/hummock/mod.rs +++ b/src/meta/src/hummock/mod.rs @@ -56,6 +56,10 @@ pub fn start_hummock_workers( hummock_manager.clone(), Duration::from_secs(meta_opts.vacuum_interval_sec), ), + start_vacuum_time_travel_metadata_loop( + hummock_manager.clone(), + Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec), + ), ]; workers } @@ -87,6 +91,32 @@ pub fn start_vacuum_metadata_loop( (join_handle, shutdown_tx) } +pub fn start_vacuum_time_travel_metadata_loop( + hummock_manager: HummockManagerRef, + interval: Duration, +) -> (JoinHandle<()>, Sender<()>) { + let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel(); + let join_handle = tokio::spawn(async move { + let mut min_trigger_interval = tokio::time::interval(interval); + min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + tokio::select! { + // Wait for interval + _ = min_trigger_interval.tick() => {}, + // Shutdown vacuum + _ = &mut shutdown_rx => { + tracing::info!("Vacuum time travel metadata loop is stopped"); + return; + } + } + if let Err(err) = hummock_manager.delete_time_travel_metadata().await { + tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error"); + } + } + }); + (join_handle, shutdown_tx) +} + pub fn start_checkpoint_loop( hummock_manager: HummockManagerRef, backup_manager: BackupManagerRef, diff --git a/src/meta/src/manager/env.rs b/src/meta/src/manager/env.rs index ddfb49a3c581b..8e75af9be00fa 100644 --- a/src/meta/src/manager/env.rs +++ b/src/meta/src/manager/env.rs @@ -109,12 +109,14 @@ pub struct MetaOpts { /// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of /// meta node. pub vacuum_spin_interval_ms: u64, + pub time_travel_vacuum_interval_sec: u64, /// Interval of hummock version checkpoint. pub hummock_version_checkpoint_interval_sec: u64, pub enable_hummock_data_archive: bool, pub hummock_time_travel_snapshot_interval: u64, pub hummock_time_travel_sst_info_fetch_batch_size: usize, pub hummock_time_travel_sst_info_insert_batch_size: usize, + pub hummock_delta_log_delete_batch_size: usize, /// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint /// attempt is rejected. Greater value reduces object store IO, meanwhile it results in /// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is @@ -268,12 +270,14 @@ impl MetaOpts { compaction_deterministic_test: false, default_parallelism: DefaultParallelism::Full, vacuum_interval_sec: 30, + time_travel_vacuum_interval_sec: 30, vacuum_spin_interval_ms: 0, hummock_version_checkpoint_interval_sec: 30, enable_hummock_data_archive: false, hummock_time_travel_snapshot_interval: 0, hummock_time_travel_sst_info_fetch_batch_size: 10_000, hummock_time_travel_sst_info_insert_batch_size: 10, + hummock_delta_log_delete_batch_size: 1000, min_delta_log_num_for_hummock_version_checkpoint: 1, min_sst_retention_time_sec: 3600 * 24 * 7, full_gc_interval_sec: 3600 * 24 * 7,