From 153402755f92853ecfc96498914e70cf04ee4224 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Thu, 18 Nov 2021 16:16:32 -0800 Subject: [PATCH] add optional mmap reader --- Cargo.toml | 5 ++- src/index.rs | 2 +- src/parse/error.rs | 1 + src/reader.rs | 96 +++++----------------------------------------- src/reader/io.rs | 91 +++++++++++++++++++++++++++++++++++++++++++ src/reader/mmap.rs | 51 ++++++++++++++++++++++++ 6 files changed, 157 insertions(+), 89 deletions(-) create mode 100644 src/reader/io.rs create mode 100644 src/reader/mmap.rs diff --git a/Cargo.toml b/Cargo.toml index 8d71552..3c7686f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,11 +3,14 @@ name = "rsbag" version = "0.1.0" edition = "2018" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html +[features] +default = ["mmap"] +mmap = ["memmap"] [dependencies] bytes = "1.1.0" log = "0.4.14" +memmap = { version = "0.7.0", optional = true } nom = "7.0.0" num_enum = "0.5.4" regex = "1.5.4" diff --git a/src/index.rs b/src/index.rs index 8c1ab92..eb29c3f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -112,7 +112,7 @@ impl BagIndex { }) } - pub fn read_all(mut reader: &mut R) -> Result { + pub fn read_all(reader: &mut R) -> Result { let version = reader.read_version()?; trace!("bag version: {}", version); if (version.major, version.minor) == (2, 0) { diff --git a/src/parse/error.rs b/src/parse/error.rs index c17ca61..38b1609 100644 --- a/src/parse/error.rs +++ b/src/parse/error.rs @@ -78,6 +78,7 @@ impl Error { } } +#[allow(dead_code)] pub fn error_context( mut f: F, mut p: P, diff --git a/src/reader.rs b/src/reader.rs index d749069..ebc0065 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,13 +1,18 @@ -use std::io::{self, SeekFrom}; +use std::io::SeekFrom; -use bytes::{Buf, BytesMut}; use nom::number::streaming::le_u32; use crate::error::{Error, Result}; use crate::index::{ChunkInfo, ConnInfo}; use crate::parse::{self, Header, Op, Version}; -const READ_SIZE: usize = 4096; +mod io; +#[cfg(feature = "mmap")] +mod mmap; + +pub use io::IoReader; +#[cfg(feature = "mmap")] +pub use mmap::MmapReader; pub trait BagReader { fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result @@ -38,7 +43,7 @@ pub trait BagReader { fn skip_data(&mut self) -> Result<()> { let data_length = self.read_data_length()?; - self.seek(io::SeekFrom::Current(data_length as i64))?; + self.seek(SeekFrom::Current(data_length as i64))?; Ok(()) } @@ -55,86 +60,3 @@ pub trait BagReader { ChunkInfo::from_header(header) } } - -pub struct IoReader { - read: R, - buffer: BytesMut, - consumed: usize, -} - -impl IoReader { - pub fn new(read: R) -> Self { - Self { - read, - buffer: BytesMut::with_capacity(READ_SIZE), - consumed: 0, - } - } - - fn read_more(&mut self, n: usize) -> Result<()> { - if n == 0 { - return Ok(()); - } - let old_size = self.buffer.len(); - self.buffer.resize(old_size + n, 0); - let read = self.read.read(&mut self.buffer[old_size..])?; - self.buffer.truncate(old_size + read); - if read == 0 { - return Err(Error::Eof); - } - Ok(()) - } -} - -impl BagReader for IoReader { - fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result - where - P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, - { - self.buffer.advance(self.consumed); - self.consumed = 0; - - let this = self as *mut Self; - - loop { - match parser.parse(&self.buffer) { - Ok((rest, output)) => { - self.consumed += self.buffer.len() - rest.len(); - return Ok(output); - } - Err(nom::Err::Incomplete(needed)) => { - let needed = match needed { - nom::Needed::Unknown => 0, - nom::Needed::Size(n) => n.get(), - }; - // Safety: this.buffer is only borrowed in the Ok case above, which - // immediately returns. - unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?; - } - Err(nom::Err::Error(e) | nom::Err::Failure(e)) => { - return Err(e.into()); - } - } - } - } - - fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> { - if let io::SeekFrom::Current(pos) = &mut pos { - // If seeking relative to current position, subtract data - // read from the file but not yet consumed. - let remaining = (self.buffer.len() - self.consumed) as i64; - let new_pos = *pos - remaining; - if *pos >= 0 && new_pos < 0 { - // The new position is within the already read data, just consume more data - self.consumed += *pos as usize; - return Ok(()); - } - *pos = new_pos; - } - - self.buffer.clear(); - self.consumed = 0; - self.read.seek(pos)?; - Ok(()) - } -} diff --git a/src/reader/io.rs b/src/reader/io.rs new file mode 100644 index 0000000..a7aadf7 --- /dev/null +++ b/src/reader/io.rs @@ -0,0 +1,91 @@ +use std::io; + +use bytes::{Buf, BytesMut}; + +use super::BagReader; +use crate::{parse, Error, Result}; + +const READ_SIZE: usize = 4096; + +pub struct IoReader { + read: R, + buffer: BytesMut, + consumed: usize, +} + +impl IoReader { + pub fn new(read: R) -> Self { + Self { + read, + buffer: BytesMut::with_capacity(READ_SIZE), + consumed: 0, + } + } + + fn read_more(&mut self, n: usize) -> Result<()> { + if n == 0 { + return Ok(()); + } + let old_size = self.buffer.len(); + self.buffer.resize(old_size + n, 0); + let read = self.read.read(&mut self.buffer[old_size..])?; + self.buffer.truncate(old_size + read); + if read == 0 { + return Err(Error::Eof); + } + Ok(()) + } +} + +impl BagReader for IoReader { + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result + where + P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, + { + self.buffer.advance(self.consumed); + self.consumed = 0; + + let this = self as *mut Self; + + loop { + match parser.parse(&self.buffer) { + Ok((rest, output)) => { + self.consumed += self.buffer.len() - rest.len(); + return Ok(output); + } + Err(nom::Err::Incomplete(needed)) => { + let needed = match needed { + nom::Needed::Unknown => 0, + nom::Needed::Size(n) => n.get(), + }; + // Safety: this.buffer is only borrowed in the Ok case above, which + // immediately returns. + unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?; + } + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => { + return Err(e.into()); + } + } + } + } + + fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> { + if let io::SeekFrom::Current(pos) = &mut pos { + // If seeking relative to current position, subtract data + // read from the file but not yet consumed. + let remaining = (self.buffer.len() - self.consumed) as i64; + let new_pos = *pos - remaining; + if *pos >= 0 && new_pos < 0 { + // The new position is within the already read data, just consume more data + self.consumed += *pos as usize; + return Ok(()); + } + *pos = new_pos; + } + + self.buffer.clear(); + self.consumed = 0; + self.read.seek(pos)?; + Ok(()) + } +} diff --git a/src/reader/mmap.rs b/src/reader/mmap.rs new file mode 100644 index 0000000..24c64a8 --- /dev/null +++ b/src/reader/mmap.rs @@ -0,0 +1,51 @@ +use std::{fs::File, io}; + +use memmap::Mmap; + +use super::BagReader; +use crate::{parse, Error, Result}; + +pub struct MmapReader { + mmap: Mmap, + pos: usize, +} + +impl MmapReader { + pub fn new(file: File) -> Result { + // Safety: ¯\_(ツ)_/¯ + let mmap = unsafe { Mmap::map(&file) }?; + Ok(Self { mmap, pos: 0 }) + } +} + +impl BagReader for MmapReader { + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result + where + P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, + { + let input = self.mmap.get(self.pos..).ok_or(Error::Eof)?; + match parser.parse(input) { + Ok((rest, output)) => { + self.pos += input.len() - rest.len(); + Ok(output) + } + Err(nom::Err::Incomplete(_)) => Err(Error::Eof), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into()), + } + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result<()> { + match pos { + io::SeekFrom::Start(pos) => { + self.pos = pos as usize; + } + io::SeekFrom::End(pos) => { + self.pos = (self.mmap.len() as i64 + pos) as usize; + } + io::SeekFrom::Current(pos) => { + self.pos = ((self.pos as i64) + pos) as usize; + } + } + Ok(()) + } +}