message data header parsing

This commit is contained in:
Alex Mikhalev 2021-11-19 18:01:25 -08:00
parent 733d2d61a3
commit bea68dfa87
9 changed files with 156 additions and 120 deletions

View File

@ -25,3 +25,7 @@ thiserror = "1.0.28"
color-eyre = "0.5.11" color-eyre = "0.5.11"
env_logger = "0.9.0" env_logger = "0.9.0"
eyre = "0.6.5" eyre = "0.6.5"
[profile.release]
debug = true
lto = true

View File

@ -1,14 +1,14 @@
use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io};
use eyre::Context; use eyre::{bail, Context};
use log::{error, info, trace}; use log::{error, info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use regex::Regex; use regex::Regex;
use ros_message::{MessagePath, Msg}; use ros_message::{MessagePath, Msg};
use rsbag::{ use rsbag::{
chunk::ChunkHeader, chunk::{ChunkHeader, MessageDataHeader},
index::{BagIndex, ConnInfo, IndexData}, index::{BagIndex, ConnInfo, IndexData},
reader::{BagReader, MmapReader}, reader::{BagReader, MmapReader, SliceReader},
Result, Result,
}; };
@ -47,11 +47,13 @@ fn parse_message_definitions(conn: &ConnInfo) -> Result<Vec<Msg>> {
} }
fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> Result<Vec<u8>> { fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> Result<Vec<u8>> {
let chunk_header = ChunkHeader::read(bag_reader, pos)?; bag_reader.seek(io::SeekFrom::Start(pos))?;
let chunk_header = ChunkHeader::read(bag_reader)?;
let compressed_data = bag_reader.read_data()?; let compressed_data = bag_reader.read_data()?;
let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize);
let mut decompresor = chunk_header.compression.decompress(compressed_data); let mut decompresor = chunk_header.compression.decompress_stream(compressed_data);
io::copy(&mut decompresor, &mut data)?; io::copy(&mut decompresor, &mut data)?;
Ok(data) Ok(data)
@ -71,7 +73,6 @@ impl BagInfo {
} }
self self
} }
} }
fn main() -> Result<()> { fn main() -> Result<()> {
@ -86,7 +87,7 @@ fn main() -> Result<()> {
let bag_path = &args[1]; let bag_path = &args[1];
let bag_file = File::open(bag_path).expect("Could not open bag file"); let bag_file = File::open(bag_path).expect("Could not open bag file");
let mut bag_reader = MmapReader::new(bag_file)?; let mut bag_reader = MmapReader::map(bag_file)?;
let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?; let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?;
@ -108,45 +109,54 @@ fn main() -> Result<()> {
} }
} }
let data = index let info = index
.chunks .chunks
.par_iter() .par_iter()
.try_fold(BagInfo::default, |mut data, chunk| -> rsbag::Result<_> { // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
// let mut reader = bag_reader.clone();
// let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?;
// info.total_uncompressed += chunk_header.uncompressed_size as u64;
// reader.skip_data()?;
// for _ in &chunk.connections {
// let index = IndexData::read(&mut reader)?;
// *info.per_connection.entry(index.conn_id).or_insert(0) +=
// index.entries.len() as u64;
// }
// Ok(info)
// })
.try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
let mut reader = bag_reader.clone(); let mut reader = bag_reader.clone();
let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?;
data.total_uncompressed += chunk_header.uncompressed_size as u64;
reader.skip_data()?;
for _ in &chunk.connections { let data = read_chunk(&mut reader, chunk.pos)
let index = IndexData::read(&mut reader)?; .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?;
*data.per_connection.entry(index.conn_id).or_insert(0) += info.total_uncompressed += data.len() as u64;
index.entries.len() as u64;
let mut chunk_reader = SliceReader::from(data);
loop {
if chunk_reader.pos() == chunk_reader.as_ref().len() {
break;
}
let header = chunk_reader.read_header()?;
let op = header.read_op()?;
match op {
rsbag::parse::Op::MsgData => {
let header = MessageDataHeader::from_header(header)?;
let count = info.per_connection.entry(header.conn_id).or_insert(0);
*count += 1;
chunk_reader.skip_data()?;
}
rsbag::parse::Op::Connection => chunk_reader.skip_data()?,
_ => bail!("unexpected op in chunk: {:?}", op),
}
} }
// let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; Ok(info)
// chunks.push(data);
Ok(data)
}) })
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))
.unwrap(); .unwrap();
info!("bag data: {:#?}", data); info!("bag info: {:#?}", info);
// let total_size = index
// .chunks
// .par_iter()
// .try_fold(
// || 0u64,
// |total_size, chunk| -> Result<_> {
// let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
// Ok(total_size + data.len() as u64)
// },
// )
// .reduce(
// // || Ok(Vec::new()),
// || Ok(0),
// |a, b| a.and_then(|a| b.map(|b| a + b)),
// )?;
Ok(()) Ok(())
} }

View File

@ -7,7 +7,7 @@ use eyre::bail;
use crate::{ use crate::{
error, error,
parse::{Header, Op}, parse::{Header, Op, Time},
reader::BagReader, reader::BagReader,
Error, Result, Error, Result,
}; };
@ -33,7 +33,7 @@ impl FromStr for Compression {
} }
impl Compression { impl Compression {
pub fn decompress<'a, R: io::Read + 'a>(self, read: R) -> Box<dyn io::Read + 'a> { pub fn decompress_stream<'a, R: io::Read + 'a>(self, read: R) -> Box<dyn io::Read + 'a> {
match self { match self {
Compression::None => Box::new(read), Compression::None => Box::new(read),
Compression::Bz2 => todo!("bz2 decompression"), Compression::Bz2 => todo!("bz2 decompression"),
@ -49,16 +49,34 @@ pub struct ChunkHeader {
} }
impl ChunkHeader { impl ChunkHeader {
pub fn read<R: BagReader>(reader: &mut R, pos: u64) -> Result<ChunkHeader> { pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
reader.seek(SeekFrom::Start(pos))?;
let header = reader.read_header_op(Op::Chunk)?; let header = reader.read_header_op(Op::Chunk)?;
ChunkHeader::from_header(header) Self::from_header(header)
} }
pub fn from_header(header: Header) -> Result<Self> { pub fn from_header(header: Header) -> Result<Self> {
Ok(ChunkHeader { Ok(Self {
compression: header.read_string(b"compression")?.parse()?, compression: header.read_string(b"compression")?.parse()?,
uncompressed_size: header.read_u32(b"size")?, uncompressed_size: header.read_u32(b"size")?,
}) })
} }
} }
pub struct MessageDataHeader {
pub conn_id: u32,
pub time: Time,
}
impl MessageDataHeader {
pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
let header = reader.read_header_op(Op::MsgData)?;
Self::from_header(header)
}
pub fn from_header(header: Header) -> Result<Self> {
Ok(Self {
conn_id: header.read_u32(b"conn")?,
time: header.read_time(b"time")?,
})
}
}

View File

@ -6,7 +6,7 @@ use nom::{number::streaming::le_u32, sequence::tuple, Parser};
use crate::{ use crate::{
error, error,
parse::{self, header::Time, Header, Op}, parse::{self, Header, Op, Time},
reader::BagReader, reader::BagReader,
Result, Result,
}; };

View File

@ -3,7 +3,7 @@ pub mod header;
mod version; mod version;
pub use error::{Error, ErrorKind}; pub use error::{Error, ErrorKind};
pub use header::{Header, Op}; pub use header::{Header, Op, Time};
pub use version::Version; pub use version::Version;
pub type Input<'a> = &'a [u8]; pub type Input<'a> = &'a [u8];

View File

@ -10,16 +10,13 @@ pub mod error;
mod io; mod io;
#[cfg(feature = "mmap")] #[cfg(feature = "mmap")]
mod mmap; mod mmap;
mod slice;
pub use self::io::IoReader;
#[cfg(feature = "mmap")] #[cfg(feature = "mmap")]
pub use self::mmap::MmapReader; pub use self::mmap::MmapReader;
pub use self::{io::IoReader, slice::SliceReader};
pub trait BagReader { pub trait BagReader {
type Read: std::io::Read;
fn as_read(&mut self) -> &mut Self::Read;
fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result<O> fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result<O>
where where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>; P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>;

View File

@ -38,12 +38,6 @@ impl<R: io::Read + io::Seek> IoReader<R> {
} }
impl<R: io::Read + io::Seek> BagReader for IoReader<R> { impl<R: io::Read + io::Seek> BagReader for IoReader<R> {
type Read = R;
fn as_read(&mut self) -> &mut Self::Read {
&mut self.read
}
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O> fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,

View File

@ -1,75 +1,17 @@
use std::{fs::File, io, sync::Arc}; use std::{fs::File, sync::Arc};
use eyre::bail;
use memmap::Mmap; use memmap::Mmap;
use super::{error::UnexpectedEof, BagReader}; use super::SliceReader;
use crate::{parse, Result}; use crate::Result;
#[derive(Clone)] pub type MmapReader = SliceReader<Arc<Mmap>>;
pub struct MmapReader {
mmap: Arc<Mmap>,
pos: usize,
}
impl MmapReader { impl MmapReader {
pub fn new(file: File) -> Result<Self> { pub fn map(file: File) -> Result<Self> {
// Safety: ¯\_(ツ)_/¯ // Safety: ¯\_(ツ)_/¯
let mmap = unsafe { Mmap::map(&file) }?; let mmap = unsafe { Mmap::map(&file) }?;
let mmap = Arc::new(mmap); let mmap = Arc::new(mmap);
Ok(Self { mmap, pos: 0 }) Ok(SliceReader::from(mmap))
}
}
impl io::Read for MmapReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(slice) = self
.mmap
.get(self.pos..)
.and_then(|unread| unread.get(..buf.len()))
{
buf.copy_from_slice(slice);
Ok(slice.len())
} else {
Ok(0)
}
}
}
impl BagReader for MmapReader {
type Read = Self;
fn as_read(&mut self) -> &mut Self::Read {
self
}
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
let input = self.mmap.get(self.pos..).ok_or(UnexpectedEof)?;
match parser.parse(input) {
Ok((rest, output)) => {
self.pos += input.len() - rest.len();
Ok(output)
}
Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
}
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<()> {
match pos {
io::SeekFrom::Start(pos) => {
self.pos = pos as usize;
}
io::SeekFrom::End(pos) => {
self.pos = (self.mmap.len() as i64 + pos) as usize;
}
io::SeekFrom::Current(pos) => {
self.pos = ((self.pos as i64) + pos) as usize;
}
}
Ok(())
} }
} }

71
src/reader/slice.rs Normal file
View File

@ -0,0 +1,71 @@
use std::{io, ops::Deref};
use eyre::bail;
use super::{error::UnexpectedEof, BagReader};
use crate::{parse, Result};
#[derive(Clone)]
pub struct SliceReader<T> {
slice: T,
pos: usize,
}
impl<T> AsRef<T> for SliceReader<T> {
fn as_ref(&self) -> &T {
&self.slice
}
}
impl<T> SliceReader<T> {
pub fn into_inner(self) -> T {
self.slice
}
pub fn pos(&self) -> usize {
self.pos
}
}
impl<T> From<T> for SliceReader<T> {
fn from(slice: T) -> Self {
Self { slice, pos: 0 }
}
}
impl<T, U> BagReader for SliceReader<T>
where
T: Deref<Target = U>,
U: AsRef<[u8]> + ?Sized + 'static,
{
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
let slice = self.slice.deref().as_ref();
let input = slice.get(self.pos..).ok_or(UnexpectedEof)?;
match parser.parse(input) {
Ok((rest, output)) => {
self.pos += input.len() - rest.len();
Ok(output)
}
Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
}
}
fn seek(&mut self, pos: io::SeekFrom) -> Result<()> {
match pos {
io::SeekFrom::Start(pos) => {
self.pos = pos as usize;
}
io::SeekFrom::End(pos) => {
self.pos = (self.slice.as_ref().len() as i64 + pos) as usize;
}
io::SeekFrom::Current(pos) => {
self.pos = ((self.pos as i64) + pos) as usize;
}
}
Ok(())
}
}