From ee2c1a452e7e338649d3a7348dc5b8c59e911262 Mon Sep 17 00:00:00 2001 From: Sven Cattell Date: Mon, 4 Dec 2023 17:06:28 -0500 Subject: [PATCH] Removed memmap dependency --- arrow-ipc/Cargo.toml | 4 +--- arrow-ipc/src/reader.rs | 53 +++++++++++++++++++---------------------- 2 files changed, 25 insertions(+), 32 deletions(-) diff --git a/arrow-ipc/Cargo.toml b/arrow-ipc/Cargo.toml index 40708e4d4818..83ad044d25e7 100644 --- a/arrow-ipc/Cargo.toml +++ b/arrow-ipc/Cargo.toml @@ -42,12 +42,10 @@ arrow-schema = { workspace = true } flatbuffers = { version = "23.1.21", default-features = false } lz4_flex = { version = "0.11", default-features = false, features = ["std", "frame"], optional = true } zstd = { version = "0.13.0", default-features = false, optional = true } -memmap2 = { version = "0.9.0", optional = true } [features] -default = ["memmap"] +default = [] lz4 = ["lz4_flex"] -memmap = ["dep:memmap2"] [dev-dependencies] tempfile = "3.3" diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs index 0d50a1ae0b55..88ab5bdc8afc 100644 --- a/arrow-ipc/src/reader.rs +++ b/arrow-ipc/src/reader.rs @@ -20,15 +20,14 @@ //! The `FileReader` and `StreamReader` have similar interfaces, //! however the `FileReader` expects a reader that supports `Seek`ing +use arrow_buffer::alloc::Allocation; use flatbuffers::VectorIter; use std::collections::HashMap; use std::fmt; use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::ops::Deref; use std::sync::Arc; -#[cfg(feature = "memmap")] -use memmap2::Mmap; - use arrow_array::*; use arrow_buffer::{Buffer, MutableBuffer}; use arrow_data::ArrayData; @@ -1032,9 +1031,9 @@ impl RecordBatchReader for StreamReader { /// Arrow Memmap reader. It does not copy the underlying buffers. #[cfg(feature = "memmap")] -pub struct MemmapReader { +pub struct BufferReader { /// The memmap we're reading from - memmap: Arc, + buffer: Arc, /// The schema that is read from the file header schema: SchemaRef, @@ -1066,7 +1065,7 @@ pub struct MemmapReader { } #[cfg(feature = "memmap")] -impl fmt::Debug for MemmapReader { +impl fmt::Debug for BufferReader { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { f.debug_struct("FileReader") .field("reader", &"BufReader<..>") @@ -1082,14 +1081,13 @@ impl fmt::Debug for MemmapReader { } #[cfg(feature = "memmap")] -impl MemmapReader { +impl + Allocation + 'static> BufferReader { /// Try to create a new file reader /// /// Returns errors if the file does not meet the Arrow Format header and footer /// requirements - pub fn try_new(memmap: Mmap, projection: Option>) -> Result { - let memmap = Arc::new(memmap); - let mut reader = std::io::Cursor::new(&memmap[..]); + pub fn try_new(buffer: Arc, projection: Option>) -> Result { + let mut reader = std::io::Cursor::new(&buffer[..]); let mut magic_buffer: [u8; 6] = [0; 6]; reader.read_exact(&mut magic_buffer)?; @@ -1169,13 +1167,13 @@ impl MemmapReader { let len = message.bodyLength() as usize; assert_ne!(len, 0); - let buf_slice = &memmap[start..(start + len)]; + let buf_slice = &buffer[start..(start + len)]; // Safety: The memmap is allocated and held. We've just checked that it's of the correct lenght. let buf = unsafe { Buffer::from_custom_allocation( buf_slice.get(0).expect("Length is non-zero").into(), len, - memmap.clone(), + buffer.clone(), ) }; @@ -1204,7 +1202,7 @@ impl MemmapReader { }; Ok(Self { - memmap, + buffer, schema: Arc::new(schema), blocks: blocks.iter().copied().collect(), current_block: 0, @@ -1250,7 +1248,7 @@ impl MemmapReader { let block = self.blocks[self.current_block]; self.current_block += 1; - let mut reader = std::io::Cursor::new(&self.memmap[..]); + let mut reader = std::io::Cursor::new(&self.buffer[..]); // read length reader.seek(SeekFrom::Start(block.offset() as u64))?; let mut meta_buf = [0; 4]; @@ -1286,7 +1284,7 @@ impl MemmapReader { })?; // read the block that makes up the record batch into a buffer let mut buf_slice = - &self.memmap[block.offset() as usize + block.metaDataLength() as usize..]; + &self.buffer[block.offset() as usize + block.metaDataLength() as usize..]; let len = message.bodyLength() as usize; buf_slice = &buf_slice[..len]; assert_ne!(0, len); @@ -1296,7 +1294,7 @@ impl MemmapReader { Buffer::from_custom_allocation( buf_slice.get(0).expect("Length is non-zero").into(), len, - self.memmap.clone(), + self.buffer.clone(), ) }; @@ -1327,16 +1325,16 @@ impl MemmapReader { } } - /// Gets a reference to the underlying reader. + /// Gets a reference to the underlying buffer. /// /// It is inadvisable to directly read from the underlying reader. - pub fn get_ref(&self) -> &Mmap { - &self.memmap + pub fn get_ref(&self) -> &Arc { + &self.buffer } } #[cfg(feature = "memmap")] -impl Iterator for MemmapReader { +impl + Allocation + 'static> Iterator for BufferReader { type Item = Result; fn next(&mut self) -> Option { @@ -1350,7 +1348,7 @@ impl Iterator for MemmapReader { } #[cfg(feature = "memmap")] -impl RecordBatchReader for MemmapReader { +impl + Allocation + 'static> RecordBatchReader for BufferReader { fn schema(&self) -> SchemaRef { self.schema.clone() } @@ -1549,19 +1547,17 @@ mod tests { let batch = create_test_projection_batch_data(&schema); // write record batch in IPC format - let mut file = tempfile::tempfile().unwrap(); + let mut buf = Vec::new(); { - let mut writer = crate::writer::FileWriter::try_new(&mut file, &schema).unwrap(); + let mut writer = crate::writer::FileWriter::try_new(&mut buf, &schema).unwrap(); writer.write(&batch).unwrap(); writer.finish().unwrap(); } - - file.rewind().unwrap(); + let memmap = Arc::new(buf); // read record batch with projection for index in 0..12 { let projection = vec![index]; - let reader = - MemmapReader::try_new(unsafe { Mmap::map(&file).unwrap() }, Some(projection)); + let reader = BufferReader::try_new(memmap.clone(), Some(projection)); let read_batch = reader.unwrap().next().unwrap().unwrap(); let projected_column = read_batch.column(0); let expected_column = batch.column(index); @@ -1572,8 +1568,7 @@ mod tests { { // read record batch with reversed projection - let reader = - MemmapReader::try_new(unsafe { Mmap::map(&file).unwrap() }, Some(vec![3, 2, 1])); + let reader = BufferReader::try_new(memmap, Some(vec![3, 2, 1])); let read_batch = reader.unwrap().next().unwrap().unwrap(); let expected_batch = batch.project(&[3, 2, 1]).unwrap();