diff --git a/examples/bag_info.rs b/examples/bag_info.rs index 323a224..4d739d2 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,4 +1,4 @@ -use std::{convert::TryFrom, env::args, fs::File, io}; +use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; use eyre::Context; use log::{error, info, trace}; @@ -7,7 +7,7 @@ use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ chunk::ChunkHeader, - index::{BagIndex, ConnInfo}, + index::{BagIndex, ConnInfo, IndexData}, reader::{BagReader, MmapReader}, Result, }; @@ -57,6 +57,23 @@ fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { Ok(data) } +#[derive(Default, Debug)] +struct BagInfo { + total_uncompressed: u64, + per_connection: HashMap, +} + +impl BagInfo { + fn combine(mut self, other: BagInfo) -> BagInfo { + self.total_uncompressed += other.total_uncompressed; + for (conn, count) in other.per_connection { + *self.per_connection.entry(conn).or_insert(0) += count; + } + self + } + +} + fn main() -> Result<()> { color_eyre::install()?; env_logger::init(); @@ -91,28 +108,30 @@ fn main() -> Result<()> { } } - let total_size = index + let data = index .chunks .par_iter() - .try_fold( - || 0u64, - |total_size, chunk| -> rsbag::Result<_> { - let mut reader = bag_reader.clone(); - let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; - reader.skip_data()?; + .try_fold(BagInfo::default, |mut data, chunk| -> rsbag::Result<_> { + let mut reader = bag_reader.clone(); + let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; + data.total_uncompressed += chunk_header.uncompressed_size as u64; + reader.skip_data()?; - // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; - // chunks.push(data); - Ok(total_size + chunk_header.uncompressed_size as u64) - }, - ) - .reduce( - // || Ok(Vec::new()), - || Ok(0), - |a, b| a.and_then(|a| b.map(|b| a + b)), - ) + for _ in &chunk.connections { + let index = IndexData::read(&mut reader)?; + *data.per_connection.entry(index.conn_id).or_insert(0) += + index.entries.len() as u64; + } + + // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; + // chunks.push(data); + Ok(data) + }) + .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) .unwrap(); + info!("bag data: {:#?}", data); + // let total_size = index // .chunks // .par_iter() @@ -129,7 +148,5 @@ fn main() -> Result<()> { // |a, b| a.and_then(|a| b.map(|b| a + b)), // )?; - info!("total uncompressed size: {}", total_size); - Ok(()) } diff --git a/src/index.rs b/src/index.rs index 66a0789..89d8712 100644 --- a/src/index.rs +++ b/src/index.rs @@ -6,7 +6,7 @@ use nom::{number::streaming::le_u32, sequence::tuple, Parser}; use crate::{ error, - parse::{self, Header, Op}, + parse::{self, header::Time, Header, Op}, reader::BagReader, Result, }; @@ -41,8 +41,8 @@ impl ConnInfo { #[derive(Debug)] pub struct ChunkInfo { pub pos: u64, - pub start_time: u64, // TODO: unpack time - pub end_time: u64, + pub start_time: Time, + pub end_time: Time, pub connections: Vec, } @@ -63,8 +63,8 @@ impl ChunkInfo { } Ok(ChunkInfo { pos: header.read_u64(b"chunk_pos")?, - start_time: header.read_u64(b"start_time")?, - end_time: header.read_u64(b"end_time")?, + start_time: header.read_time(b"start_time")?, + end_time: header.read_time(b"end_time")?, connections, }) } @@ -151,3 +151,49 @@ impl BagIndex { } } } + +#[derive(Debug)] +pub struct IndexData { + pub conn_id: u32, + pub entries: Vec, +} + +impl IndexData { + pub fn read(reader: &mut R) -> Result { + let header = reader.read_header_op(Op::IndexData)?; + if header.read_u32(b"ver")? != 1 { + bail!("unsupported IndexData version"); + } + + reader.read_data_length()?; // Data length not needed + + let entry_count = header.read_u32(b"count")?; + let mut entries = Vec::with_capacity(entry_count as usize); + for _ in 0..entry_count { + let conn = IndexEntry::read(reader)?; + entries.push(conn); + } + Ok(IndexData { + conn_id: header.read_u32(b"conn")?, + entries, + }) + } +} + +#[derive(Debug)] +pub struct IndexEntry { + pub time: Time, + pub offset: u32, +} + +impl IndexEntry { + pub fn parse(input: parse::Input) -> parse::IResult { + tuple((Time::parse, le_u32)) + .map(|(time, offset)| IndexEntry { time, offset }) + .parse(input) + } + + pub fn read(reader: &mut R) -> Result { + reader.read_parser(Self::parse) + } +} diff --git a/src/parse/header.rs b/src/parse/header.rs index a9005b6..96b0eaa 100644 --- a/src/parse/header.rs +++ b/src/parse/header.rs @@ -19,7 +19,7 @@ mod fields; pub use self::{ error::{FieldDataError, MissingFieldError}, - fields::Op, + fields::{Op, Time}, }; #[derive(Clone, Debug)] @@ -87,19 +87,23 @@ impl Header { } pub fn read_op(&self) -> Result { - self.find_field(b"op").and_then(fields::Op::parse) + self.find_field(b"op").and_then(fields::Op::read) } pub fn read_u64(&self, field: &[u8]) -> Result { - self.find_field(field).and_then(fields::parse_u64) + self.find_field(field).and_then(fields::read_u64) + } + + pub fn read_time(&self, field: &[u8]) -> Result