add optional mmap reader

This commit is contained in:
Alex Mikhalev 2021-11-18 16:16:32 -08:00
parent 5bff865da4
commit 153402755f
6 changed files with 157 additions and 89 deletions

View File

@ -3,11 +3,14 @@ name = "rsbag"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["mmap"]
mmap = ["memmap"]
[dependencies]
bytes = "1.1.0"
log = "0.4.14"
memmap = { version = "0.7.0", optional = true }
nom = "7.0.0"
num_enum = "0.5.4"
regex = "1.5.4"

View File

@ -112,7 +112,7 @@ impl BagIndex {
})
}
pub fn read_all<R: BagReader>(mut reader: &mut R) -> Result<Self> {
pub fn read_all<R: BagReader>(reader: &mut R) -> Result<Self> {
let version = reader.read_version()?;
trace!("bag version: {}", version);
if (version.major, version.minor) == (2, 0) {

View File

@ -78,6 +78,7 @@ impl<I: ErrorInput> Error<I> {
}
}
#[allow(dead_code)]
pub fn error_context<I: ErrorInput + Clone, F, P, O>(
mut f: F,
mut p: P,

View File

@ -1,13 +1,18 @@
use std::io::{self, SeekFrom};
use std::io::SeekFrom;
use bytes::{Buf, BytesMut};
use nom::number::streaming::le_u32;
use crate::error::{Error, Result};
use crate::index::{ChunkInfo, ConnInfo};
use crate::parse::{self, Header, Op, Version};
const READ_SIZE: usize = 4096;
mod io;
#[cfg(feature = "mmap")]
mod mmap;
pub use io::IoReader;
#[cfg(feature = "mmap")]
pub use mmap::MmapReader;
pub trait BagReader {
fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result<O>
@ -38,7 +43,7 @@ pub trait BagReader {
fn skip_data(&mut self) -> Result<()> {
let data_length = self.read_data_length()?;
self.seek(io::SeekFrom::Current(data_length as i64))?;
self.seek(SeekFrom::Current(data_length as i64))?;
Ok(())
}
@ -55,86 +60,3 @@ pub trait BagReader {
ChunkInfo::from_header(header)
}
}
pub struct IoReader<R> {
read: R,
buffer: BytesMut,
consumed: usize,
}
impl<R: io::Read + io::Seek> IoReader<R> {
pub fn new(read: R) -> Self {
Self {
read,
buffer: BytesMut::with_capacity(READ_SIZE),
consumed: 0,
}
}
fn read_more(&mut self, n: usize) -> Result<()> {
if n == 0 {
return Ok(());
}
let old_size = self.buffer.len();
self.buffer.resize(old_size + n, 0);
let read = self.read.read(&mut self.buffer[old_size..])?;
self.buffer.truncate(old_size + read);
if read == 0 {
return Err(Error::Eof);
}
Ok(())
}
}
impl<R: io::Read + io::Seek> BagReader for IoReader<R> {
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
self.buffer.advance(self.consumed);
self.consumed = 0;
let this = self as *mut Self;
loop {
match parser.parse(&self.buffer) {
Ok((rest, output)) => {
self.consumed += self.buffer.len() - rest.len();
return Ok(output);
}
Err(nom::Err::Incomplete(needed)) => {
let needed = match needed {
nom::Needed::Unknown => 0,
nom::Needed::Size(n) => n.get(),
};
// Safety: this.buffer is only borrowed in the Ok case above, which
// immediately returns.
unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?;
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
return Err(e.into());
}
}
}
}
fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> {
if let io::SeekFrom::Current(pos) = &mut pos {
// If seeking relative to current position, subtract data
// read from the file but not yet consumed.
let remaining = (self.buffer.len() - self.consumed) as i64;
let new_pos = *pos - remaining;
if *pos >= 0 && new_pos < 0 {
// The new position is within the already read data, just consume more data
self.consumed += *pos as usize;
return Ok(());
}
*pos = new_pos;
}
self.buffer.clear();
self.consumed = 0;
self.read.seek(pos)?;
Ok(())
}
}

91
src/reader/io.rs Normal file
View File

@ -0,0 +1,91 @@
use std::io;
use bytes::{Buf, BytesMut};
use super::BagReader;
use crate::{parse, Error, Result};
const READ_SIZE: usize = 4096;
pub struct IoReader<R> {
read: R,
buffer: BytesMut,
consumed: usize,
}
impl<R: io::Read + io::Seek> IoReader<R> {
pub fn new(read: R) -> Self {
Self {
read,
buffer: BytesMut::with_capacity(READ_SIZE),
consumed: 0,
}
}
fn read_more(&mut self, n: usize) -> Result<()> {
if n == 0 {
return Ok(());
}
let old_size = self.buffer.len();
self.buffer.resize(old_size + n, 0);
let read = self.read.read(&mut self.buffer[old_size..])?;
self.buffer.truncate(old_size + read);
if read == 0 {
return Err(Error::Eof);
}
Ok(())
}
}
impl<R: io::Read + io::Seek> BagReader for IoReader<R> {
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
self.buffer.advance(self.consumed);
self.consumed = 0;
let this = self as *mut Self;
loop {
match parser.parse(&self.buffer) {
Ok((rest, output)) => {
self.consumed += self.buffer.len() - rest.len();
return Ok(output);
}
Err(nom::Err::Incomplete(needed)) => {
let needed = match needed {
nom::Needed::Unknown => 0,
nom::Needed::Size(n) => n.get(),
};
// Safety: this.buffer is only borrowed in the Ok case above, which
// immediately returns.
unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?;
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
return Err(e.into());
}
}
}
}
fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> {
if let io::SeekFrom::Current(pos) = &mut pos {
// If seeking relative to current position, subtract data
// read from the file but not yet consumed.
let remaining = (self.buffer.len() - self.consumed) as i64;
let new_pos = *pos - remaining;
if *pos >= 0 && new_pos < 0 {
// The new position is within the already read data, just consume more data
self.consumed += *pos as usize;
return Ok(());
}
*pos = new_pos;
}
self.buffer.clear();
self.consumed = 0;
self.read.seek(pos)?;
Ok(())
}
}

51
src/reader/mmap.rs Normal file
View File

@ -0,0 +1,51 @@
use std::{fs::File, io};
use memmap::Mmap;
use super::BagReader;
use crate::{parse, Error, Result};
pub struct MmapReader {
mmap: Mmap,
pos: usize,
}
impl MmapReader {
pub fn new(file: File) -> Result<Self> {
// Safety: ¯\_(ツ)_/¯
let mmap = unsafe { Mmap::map(&file) }?;
Ok(Self { mmap, pos: 0 })
}
}
impl BagReader for MmapReader {
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
let input = self.mmap.get(self.pos..).ok_or(Error::Eof)?;
match parser.parse(input) {
Ok((rest, output)) => {
self.pos += input.len() - rest.len();
Ok(output)
}
Err(nom::Err::Incomplete(_)) => Err(Error::Eof),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into()),
}
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<()> {
match pos {
io::SeekFrom::Start(pos) => {
self.pos = pos as usize;
}
io::SeekFrom::End(pos) => {
self.pos = (self.mmap.len() as i64 + pos) as usize;
}
io::SeekFrom::Current(pos) => {
self.pos = ((self.pos as i64) + pos) as usize;
}
}
Ok(())
}
}