diff --git a/Cargo.toml b/Cargo.toml index 2c65e32..932d31b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ mmap = ["memmap"] [dependencies] bytes = "1.1.0" eyre = "0.6.5" +lazy_static = "1.4.0" log = "0.4.14" lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] } memmap = { version = "0.7.0", optional = true } @@ -28,4 +29,4 @@ eyre = "0.6.5" [profile.release] debug = true -lto = true \ No newline at end of file +# lto = true diff --git a/examples/bag_info.rs b/examples/bag_info.rs index 7bd46a0..a41b55b 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,66 +1,7 @@ -use std::{collections::HashMap, convert::TryFrom, env::args, fs::File}; +use std::env::args; -use eyre::{bail, Context}; -use log::{error, info, trace}; -use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; -use regex::Regex; -use ros_message::{MessagePath, Msg}; -use rsbag::{ - chunk::{read_chunk, MessageDataHeader}, - index::{BagIndex, ConnInfo}, - reader::{BagReader, MmapReader, SliceReader}, - Result, -}; - -fn parse_msgdef(message_name: &str, msgdef: &str) -> Result { - trace!("message definition: {}", msgdef); - let path = MessagePath::try_from(message_name)?; - let msgtype = Msg::new(path, msgdef)?; - Ok(msgtype) -} - -fn parse_message_definitions(conn: &ConnInfo) -> Result> { - let msgdefs = &conn.message_definition; - let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n")?; - let mut name = conn.datatype.clone(); - let mut begin = 0usize; - let mut msgs = Vec::new(); - - for cap in boundary_re.captures_iter(msgdefs) { - let boundary_range = cap.get(0).unwrap(); - let end = boundary_range.start(); - - let msgdef = &msgdefs[begin..end]; - let msgtype = parse_msgdef(&name, msgdef)?; - msgs.push(msgtype); - - name = cap[1].to_string(); - begin = boundary_range.end(); - } - - let msgdef = &msgdefs[begin..]; - - let msg = parse_msgdef(&name, msgdef)?; - msgs.push(msg); - - Ok(msgs) -} - -#[derive(Default, Debug)] -struct BagInfo { - total_uncompressed: u64, - per_connection: HashMap, -} - -impl BagInfo { - fn combine(mut self, other: BagInfo) -> BagInfo { - self.total_uncompressed += other.total_uncompressed; - for (conn, count) in other.per_connection { - *self.per_connection.entry(conn).or_insert(0) += count; - } - self - } -} +use log::info; +use rsbag::{Bag, Result}; fn main() -> Result<()> { color_eyre::install()?; @@ -73,69 +14,11 @@ fn main() -> Result<()> { } let bag_path = &args[1]; - let bag_file = File::open(bag_path).expect("Could not open bag file"); - let mut bag_reader = MmapReader::map(bag_file)?; + let mut bag = Bag::open(bag_path)?; - let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?; + bag.compute_message_layouts()?; - for conn in &index.connections { - match parse_message_definitions(conn) { - Ok(msgs) => { - for msg in &msgs { - trace!( - "message definition parsed: {:#?}", - msg.fields() - .iter() - .filter(|field| !field.is_constant()) - .map(ToString::to_string) - .collect::>() - ); - } - } - Err(err) => error!("could not parse message definition: {}", err), - } - } - - let info = index - .chunks - .par_iter() - // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { - // let mut reader = bag_reader.clone(); - // reader.seek(io::SeekFrom::Start(chunk.pos))?; - // let chunk_header = ChunkHeader::read(&mut reader)?; - // info.total_uncompressed += chunk_header.uncompressed_size as u64; - // reader.skip_data()?; - // for _ in &chunk.connections { - // let index = IndexData::read(&mut reader)?; - // *info.per_connection.entry(index.conn_id).or_insert(0) += - // index.entries.len() as u64; - // } - // Ok(info) - // }) - .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { - let mut reader = bag_reader.clone(); - let data = read_chunk(&mut reader, chunk.pos) - .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?; - info.total_uncompressed += data.len() as u64; - let mut chunk_reader = SliceReader::from(data); - while chunk_reader.remaining() > 0 { - let header = chunk_reader.read_header()?; - let op = header.read_op()?; - match op { - rsbag::parse::Op::MsgData => { - let header = MessageDataHeader::from_header(header)?; - let count = info.per_connection.entry(header.conn_id).or_insert(0); - *count += 1; - chunk_reader.skip_data()?; - } - rsbag::parse::Op::Connection => chunk_reader.skip_data()?, - _ => bail!("unexpected op in chunk: {:?}", op), - } - } - Ok(info) - }) - .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) - .unwrap(); + let info = bag.compute_info()?; info!("bag info: {:#?}", info); diff --git a/src/bag.rs b/src/bag.rs new file mode 100644 index 0000000..fe16934 --- /dev/null +++ b/src/bag.rs @@ -0,0 +1,46 @@ +use std::{fs::File, path::Path}; + +use eyre::Context; +use log::{error, trace}; + +use crate::{Result, index::BagIndex, info::BagInfo, message::parse_msgs, reader::MmapReader}; + +pub struct Bag { + reader: MmapReader, + index: BagIndex, +} + +impl Bag { + pub fn open>(path: P) -> Result { + let bag_file = File::open(path).wrap_err("could not open bag file")?; + let mut reader = MmapReader::map(bag_file).wrap_err("could not map bag file memory")?; + let index = BagIndex::read(&mut reader).wrap_err("error reading bag index")?; + Ok(Bag { reader, index }) + } + + pub fn compute_message_layouts(&mut self) -> Result<()> { + for conn in &self.index.connections { + match parse_msgs(conn) { + Ok(msgs) => { + for msg in &msgs { + trace!( + "message definition parsed: {:#?}", + msg.fields() + .iter() + .filter(|field| !field.is_constant()) + .map(ToString::to_string) + .collect::>() + ); + } + } + Err(err) => error!("could not parse message definition: {}", err), + } + } + Ok(()) + } + + pub fn compute_info(&mut self) -> Result { + BagInfo::compute(&mut self.reader, &self.index) + } +} + diff --git a/src/index.rs b/src/index.rs index b789ee1..5025d8d 100644 --- a/src/index.rs +++ b/src/index.rs @@ -141,7 +141,7 @@ impl BagIndex { }) } - pub fn read_all(reader: &mut R) -> Result { + pub fn read(reader: &mut R) -> Result { let version = reader.read_version()?; trace!("bag version: {}", version); if (version.major, version.minor) == (2, 0) { diff --git a/src/info.rs b/src/info.rs new file mode 100644 index 0000000..8a1398d --- /dev/null +++ b/src/info.rs @@ -0,0 +1,70 @@ +use std::collections::HashMap; + +use eyre::{bail, Context}; +use rayon::prelude::*; + +use crate::{ + chunk::{read_chunk, MessageDataHeader}, + index::BagIndex, + parse::Op, + reader::{BagReader, SliceReader}, + Result, +}; + +#[derive(Default, Debug)] +pub struct BagInfo { + pub total_uncompressed: u64, + pub per_connection: HashMap, +} + +impl BagInfo { + fn combine(mut self, other: BagInfo) -> BagInfo { + self.total_uncompressed += other.total_uncompressed; + for (conn, count) in other.per_connection { + *self.per_connection.entry(conn).or_insert(0) += count; + } + self + } + + pub fn compute(reader: &mut R, index: &BagIndex) -> Result { + index + .chunks + .par_iter() + // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { + // let mut reader = reader.clone(); + // reader.seek(io::SeekFrom::Start(chunk.pos))?; + // let chunk_header = ChunkHeader::read(&mut reader)?; + // info.total_uncompressed += chunk_header.uncompressed_size as u64; + // reader.skip_data()?; + // for _ in &chunk.connections { + // let index = IndexData::read(&mut reader)?; + // *info.per_connection.entry(index.conn_id).or_insert(0) += + // index.entries.len() as u64; + // } + // Ok(info) + // }) + .try_fold(BagInfo::default, |mut info, chunk| -> Result<_> { + let mut reader = reader.clone(); + let data = read_chunk(&mut reader, chunk.pos) + .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?; + info.total_uncompressed += data.len() as u64; + let mut chunk_reader = SliceReader::from(data); + while chunk_reader.remaining() > 0 { + let header = chunk_reader.read_header()?; + let op = header.read_op()?; + match op { + Op::MsgData => { + let header = MessageDataHeader::from_header(header)?; + let count = info.per_connection.entry(header.conn_id).or_insert(0); + *count += 1; + chunk_reader.skip_data()?; + } + Op::Connection => chunk_reader.skip_data()?, + _ => bail!("unexpected op in chunk: {:?}", op), + } + } + Ok(info) + }) + .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) + } +} diff --git a/src/layout.rs b/src/layout.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/lib.rs b/src/lib.rs index 50a6cb2..1210678 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,11 @@ +pub mod bag; +pub mod chunk; mod error; pub mod index; +pub mod message; pub mod parse; pub mod reader; -pub mod chunk; +pub mod info; pub use error::{Error, Result}; +pub use bag::Bag; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..350d692 --- /dev/null +++ b/src/message.rs @@ -0,0 +1,45 @@ +use std::convert::TryFrom; + +use lazy_static::lazy_static; +use log::trace; +use regex::Regex; +use ros_message::{MessagePath, Msg}; + +use crate::{index::ConnInfo, Result}; + +fn parse_msg(message_name: &str, msgdef: &str) -> Result { + trace!("message definition: {}", msgdef); + let path = MessagePath::try_from(message_name)?; + let msgtype = Msg::new(path, msgdef)?; + Ok(msgtype) +} + +lazy_static! { + static ref BOUNDARY_RE: Regex = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n").unwrap(); +} + +pub fn parse_msgs(conn: &ConnInfo) -> Result> { + let msgdefs = &conn.message_definition; + let mut name = conn.datatype.clone(); + let mut begin = 0usize; + let mut msgs = Vec::new(); + + for cap in BOUNDARY_RE.captures_iter(msgdefs) { + let boundary_range = cap.get(0).unwrap(); + let end = boundary_range.start(); + + let msgdef = &msgdefs[begin..end]; + let msgtype = parse_msg(&name, msgdef)?; + msgs.push(msgtype); + + name = cap[1].to_string(); + begin = boundary_range.end(); + } + + let msgdef = &msgdefs[begin..]; + + let msg = parse_msg(&name, msgdef)?; + msgs.push(msg); + + Ok(msgs) +}