diff --git a/Cargo.toml b/Cargo.toml index 3c7686f..52cbe8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,15 +4,17 @@ version = "0.1.0" edition = "2018" [features] -default = ["mmap"] +default = ["mmap", "rayon"] mmap = ["memmap"] [dependencies] bytes = "1.1.0" log = "0.4.14" +lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] } memmap = { version = "0.7.0", optional = true } nom = "7.0.0" num_enum = "0.5.4" +rayon = { version = "1.5.1", optional = true } regex = "1.5.4" ros_message = "0.1.0" smallvec = "1.6.1" diff --git a/examples/bag_info.rs b/examples/bag_info.rs index b757cfe..6fd1af1 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,6 +1,7 @@ -use std::{convert::TryFrom, env::args, fs::File}; +use std::{convert::TryFrom, env::args, fs::File, io}; use log::{error, info, trace}; +use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ @@ -42,6 +43,17 @@ fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result> { Ok(msgs) } +fn read_chunk(bag_reader: &mut R, pos: u64) -> rsbag::Result> { + let chunk_header = bag_reader.read_chunk_header(pos)?; + + let compressed_data = bag_reader.read_data().unwrap(); + let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); + let mut decompresor = chunk_header.compression.decompress(compressed_data); + io::copy(&mut decompresor, &mut data).unwrap(); + + Ok(data) +} + fn main() { env_logger::init(); @@ -67,7 +79,7 @@ fn main() { match parse_message_definitions(conn) { Ok(msgs) => { for msg in &msgs { - info!( + trace!( "message definition parsed: {:#?}", msg.fields() .iter() @@ -81,12 +93,42 @@ fn main() { } } - let mut total_size = 0; + // let mut total_size = 0; - for chunk in &index.chunks { - let chunk_header = bag_reader.read_chunk_header(chunk.pos).unwrap(); - total_size += chunk_header.uncompressed_size; - } + // let total_size = index + // .chunks + // .par_iter() + // .try_fold( + // || 0u64, + // |total_size, chunk| -> rsbag::Result<_> { + // let chunk_header = bag_reader.clone().read_chunk_header(chunk.pos)?; + + // // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; + // // chunks.push(data); + // Ok(total_size + chunk_header.uncompressed_size as u64) + // }, + // ) + // .reduce( + // // || Ok(Vec::new()), + // || Ok(0), + // |a, b| a.and_then(|a| b.map(|b| a + b)), + // ).unwrap(); + + let total_size = index + .chunks + .par_iter() + .try_fold( + || 0u64, + |total_size, chunk| -> rsbag::Result<_> { + let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; + Ok(total_size + data.len() as u64) + }, + ) + .reduce( + // || Ok(Vec::new()), + || Ok(0), + |a, b| a.and_then(|a| b.map(|b| a + b)), + ).unwrap(); info!("total uncompressed size: {}", total_size); } diff --git a/src/chunk.rs b/src/chunk.rs index c141226..7891614 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,4 +1,4 @@ -use std::str::FromStr; +use std::{io, str::FromStr}; use crate::{parse::Header, Error, Result}; @@ -22,6 +22,16 @@ impl FromStr for Compression { } } +impl Compression { + pub fn decompress<'a, R: io::Read + 'a>(self, read: R) -> Box { + match self { + Compression::None => Box::new(read), + Compression::Bz2 => todo!("bz2 decompression"), + Compression::Lz4 => Box::new(lz4_flex::frame::FrameDecoder::new(read)), + } + } +} + #[derive(Debug)] pub struct ChunkHeader { pub compression: Compression, diff --git a/src/reader.rs b/src/reader.rs index 5652fe1..cc3faf8 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -1,5 +1,6 @@ use std::io::SeekFrom; +use nom::multi::length_data; use nom::number::streaming::le_u32; use crate::chunk::ChunkHeader; @@ -16,6 +17,10 @@ pub use io::IoReader; pub use mmap::MmapReader; pub trait BagReader { + type Read: std::io::Read; + + fn as_read(&mut self) -> &mut Self::Read; + fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>; @@ -48,6 +53,10 @@ pub trait BagReader { Ok(()) } + fn read_data(&mut self) -> Result<&[u8]> { + self.read_parser(length_data(le_u32)) + } + fn read_conn_info(&mut self) -> Result { let record_header = self.read_header_op(Op::Connection)?; let conn_header = self.read_header()?; diff --git a/src/reader/io.rs b/src/reader/io.rs index a7aadf7..5095aec 100644 --- a/src/reader/io.rs +++ b/src/reader/io.rs @@ -38,6 +38,12 @@ impl IoReader { } impl BagReader for IoReader { + type Read = R; + + fn as_read(&mut self) -> &mut Self::Read { + &mut self.read + } + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, diff --git a/src/reader/mmap.rs b/src/reader/mmap.rs index 24c64a8..49f0c50 100644 --- a/src/reader/mmap.rs +++ b/src/reader/mmap.rs @@ -1,12 +1,13 @@ -use std::{fs::File, io}; +use std::{fs::File, io, sync::Arc}; use memmap::Mmap; use super::BagReader; use crate::{parse, Error, Result}; +#[derive(Clone)] pub struct MmapReader { - mmap: Mmap, + mmap: Arc, pos: usize, } @@ -14,11 +15,33 @@ impl MmapReader { pub fn new(file: File) -> Result { // Safety: ¯\_(ツ)_/¯ let mmap = unsafe { Mmap::map(&file) }?; + let mmap = Arc::new(mmap); Ok(Self { mmap, pos: 0 }) } } +impl io::Read for MmapReader { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if let Some(slice) = self + .mmap + .get(self.pos..) + .and_then(|unread| unread.get(..buf.len())) + { + buf.copy_from_slice(slice); + Ok(slice.len()) + } else { + Ok(0) + } + } +} + impl BagReader for MmapReader { + type Read = Self; + + fn as_read(&mut self) -> &mut Self::Read { + self + } + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,