Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): optimize Hummock version delta deletion #20114

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,12 @@ pub struct MetaDeveloperConfig {
#[serde(default = "default::developer::hummock_time_travel_sst_info_insert_batch_size")]
/// Max number of SSTs inserted into meta store per INSERT, during time travel metadata writing.
pub hummock_time_travel_sst_info_insert_batch_size: usize,

#[serde(default = "default::developer::hummock_delta_log_delete_batch_size")]
pub hummock_delta_log_delete_batch_size: usize,

#[serde(default = "default::developer::time_travel_vacuum_interval_sec")]
pub time_travel_vacuum_interval_sec: u64,
}

/// The section `[server]` in `risingwave.toml`.
Expand Down Expand Up @@ -1475,7 +1481,7 @@ pub mod default {
}

pub fn vacuum_spin_interval_ms() -> u64 {
200
100
}

pub fn hummock_version_checkpoint_interval_sec() -> u64 {
Expand Down Expand Up @@ -2054,6 +2060,14 @@ pub mod default {
100
}

pub fn hummock_delta_log_delete_batch_size() -> usize {
512
}

pub fn time_travel_vacuum_interval_sec() -> u64 {
30
}

pub fn memory_controller_threshold_aggressive() -> f64 {
0.9
}
Expand Down
2 changes: 1 addition & 1 deletion src/config/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ This page is automatically generated by `./risedev generate-example-config`
| table_stat_throuput_window_seconds_for_merge | The window seconds of table throughput statistic history for merge compaction group. | 240 |
| table_stat_throuput_window_seconds_for_split | The window seconds of table throughput statistic history for split compaction group. | 60 |
| vacuum_interval_sec | Interval of invoking a vacuum job, to remove stale metadata from meta store and objects from object store. | 30 |
| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 200 |
| vacuum_spin_interval_ms | The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of meta node. | 100 |

## meta.compaction_config

Expand Down
4 changes: 3 additions & 1 deletion src/config/example.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ gc_history_retention_time_sec = 21600
max_inflight_time_travel_query = 1000
periodic_compaction_interval_sec = 60
vacuum_interval_sec = 30
vacuum_spin_interval_ms = 200
vacuum_spin_interval_ms = 100
hummock_version_checkpoint_interval_sec = 30
enable_hummock_data_archive = false
hummock_time_travel_snapshot_interval = 100
Expand Down Expand Up @@ -93,6 +93,8 @@ meta_actor_cnt_per_worker_parallelism_soft_limit = 100
meta_actor_cnt_per_worker_parallelism_hard_limit = 400
meta_hummock_time_travel_sst_info_fetch_batch_size = 10000
meta_hummock_time_travel_sst_info_insert_batch_size = 100
meta_hummock_delta_log_delete_batch_size = 512
meta_time_travel_vacuum_interval_sec = 30

[meta.meta_store_config]
max_connections = 10
Expand Down
8 changes: 8 additions & 0 deletions src/meta/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,10 @@ pub fn start(
compaction_deterministic_test: config.meta.enable_compaction_deterministic,
default_parallelism: config.meta.default_parallelism,
vacuum_interval_sec: config.meta.vacuum_interval_sec,
time_travel_vacuum_interval_sec: config
.meta
.developer
.time_travel_vacuum_interval_sec,
vacuum_spin_interval_ms: config.meta.vacuum_spin_interval_ms,
hummock_version_checkpoint_interval_sec: config
.meta
Expand All @@ -370,6 +374,10 @@ pub fn start(
.meta
.developer
.hummock_time_travel_sst_info_insert_batch_size,
hummock_delta_log_delete_batch_size: config
.meta
.developer
.hummock_delta_log_delete_batch_size,
min_delta_log_num_for_hummock_version_checkpoint: config
.meta
.min_delta_log_num_for_hummock_version_checkpoint,
Expand Down
24 changes: 13 additions & 11 deletions src/meta/src/hummock/manager/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,23 +183,23 @@ impl HummockManager {
let mut versioning_guard = self.versioning.write().await;
let versioning = versioning_guard.deref_mut();
let context_info = self.context_info.read().await;
let deltas_to_delete = versioning
let deltas_to_delete_count = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.count();
let batch = versioning
.hummock_version_deltas
.range(..=versioning.checkpoint.version.id)
.map(|(k, _)| *k)
.take(batch_size)
.collect_vec();
// If there is any safe point, skip this to ensure meta backup has required delta logs to
// replay version.
if !context_info.version_safe_points.is_empty() {
return Ok((0, deltas_to_delete.len()));
return Ok((0, deltas_to_delete_count));
}
let mut hummock_version_deltas =
BTreeMapTransaction::new(&mut versioning.hummock_version_deltas);
let batch = deltas_to_delete
.iter()
.take(batch_size)
.cloned()
.collect_vec();
if batch.is_empty() {
return Ok((0, 0));
}
Expand All @@ -213,7 +213,7 @@ impl HummockManager {
drop(versioning_guard);
self.check_state_consistency().await;
}
Ok((batch.len(), deltas_to_delete.len() - batch.len()))
Ok((batch.len(), deltas_to_delete_count - batch.len()))
}

/// Filters by Hummock version and Writes GC history.
Expand Down Expand Up @@ -487,7 +487,7 @@ impl HummockManager {
///
/// Returns number of deleted deltas
pub async fn delete_metadata(&self) -> MetaResult<usize> {
let batch_size = 64usize;
let batch_size = self.env.opts.hummock_delta_log_delete_batch_size;
let mut total_deleted = 0;
loop {
if total_deleted != 0 && self.env.opts.vacuum_spin_interval_ms != 0 {
Expand All @@ -500,7 +500,10 @@ impl HummockManager {
break;
}
}
Ok(total_deleted)
}

pub async fn delete_time_travel_metadata(&self) -> MetaResult<()> {
let current_epoch_time = Epoch::now().physical_time();
let epoch_watermark = Epoch::from_physical_time(
current_epoch_time.saturating_sub(
Expand All @@ -512,8 +515,7 @@ impl HummockManager {
)
.0;
self.truncate_time_travel_metadata(epoch_watermark).await?;

Ok(total_deleted)
Ok(())
}

/// Deletes stale SST objects from object store.
Expand Down
24 changes: 21 additions & 3 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,11 +392,29 @@ impl HummockManager {
self.write_checkpoint(&versioning_guard.checkpoint).await?;
checkpoint_version
};
for version_delta in hummock_version_deltas.values() {
if version_delta.prev_id == redo_state.id {
redo_state.apply_version_delta(version_delta);
let mut applied_delta_count = 0;
let total_to_apply = hummock_version_deltas.range(redo_state.id + 1..).count();
tracing::info!(
total_delta = hummock_version_deltas.len(),
total_to_apply,
"Start redo Hummock version."
);
for version_delta in hummock_version_deltas
.range(redo_state.id + 1..)
.map(|(_, v)| v)
{
assert_eq!(
version_delta.prev_id, redo_state.id,
"delta prev_id {}, redo state id {}",
version_delta.prev_id, redo_state.id
);
redo_state.apply_version_delta(version_delta);
applied_delta_count += 1;
if applied_delta_count % 1000 == 0 {
tracing::info!("Redo progress {applied_delta_count}/{total_to_apply}.");
}
}
tracing::info!("Finish redo Hummock version.");
versioning_guard.version_stats = hummock_version_stats::Entity::find()
.one(&meta_store.conn)
.await
Expand Down
30 changes: 30 additions & 0 deletions src/meta/src/hummock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ pub fn start_hummock_workers(
hummock_manager.clone(),
Duration::from_secs(meta_opts.vacuum_interval_sec),
),
start_vacuum_time_travel_metadata_loop(
hummock_manager.clone(),
Duration::from_secs(meta_opts.time_travel_vacuum_interval_sec),
),
];
workers
}
Expand Down Expand Up @@ -87,6 +91,32 @@ pub fn start_vacuum_metadata_loop(
(join_handle, shutdown_tx)
}

pub fn start_vacuum_time_travel_metadata_loop(
hummock_manager: HummockManagerRef,
interval: Duration,
) -> (JoinHandle<()>, Sender<()>) {
let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
let join_handle = tokio::spawn(async move {
let mut min_trigger_interval = tokio::time::interval(interval);
min_trigger_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
tokio::select! {
// Wait for interval
_ = min_trigger_interval.tick() => {},
// Shutdown vacuum
_ = &mut shutdown_rx => {
tracing::info!("Vacuum time travel metadata loop is stopped");
return;
}
}
if let Err(err) = hummock_manager.delete_time_travel_metadata().await {
tracing::warn!(error = %err.as_report(), "Vacuum time travel metadata error");
}
}
});
(join_handle, shutdown_tx)
}

pub fn start_checkpoint_loop(
hummock_manager: HummockManagerRef,
backup_manager: BackupManagerRef,
Expand Down
4 changes: 4 additions & 0 deletions src/meta/src/manager/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,14 @@ pub struct MetaOpts {
/// The spin interval inside a vacuum job. It avoids the vacuum job monopolizing resources of
/// meta node.
pub vacuum_spin_interval_ms: u64,
pub time_travel_vacuum_interval_sec: u64,
/// Interval of hummock version checkpoint.
pub hummock_version_checkpoint_interval_sec: u64,
pub enable_hummock_data_archive: bool,
pub hummock_time_travel_snapshot_interval: u64,
pub hummock_time_travel_sst_info_fetch_batch_size: usize,
pub hummock_time_travel_sst_info_insert_batch_size: usize,
pub hummock_delta_log_delete_batch_size: usize,
/// The minimum delta log number a new checkpoint should compact, otherwise the checkpoint
/// attempt is rejected. Greater value reduces object store IO, meanwhile it results in
/// more loss of in memory `HummockVersionCheckpoint::stale_objects` state when meta node is
Expand Down Expand Up @@ -268,12 +270,14 @@ impl MetaOpts {
compaction_deterministic_test: false,
default_parallelism: DefaultParallelism::Full,
vacuum_interval_sec: 30,
time_travel_vacuum_interval_sec: 30,
vacuum_spin_interval_ms: 0,
hummock_version_checkpoint_interval_sec: 30,
enable_hummock_data_archive: false,
hummock_time_travel_snapshot_interval: 0,
hummock_time_travel_sst_info_fetch_batch_size: 10_000,
hummock_time_travel_sst_info_insert_batch_size: 10,
hummock_delta_log_delete_batch_size: 1000,
min_delta_log_num_for_hummock_version_checkpoint: 1,
min_sst_retention_time_sec: 3600 * 24 * 7,
full_gc_interval_sec: 3600 * 24 * 7,
Expand Down
Loading