Skip to content

Commit

Permalink
Removed memmap dependency
Browse files Browse the repository at this point in the history
  • Loading branch information
Sven Cattell committed Dec 4, 2023
1 parent 8da39d8 commit ee2c1a4
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 32 deletions.
4 changes: 1 addition & 3 deletions arrow-ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ arrow-schema = { workspace = true }
flatbuffers = { version = "23.1.21", default-features = false }
lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true }
zstd = { version = "0.13.0", default-features = false, optional = true }
memmap2 = { version = "0.9.0", optional = true }

[features]
default = ["memmap"]
default = []
lz4 = ["lz4_flex"]
memmap = ["dep:memmap2"]

[dev-dependencies]
tempfile = "3.3"
53 changes: 24 additions & 29 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing
use arrow_buffer::alloc::Allocation;
use flatbuffers::VectorIter;
use std::collections::HashMap;
use std::fmt;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::ops::Deref;
use std::sync::Arc;

#[cfg(feature = "memmap")]
use memmap2::Mmap;

use arrow_array::*;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayData;
Expand Down Expand Up @@ -1032,9 +1031,9 @@ impl<R: Read> RecordBatchReader for StreamReader<R> {

/// Arrow Memmap reader. It does not copy the underlying buffers.
#[cfg(feature = "memmap")]
pub struct MemmapReader {
pub struct BufferReader<A> {
/// The memmap we're reading from
memmap: Arc<Mmap>,
buffer: Arc<A>,

/// The schema that is read from the file header
schema: SchemaRef,
Expand Down Expand Up @@ -1066,7 +1065,7 @@ pub struct MemmapReader {
}

#[cfg(feature = "memmap")]
impl fmt::Debug for MemmapReader {
impl<A> fmt::Debug for BufferReader<A> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("reader", &"BufReader<..>")
Expand All @@ -1082,14 +1081,13 @@ impl fmt::Debug for MemmapReader {
}

#[cfg(feature = "memmap")]
impl MemmapReader {
impl<A: Deref<Target = [u8]> + Allocation + 'static> BufferReader<A> {
/// Try to create a new file reader
///
/// Returns errors if the file does not meet the Arrow Format header and footer
/// requirements
pub fn try_new(memmap: Mmap, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let memmap = Arc::new(memmap);
let mut reader = std::io::Cursor::new(&memmap[..]);
pub fn try_new(buffer: Arc<A>, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let mut reader = std::io::Cursor::new(&buffer[..]);

let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
Expand Down Expand Up @@ -1169,13 +1167,13 @@ impl MemmapReader {
let len = message.bodyLength() as usize;
assert_ne!(len, 0);

let buf_slice = &memmap[start..(start + len)];
let buf_slice = &buffer[start..(start + len)];
// Safety: The memmap is allocated and held. We've just checked that it's of the correct lenght.
let buf = unsafe {
Buffer::from_custom_allocation(
buf_slice.get(0).expect("Length is non-zero").into(),
len,
memmap.clone(),
buffer.clone(),
)
};

Expand Down Expand Up @@ -1204,7 +1202,7 @@ impl MemmapReader {
};

Ok(Self {
memmap,
buffer,
schema: Arc::new(schema),
blocks: blocks.iter().copied().collect(),
current_block: 0,
Expand Down Expand Up @@ -1250,7 +1248,7 @@ impl MemmapReader {
let block = self.blocks[self.current_block];
self.current_block += 1;

let mut reader = std::io::Cursor::new(&self.memmap[..]);
let mut reader = std::io::Cursor::new(&self.buffer[..]);
// read length
reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
Expand Down Expand Up @@ -1286,7 +1284,7 @@ impl MemmapReader {
})?;
// read the block that makes up the record batch into a buffer
let mut buf_slice =
&self.memmap[block.offset() as usize + block.metaDataLength() as usize..];
&self.buffer[block.offset() as usize + block.metaDataLength() as usize..];
let len = message.bodyLength() as usize;
buf_slice = &buf_slice[..len];
assert_ne!(0, len);
Expand All @@ -1296,7 +1294,7 @@ impl MemmapReader {
Buffer::from_custom_allocation(
buf_slice.get(0).expect("Length is non-zero").into(),
len,
self.memmap.clone(),
self.buffer.clone(),
)
};

Expand Down Expand Up @@ -1327,16 +1325,16 @@ impl MemmapReader {
}
}

/// Gets a reference to the underlying reader.
/// Gets a reference to the underlying buffer.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &Mmap {
&self.memmap
pub fn get_ref(&self) -> &Arc<A> {
&self.buffer
}
}

#[cfg(feature = "memmap")]
impl Iterator for MemmapReader {
impl<A: Deref<Target = [u8]> + Allocation + 'static> Iterator for BufferReader<A> {
type Item = Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -1350,7 +1348,7 @@ impl Iterator for MemmapReader {
}

#[cfg(feature = "memmap")]
impl RecordBatchReader for MemmapReader {
impl<A: Deref<Target = [u8]> + Allocation + 'static> RecordBatchReader for BufferReader<A> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
Expand Down Expand Up @@ -1549,19 +1547,17 @@ mod tests {
let batch = create_test_projection_batch_data(&schema);

// write record batch in IPC format
let mut file = tempfile::tempfile().unwrap();
let mut buf = Vec::new();
{
let mut writer = crate::writer::FileWriter::try_new(&mut file, &schema).unwrap();
let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}

file.rewind().unwrap();
let memmap = Arc::new(buf);
// read record batch with projection
for index in 0..12 {
let projection = vec![index];
let reader =
MemmapReader::try_new(unsafe { Mmap::map(&file).unwrap() }, Some(projection));
let reader = BufferReader::try_new(memmap.clone(), Some(projection));
let read_batch = reader.unwrap().next().unwrap().unwrap();
let projected_column = read_batch.column(0);
let expected_column = batch.column(index);
Expand All @@ -1572,8 +1568,7 @@ mod tests {

{
// read record batch with reversed projection
let reader =
MemmapReader::try_new(unsafe { Mmap::map(&file).unwrap() }, Some(vec![3, 2, 1]));
let reader = BufferReader::try_new(memmap, Some(vec![3, 2, 1]));

let read_batch = reader.unwrap().next().unwrap().unwrap();
let expected_batch = batch.project(&[3, 2, 1]).unwrap();
Expand Down

0 comments on commit ee2c1a4

Please sign in to comment.