use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; use eyre::Context; use log::{error, info, trace}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ chunk::ChunkHeader, index::{BagIndex, ConnInfo, IndexData}, reader::{BagReader, MmapReader}, Result, }; fn parse_msgdef(message_name: &str, msgdef: &str) -> Result { trace!("message definition: {}", msgdef); let path = MessagePath::try_from(message_name)?; let msgtype = Msg::new(path, msgdef)?; Ok(msgtype) } fn parse_message_definitions(conn: &ConnInfo) -> Result> { let msgdefs = &conn.message_definition; let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n")?; let mut name = conn.datatype.clone(); let mut begin = 0usize; let mut msgs = Vec::new(); for cap in boundary_re.captures_iter(msgdefs) { let boundary_range = cap.get(0).unwrap(); let end = boundary_range.start(); let msgdef = &msgdefs[begin..end]; let msgtype = parse_msgdef(&name, msgdef)?; msgs.push(msgtype); name = cap[1].to_string(); begin = boundary_range.end(); } let msgdef = &msgdefs[begin..]; let msg = parse_msgdef(&name, msgdef)?; msgs.push(msg); Ok(msgs) } fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { let chunk_header = ChunkHeader::read(bag_reader, pos)?; let compressed_data = bag_reader.read_data()?; let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); let mut decompresor = chunk_header.compression.decompress(compressed_data); io::copy(&mut decompresor, &mut data)?; Ok(data) } #[derive(Default, Debug)] struct BagInfo { total_uncompressed: u64, per_connection: HashMap, } impl BagInfo { fn combine(mut self, other: BagInfo) -> BagInfo { self.total_uncompressed += other.total_uncompressed; for (conn, count) in other.per_connection { *self.per_connection.entry(conn).or_insert(0) += count; } self } } fn main() -> Result<()> { color_eyre::install()?; env_logger::init(); let args: Vec<_> = args().collect(); if args.len() != 2 { eprintln!("Usage: {} ", args[0]); return Ok(()); } let bag_path = &args[1]; let bag_file = File::open(bag_path).expect("Could not open bag file"); let mut bag_reader = MmapReader::new(bag_file)?; let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?; for conn in &index.connections { match parse_message_definitions(conn) { Ok(msgs) => { for msg in &msgs { trace!( "message definition parsed: {:#?}", msg.fields() .iter() .filter(|field| !field.is_constant()) .map(ToString::to_string) .collect::>() ); } } Err(err) => error!("could not parse message definition: {}", err), } } let data = index .chunks .par_iter() .try_fold(BagInfo::default, |mut data, chunk| -> rsbag::Result<_> { let mut reader = bag_reader.clone(); let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; data.total_uncompressed += chunk_header.uncompressed_size as u64; reader.skip_data()?; for _ in &chunk.connections { let index = IndexData::read(&mut reader)?; *data.per_connection.entry(index.conn_id).or_insert(0) += index.entries.len() as u64; } // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; // chunks.push(data); Ok(data) }) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) .unwrap(); info!("bag data: {:#?}", data); // let total_size = index // .chunks // .par_iter() // .try_fold( // || 0u64, // |total_size, chunk| -> Result<_> { // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; // Ok(total_size + data.len() as u64) // }, // ) // .reduce( // // || Ok(Vec::new()), // || Ok(0), // |a, b| a.and_then(|a| b.map(|b| a + b)), // )?; Ok(()) }