diff --git a/examples/bag_info.rs b/examples/bag_info.rs index cd9549a..7bd46a0 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; +use std::{collections::HashMap, convert::TryFrom, env::args, fs::File}; use eyre::{bail, Context}; use log::{error, info, trace}; @@ -6,8 +6,8 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ - chunk::{ChunkHeader, MessageDataHeader}, - index::{BagIndex, ConnInfo, IndexData}, + chunk::{read_chunk, MessageDataHeader}, + index::{BagIndex, ConnInfo}, reader::{BagReader, MmapReader, SliceReader}, Result, }; @@ -46,19 +46,6 @@ fn parse_message_definitions(conn: &ConnInfo) -> Result> { Ok(msgs) } -fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { - bag_reader.seek(io::SeekFrom::Start(pos))?; - - let chunk_header = ChunkHeader::read(bag_reader)?; - - let compressed_data = bag_reader.read_data()?; - let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); - let mut decompresor = chunk_header.compression.decompress_stream(compressed_data); - io::copy(&mut decompresor, &mut data)?; - - Ok(data) -} - #[derive(Default, Debug)] struct BagInfo { total_uncompressed: u64, @@ -114,7 +101,8 @@ fn main() -> Result<()> { .par_iter() // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { // let mut reader = bag_reader.clone(); - // let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; + // reader.seek(io::SeekFrom::Start(chunk.pos))?; + // let chunk_header = ChunkHeader::read(&mut reader)?; // info.total_uncompressed += chunk_header.uncompressed_size as u64; // reader.skip_data()?; // for _ in &chunk.connections { @@ -126,17 +114,11 @@ fn main() -> Result<()> { // }) .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { let mut reader = bag_reader.clone(); - let data = read_chunk(&mut reader, chunk.pos) .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?; info.total_uncompressed += data.len() as u64; - let mut chunk_reader = SliceReader::from(data); - - loop { - if chunk_reader.pos() == chunk_reader.as_ref().len() { - break; - } + while chunk_reader.remaining() > 0 { let header = chunk_reader.read_header()?; let op = header.read_op()?; match op { @@ -150,7 +132,6 @@ fn main() -> Result<()> { _ => bail!("unexpected op in chunk: {:?}", op), } } - Ok(info) }) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) diff --git a/src/chunk.rs b/src/chunk.rs index c9a6478..dddd09b 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,7 +1,4 @@ -use std::{ - io::{self, SeekFrom}, - str::FromStr, -}; +use std::{io, str::FromStr}; use eyre::bail; @@ -62,6 +59,19 @@ impl ChunkHeader { } } +pub fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { + bag_reader.seek(io::SeekFrom::Start(pos))?; + + let chunk_header = ChunkHeader::read(bag_reader)?; + + let compressed_data = bag_reader.read_data()?; + let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); + let mut decompresor = chunk_header.compression.decompress_stream(compressed_data); + io::copy(&mut decompresor, &mut data)?; + + Ok(data) +} + pub struct MessageDataHeader { pub conn_id: u32, pub time: Time, diff --git a/src/reader/slice.rs b/src/reader/slice.rs index 1fca9b0..26e9af4 100644 --- a/src/reader/slice.rs +++ b/src/reader/slice.rs @@ -11,12 +11,6 @@ pub struct SliceReader { pos: usize, } -impl AsRef for SliceReader { - fn as_ref(&self) -> &T { - &self.slice - } -} - impl SliceReader { pub fn into_inner(self) -> T { self.slice @@ -27,6 +21,22 @@ impl SliceReader { } } +impl SliceReader +where + T: Deref, + U: AsRef<[u8]> + ?Sized + 'static, +{ + pub fn remaining(&self) -> usize { + self.slice.deref().as_ref().len() - self.pos() + } +} + +impl AsRef for SliceReader { + fn as_ref(&self) -> &T { + &self.slice + } +} + impl From for SliceReader { fn from(slice: T) -> Self { Self { slice, pos: 0 }