refactor out of bag_info
This commit is contained in:
parent
6237c25458
commit
e70f7b41dd
@ -10,6 +10,7 @@ mmap = ["memmap"]
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
eyre = "0.6.5"
|
eyre = "0.6.5"
|
||||||
|
lazy_static = "1.4.0"
|
||||||
log = "0.4.14"
|
log = "0.4.14"
|
||||||
lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] }
|
lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] }
|
||||||
memmap = { version = "0.7.0", optional = true }
|
memmap = { version = "0.7.0", optional = true }
|
||||||
@ -28,4 +29,4 @@ eyre = "0.6.5"
|
|||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = true
|
debug = true
|
||||||
lto = true
|
# lto = true
|
||||||
|
@ -1,66 +1,7 @@
|
|||||||
use std::{collections::HashMap, convert::TryFrom, env::args, fs::File};
|
use std::env::args;
|
||||||
|
|
||||||
use eyre::{bail, Context};
|
use log::info;
|
||||||
use log::{error, info, trace};
|
use rsbag::{Bag, Result};
|
||||||
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
|
|
||||||
use regex::Regex;
|
|
||||||
use ros_message::{MessagePath, Msg};
|
|
||||||
use rsbag::{
|
|
||||||
chunk::{read_chunk, MessageDataHeader},
|
|
||||||
index::{BagIndex, ConnInfo},
|
|
||||||
reader::{BagReader, MmapReader, SliceReader},
|
|
||||||
Result,
|
|
||||||
};
|
|
||||||
|
|
||||||
fn parse_msgdef(message_name: &str, msgdef: &str) -> Result<Msg> {
|
|
||||||
trace!("message definition: {}", msgdef);
|
|
||||||
let path = MessagePath::try_from(message_name)?;
|
|
||||||
let msgtype = Msg::new(path, msgdef)?;
|
|
||||||
Ok(msgtype)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_message_definitions(conn: &ConnInfo) -> Result<Vec<Msg>> {
|
|
||||||
let msgdefs = &conn.message_definition;
|
|
||||||
let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n")?;
|
|
||||||
let mut name = conn.datatype.clone();
|
|
||||||
let mut begin = 0usize;
|
|
||||||
let mut msgs = Vec::new();
|
|
||||||
|
|
||||||
for cap in boundary_re.captures_iter(msgdefs) {
|
|
||||||
let boundary_range = cap.get(0).unwrap();
|
|
||||||
let end = boundary_range.start();
|
|
||||||
|
|
||||||
let msgdef = &msgdefs[begin..end];
|
|
||||||
let msgtype = parse_msgdef(&name, msgdef)?;
|
|
||||||
msgs.push(msgtype);
|
|
||||||
|
|
||||||
name = cap[1].to_string();
|
|
||||||
begin = boundary_range.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
let msgdef = &msgdefs[begin..];
|
|
||||||
|
|
||||||
let msg = parse_msgdef(&name, msgdef)?;
|
|
||||||
msgs.push(msg);
|
|
||||||
|
|
||||||
Ok(msgs)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Default, Debug)]
|
|
||||||
struct BagInfo {
|
|
||||||
total_uncompressed: u64,
|
|
||||||
per_connection: HashMap<u32, u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BagInfo {
|
|
||||||
fn combine(mut self, other: BagInfo) -> BagInfo {
|
|
||||||
self.total_uncompressed += other.total_uncompressed;
|
|
||||||
for (conn, count) in other.per_connection {
|
|
||||||
*self.per_connection.entry(conn).or_insert(0) += count;
|
|
||||||
}
|
|
||||||
self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn main() -> Result<()> {
|
fn main() -> Result<()> {
|
||||||
color_eyre::install()?;
|
color_eyre::install()?;
|
||||||
@ -73,69 +14,11 @@ fn main() -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let bag_path = &args[1];
|
let bag_path = &args[1];
|
||||||
let bag_file = File::open(bag_path).expect("Could not open bag file");
|
let mut bag = Bag::open(bag_path)?;
|
||||||
let mut bag_reader = MmapReader::map(bag_file)?;
|
|
||||||
|
|
||||||
let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?;
|
bag.compute_message_layouts()?;
|
||||||
|
|
||||||
for conn in &index.connections {
|
let info = bag.compute_info()?;
|
||||||
match parse_message_definitions(conn) {
|
|
||||||
Ok(msgs) => {
|
|
||||||
for msg in &msgs {
|
|
||||||
trace!(
|
|
||||||
"message definition parsed: {:#?}",
|
|
||||||
msg.fields()
|
|
||||||
.iter()
|
|
||||||
.filter(|field| !field.is_constant())
|
|
||||||
.map(ToString::to_string)
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(err) => error!("could not parse message definition: {}", err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let info = index
|
|
||||||
.chunks
|
|
||||||
.par_iter()
|
|
||||||
// .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
|
|
||||||
// let mut reader = bag_reader.clone();
|
|
||||||
// reader.seek(io::SeekFrom::Start(chunk.pos))?;
|
|
||||||
// let chunk_header = ChunkHeader::read(&mut reader)?;
|
|
||||||
// info.total_uncompressed += chunk_header.uncompressed_size as u64;
|
|
||||||
// reader.skip_data()?;
|
|
||||||
// for _ in &chunk.connections {
|
|
||||||
// let index = IndexData::read(&mut reader)?;
|
|
||||||
// *info.per_connection.entry(index.conn_id).or_insert(0) +=
|
|
||||||
// index.entries.len() as u64;
|
|
||||||
// }
|
|
||||||
// Ok(info)
|
|
||||||
// })
|
|
||||||
.try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
|
|
||||||
let mut reader = bag_reader.clone();
|
|
||||||
let data = read_chunk(&mut reader, chunk.pos)
|
|
||||||
.wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?;
|
|
||||||
info.total_uncompressed += data.len() as u64;
|
|
||||||
let mut chunk_reader = SliceReader::from(data);
|
|
||||||
while chunk_reader.remaining() > 0 {
|
|
||||||
let header = chunk_reader.read_header()?;
|
|
||||||
let op = header.read_op()?;
|
|
||||||
match op {
|
|
||||||
rsbag::parse::Op::MsgData => {
|
|
||||||
let header = MessageDataHeader::from_header(header)?;
|
|
||||||
let count = info.per_connection.entry(header.conn_id).or_insert(0);
|
|
||||||
*count += 1;
|
|
||||||
chunk_reader.skip_data()?;
|
|
||||||
}
|
|
||||||
rsbag::parse::Op::Connection => chunk_reader.skip_data()?,
|
|
||||||
_ => bail!("unexpected op in chunk: {:?}", op),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(info)
|
|
||||||
})
|
|
||||||
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
info!("bag info: {:#?}", info);
|
info!("bag info: {:#?}", info);
|
||||||
|
|
||||||
|
46
src/bag.rs
Normal file
46
src/bag.rs
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
use std::{fs::File, path::Path};
|
||||||
|
|
||||||
|
use eyre::Context;
|
||||||
|
use log::{error, trace};
|
||||||
|
|
||||||
|
use crate::{Result, index::BagIndex, info::BagInfo, message::parse_msgs, reader::MmapReader};
|
||||||
|
|
||||||
|
pub struct Bag {
|
||||||
|
reader: MmapReader,
|
||||||
|
index: BagIndex,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Bag {
|
||||||
|
pub fn open<P: AsRef<Path>>(path: P) -> Result<Self> {
|
||||||
|
let bag_file = File::open(path).wrap_err("could not open bag file")?;
|
||||||
|
let mut reader = MmapReader::map(bag_file).wrap_err("could not map bag file memory")?;
|
||||||
|
let index = BagIndex::read(&mut reader).wrap_err("error reading bag index")?;
|
||||||
|
Ok(Bag { reader, index })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_message_layouts(&mut self) -> Result<()> {
|
||||||
|
for conn in &self.index.connections {
|
||||||
|
match parse_msgs(conn) {
|
||||||
|
Ok(msgs) => {
|
||||||
|
for msg in &msgs {
|
||||||
|
trace!(
|
||||||
|
"message definition parsed: {:#?}",
|
||||||
|
msg.fields()
|
||||||
|
.iter()
|
||||||
|
.filter(|field| !field.is_constant())
|
||||||
|
.map(ToString::to_string)
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => error!("could not parse message definition: {}", err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_info(&mut self) -> Result<BagInfo> {
|
||||||
|
BagInfo::compute(&mut self.reader, &self.index)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -141,7 +141,7 @@ impl BagIndex {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_all<R: BagReader>(reader: &mut R) -> Result<Self> {
|
pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
|
||||||
let version = reader.read_version()?;
|
let version = reader.read_version()?;
|
||||||
trace!("bag version: {}", version);
|
trace!("bag version: {}", version);
|
||||||
if (version.major, version.minor) == (2, 0) {
|
if (version.major, version.minor) == (2, 0) {
|
||||||
|
70
src/info.rs
Normal file
70
src/info.rs
Normal file
@ -0,0 +1,70 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use eyre::{bail, Context};
|
||||||
|
use rayon::prelude::*;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
chunk::{read_chunk, MessageDataHeader},
|
||||||
|
index::BagIndex,
|
||||||
|
parse::Op,
|
||||||
|
reader::{BagReader, SliceReader},
|
||||||
|
Result,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Default, Debug)]
|
||||||
|
pub struct BagInfo {
|
||||||
|
pub total_uncompressed: u64,
|
||||||
|
pub per_connection: HashMap<u32, u64>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BagInfo {
|
||||||
|
fn combine(mut self, other: BagInfo) -> BagInfo {
|
||||||
|
self.total_uncompressed += other.total_uncompressed;
|
||||||
|
for (conn, count) in other.per_connection {
|
||||||
|
*self.per_connection.entry(conn).or_insert(0) += count;
|
||||||
|
}
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute<R: BagReader + Clone + Send + Sync>(reader: &mut R, index: &BagIndex) -> Result<BagInfo> {
|
||||||
|
index
|
||||||
|
.chunks
|
||||||
|
.par_iter()
|
||||||
|
// .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
|
||||||
|
// let mut reader = reader.clone();
|
||||||
|
// reader.seek(io::SeekFrom::Start(chunk.pos))?;
|
||||||
|
// let chunk_header = ChunkHeader::read(&mut reader)?;
|
||||||
|
// info.total_uncompressed += chunk_header.uncompressed_size as u64;
|
||||||
|
// reader.skip_data()?;
|
||||||
|
// for _ in &chunk.connections {
|
||||||
|
// let index = IndexData::read(&mut reader)?;
|
||||||
|
// *info.per_connection.entry(index.conn_id).or_insert(0) +=
|
||||||
|
// index.entries.len() as u64;
|
||||||
|
// }
|
||||||
|
// Ok(info)
|
||||||
|
// })
|
||||||
|
.try_fold(BagInfo::default, |mut info, chunk| -> Result<_> {
|
||||||
|
let mut reader = reader.clone();
|
||||||
|
let data = read_chunk(&mut reader, chunk.pos)
|
||||||
|
.wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?;
|
||||||
|
info.total_uncompressed += data.len() as u64;
|
||||||
|
let mut chunk_reader = SliceReader::from(data);
|
||||||
|
while chunk_reader.remaining() > 0 {
|
||||||
|
let header = chunk_reader.read_header()?;
|
||||||
|
let op = header.read_op()?;
|
||||||
|
match op {
|
||||||
|
Op::MsgData => {
|
||||||
|
let header = MessageDataHeader::from_header(header)?;
|
||||||
|
let count = info.per_connection.entry(header.conn_id).or_insert(0);
|
||||||
|
*count += 1;
|
||||||
|
chunk_reader.skip_data()?;
|
||||||
|
}
|
||||||
|
Op::Connection => chunk_reader.skip_data()?,
|
||||||
|
_ => bail!("unexpected op in chunk: {:?}", op),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(info)
|
||||||
|
})
|
||||||
|
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))
|
||||||
|
}
|
||||||
|
}
|
0
src/layout.rs
Normal file
0
src/layout.rs
Normal file
@ -1,7 +1,11 @@
|
|||||||
|
pub mod bag;
|
||||||
|
pub mod chunk;
|
||||||
mod error;
|
mod error;
|
||||||
pub mod index;
|
pub mod index;
|
||||||
|
pub mod message;
|
||||||
pub mod parse;
|
pub mod parse;
|
||||||
pub mod reader;
|
pub mod reader;
|
||||||
pub mod chunk;
|
pub mod info;
|
||||||
|
|
||||||
pub use error::{Error, Result};
|
pub use error::{Error, Result};
|
||||||
|
pub use bag::Bag;
|
||||||
|
45
src/message.rs
Normal file
45
src/message.rs
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
use std::convert::TryFrom;
|
||||||
|
|
||||||
|
use lazy_static::lazy_static;
|
||||||
|
use log::trace;
|
||||||
|
use regex::Regex;
|
||||||
|
use ros_message::{MessagePath, Msg};
|
||||||
|
|
||||||
|
use crate::{index::ConnInfo, Result};
|
||||||
|
|
||||||
|
fn parse_msg(message_name: &str, msgdef: &str) -> Result<Msg> {
|
||||||
|
trace!("message definition: {}", msgdef);
|
||||||
|
let path = MessagePath::try_from(message_name)?;
|
||||||
|
let msgtype = Msg::new(path, msgdef)?;
|
||||||
|
Ok(msgtype)
|
||||||
|
}
|
||||||
|
|
||||||
|
lazy_static! {
|
||||||
|
static ref BOUNDARY_RE: Regex = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n").unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_msgs(conn: &ConnInfo) -> Result<Vec<Msg>> {
|
||||||
|
let msgdefs = &conn.message_definition;
|
||||||
|
let mut name = conn.datatype.clone();
|
||||||
|
let mut begin = 0usize;
|
||||||
|
let mut msgs = Vec::new();
|
||||||
|
|
||||||
|
for cap in BOUNDARY_RE.captures_iter(msgdefs) {
|
||||||
|
let boundary_range = cap.get(0).unwrap();
|
||||||
|
let end = boundary_range.start();
|
||||||
|
|
||||||
|
let msgdef = &msgdefs[begin..end];
|
||||||
|
let msgtype = parse_msg(&name, msgdef)?;
|
||||||
|
msgs.push(msgtype);
|
||||||
|
|
||||||
|
name = cap[1].to_string();
|
||||||
|
begin = boundary_range.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
let msgdef = &msgdefs[begin..];
|
||||||
|
|
||||||
|
let msg = parse_msg(&name, msgdef)?;
|
||||||
|
msgs.push(msg);
|
||||||
|
|
||||||
|
Ok(msgs)
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user