From 6dac4ca263c095ee286d153ef74ad2afce6eb4ef Mon Sep 17 00:00:00 2001 From: Linwei Zhang Date: Sun, 22 Oct 2023 00:06:38 +0800 Subject: [PATCH] Add rocksdb as new kernel --- Cargo.toml | 1 + src/bench/kernel_bench.rs | 16 +++++-- src/error.rs | 3 ++ src/kernel/lsm/compactor.rs | 4 +- src/kernel/lsm/table/scope.rs | 2 +- src/kernel/lsm/version/cleaner.rs | 2 +- src/kernel/mod.rs | 1 + src/kernel/rocksdb_storage.rs | 72 +++++++++++++++++++++++++++++++ 8 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 src/kernel/rocksdb_storage.rs diff --git a/Cargo.toml b/Cargo.toml index 0078539..d487e1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ tonic = "0.10.2" prost = "0.12" # 其他数据库内核 sled = "0.34.7" +rocksdb = "0.21.0" [dev-dependencies] assert_cmd = "0.11.0" diff --git a/src/bench/kernel_bench.rs b/src/bench/kernel_bench.rs index c8b324f..044dc34 100644 --- a/src/bench/kernel_bench.rs +++ b/src/bench/kernel_bench.rs @@ -6,6 +6,7 @@ use std::sync::atomic::AtomicU32; use std::sync::atomic::Ordering::Relaxed; use kip_db::kernel::lsm::storage::KipStorage; +use kip_db::kernel::rocksdb_storage::RocksdbStorage; use kip_db::kernel::sled_storage::SledStorage; use kip_db::kernel::Storage; @@ -109,7 +110,7 @@ fn monotonic_crud(c: &mut Criterion) { let count = AtomicU32::new(0_u32); b.iter(|| async { db.set( - Bytes::from(count.fetch_add(1, Relaxed).to_be_bytes()), + Bytes::from(count.fetch_add(1, Relaxed).to_be_bytes().to_vec()), Bytes::new(), ) .await @@ -150,9 +151,12 @@ fn random_crud(c: &mut Criterion) { c.bench_function(&format!("Store: {}, random inserts", T::name()), |b| { b.iter(|| async { - db.set(Bytes::from(random(SIZE).to_be_bytes()), Bytes::new()) - .await - .unwrap(); + db.set( + Bytes::from(random(SIZE).to_be_bytes().to_vec()), + Bytes::new(), + ) + .await + .unwrap(); }) }); @@ -191,21 +195,25 @@ fn empty_opens(c: &mut Criterion) { fn kv_bulk_load(c: &mut Criterion) { bulk_load::(c); bulk_load::(c); + bulk_load::(c); } fn kv_monotonic_crud(c: &mut Criterion) { monotonic_crud::(c); monotonic_crud::(c); + monotonic_crud::(c); } fn kv_random_crud(c: &mut Criterion) { random_crud::(c); random_crud::(c); + random_crud::(c); } fn kv_empty_opens(c: &mut Criterion) { empty_opens::(c); empty_opens::(c); + empty_opens::(c); } criterion_group!( diff --git a/src/error.rs b/src/error.rs index 0e0b7fd..9d2c9d8 100644 --- a/src/error.rs +++ b/src/error.rs @@ -45,6 +45,9 @@ pub enum KernelError { #[error(transparent)] SledErr(#[from] sled::Error), + #[error(transparent)] + RocksdbErr(#[from] rocksdb::Error), + #[error("Cache size overflow")] CacheSizeOverFlow, diff --git a/src/kernel/lsm/compactor.rs b/src/kernel/lsm/compactor.rs index 8e95f8b..dfb0ff4 100644 --- a/src/kernel/lsm/compactor.rs +++ b/src/kernel/lsm/compactor.rs @@ -26,11 +26,11 @@ pub(crate) type MergeShardingVec = Vec<(i64, Vec)>; pub(crate) type DelNode = (Vec, TableMeta); /// Major压缩时的待删除Gen封装(N为此次Major所压缩的Level),第一个为Level N级,第二个为Level N+1级 pub(crate) type DelNodeTuple = (DelNode, DelNode); -pub(crate) type SeekScope = (Scope, usize); +pub type SeekScope = (Scope, usize); /// Store与Compactor的交互信息 #[derive(Debug)] -pub(crate) enum CompactTask { +pub enum CompactTask { Seek(SeekScope), Flush(Option>), } diff --git a/src/kernel/lsm/table/scope.rs b/src/kernel/lsm/table/scope.rs index 16f1b7e..41dc59f 100644 --- a/src/kernel/lsm/table/scope.rs +++ b/src/kernel/lsm/table/scope.rs @@ -14,7 +14,7 @@ const SEEK_COMPACTION_COUNT: u32 = 100; /// 用于缓存SSTable中所有数据的第一个和最后一个数据的Key /// 标明数据的范围以做到快速区域定位 #[derive(Serialize, Deserialize, Debug, Clone)] -pub(crate) struct Scope { +pub struct Scope { pub(crate) start: Bytes, pub(crate) end: Bytes, gen: i64, diff --git a/src/kernel/lsm/version/cleaner.rs b/src/kernel/lsm/version/cleaner.rs index 4be0310..e017feb 100644 --- a/src/kernel/lsm/version/cleaner.rs +++ b/src/kernel/lsm/version/cleaner.rs @@ -5,7 +5,7 @@ use tokio::sync::mpsc::UnboundedReceiver; use tracing::error; #[derive(Debug)] -pub(crate) enum CleanTag { +pub enum CleanTag { Clean(u64), Add { version: u64, gens: Vec }, } diff --git a/src/kernel/mod.rs b/src/kernel/mod.rs index 50d7ee1..21e3cb1 100644 --- a/src/kernel/mod.rs +++ b/src/kernel/mod.rs @@ -13,6 +13,7 @@ use crate::KernelError; pub mod io; pub mod lsm; +pub mod rocksdb_storage; pub mod sled_storage; pub mod utils; diff --git a/src/kernel/rocksdb_storage.rs b/src/kernel/rocksdb_storage.rs new file mode 100644 index 0000000..9ac73a3 --- /dev/null +++ b/src/kernel/rocksdb_storage.rs @@ -0,0 +1,72 @@ +use crate::kernel::Storage; +use crate::KernelError; +use async_trait::async_trait; +use bytes::Bytes; +use core::slice::SlicePattern; +use std::path::PathBuf; + +#[derive(Debug)] +pub struct RocksdbStorage { + data_base: rocksdb::DB, +} + +#[async_trait] +impl Storage for RocksdbStorage { + #[inline] + fn name() -> &'static str + where + Self: Sized, + { + "Rocksdb" + } + + #[inline] + async fn open(path: impl Into + Send) -> crate::kernel::KernelResult { + let db = rocksdb::DB::open_default(path.into())?; + + Ok(RocksdbStorage { data_base: db }) + } + + #[inline] + async fn flush(&self) -> crate::kernel::KernelResult<()> { + let _ignore = self.data_base.flush()?; + Ok(()) + } + + #[inline] + async fn set(&self, key: Bytes, value: Bytes) -> crate::kernel::KernelResult<()> { + let _ignore = self.data_base.put(key.as_slice(), value.to_vec())?; + Ok(()) + } + + #[inline] + async fn get(&self, key: &[u8]) -> crate::kernel::KernelResult> { + match self.data_base.get(key)? { + None => Ok(None), + Some(i_vec) => Ok(Some(Bytes::from(i_vec.to_vec()))), + } + } + + #[inline] + async fn remove(&self, key: &[u8]) -> crate::kernel::KernelResult<()> { + match self.data_base.delete(key) { + Ok(_) => Ok(()), + Err(e) => Err(KernelError::RocksdbErr(e)), + } + } + + #[inline] + async fn size_of_disk(&self) -> crate::kernel::KernelResult { + unimplemented!("Rocksdb does not support size_of_disk()") + } + + #[inline] + async fn len(&self) -> crate::kernel::KernelResult { + unimplemented!("Rocksdb does not support len()") + } + + #[inline] + async fn is_empty(&self) -> bool { + unimplemented!("Rocksdb does not support is_empty()") + } +}