From bea68dfa8765a401c95bf80b1a87c8d95a2b55ef Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Fri, 19 Nov 2021 18:01:25 -0800 Subject: [PATCH] message data header parsing --- Cargo.toml | 4 +++ examples/bag_info.rs | 82 +++++++++++++++++++++++++------------------- src/chunk.rs | 30 ++++++++++++---- src/index.rs | 2 +- src/parse.rs | 2 +- src/reader.rs | 7 ++-- src/reader/io.rs | 6 ---- src/reader/mmap.rs | 72 ++++---------------------------------- src/reader/slice.rs | 71 ++++++++++++++++++++++++++++++++++++++ 9 files changed, 156 insertions(+), 120 deletions(-) create mode 100644 src/reader/slice.rs diff --git a/Cargo.toml b/Cargo.toml index 34931cb..2c65e32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,3 +25,7 @@ thiserror = "1.0.28" color-eyre = "0.5.11" env_logger = "0.9.0" eyre = "0.6.5" + +[profile.release] +debug = true +lto = true \ No newline at end of file diff --git a/examples/bag_info.rs b/examples/bag_info.rs index 4d739d2..cd9549a 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,14 +1,14 @@ use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; -use eyre::Context; +use eyre::{bail, Context}; use log::{error, info, trace}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ - chunk::ChunkHeader, + chunk::{ChunkHeader, MessageDataHeader}, index::{BagIndex, ConnInfo, IndexData}, - reader::{BagReader, MmapReader}, + reader::{BagReader, MmapReader, SliceReader}, Result, }; @@ -47,11 +47,13 @@ fn parse_message_definitions(conn: &ConnInfo) -> Result> { } fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { - let chunk_header = ChunkHeader::read(bag_reader, pos)?; + bag_reader.seek(io::SeekFrom::Start(pos))?; + + let chunk_header = ChunkHeader::read(bag_reader)?; let compressed_data = bag_reader.read_data()?; let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); - let mut decompresor = chunk_header.compression.decompress(compressed_data); + let mut decompresor = chunk_header.compression.decompress_stream(compressed_data); io::copy(&mut decompresor, &mut data)?; Ok(data) @@ -71,7 +73,6 @@ impl BagInfo { } self } - } fn main() -> Result<()> { @@ -86,7 +87,7 @@ fn main() -> Result<()> { let bag_path = &args[1]; let bag_file = File::open(bag_path).expect("Could not open bag file"); - let mut bag_reader = MmapReader::new(bag_file)?; + let mut bag_reader = MmapReader::map(bag_file)?; let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?; @@ -108,45 +109,54 @@ fn main() -> Result<()> { } } - let data = index + let info = index .chunks .par_iter() - .try_fold(BagInfo::default, |mut data, chunk| -> rsbag::Result<_> { + // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { + // let mut reader = bag_reader.clone(); + // let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; + // info.total_uncompressed += chunk_header.uncompressed_size as u64; + // reader.skip_data()?; + // for _ in &chunk.connections { + // let index = IndexData::read(&mut reader)?; + // *info.per_connection.entry(index.conn_id).or_insert(0) += + // index.entries.len() as u64; + // } + // Ok(info) + // }) + .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { let mut reader = bag_reader.clone(); - let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; - data.total_uncompressed += chunk_header.uncompressed_size as u64; - reader.skip_data()?; - for _ in &chunk.connections { - let index = IndexData::read(&mut reader)?; - *data.per_connection.entry(index.conn_id).or_insert(0) += - index.entries.len() as u64; + let data = read_chunk(&mut reader, chunk.pos) + .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?; + info.total_uncompressed += data.len() as u64; + + let mut chunk_reader = SliceReader::from(data); + + loop { + if chunk_reader.pos() == chunk_reader.as_ref().len() { + break; + } + let header = chunk_reader.read_header()?; + let op = header.read_op()?; + match op { + rsbag::parse::Op::MsgData => { + let header = MessageDataHeader::from_header(header)?; + let count = info.per_connection.entry(header.conn_id).or_insert(0); + *count += 1; + chunk_reader.skip_data()?; + } + rsbag::parse::Op::Connection => chunk_reader.skip_data()?, + _ => bail!("unexpected op in chunk: {:?}", op), + } } - // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; - // chunks.push(data); - Ok(data) + Ok(info) }) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) .unwrap(); - info!("bag data: {:#?}", data); - - // let total_size = index - // .chunks - // .par_iter() - // .try_fold( - // || 0u64, - // |total_size, chunk| -> Result<_> { - // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; - // Ok(total_size + data.len() as u64) - // }, - // ) - // .reduce( - // // || Ok(Vec::new()), - // || Ok(0), - // |a, b| a.and_then(|a| b.map(|b| a + b)), - // )?; + info!("bag info: {:#?}", info); Ok(()) } diff --git a/src/chunk.rs b/src/chunk.rs index 1230ee9..c9a6478 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -7,7 +7,7 @@ use eyre::bail; use crate::{ error, - parse::{Header, Op}, + parse::{Header, Op, Time}, reader::BagReader, Error, Result, }; @@ -33,7 +33,7 @@ impl FromStr for Compression { } impl Compression { - pub fn decompress<'a, R: io::Read + 'a>(self, read: R) -> Box { + pub fn decompress_stream<'a, R: io::Read + 'a>(self, read: R) -> Box { match self { Compression::None => Box::new(read), Compression::Bz2 => todo!("bz2 decompression"), @@ -49,16 +49,34 @@ pub struct ChunkHeader { } impl ChunkHeader { - pub fn read(reader: &mut R, pos: u64) -> Result { - reader.seek(SeekFrom::Start(pos))?; + pub fn read(reader: &mut R) -> Result { let header = reader.read_header_op(Op::Chunk)?; - ChunkHeader::from_header(header) + Self::from_header(header) } pub fn from_header(header: Header) -> Result { - Ok(ChunkHeader { + Ok(Self { compression: header.read_string(b"compression")?.parse()?, uncompressed_size: header.read_u32(b"size")?, }) } } + +pub struct MessageDataHeader { + pub conn_id: u32, + pub time: Time, +} + +impl MessageDataHeader { + pub fn read(reader: &mut R) -> Result { + let header = reader.read_header_op(Op::MsgData)?; + Self::from_header(header) + } + + pub fn from_header(header: Header) -> Result { + Ok(Self { + conn_id: header.read_u32(b"conn")?, + time: header.read_time(b"time")?, + }) + } +} diff --git a/src/index.rs b/src/index.rs index 89d8712..b789ee1 100644 --- a/src/index.rs +++ b/src/index.rs @@ -6,7 +6,7 @@ use nom::{number::streaming::le_u32, sequence::tuple, Parser}; use crate::{ error, - parse::{self, header::Time, Header, Op}, + parse::{self, Header, Op, Time}, reader::BagReader, Result, }; diff --git a/src/parse.rs b/src/parse.rs index e935464..f4a4e6b 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -3,7 +3,7 @@ pub mod header; mod version; pub use error::{Error, ErrorKind}; -pub use header::{Header, Op}; +pub use header::{Header, Op, Time}; pub use version::Version; pub type Input<'a> = &'a [u8]; diff --git a/src/reader.rs b/src/reader.rs index ed016fe..d8a7b45 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -10,16 +10,13 @@ pub mod error; mod io; #[cfg(feature = "mmap")] mod mmap; +mod slice; -pub use self::io::IoReader; #[cfg(feature = "mmap")] pub use self::mmap::MmapReader; +pub use self::{io::IoReader, slice::SliceReader}; pub trait BagReader { - type Read: std::io::Read; - - fn as_read(&mut self) -> &mut Self::Read; - fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>; diff --git a/src/reader/io.rs b/src/reader/io.rs index 0b7900c..70a1f5c 100644 --- a/src/reader/io.rs +++ b/src/reader/io.rs @@ -38,12 +38,6 @@ impl IoReader { } impl BagReader for IoReader { - type Read = R; - - fn as_read(&mut self) -> &mut Self::Read { - &mut self.read - } - fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, diff --git a/src/reader/mmap.rs b/src/reader/mmap.rs index b7aea93..28fa50b 100644 --- a/src/reader/mmap.rs +++ b/src/reader/mmap.rs @@ -1,75 +1,17 @@ -use std::{fs::File, io, sync::Arc}; +use std::{fs::File, sync::Arc}; -use eyre::bail; use memmap::Mmap; -use super::{error::UnexpectedEof, BagReader}; -use crate::{parse, Result}; +use super::SliceReader; +use crate::Result; -#[derive(Clone)] -pub struct MmapReader { - mmap: Arc, - pos: usize, -} +pub type MmapReader = SliceReader>; impl MmapReader { - pub fn new(file: File) -> Result { + pub fn map(file: File) -> Result { // Safety: ¯\_(ツ)_/¯ let mmap = unsafe { Mmap::map(&file) }?; - let mmap = Arc::new(mmap); - Ok(Self { mmap, pos: 0 }) - } -} - -impl io::Read for MmapReader { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - if let Some(slice) = self - .mmap - .get(self.pos..) - .and_then(|unread| unread.get(..buf.len())) - { - buf.copy_from_slice(slice); - Ok(slice.len()) - } else { - Ok(0) - } - } -} - -impl BagReader for MmapReader { - type Read = Self; - - fn as_read(&mut self) -> &mut Self::Read { - self - } - - fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result - where - P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, - { - let input = self.mmap.get(self.pos..).ok_or(UnexpectedEof)?; - match parser.parse(input) { - Ok((rest, output)) => { - self.pos += input.len() - rest.len(); - Ok(output) - } - Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof), - Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), - } - } - - fn seek(&mut self, pos: io::SeekFrom) -> Result<()> { - match pos { - io::SeekFrom::Start(pos) => { - self.pos = pos as usize; - } - io::SeekFrom::End(pos) => { - self.pos = (self.mmap.len() as i64 + pos) as usize; - } - io::SeekFrom::Current(pos) => { - self.pos = ((self.pos as i64) + pos) as usize; - } - } - Ok(()) + let mmap = Arc::new(mmap); + Ok(SliceReader::from(mmap)) } } diff --git a/src/reader/slice.rs b/src/reader/slice.rs new file mode 100644 index 0000000..1fca9b0 --- /dev/null +++ b/src/reader/slice.rs @@ -0,0 +1,71 @@ +use std::{io, ops::Deref}; + +use eyre::bail; + +use super::{error::UnexpectedEof, BagReader}; +use crate::{parse, Result}; + +#[derive(Clone)] +pub struct SliceReader { + slice: T, + pos: usize, +} + +impl AsRef for SliceReader { + fn as_ref(&self) -> &T { + &self.slice + } +} + +impl SliceReader { + pub fn into_inner(self) -> T { + self.slice + } + + pub fn pos(&self) -> usize { + self.pos + } +} + +impl From for SliceReader { + fn from(slice: T) -> Self { + Self { slice, pos: 0 } + } +} + +impl BagReader for SliceReader +where + T: Deref, + U: AsRef<[u8]> + ?Sized + 'static, +{ + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result + where + P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, + { + let slice = self.slice.deref().as_ref(); + let input = slice.get(self.pos..).ok_or(UnexpectedEof)?; + match parser.parse(input) { + Ok((rest, output)) => { + self.pos += input.len() - rest.len(); + Ok(output) + } + Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), + } + } + + fn seek(&mut self, pos: io::SeekFrom) -> Result<()> { + match pos { + io::SeekFrom::Start(pos) => { + self.pos = pos as usize; + } + io::SeekFrom::End(pos) => { + self.pos = (self.slice.as_ref().len() as i64 + pos) as usize; + } + io::SeekFrom::Current(pos) => { + self.pos = ((self.pos as i64) + pos) as usize; + } + } + Ok(()) + } +}