diff --git a/Cargo.toml b/Cargo.toml index 52cbe8f..34931cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,6 +9,7 @@ mmap = ["memmap"] [dependencies] bytes = "1.1.0" +eyre = "0.6.5" log = "0.4.14" lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] } memmap = { version = "0.7.0", optional = true } @@ -21,5 +22,6 @@ smallvec = "1.6.1" thiserror = "1.0.28" [dev-dependencies] +color-eyre = "0.5.11" env_logger = "0.9.0" eyre = "0.6.5" diff --git a/examples/bag_info.rs b/examples/bag_info.rs index 6fd1af1..323a224 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -1,29 +1,32 @@ use std::{convert::TryFrom, env::args, fs::File, io}; +use eyre::Context; use log::{error, info, trace}; use rayon::iter::{IntoParallelRefIterator, ParallelIterator}; use regex::Regex; use ros_message::{MessagePath, Msg}; use rsbag::{ + chunk::ChunkHeader, index::{BagIndex, ConnInfo}, reader::{BagReader, MmapReader}, + Result, }; -fn parse_msgdef(message_name: &str, msgdef: &str) -> rsbag::Result { +fn parse_msgdef(message_name: &str, msgdef: &str) -> Result { trace!("message definition: {}", msgdef); - let path = MessagePath::try_from(message_name).map_err(rsbag::Error::other)?; - let msgtype = Msg::new(path, msgdef).map_err(rsbag::Error::other)?; + let path = MessagePath::try_from(message_name)?; + let msgtype = Msg::new(path, msgdef)?; Ok(msgtype) } -fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result> { - let msgdefs = conn.message_definition()?; - let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n").unwrap(); - let mut name = conn.datatype()?; +fn parse_message_definitions(conn: &ConnInfo) -> Result> { + let msgdefs = &conn.message_definition; + let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n")?; + let mut name = conn.datatype.clone(); let mut begin = 0usize; let mut msgs = Vec::new(); - for cap in boundary_re.captures_iter(&msgdefs) { + for cap in boundary_re.captures_iter(msgdefs) { let boundary_range = cap.get(0).unwrap(); let end = boundary_range.start(); @@ -43,37 +46,32 @@ fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result> { Ok(msgs) } -fn read_chunk(bag_reader: &mut R, pos: u64) -> rsbag::Result> { - let chunk_header = bag_reader.read_chunk_header(pos)?; +fn read_chunk(bag_reader: &mut R, pos: u64) -> Result> { + let chunk_header = ChunkHeader::read(bag_reader, pos)?; - let compressed_data = bag_reader.read_data().unwrap(); + let compressed_data = bag_reader.read_data()?; let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize); let mut decompresor = chunk_header.compression.decompress(compressed_data); - io::copy(&mut decompresor, &mut data).unwrap(); + io::copy(&mut decompresor, &mut data)?; Ok(data) } -fn main() { +fn main() -> Result<()> { + color_eyre::install()?; env_logger::init(); let args: Vec<_> = args().collect(); if args.len() != 2 { eprintln!("Usage: {} ", args[0]); - return; + return Ok(()); } let bag_path = &args[1]; let bag_file = File::open(bag_path).expect("Could not open bag file"); - let mut bag_reader = MmapReader::new(bag_file).unwrap(); + let mut bag_reader = MmapReader::new(bag_file)?; - let index = match BagIndex::read_all(&mut bag_reader) { - Ok(index) => index, - Err(err) => { - error!("bag parse error: {}", err); - return; - } - }; + let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?; for conn in &index.connections { match parse_message_definitions(conn) { @@ -93,42 +91,45 @@ fn main() { } } - // let mut total_size = 0; - - // let total_size = index - // .chunks - // .par_iter() - // .try_fold( - // || 0u64, - // |total_size, chunk| -> rsbag::Result<_> { - // let chunk_header = bag_reader.clone().read_chunk_header(chunk.pos)?; - - // // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; - // // chunks.push(data); - // Ok(total_size + chunk_header.uncompressed_size as u64) - // }, - // ) - // .reduce( - // // || Ok(Vec::new()), - // || Ok(0), - // |a, b| a.and_then(|a| b.map(|b| a + b)), - // ).unwrap(); - let total_size = index .chunks .par_iter() .try_fold( || 0u64, |total_size, chunk| -> rsbag::Result<_> { - let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; - Ok(total_size + data.len() as u64) + let mut reader = bag_reader.clone(); + let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; + reader.skip_data()?; + + // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; + // chunks.push(data); + Ok(total_size + chunk_header.uncompressed_size as u64) }, ) .reduce( // || Ok(Vec::new()), || Ok(0), |a, b| a.and_then(|a| b.map(|b| a + b)), - ).unwrap(); + ) + .unwrap(); + + // let total_size = index + // .chunks + // .par_iter() + // .try_fold( + // || 0u64, + // |total_size, chunk| -> Result<_> { + // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?; + // Ok(total_size + data.len() as u64) + // }, + // ) + // .reduce( + // // || Ok(Vec::new()), + // || Ok(0), + // |a, b| a.and_then(|a| b.map(|b| a + b)), + // )?; info!("total uncompressed size: {}", total_size); + + Ok(()) } diff --git a/src/chunk.rs b/src/chunk.rs index 7891614..1230ee9 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -1,6 +1,16 @@ -use std::{io, str::FromStr}; +use std::{ + io::{self, SeekFrom}, + str::FromStr, +}; -use crate::{parse::Header, Error, Result}; +use eyre::bail; + +use crate::{ + error, + parse::{Header, Op}, + reader::BagReader, + Error, Result, +}; #[derive(Clone, Copy, Debug)] pub enum Compression { @@ -17,7 +27,7 @@ impl FromStr for Compression { "none" => Compression::None, "bz2" => Compression::Bz2, "lz4" => Compression::Lz4, - _ => return Err(Error::UnsupportedCompression(s.to_string())), + _ => bail!(error::UnsupportedCompression(s.to_string())), }) } } @@ -39,8 +49,14 @@ pub struct ChunkHeader { } impl ChunkHeader { + pub fn read(reader: &mut R, pos: u64) -> Result { + reader.seek(SeekFrom::Start(pos))?; + let header = reader.read_header_op(Op::Chunk)?; + ChunkHeader::from_header(header) + } + pub fn from_header(header: Header) -> Result { - Ok(Self { + Ok(ChunkHeader { compression: header.read_string(b"compression")?.parse()?, uncompressed_size: header.read_u32(b"size")?, }) diff --git a/src/error.rs b/src/error.rs index 8fbb6b0..17352fc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,51 +1,21 @@ -use num_enum::TryFromPrimitiveError; use thiserror::Error; -use crate::parse::{self, Op}; +use crate::parse::{self}; #[derive(Debug, Error)] -pub enum Error { - #[error("i/o error: {0}")] - Io(#[from] std::io::Error), - #[error("{0}")] - Parse(parse::Error), - #[error("unsupported version: {0}")] - UnsupportedVersion(parse::Version), - #[error("unsupported encryptor: {0}")] - UnsupportedEncryptor(String), - #[error("unsupported compression: {0}")] - UnsupportedCompression(String), - #[error("unexpected EOF")] - Eof, - #[error("invalid header op: {0}")] - InvalidOp(#[from] TryFromPrimitiveError), - #[error("missing field: {0:?}")] - MissingField(String), - #[error("bag is unindexed")] - Unindexed, - #[error("{0}")] - Other(String), -} +#[error("bag is unindexed")] +pub struct Unindexed; -impl<'a> From>> for Error { - fn from(err: parse::Error) -> Self { - Error::Parse(err.into_owned()) - } -} +#[derive(Debug, Error)] +#[error("unsupported encryptor: {0}")] +pub struct UnsupportedEncryptor(pub String); -impl<'a> From>>> for Error { - fn from(err: nom::Err>>) -> Self { - match err { - nom::Err::Error(e) | nom::Err::Failure(e) => e.into(), - nom::Err::Incomplete(_) => panic!("incomplete error"), - } - } -} +#[derive(Debug, Error)] +#[error("unsupported bag version: {0}")] +pub struct UnsupportedVersion(pub parse::Version); -impl Error { - pub fn other(message: S) -> Self { - Error::Other(message.to_string()) - } -} +#[derive(Debug, Error)] +#[error("unsupported compression: {0}")] +pub struct UnsupportedCompression(pub String); -pub type Result = core::result::Result; +pub use eyre::{Report as Error, Result}; diff --git a/src/index.rs b/src/index.rs index eb29c3f..66a0789 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,40 +1,41 @@ use std::io::SeekFrom; +use eyre::bail; use log::trace; +use nom::{number::streaming::le_u32, sequence::tuple, Parser}; use crate::{ - parse::{Header, Op}, + error, + parse::{self, Header, Op}, reader::BagReader, - Error, Result, + Result, }; #[derive(Debug)] pub struct ConnInfo { pub id: u32, pub topic: String, - pub conn_header: Header, + pub message_definition: String, + pub datatype: String, + pub md5sum: String, } impl ConnInfo { + pub fn read(reader: &mut R) -> Result { + let record_header = reader.read_header_op(Op::Connection)?; + let conn_header = reader.read_header()?; + Self::from_headers(record_header, conn_header) + } + pub fn from_headers(record_header: Header, conn_header: Header) -> Result { Ok(ConnInfo { id: record_header.read_u32(b"conn")?, topic: record_header.read_string(b"topic")?, - conn_header, + message_definition: conn_header.read_string(b"message_definition")?, + datatype: conn_header.read_string(b"type")?, + md5sum: conn_header.read_string(b"md5sum")?, }) } - - pub fn message_definition(&self) -> Result { - self.conn_header.read_string(b"message_definition") - } - - pub fn datatype(&self) -> Result { - self.conn_header.read_string(b"type") - } - - pub fn md5sum(&self) -> Result { - self.conn_header.read_string(b"md5sum") - } } #[derive(Debug)] @@ -42,23 +43,51 @@ pub struct ChunkInfo { pub pos: u64, pub start_time: u64, // TODO: unpack time pub end_time: u64, - pub conn_count: u32, + pub connections: Vec, } impl ChunkInfo { - pub fn from_header(header: Header) -> Result { + pub fn read(reader: &mut R) -> Result { + let header = reader.read_header_op(Op::ChunkInfo)?; if header.read_u32(b"ver")? != 1 { - return Err(Error::other("unsupported ChunkInfo version")); + bail!("unsupported ChunkInfo version"); + } + + reader.read_data_length()?; // Data length not needed + + let conn_count = header.read_u32(b"count")?; + let mut connections = Vec::with_capacity(conn_count as usize); + for _ in 0..conn_count { + let conn = ChunkConnection::read(reader)?; + connections.push(conn); } Ok(ChunkInfo { pos: header.read_u64(b"chunk_pos")?, start_time: header.read_u64(b"start_time")?, end_time: header.read_u64(b"end_time")?, - conn_count: header.read_u32(b"count")?, + connections, }) } } +#[derive(Debug)] +pub struct ChunkConnection { + pub conn_id: u32, + pub count: u32, +} + +impl ChunkConnection { + pub fn parse(input: parse::Input) -> parse::IResult { + tuple((le_u32, le_u32)) + .map(|(conn_id, count)| ChunkConnection { conn_id, count }) + .parse(input) + } + + pub fn read(reader: &mut R) -> Result { + reader.read_parser(Self::parse) + } +} + #[derive(Debug)] pub struct BagIndex { pub connections: Vec, @@ -76,11 +105,11 @@ impl BagIndex { trace!("index pos: {}", index_pos); if index_pos == 0 { - return Err(Error::Unindexed); + bail!(error::Unindexed); } if let Ok(encryptor) = file_header.read_string(b"encryptor") { - return Err(Error::UnsupportedEncryptor(encryptor)); + bail!(error::UnsupportedEncryptor(encryptor)); } reader.seek(SeekFrom::Start(index_pos))?; @@ -91,7 +120,7 @@ impl BagIndex { let mut connections = Vec::with_capacity(conn_count as usize); for _ in 0..conn_count { - let conn = reader.read_conn_info()?; + let conn = ConnInfo::read(reader)?; trace!("connection: id={}, topic={}", conn.id, conn.topic); connections.push(conn); } @@ -102,7 +131,7 @@ impl BagIndex { let mut chunks = Vec::with_capacity(chunk_count as usize); for _ in 0..chunk_count { - let chunk = reader.read_chunk_info()?; + let chunk = ChunkInfo::read(reader)?; chunks.push(chunk); } @@ -118,7 +147,7 @@ impl BagIndex { if (version.major, version.minor) == (2, 0) { Self::read_v2(reader) } else { - Err(Error::UnsupportedVersion(version)) + bail!(error::UnsupportedVersion(version)); } } } diff --git a/src/parse.rs b/src/parse.rs index b458a04..e935464 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,11 +1,9 @@ mod error; -mod fields; -mod header; +pub mod header; mod version; pub use error::{Error, ErrorKind}; -pub use fields::Op; -pub use header::Header; +pub use header::{Header, Op}; pub use version::Version; pub type Input<'a> = &'a [u8]; diff --git a/src/parse/error.rs b/src/parse/error.rs index 38b1609..b5ac198 100644 --- a/src/parse/error.rs +++ b/src/parse/error.rs @@ -149,6 +149,12 @@ impl Error<&I> { } } +impl<'a> From> for Error> { + fn from(e: Error<&'a [u8]>) -> Self { + e.into_owned() + } +} + // adapted from https://fasterthanli.me/series/making-our-own-ping/part-9 impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { diff --git a/src/parse/header.rs b/src/parse/header.rs index f963d91..a9005b6 100644 --- a/src/parse/header.rs +++ b/src/parse/header.rs @@ -11,8 +11,16 @@ use nom::{ }; use smallvec::SmallVec; -use super::{fields, IResult, Input, Op}; -use crate::{Error, Result}; +use super::{IResult, Input}; +use crate::Result; + +mod error; +mod fields; + +pub use self::{ + error::{FieldDataError, MissingFieldError}, + fields::Op, +}; #[derive(Clone, Debug)] pub struct HeaderField { @@ -70,15 +78,15 @@ impl Header { .parse(input) } - pub fn find_field(&self, name: &[u8]) -> Result<&[u8]> { + fn find_field(&self, name: &[u8]) -> Result<&[u8]> { self.0 .iter() .find(|field| field.name() == name) .map(|field| field.value()) - .ok_or_else(|| Error::MissingField(String::from_utf8_lossy(name).into_owned())) + .ok_or_else(|| MissingFieldError(String::from_utf8_lossy(name).into_owned()).into()) } - pub fn read_op(&self) -> Result { + pub fn read_op(&self) -> Result { self.find_field(b"op").and_then(fields::Op::parse) } diff --git a/src/parse/header/error.rs b/src/parse/header/error.rs new file mode 100644 index 0000000..42e61e6 --- /dev/null +++ b/src/parse/header/error.rs @@ -0,0 +1,26 @@ +use thiserror::Error; + +use crate::parse; + +#[derive(Debug, Error)] +#[error("missing field {0:?}")] +pub struct MissingFieldError(pub String); + +#[derive(Debug, Error)] +#[error("bad field data: {0}")] +pub struct FieldDataError(#[source] pub parse::Error); + +impl<'a> From>> for FieldDataError { + fn from(err: parse::Error>) -> Self { + FieldDataError(err.into_owned()) + } +} + +impl<'a> From>>> for FieldDataError { + fn from(err: nom::Err>>) -> Self { + match err { + nom::Err::Error(e) | nom::Err::Failure(e) => e.into(), + nom::Err::Incomplete(_) => unreachable!("incomplete error"), + } + } +} diff --git a/src/parse/fields.rs b/src/parse/header/fields.rs similarity index 61% rename from src/parse/fields.rs rename to src/parse/header/fields.rs index 47efada..44433cf 100644 --- a/src/parse/fields.rs +++ b/src/parse/header/fields.rs @@ -5,8 +5,7 @@ use nom::{ }; use num_enum::TryFromPrimitive; -use super::Input; -use crate::{Error, Result}; +use super::{FieldDataError, Input, Result}; #[derive(Clone, Copy, Debug, TryFromPrimitive, PartialEq)] #[repr(u8)] @@ -21,21 +20,27 @@ pub enum Op { impl Op { pub fn parse(input: Input) -> Result { - let (_, op) = all_consuming(le_u8).parse(input)?; + let (_, op) = all_consuming(le_u8) + .parse(input) + .map_err(FieldDataError::from)?; Op::try_from_primitive(op).map_err(Into::into) } } pub fn parse_u32(input: Input) -> Result { - let (_, x) = all_consuming(le_u32).parse(input)?; + let (_, x) = all_consuming(le_u32) + .parse(input) + .map_err(FieldDataError::from)?; Ok(x) } pub fn parse_u64(input: Input) -> Result { - let (_, x) = all_consuming(le_u64).parse(input)?; + let (_, x) = all_consuming(le_u64) + .parse(input) + .map_err(FieldDataError::from)?; Ok(x) } pub fn parse_string(input: Input) -> Result { - String::from_utf8(input.to_owned()).map_err(|_| Error::other("invalid utf8")) + Ok(String::from_utf8(input.to_owned())?) } diff --git a/src/reader.rs b/src/reader.rs index cc3faf8..ed016fe 100644 --- a/src/reader.rs +++ b/src/reader.rs @@ -3,18 +3,17 @@ use std::io::SeekFrom; use nom::multi::length_data; use nom::number::streaming::le_u32; -use crate::chunk::ChunkHeader; -use crate::error::{Error, Result}; -use crate::index::{ChunkInfo, ConnInfo}; +use crate::error::Result; use crate::parse::{self, Header, Op, Version}; +pub mod error; mod io; #[cfg(feature = "mmap")] mod mmap; -pub use io::IoReader; +pub use self::io::IoReader; #[cfg(feature = "mmap")] -pub use mmap::MmapReader; +pub use self::mmap::MmapReader; pub trait BagReader { type Read: std::io::Read; @@ -37,8 +36,9 @@ pub trait BagReader { fn read_header_op(&mut self, op: Op) -> Result
{ let header = self.read_header()?; - if header.read_op()? != op { - return Err(Error::other(format!("Expected {:?} op", op))); + let actual_op = header.read_op()?; + if actual_op != op { + return Err(error::UnexpectedOp(op, actual_op).into()); } Ok(header) } @@ -56,23 +56,4 @@ pub trait BagReader { fn read_data(&mut self) -> Result<&[u8]> { self.read_parser(length_data(le_u32)) } - - fn read_conn_info(&mut self) -> Result { - let record_header = self.read_header_op(Op::Connection)?; - let conn_header = self.read_header()?; - ConnInfo::from_headers(record_header, conn_header) - } - - fn read_chunk_info(&mut self) -> Result { - let header = self.read_header_op(Op::ChunkInfo)?; - // TODO: read connection message counts - self.skip_data()?; - ChunkInfo::from_header(header) - } - - fn read_chunk_header(&mut self, pos: u64) -> Result { - self.seek(SeekFrom::Start(pos))?; - let header = self.read_header_op(Op::Chunk)?; - ChunkHeader::from_header(header) - } } diff --git a/src/reader/error.rs b/src/reader/error.rs new file mode 100644 index 0000000..46e6e2f --- /dev/null +++ b/src/reader/error.rs @@ -0,0 +1,11 @@ +use thiserror::Error; + +use crate::parse::Op; + +#[derive(Debug, Error)] +#[error("unexpected EOF")] +pub struct UnexpectedEof; + +#[derive(Debug, Error)] +#[error("expected op {0:?}, got {1:?}")] +pub struct UnexpectedOp(pub Op, pub Op); diff --git a/src/reader/io.rs b/src/reader/io.rs index 5095aec..0b7900c 100644 --- a/src/reader/io.rs +++ b/src/reader/io.rs @@ -2,8 +2,8 @@ use std::io; use bytes::{Buf, BytesMut}; -use super::BagReader; -use crate::{parse, Error, Result}; +use super::{error::UnexpectedEof, BagReader}; +use crate::{parse, Result}; const READ_SIZE: usize = 4096; @@ -31,7 +31,7 @@ impl IoReader { let read = self.read.read(&mut self.buffer[old_size..])?; self.buffer.truncate(old_size + read); if read == 0 { - return Err(Error::Eof); + return Err(UnexpectedEof.into()); } Ok(()) } @@ -69,7 +69,7 @@ impl BagReader for IoReader { unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?; } Err(nom::Err::Error(e) | nom::Err::Failure(e)) => { - return Err(e.into()); + return Err(e.into_owned().into()); } } } diff --git a/src/reader/mmap.rs b/src/reader/mmap.rs index 49f0c50..b7aea93 100644 --- a/src/reader/mmap.rs +++ b/src/reader/mmap.rs @@ -1,9 +1,10 @@ use std::{fs::File, io, sync::Arc}; +use eyre::bail; use memmap::Mmap; -use super::BagReader; -use crate::{parse, Error, Result}; +use super::{error::UnexpectedEof, BagReader}; +use crate::{parse, Result}; #[derive(Clone)] pub struct MmapReader { @@ -46,14 +47,14 @@ impl BagReader for MmapReader { where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, { - let input = self.mmap.get(self.pos..).ok_or(Error::Eof)?; + let input = self.mmap.get(self.pos..).ok_or(UnexpectedEof)?; match parser.parse(input) { Ok((rest, output)) => { self.pos += input.len() - rest.len(); Ok(output) } - Err(nom::Err::Incomplete(_)) => Err(Error::Eof), - Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into()), + Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), } }