diff --git a/src/postgres.rs b/src/postgres.rs index 06b402d..c24d669 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,4 +1,5 @@ use std::mem::{offset_of, MaybeUninit}; +use std::ops::{Deref, DerefMut}; use std::ptr::NonNull; const _: () = assert!( @@ -46,6 +47,14 @@ impl Page { assert_eq!(offset_of!(Self, opaque), this.header.pd_special as usize); this } + #[allow(dead_code)] + pub fn clone_into_boxed(&self) -> Box { + let mut result = Box::new_uninit(); + unsafe { + std::ptr::copy(self as *const Self, result.as_mut_ptr(), 1); + result.assume_init() + } + } pub fn get_opaque(&self) -> &Opaque { &self.opaque } @@ -170,15 +179,20 @@ pub struct BufferReadGuard { } impl BufferReadGuard { - pub fn get(&self) -> &Page { - unsafe { self.page.as_ref() } - } #[allow(dead_code)] pub fn id(&self) -> u32 { self.id } } +impl Deref for BufferReadGuard { + type Target = Page; + + fn deref(&self) -> &Page { + unsafe { self.page.as_ref() } + } +} + impl Drop for BufferReadGuard { fn drop(&mut self) { unsafe { @@ -197,15 +211,23 @@ pub struct BufferWriteGuard { } impl BufferWriteGuard { - pub fn get(&self) -> &Page { + pub fn id(&self) -> u32 { + self.id + } +} + +impl Deref for BufferWriteGuard { + type Target = Page; + + fn deref(&self) -> &Page { unsafe { self.page.as_ref() } } - pub fn get_mut(&mut self) -> &mut Page { +} + +impl DerefMut for BufferWriteGuard { + fn deref_mut(&mut self) -> &mut Page { unsafe { self.page.as_mut() } } - pub fn id(&self) -> u32 { - self.id - } } impl Drop for BufferWriteGuard { @@ -215,11 +237,7 @@ impl Drop for BufferWriteGuard { pgrx::pg_sys::GenericXLogAbort(self.state); } else { if self.tracking_freespace { - pgrx::pg_sys::RecordPageWithFreeSpace( - self.raw, - self.id, - self.get().freespace() as _, - ); + pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, self.id, self.freespace() as _); pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, self.id, self.id + 1); } pgrx::pg_sys::GenericXLogFinish(self.state); @@ -329,13 +347,9 @@ impl Relation { return None; } let write = self.write(id, true); - if write.get().freespace() < freespace as _ { + if write.freespace() < freespace as _ { // the free space is recorded incorrectly - pgrx::pg_sys::RecordPageWithFreeSpace( - self.raw, - id, - write.get().freespace() as _, - ); + pgrx::pg_sys::RecordPageWithFreeSpace(self.raw, id, write.freespace() as _); pgrx::pg_sys::FreeSpaceMapVacuumRange(self.raw, id, id + 1); continue; } @@ -343,4 +357,13 @@ impl Relation { } } } + #[allow(dead_code)] + pub fn len(&self) -> u32 { + unsafe { + pgrx::pg_sys::RelationGetNumberOfBlocksInFork( + self.raw, + pgrx::pg_sys::ForkNumber::MAIN_FORKNUM, + ) + } + } } diff --git a/src/vchordrq/algorithm/build.rs b/src/vchordrq/algorithm/build.rs index 06f835e..6554bfc 100644 --- a/src/vchordrq/algorithm/build.rs +++ b/src/vchordrq/algorithm/build.rs @@ -1,7 +1,7 @@ -use crate::postgres::BufferWriteGuard; -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::rabitq; use crate::vchordrq::algorithm::tuples::*; +use crate::vchordrq::algorithm::PageGuard; use crate::vchordrq::index::am_options::Opfamily; use crate::vchordrq::types::VchordrqBuildOptions; use crate::vchordrq::types::VchordrqExternalBuildOptions; @@ -32,7 +32,7 @@ pub fn build, R: Reporter>( vector_options: VectorOptions, vchordrq_options: VchordrqIndexingOptions, heap_relation: T, - relation: Relation, + relation: impl RelationWrite, mut reporter: R, ) { let dims = vector_options.dims; @@ -75,7 +75,7 @@ pub fn build, R: Reporter>( }; let mut meta = Tape::create(&relation, false); assert_eq!(meta.first(), 0); - let mut vectors = Tape::>::create(&relation, true); + let mut vectors = Tape::, _>::create(&relation, true); let mut pointer_of_means = Vec::>::new(); for i in 0..structures.len() { let mut level = Vec::new(); @@ -99,10 +99,10 @@ pub fn build, R: Reporter>( let mut level = Vec::new(); for j in 0..structures[i].len() { if i == 0 { - let tape = Tape::::create(&relation, false); + let tape = Tape::::create(&relation, false); level.push(tape.first()); } else { - let mut tape = Tape::::create(&relation, false); + let mut tape = Tape::::create(&relation, false); let h2_mean = &structures[i].means[j]; let h2_children = &structures[i].children[j]; for child in h2_children.iter().copied() { @@ -361,18 +361,18 @@ impl Structure { } } -struct Tape<'a, T> { - relation: &'a Relation, - head: BufferWriteGuard, +struct Tape<'a: 'b, 'b, T, R: 'b + RelationWrite> { + relation: &'a R, + head: R::WriteGuard<'b>, first: u32, tracking_freespace: bool, _phantom: PhantomData T>, } -impl<'a, T> Tape<'a, T> { - fn create(relation: &'a Relation, tracking_freespace: bool) -> Self { +impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> { + fn create(relation: &'a R, tracking_freespace: bool) -> Self { let mut head = relation.extend(tracking_freespace); - head.get_mut().get_opaque_mut().skip = head.id(); + head.get_opaque_mut().skip = head.id(); let first = head.id(); Self { relation, @@ -387,19 +387,19 @@ impl<'a, T> Tape<'a, T> { } } -impl Tape<'_, T> +impl<'a: 'b, 'b, T, R: 'b + RelationWrite> Tape<'a, 'b, T, R> where T: rkyv::Serialize>, { fn push(&mut self, x: &T) -> (u32, u16) { let bytes = rkyv::to_bytes(x).expect("failed to serialize"); - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { let next = self.relation.extend(self.tracking_freespace); - self.head.get_mut().get_opaque_mut().next = next.id(); + self.head.get_opaque_mut().next = next.id(); self.head = next; - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { panic!("tuple is too large to fit in a fresh page") diff --git a/src/vchordrq/algorithm/insert.rs b/src/vchordrq/algorithm/insert.rs index ee47e9a..323fa24 100644 --- a/src/vchordrq/algorithm/insert.rs +++ b/src/vchordrq/algorithm/insert.rs @@ -1,7 +1,8 @@ -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::rabitq::fscan_process_lowerbound; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; +use crate::vchordrq::algorithm::PageGuard; use base::always_equal::AlwaysEqual; use base::distance::Distance; use base::distance::DistanceKind; @@ -11,7 +12,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; pub fn insert( - relation: Relation, + relation: impl RelationWrite + Clone, payload: Pointer, vector: V, distance_kind: DistanceKind, @@ -20,7 +21,6 @@ pub fn insert( let vector = vector.as_borrowed(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -84,9 +84,8 @@ pub fn insert( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -110,7 +109,7 @@ pub fn insert( AlwaysEqual(h1_tuple.first), )); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -155,11 +154,18 @@ pub fn insert( t: code.t(), }) .unwrap(); - append(relation, list.0, &tuple, false, in_building, in_building); + append( + relation.clone(), + list.0, + &tuple, + false, + in_building, + in_building, + ); } fn append( - relation: Relation, + relation: impl RelationWrite, first: u32, tuple: &[u8], tracking_freespace: bool, @@ -168,7 +174,7 @@ fn append( ) -> (u32, u16) { if tracking_freespace { if let Some(mut write) = relation.search(tuple.len()) { - let i = write.get_mut().alloc(tuple).unwrap(); + let i = write.alloc(tuple).unwrap(); return (write.id(), i); } } @@ -176,24 +182,22 @@ fn append( let mut current = first; loop { let read = relation.read(current); - if read.get().freespace() as usize >= tuple.len() - || read.get().get_opaque().next == u32::MAX - { + if read.freespace() as usize >= tuple.len() || read.get_opaque().next == u32::MAX { drop(read); let mut write = relation.write(current, tracking_freespace); - if let Some(i) = write.get_mut().alloc(tuple) { + if let Some(i) = write.alloc(tuple) { return (current, i); } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { let mut extend = relation.extend(tracking_freespace); - write.get_mut().get_opaque_mut().next = extend.id(); + write.get_opaque_mut().next = extend.id(); drop(write); - if let Some(i) = extend.get_mut().alloc(tuple) { + if let Some(i) = extend.alloc(tuple) { let result = (extend.id(), i); drop(extend); if updating_skip { let mut past = relation.write(first, tracking_freespace); - let skip = &mut past.get_mut().get_opaque_mut().skip; + let skip = &mut past.get_opaque_mut().skip; assert!(*skip != u32::MAX); *skip = std::cmp::max(*skip, result.0); } @@ -202,16 +206,16 @@ fn append( panic!("a tuple cannot even be fit in a fresh page"); } } - if skipping_traversal && current == first && write.get().get_opaque().skip != first { - current = write.get().get_opaque().skip; + if skipping_traversal && current == first && write.get_opaque().skip != first { + current = write.get_opaque().skip; } else { - current = write.get().get_opaque().next; + current = write.get_opaque().next; } } else { - if skipping_traversal && current == first && read.get().get_opaque().skip != first { - current = read.get().get_opaque().skip; + if skipping_traversal && current == first && read.get_opaque().skip != first { + current = read.get_opaque().skip; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrq/algorithm/mod.rs b/src/vchordrq/algorithm/mod.rs index 88239a8..41744d7 100644 --- a/src/vchordrq/algorithm/mod.rs +++ b/src/vchordrq/algorithm/mod.rs @@ -6,3 +6,62 @@ pub mod scan; pub mod tuples; pub mod vacuum; pub mod vectors; + +use crate::postgres::Page; +use std::ops::{Deref, DerefMut}; + +pub trait PageGuard { + fn id(&self) -> u32; +} + +pub trait RelationRead { + type ReadGuard<'a>: PageGuard + Deref + where + Self: 'a; + fn read(&self, id: u32) -> Self::ReadGuard<'_>; +} + +pub trait RelationWrite: RelationRead { + type WriteGuard<'a>: PageGuard + DerefMut + where + Self: 'a; + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_>; + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_>; + fn search(&self, freespace: usize) -> Option>; +} + +impl PageGuard for crate::postgres::BufferReadGuard { + fn id(&self) -> u32 { + self.id() + } +} + +impl PageGuard for crate::postgres::BufferWriteGuard { + fn id(&self) -> u32 { + self.id() + } +} + +impl RelationRead for crate::postgres::Relation { + type ReadGuard<'a> = crate::postgres::BufferReadGuard; + + fn read(&self, id: u32) -> Self::ReadGuard<'_> { + self.read(id) + } +} + +impl RelationWrite for crate::postgres::Relation { + type WriteGuard<'a> = crate::postgres::BufferWriteGuard; + + fn write(&self, id: u32, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.write(id, tracking_freespace) + } + + fn extend(&self, tracking_freespace: bool) -> Self::WriteGuard<'_> { + self.extend(tracking_freespace) + } + + fn search(&self, freespace: usize) -> Option> { + self.search(freespace) + } +} diff --git a/src/vchordrq/algorithm/prewarm.rs b/src/vchordrq/algorithm/prewarm.rs index 26bb986..6d01f7c 100644 --- a/src/vchordrq/algorithm/prewarm.rs +++ b/src/vchordrq/algorithm/prewarm.rs @@ -1,13 +1,12 @@ -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; use std::fmt::Write; -pub fn prewarm(relation: Relation, height: i32) -> String { +pub fn prewarm(relation: impl RelationRead + Clone, height: i32) -> String { let mut message = String::new(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -37,9 +36,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -47,7 +45,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { vectors::vector_warm::(relation.clone(), h1_tuple.mean); results.push(h1_tuple.first); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); @@ -66,16 +64,15 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let _h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") .expect("data corruption"); results.push(()); } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); diff --git a/src/vchordrq/algorithm/scan.rs b/src/vchordrq/algorithm/scan.rs index df4d93d..42357c0 100644 --- a/src/vchordrq/algorithm/scan.rs +++ b/src/vchordrq/algorithm/scan.rs @@ -1,4 +1,4 @@ -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::rabitq::fscan_process_lowerbound; use crate::vchordrq::algorithm::tuples::*; use crate::vchordrq::algorithm::vectors; @@ -11,7 +11,7 @@ use std::cmp::Reverse; use std::collections::BinaryHeap; pub fn scan( - relation: Relation, + relation: impl RelationRead + Clone, vector: V, distance_kind: DistanceKind, probes: Vec, @@ -20,7 +20,6 @@ pub fn scan( let vector = vector.as_borrowed(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -66,9 +65,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -92,7 +90,7 @@ pub fn scan( AlwaysEqual(h1_tuple.first), )); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -121,6 +119,7 @@ pub fn scan( for i in (1..meta_tuple.height_of_root).rev() { lists = make_lists(lists, probes[i as usize - 1]); } + drop(meta_guard); { let mut results = Vec::new(); for list in lists { @@ -138,9 +137,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -164,7 +162,7 @@ pub fn scan( AlwaysEqual(h0_tuple.payload), )); } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); diff --git a/src/vchordrq/algorithm/vacuum.rs b/src/vchordrq/algorithm/vacuum.rs index 2b219c4..c77bd21 100644 --- a/src/vchordrq/algorithm/vacuum.rs +++ b/src/vchordrq/algorithm/vacuum.rs @@ -1,13 +1,16 @@ -use crate::postgres::Relation; +use super::RelationWrite; use crate::vchordrq::algorithm::tuples::*; use base::search::Pointer; -pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) -> bool) { +pub fn vacuum( + relation: impl RelationWrite, + delay: impl Fn(), + callback: impl Fn(Pointer) -> bool, +) { // step 1: vacuum height_0_tuple { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -19,16 +22,15 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn let mut current = first; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") .expect("data corruption"); results.push(h1_tuple.first); } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } results @@ -42,9 +44,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn delay(); let mut h0_guard = relation.write(current, false); let mut reconstruct_removes = Vec::new(); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -53,8 +54,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn reconstruct_removes.push(i); } } - h0_guard.get_mut().reconstruct(&reconstruct_removes); - current = h0_guard.get().get_opaque().next; + h0_guard.reconstruct(&reconstruct_removes); + current = h0_guard.get_opaque().next; } } } @@ -63,7 +64,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn let mut current = { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -74,8 +74,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn delay(); let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { - let Some(vector_tuple) = read.get().get(i) else { + for i in 1..=read.len() { + let Some(vector_tuple) = read.get(i) else { continue; }; let vector_tuple = @@ -91,21 +91,21 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn if flag { drop(read); let mut write = relation.write(current, true); - for i in 1..=write.get().len() { - let Some(vector_tuple) = write.get().get(i) else { + for i in 1..=write.len() { + let Some(vector_tuple) = write.get(i) else { continue; }; let vector_tuple = unsafe { rkyv::archived_root::>(vector_tuple) }; if let Some(payload) = vector_tuple.payload.as_ref().copied() { if callback(Pointer::new(payload)) { - write.get_mut().free(i); + write.free(i); } } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrq/algorithm/vectors.rs b/src/vchordrq/algorithm/vectors.rs index 6a23f74..06075d3 100644 --- a/src/vchordrq/algorithm/vectors.rs +++ b/src/vchordrq/algorithm/vectors.rs @@ -1,11 +1,11 @@ use super::tuples::Vector; -use crate::postgres::Relation; +use super::RelationRead; use crate::vchordrq::algorithm::tuples::VectorTuple; use base::distance::Distance; use base::distance::DistanceKind; pub fn vector_dist( - relation: Relation, + relation: impl RelationRead, vector: V::Borrowed<'_>, mean: (u32, u16), payload: Option, @@ -25,7 +25,7 @@ pub fn vector_dist( return None; }; let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check return None; }; @@ -54,11 +54,11 @@ pub fn vector_dist( )) } -pub fn vector_warm(relation: Relation, mean: (u32, u16)) { +pub fn vector_warm(relation: impl RelationRead, mean: (u32, u16)) { let mut cursor = Ok(mean); while let Ok(mean) = cursor { let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check return; }; diff --git a/src/vchordrq/index/am.rs b/src/vchordrq/index/am.rs index 8f22db6..1dc7cdc 100644 --- a/src/vchordrq/index/am.rs +++ b/src/vchordrq/index/am.rs @@ -289,6 +289,7 @@ pub unsafe extern "C" fn ambuild( } else { let mut indtuples = 0; reporter.tuples_done(indtuples); + let relation = unsafe { Relation::new(index) }; match opfamily.vector_kind() { VectorKind::Vecf32 => { HeapRelation::>::traverse( @@ -296,7 +297,7 @@ pub unsafe extern "C" fn ambuild( true, |(pointer, vector)| { algorithm::insert::insert::>( - unsafe { Relation::new(index) }, + relation.clone(), pointer, vector, opfamily.distance_kind(), @@ -313,7 +314,7 @@ pub unsafe extern "C" fn ambuild( true, |(pointer, vector)| { algorithm::insert::insert::>( - unsafe { Relation::new(index) }, + relation.clone(), pointer, vector, opfamily.distance_kind(), @@ -627,6 +628,7 @@ unsafe fn parallel_build( } let index_relation = unsafe { Relation::new(index) }; + let scan = unsafe { pgrx::pg_sys::table_beginscan_parallel(heap, tablescandesc) }; let opfamily = unsafe { am_options::opfamily(index) }; let heap_relation = Heap { diff --git a/src/vchordrqfscan/algorithm/build.rs b/src/vchordrqfscan/algorithm/build.rs index 25a6611..9a0daa6 100644 --- a/src/vchordrqfscan/algorithm/build.rs +++ b/src/vchordrqfscan/algorithm/build.rs @@ -167,7 +167,7 @@ pub fn build( } pointer_of_firsts.push(level); } - forwards.head.get_mut().get_opaque_mut().skip = vectors.first(); + forwards.head.get_opaque_mut().skip = vectors.first(); meta.push(&MetaTuple { dims, height_of_root: structures.len() as u32, @@ -399,13 +399,13 @@ where { fn push(&mut self, x: &T) -> (u32, u16) { let bytes = rkyv::to_bytes(x).expect("failed to serialize"); - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { let next = self.relation.extend(self.tracking_freespace); - self.head.get_mut().get_opaque_mut().next = next.id(); + self.head.get_opaque_mut().next = next.id(); self.head = next; - if let Some(i) = self.head.get_mut().alloc(&bytes) { + if let Some(i) = self.head.alloc(&bytes) { (self.head.id(), i) } else { panic!("tuple is too large to fit in a fresh page") diff --git a/src/vchordrqfscan/algorithm/insert.rs b/src/vchordrqfscan/algorithm/insert.rs index 4dfd432..1a0ab77 100644 --- a/src/vchordrqfscan/algorithm/insert.rs +++ b/src/vchordrqfscan/algorithm/insert.rs @@ -14,7 +14,6 @@ use std::collections::BinaryHeap; pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_kind: DistanceKind) { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -35,22 +34,18 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k }) .unwrap(); if let Some(mut write) = relation.search(tuple.len()) { - let i = write.get_mut().alloc(&tuple).unwrap(); + let i = write.alloc(&tuple).unwrap(); break 'h0_vector (write.id(), i); } - let mut current = relation - .read(meta_tuple.forwards_first) - .get() - .get_opaque() - .skip; + let mut current = relation.read(meta_tuple.forwards_first).get_opaque().skip; let mut changed = false; loop { let read = relation.read(current); let flag = 'flag: { - if read.get().freespace() as usize >= tuple.len() { + if read.freespace() as usize >= tuple.len() { break 'flag true; } - if read.get().get_opaque().next == u32::MAX { + if read.get_opaque().next == u32::MAX { break 'flag true; } false @@ -58,28 +53,27 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if flag { drop(read); let mut write = relation.write(current, true); - if let Some(i) = write.get_mut().alloc(&tuple) { + if let Some(i) = write.alloc(&tuple) { break (current, i); } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { if changed { relation .write(meta_tuple.forwards_first, false) - .get_mut() .get_opaque_mut() .skip = write.id(); } let mut extend = relation.extend(true); - write.get_mut().get_opaque_mut().next = extend.id(); - if let Some(i) = extend.get_mut().alloc(&tuple) { + write.get_opaque_mut().next = extend.id(); + if let Some(i) = extend.alloc(&tuple) { break (extend.id(), i); } else { panic!("a tuple cannot even be fit in a fresh page"); } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } changed = true; } @@ -90,7 +84,6 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if is_residual { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -111,9 +104,8 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -141,7 +133,7 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -151,7 +143,6 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -196,9 +187,8 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k loop { let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { + for i in 1..=read.len() { let h0_tuple = read - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -207,10 +197,10 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k break 'flag true; } } - if read.get().freespace() as usize >= dummy.len() { + if read.freespace() as usize >= dummy.len() { break 'flag true; } - if read.get().get_opaque().next == u32::MAX { + if read.get_opaque().next == u32::MAX { break 'flag true; } false @@ -218,9 +208,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k if flag { drop(read); let mut write = relation.write(current, false); - for i in 1..=write.get().len() { + for i in 1..=write.len() { let flag = put( - write.get_mut().get_mut(i).expect("data corruption"), + write.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -230,9 +220,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k return; } } - if let Some(i) = write.get_mut().alloc(&dummy) { + if let Some(i) = write.alloc(&dummy) { let flag = put( - write.get_mut().get_mut(i).expect("data corruption"), + write.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -241,12 +231,12 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k assert!(flag, "a put fails even on a fresh tuple"); return; } - if write.get().get_opaque().next == u32::MAX { + if write.get_opaque().next == u32::MAX { let mut extend = relation.extend(false); - write.get_mut().get_opaque_mut().next = extend.id(); - if let Some(i) = extend.get_mut().alloc(&dummy) { + write.get_opaque_mut().next = extend.id(); + if let Some(i) = extend.alloc(&dummy) { let flag = put( - extend.get_mut().get_mut(i).expect("data corruption"), + extend.get_mut(i).expect("data corruption"), dims, &code, h0_vector, @@ -258,9 +248,9 @@ pub fn insert(relation: Relation, payload: Pointer, vector: Vec, distance_k panic!("a tuple cannot even be fit in a fresh page"); } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } } diff --git a/src/vchordrqfscan/algorithm/prewarm.rs b/src/vchordrqfscan/algorithm/prewarm.rs index ec7642a..ec8976a 100644 --- a/src/vchordrqfscan/algorithm/prewarm.rs +++ b/src/vchordrqfscan/algorithm/prewarm.rs @@ -6,7 +6,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { let mut message = String::new(); let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -22,7 +21,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -43,9 +41,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -56,7 +53,6 @@ pub fn prewarm(relation: Relation, height: i32) -> String { let mean = h1_tuple.mean[j]; let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -65,7 +61,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); @@ -84,9 +80,8 @@ pub fn prewarm(relation: Relation, height: i32) -> String { counter += 1; pgrx::check_for_interrupts!(); let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -97,7 +92,7 @@ pub fn prewarm(relation: Relation, height: i32) -> String { } } } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } writeln!(message, "number of tuples: {}", results.len()).unwrap(); diff --git a/src/vchordrqfscan/algorithm/scan.rs b/src/vchordrqfscan/algorithm/scan.rs index 202949e..a691b6c 100644 --- a/src/vchordrqfscan/algorithm/scan.rs +++ b/src/vchordrqfscan/algorithm/scan.rs @@ -20,7 +20,6 @@ pub fn scan( ) -> impl Iterator { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -41,7 +40,6 @@ pub fn scan( if is_residual { let vector_guard = relation.read(meta_tuple.mean.0); let vector_tuple = vector_guard - .get() .get(meta_tuple.mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -62,9 +60,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -92,7 +89,7 @@ pub fn scan( } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -102,7 +99,6 @@ pub fn scan( let (_, AlwaysEqual(mean), AlwaysEqual(first)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); let vector_tuple = vector_guard - .get() .get(mean.1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -138,9 +134,8 @@ pub fn scan( let mut current = list.0; while current != u32::MAX { let h0_guard = relation.read(current); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -168,7 +163,7 @@ pub fn scan( } } } - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } let mut heap = BinaryHeap::from(results); @@ -177,7 +172,7 @@ pub fn scan( while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) { let (_, AlwaysEqual(mean), AlwaysEqual(pay_u)) = heap.pop().unwrap(); let vector_guard = relation.read(mean.0); - let Some(vector_tuple) = vector_guard.get().get(mean.1) else { + let Some(vector_tuple) = vector_guard.get(mean.1) else { // fails consistency check continue; }; diff --git a/src/vchordrqfscan/algorithm/vacuum.rs b/src/vchordrqfscan/algorithm/vacuum.rs index 7bb5179..8ede4f6 100644 --- a/src/vchordrqfscan/algorithm/vacuum.rs +++ b/src/vchordrqfscan/algorithm/vacuum.rs @@ -8,7 +8,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -20,9 +19,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - let mut current = first; while current != u32::MAX { let h1_guard = relation.read(current); - for i in 1..=h1_guard.get().len() { + for i in 1..=h1_guard.len() { let h1_tuple = h1_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -33,7 +31,7 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - } } } - current = h1_guard.get().get_opaque().next; + current = h1_guard.get_opaque().next; } } results @@ -46,9 +44,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - while current != u32::MAX { delay(); let mut h0_guard = relation.write(current, false); - for i in 1..=h0_guard.get().len() { + for i in 1..=h0_guard.len() { let h0_tuple = h0_guard - .get() .get(i) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -64,7 +61,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - if flag { // todo: use mutable API let mut temp = h0_guard - .get() .get(i) .map(rkyv::from_bytes::) .expect("data corruption") @@ -76,14 +72,13 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - } let temp = rkyv::to_bytes::<_, 8192>(&temp).expect("failed to serialize"); h0_guard - .get_mut() .get_mut(i) .expect("data corruption") .copy_from_slice(&temp); } } // todo: cross-tuple vacuum so that we can skip a tuple - current = h0_guard.get().get_opaque().next; + current = h0_guard.get_opaque().next; } } } @@ -92,7 +87,6 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - let mut current = { let meta_guard = relation.read(0); let meta_tuple = meta_guard - .get() .get(1) .map(rkyv::check_archived_root::) .expect("data corruption") @@ -103,8 +97,8 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - delay(); let read = relation.read(current); let flag = 'flag: { - for i in 1..=read.get().len() { - let Some(vector_tuple) = read.get().get(i) else { + for i in 1..=read.len() { + let Some(vector_tuple) = read.get(i) else { continue; }; let vector_tuple = rkyv::check_archived_root::(vector_tuple) @@ -120,21 +114,21 @@ pub fn vacuum(relation: Relation, delay: impl Fn(), callback: impl Fn(Pointer) - if flag { drop(read); let mut write = relation.write(current, true); - for i in 1..=write.get().len() { - let Some(vector_tuple) = write.get().get(i) else { + for i in 1..=write.len() { + let Some(vector_tuple) = write.get(i) else { continue; }; let vector_tuple = rkyv::check_archived_root::(vector_tuple) .expect("data corruption"); if let Some(payload) = vector_tuple.payload.as_ref().copied() { if callback(Pointer::new(payload)) { - write.get_mut().free(i); + write.free(i); } } } - current = write.get().get_opaque().next; + current = write.get_opaque().next; } else { - current = read.get().get_opaque().next; + current = read.get_opaque().next; } } }