Skip to content

Commit

Permalink
feat: prefetch
Browse files Browse the repository at this point in the history
Signed-off-by: usamoi <usamoi@outlook.com>
  • Loading branch information
usamoi committed Nov 7, 2024
1 parent b80eba5 commit b4841ba
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 1 deletion.
50 changes: 49 additions & 1 deletion src/algorithm/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pub fn scan(
distance_kind: DistanceKind,
nprobe_2: u32,
nprobe_1: u32,
prefetch: u32,
) -> impl Iterator<Item = (Distance, Pointer)> {
assert!(nprobe_1 >= 1);
let meta_guard = relation.read(0);
Expand Down Expand Up @@ -237,7 +238,16 @@ pub fn scan(
current = h0_guard.get().get_opaque().next;
}
}
let mut heap = BinaryHeap::from(results);
let mut heap = Prefetcher::new(
BinaryHeap::from(results),
{
let relation = relation.clone();
move |x| {
relation.prefetch(x.1 .0 .0);
}
},
prefetch,
);
let mut cache = BinaryHeap::<(Reverse<Distance>, _)>::new();
std::iter::from_fn(move || {
while !heap.is_empty() && heap.peek().map(|x| x.0) > cache.peek().map(|x| x.0) {
Expand All @@ -261,3 +271,41 @@ pub fn scan(
})
}
}

pub struct Prefetcher<T, F> {
heap: BinaryHeap<T>,
fetch: BinaryHeap<T>,
f: F,
prefetch: u32,
}

impl<T: Ord, F: Fn(&T)> Prefetcher<T, F> {
pub fn new(heap: BinaryHeap<T>, f: F, prefetch: u32) -> Self {
Self {
heap,
fetch: BinaryHeap::new(),
f,
prefetch,
}
}
pub fn peek(&mut self) -> Option<&T> {
if let Some(x) = self.fetch.peek() {
return Some(x);
}
if let Some(x) = self.heap.peek() {
return Some(x);
}
None
}
pub fn pop(&mut self) -> Option<T> {
while self.fetch.len() <= self.prefetch as _ && !self.heap.is_empty() {
let x = self.heap.pop().unwrap();
(self.f)(&x);
self.fetch.push(x);
}
self.fetch.pop()
}
pub fn is_empty(&self) -> bool {
self.heap.is_empty() && self.fetch.is_empty()
}
}
15 changes: 15 additions & 0 deletions src/gucs/executing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use pgrx::guc::{GucContext, GucFlags, GucRegistry, GucSetting};

static NPROBE_2: GucSetting<i32> = GucSetting::<i32>::new(1);
static NPROBE_1: GucSetting<i32> = GucSetting::<i32>::new(10);
static PREFETCH: GucSetting<i32> = GucSetting::<i32>::new(0);

pub unsafe fn init() {
GucRegistry::define_int_guc(
Expand All @@ -24,6 +25,16 @@ pub unsafe fn init() {
GucContext::Userset,
GucFlags::default(),
);
GucRegistry::define_int_guc(
"rabbithole.prefetch",
"`prefetch` argument of rabbithole.",
"`prefetch` argument of rabbithole.",
&PREFETCH,
0,
u16::MAX as _,
GucContext::Userset,
GucFlags::default(),
);
}

pub fn nprobe_2() -> u32 {
Expand All @@ -33,3 +44,7 @@ pub fn nprobe_2() -> u32 {
pub fn nprobe_1() -> u32 {
NPROBE_1.get() as u32
}

pub fn prefetch() -> u32 {
PREFETCH.get() as u32
}
2 changes: 2 additions & 0 deletions src/index/am_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::am_options::Opfamily;
use crate::algorithm::scan::scan;
use crate::gucs::executing::nprobe_1;
use crate::gucs::executing::nprobe_2;
use crate::gucs::executing::prefetch;
use crate::postgres::Relation;
use base::distance::Distance;
use base::search::*;
Expand Down Expand Up @@ -84,6 +85,7 @@ pub fn scan_next(scanner: &mut Scanner, relation: Relation) -> Option<(Pointer,
opfamily.distance_kind(),
nprobe_2(),
nprobe_1(),
prefetch(),
);
*scanner = Scanner::Vbase {
vbase: Box::new(vbase),
Expand Down
5 changes: 5 additions & 0 deletions src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,4 +326,9 @@ impl Relation {
}
}
}
pub fn prefetch(&self, id: u32) {
unsafe {
pgrx::pg_sys::PrefetchBuffer(self.raw, pgrx::pg_sys::ForkNumber::MAIN_FORKNUM, id);
}
}
}

0 comments on commit b4841ba

Please sign in to comment.