replace errors with eyre

This commit is contained in:
Alex Mikhalev 2021-11-19 14:15:15 -08:00
parent f71048b185
commit 4e5898c0c0
14 changed files with 222 additions and 168 deletions

View File

@ -9,6 +9,7 @@ mmap = ["memmap"]
[dependencies]
bytes = "1.1.0"
eyre = "0.6.5"
log = "0.4.14"
lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] }
memmap = { version = "0.7.0", optional = true }
@ -21,5 +22,6 @@ smallvec = "1.6.1"
thiserror = "1.0.28"
[dev-dependencies]
color-eyre = "0.5.11"
env_logger = "0.9.0"
eyre = "0.6.5"

View File

@ -1,29 +1,32 @@
use std::{convert::TryFrom, env::args, fs::File, io};
use eyre::Context;
use log::{error, info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use regex::Regex;
use ros_message::{MessagePath, Msg};
use rsbag::{
chunk::ChunkHeader,
index::{BagIndex, ConnInfo},
reader::{BagReader, MmapReader},
Result,
};
fn parse_msgdef(message_name: &str, msgdef: &str) -> rsbag::Result<Msg> {
fn parse_msgdef(message_name: &str, msgdef: &str) -> Result<Msg> {
trace!("message definition: {}", msgdef);
let path = MessagePath::try_from(message_name).map_err(rsbag::Error::other)?;
let msgtype = Msg::new(path, msgdef).map_err(rsbag::Error::other)?;
let path = MessagePath::try_from(message_name)?;
let msgtype = Msg::new(path, msgdef)?;
Ok(msgtype)
}
fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result<Vec<Msg>> {
let msgdefs = conn.message_definition()?;
let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n").unwrap();
let mut name = conn.datatype()?;
fn parse_message_definitions(conn: &ConnInfo) -> Result<Vec<Msg>> {
let msgdefs = &conn.message_definition;
let boundary_re = Regex::new(r"\r?\n==+\r?\nMSG: ([^\r\n]+)\r?\n")?;
let mut name = conn.datatype.clone();
let mut begin = 0usize;
let mut msgs = Vec::new();
for cap in boundary_re.captures_iter(&msgdefs) {
for cap in boundary_re.captures_iter(msgdefs) {
let boundary_range = cap.get(0).unwrap();
let end = boundary_range.start();
@ -43,37 +46,32 @@ fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result<Vec<Msg>> {
Ok(msgs)
}
fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> rsbag::Result<Vec<u8>> {
let chunk_header = bag_reader.read_chunk_header(pos)?;
fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> Result<Vec<u8>> {
let chunk_header = ChunkHeader::read(bag_reader, pos)?;
let compressed_data = bag_reader.read_data().unwrap();
let compressed_data = bag_reader.read_data()?;
let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize);
let mut decompresor = chunk_header.compression.decompress(compressed_data);
io::copy(&mut decompresor, &mut data).unwrap();
io::copy(&mut decompresor, &mut data)?;
Ok(data)
}
fn main() {
fn main() -> Result<()> {
color_eyre::install()?;
env_logger::init();
let args: Vec<_> = args().collect();
if args.len() != 2 {
eprintln!("Usage: {} <bag path>", args[0]);
return;
return Ok(());
}
let bag_path = &args[1];
let bag_file = File::open(bag_path).expect("Could not open bag file");
let mut bag_reader = MmapReader::new(bag_file).unwrap();
let mut bag_reader = MmapReader::new(bag_file)?;
let index = match BagIndex::read_all(&mut bag_reader) {
Ok(index) => index,
Err(err) => {
error!("bag parse error: {}", err);
return;
}
};
let index = BagIndex::read_all(&mut bag_reader).wrap_err("bag parse error")?;
for conn in &index.connections {
match parse_message_definitions(conn) {
@ -93,42 +91,45 @@ fn main() {
}
}
// let mut total_size = 0;
// let total_size = index
// .chunks
// .par_iter()
// .try_fold(
// || 0u64,
// |total_size, chunk| -> rsbag::Result<_> {
// let chunk_header = bag_reader.clone().read_chunk_header(chunk.pos)?;
// // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
// // chunks.push(data);
// Ok(total_size + chunk_header.uncompressed_size as u64)
// },
// )
// .reduce(
// // || Ok(Vec::new()),
// || Ok(0),
// |a, b| a.and_then(|a| b.map(|b| a + b)),
// ).unwrap();
let total_size = index
.chunks
.par_iter()
.try_fold(
|| 0u64,
|total_size, chunk| -> rsbag::Result<_> {
let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
Ok(total_size + data.len() as u64)
let mut reader = bag_reader.clone();
let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?;
reader.skip_data()?;
// let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
// chunks.push(data);
Ok(total_size + chunk_header.uncompressed_size as u64)
},
)
.reduce(
// || Ok(Vec::new()),
|| Ok(0),
|a, b| a.and_then(|a| b.map(|b| a + b)),
).unwrap();
)
.unwrap();
// let total_size = index
// .chunks
// .par_iter()
// .try_fold(
// || 0u64,
// |total_size, chunk| -> Result<_> {
// let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
// Ok(total_size + data.len() as u64)
// },
// )
// .reduce(
// // || Ok(Vec::new()),
// || Ok(0),
// |a, b| a.and_then(|a| b.map(|b| a + b)),
// )?;
info!("total uncompressed size: {}", total_size);
Ok(())
}

View File

@ -1,6 +1,16 @@
use std::{io, str::FromStr};
use std::{
io::{self, SeekFrom},
str::FromStr,
};
use crate::{parse::Header, Error, Result};
use eyre::bail;
use crate::{
error,
parse::{Header, Op},
reader::BagReader,
Error, Result,
};
#[derive(Clone, Copy, Debug)]
pub enum Compression {
@ -17,7 +27,7 @@ impl FromStr for Compression {
"none" => Compression::None,
"bz2" => Compression::Bz2,
"lz4" => Compression::Lz4,
_ => return Err(Error::UnsupportedCompression(s.to_string())),
_ => bail!(error::UnsupportedCompression(s.to_string())),
})
}
}
@ -39,8 +49,14 @@ pub struct ChunkHeader {
}
impl ChunkHeader {
pub fn read<R: BagReader>(reader: &mut R, pos: u64) -> Result<ChunkHeader> {
reader.seek(SeekFrom::Start(pos))?;
let header = reader.read_header_op(Op::Chunk)?;
ChunkHeader::from_header(header)
}
pub fn from_header(header: Header) -> Result<Self> {
Ok(Self {
Ok(ChunkHeader {
compression: header.read_string(b"compression")?.parse()?,
uncompressed_size: header.read_u32(b"size")?,
})

View File

@ -1,51 +1,21 @@
use num_enum::TryFromPrimitiveError;
use thiserror::Error;
use crate::parse::{self, Op};
use crate::parse::{self};
#[derive(Debug, Error)]
pub enum Error {
#[error("i/o error: {0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Parse(parse::Error<parse::InputOwned>),
#[error("unsupported version: {0}")]
UnsupportedVersion(parse::Version),
#[error("unsupported encryptor: {0}")]
UnsupportedEncryptor(String),
#[error("unsupported compression: {0}")]
UnsupportedCompression(String),
#[error("unexpected EOF")]
Eof,
#[error("invalid header op: {0}")]
InvalidOp(#[from] TryFromPrimitiveError<Op>),
#[error("missing field: {0:?}")]
MissingField(String),
#[error("bag is unindexed")]
Unindexed,
#[error("{0}")]
Other(String),
}
#[error("bag is unindexed")]
pub struct Unindexed;
impl<'a> From<parse::Error<parse::Input<'a>>> for Error {
fn from(err: parse::Error<parse::Input>) -> Self {
Error::Parse(err.into_owned())
}
}
#[derive(Debug, Error)]
#[error("unsupported encryptor: {0}")]
pub struct UnsupportedEncryptor(pub String);
impl<'a> From<nom::Err<parse::Error<parse::Input<'a>>>> for Error {
fn from(err: nom::Err<parse::Error<parse::Input<'a>>>) -> Self {
match err {
nom::Err::Error(e) | nom::Err::Failure(e) => e.into(),
nom::Err::Incomplete(_) => panic!("incomplete error"),
}
}
}
#[derive(Debug, Error)]
#[error("unsupported bag version: {0}")]
pub struct UnsupportedVersion(pub parse::Version);
impl Error {
pub fn other<S: ToString>(message: S) -> Self {
Error::Other(message.to_string())
}
}
#[derive(Debug, Error)]
#[error("unsupported compression: {0}")]
pub struct UnsupportedCompression(pub String);
pub type Result<T, E = Error> = core::result::Result<T, E>;
pub use eyre::{Report as Error, Result};

View File

@ -1,40 +1,41 @@
use std::io::SeekFrom;
use eyre::bail;
use log::trace;
use nom::{number::streaming::le_u32, sequence::tuple, Parser};
use crate::{
parse::{Header, Op},
error,
parse::{self, Header, Op},
reader::BagReader,
Error, Result,
Result,
};
#[derive(Debug)]
pub struct ConnInfo {
pub id: u32,
pub topic: String,
pub conn_header: Header,
pub message_definition: String,
pub datatype: String,
pub md5sum: String,
}
impl ConnInfo {
pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
let record_header = reader.read_header_op(Op::Connection)?;
let conn_header = reader.read_header()?;
Self::from_headers(record_header, conn_header)
}
pub fn from_headers(record_header: Header, conn_header: Header) -> Result<Self> {
Ok(ConnInfo {
id: record_header.read_u32(b"conn")?,
topic: record_header.read_string(b"topic")?,
conn_header,
message_definition: conn_header.read_string(b"message_definition")?,
datatype: conn_header.read_string(b"type")?,
md5sum: conn_header.read_string(b"md5sum")?,
})
}
pub fn message_definition(&self) -> Result<String> {
self.conn_header.read_string(b"message_definition")
}
pub fn datatype(&self) -> Result<String> {
self.conn_header.read_string(b"type")
}
pub fn md5sum(&self) -> Result<String> {
self.conn_header.read_string(b"md5sum")
}
}
#[derive(Debug)]
@ -42,23 +43,51 @@ pub struct ChunkInfo {
pub pos: u64,
pub start_time: u64, // TODO: unpack time
pub end_time: u64,
pub conn_count: u32,
pub connections: Vec<ChunkConnection>,
}
impl ChunkInfo {
pub fn from_header(header: Header) -> Result<Self> {
pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
let header = reader.read_header_op(Op::ChunkInfo)?;
if header.read_u32(b"ver")? != 1 {
return Err(Error::other("unsupported ChunkInfo version"));
bail!("unsupported ChunkInfo version");
}
reader.read_data_length()?; // Data length not needed
let conn_count = header.read_u32(b"count")?;
let mut connections = Vec::with_capacity(conn_count as usize);
for _ in 0..conn_count {
let conn = ChunkConnection::read(reader)?;
connections.push(conn);
}
Ok(ChunkInfo {
pos: header.read_u64(b"chunk_pos")?,
start_time: header.read_u64(b"start_time")?,
end_time: header.read_u64(b"end_time")?,
conn_count: header.read_u32(b"count")?,
connections,
})
}
}
#[derive(Debug)]
pub struct ChunkConnection {
pub conn_id: u32,
pub count: u32,
}
impl ChunkConnection {
pub fn parse(input: parse::Input) -> parse::IResult<Self> {
tuple((le_u32, le_u32))
.map(|(conn_id, count)| ChunkConnection { conn_id, count })
.parse(input)
}
pub fn read<R: BagReader>(reader: &mut R) -> Result<Self> {
reader.read_parser(Self::parse)
}
}
#[derive(Debug)]
pub struct BagIndex {
pub connections: Vec<ConnInfo>,
@ -76,11 +105,11 @@ impl BagIndex {
trace!("index pos: {}", index_pos);
if index_pos == 0 {
return Err(Error::Unindexed);
bail!(error::Unindexed);
}
if let Ok(encryptor) = file_header.read_string(b"encryptor") {
return Err(Error::UnsupportedEncryptor(encryptor));
bail!(error::UnsupportedEncryptor(encryptor));
}
reader.seek(SeekFrom::Start(index_pos))?;
@ -91,7 +120,7 @@ impl BagIndex {
let mut connections = Vec::with_capacity(conn_count as usize);
for _ in 0..conn_count {
let conn = reader.read_conn_info()?;
let conn = ConnInfo::read(reader)?;
trace!("connection: id={}, topic={}", conn.id, conn.topic);
connections.push(conn);
}
@ -102,7 +131,7 @@ impl BagIndex {
let mut chunks = Vec::with_capacity(chunk_count as usize);
for _ in 0..chunk_count {
let chunk = reader.read_chunk_info()?;
let chunk = ChunkInfo::read(reader)?;
chunks.push(chunk);
}
@ -118,7 +147,7 @@ impl BagIndex {
if (version.major, version.minor) == (2, 0) {
Self::read_v2(reader)
} else {
Err(Error::UnsupportedVersion(version))
bail!(error::UnsupportedVersion(version));
}
}
}

View File

@ -1,11 +1,9 @@
mod error;
mod fields;
mod header;
pub mod header;
mod version;
pub use error::{Error, ErrorKind};
pub use fields::Op;
pub use header::Header;
pub use header::{Header, Op};
pub use version::Version;
pub type Input<'a> = &'a [u8];

View File

@ -149,6 +149,12 @@ impl<I: ToOwned + ?Sized> Error<&I> {
}
}
impl<'a> From<Error<&'a [u8]>> for Error<Vec<u8>> {
fn from(e: Error<&'a [u8]>) -> Self {
e.into_owned()
}
}
// adapted from https://fasterthanli.me/series/making-our-own-ping/part-9
impl<I: ErrorInputDisplay> fmt::Display for Error<I> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {

View File

@ -11,8 +11,16 @@ use nom::{
};
use smallvec::SmallVec;
use super::{fields, IResult, Input, Op};
use crate::{Error, Result};
use super::{IResult, Input};
use crate::Result;
mod error;
mod fields;
pub use self::{
error::{FieldDataError, MissingFieldError},
fields::Op,
};
#[derive(Clone, Debug)]
pub struct HeaderField {
@ -70,15 +78,15 @@ impl Header {
.parse(input)
}
pub fn find_field(&self, name: &[u8]) -> Result<&[u8]> {
fn find_field(&self, name: &[u8]) -> Result<&[u8]> {
self.0
.iter()
.find(|field| field.name() == name)
.map(|field| field.value())
.ok_or_else(|| Error::MissingField(String::from_utf8_lossy(name).into_owned()))
.ok_or_else(|| MissingFieldError(String::from_utf8_lossy(name).into_owned()).into())
}
pub fn read_op(&self) -> Result<Op> {
pub fn read_op(&self) -> Result<fields::Op> {
self.find_field(b"op").and_then(fields::Op::parse)
}

26
src/parse/header/error.rs Normal file
View File

@ -0,0 +1,26 @@
use thiserror::Error;
use crate::parse;
#[derive(Debug, Error)]
#[error("missing field {0:?}")]
pub struct MissingFieldError(pub String);
#[derive(Debug, Error)]
#[error("bad field data: {0}")]
pub struct FieldDataError(#[source] pub parse::Error<parse::InputOwned>);
impl<'a> From<parse::Error<parse::Input<'a>>> for FieldDataError {
fn from(err: parse::Error<parse::Input<'a>>) -> Self {
FieldDataError(err.into_owned())
}
}
impl<'a> From<nom::Err<parse::Error<parse::Input<'a>>>> for FieldDataError {
fn from(err: nom::Err<parse::Error<parse::Input<'a>>>) -> Self {
match err {
nom::Err::Error(e) | nom::Err::Failure(e) => e.into(),
nom::Err::Incomplete(_) => unreachable!("incomplete error"),
}
}
}

View File

@ -5,8 +5,7 @@ use nom::{
};
use num_enum::TryFromPrimitive;
use super::Input;
use crate::{Error, Result};
use super::{FieldDataError, Input, Result};
#[derive(Clone, Copy, Debug, TryFromPrimitive, PartialEq)]
#[repr(u8)]
@ -21,21 +20,27 @@ pub enum Op {
impl Op {
pub fn parse(input: Input) -> Result<Self> {
let (_, op) = all_consuming(le_u8).parse(input)?;
let (_, op) = all_consuming(le_u8)
.parse(input)
.map_err(FieldDataError::from)?;
Op::try_from_primitive(op).map_err(Into::into)
}
}
pub fn parse_u32(input: Input) -> Result<u32> {
let (_, x) = all_consuming(le_u32).parse(input)?;
let (_, x) = all_consuming(le_u32)
.parse(input)
.map_err(FieldDataError::from)?;
Ok(x)
}
pub fn parse_u64(input: Input) -> Result<u64> {
let (_, x) = all_consuming(le_u64).parse(input)?;
let (_, x) = all_consuming(le_u64)
.parse(input)
.map_err(FieldDataError::from)?;
Ok(x)
}
pub fn parse_string(input: Input) -> Result<String> {
String::from_utf8(input.to_owned()).map_err(|_| Error::other("invalid utf8"))
Ok(String::from_utf8(input.to_owned())?)
}

View File

@ -3,18 +3,17 @@ use std::io::SeekFrom;
use nom::multi::length_data;
use nom::number::streaming::le_u32;
use crate::chunk::ChunkHeader;
use crate::error::{Error, Result};
use crate::index::{ChunkInfo, ConnInfo};
use crate::error::Result;
use crate::parse::{self, Header, Op, Version};
pub mod error;
mod io;
#[cfg(feature = "mmap")]
mod mmap;
pub use io::IoReader;
pub use self::io::IoReader;
#[cfg(feature = "mmap")]
pub use mmap::MmapReader;
pub use self::mmap::MmapReader;
pub trait BagReader {
type Read: std::io::Read;
@ -37,8 +36,9 @@ pub trait BagReader {
fn read_header_op(&mut self, op: Op) -> Result<Header> {
let header = self.read_header()?;
if header.read_op()? != op {
return Err(Error::other(format!("Expected {:?} op", op)));
let actual_op = header.read_op()?;
if actual_op != op {
return Err(error::UnexpectedOp(op, actual_op).into());
}
Ok(header)
}
@ -56,23 +56,4 @@ pub trait BagReader {
fn read_data(&mut self) -> Result<&[u8]> {
self.read_parser(length_data(le_u32))
}
fn read_conn_info(&mut self) -> Result<ConnInfo> {
let record_header = self.read_header_op(Op::Connection)?;
let conn_header = self.read_header()?;
ConnInfo::from_headers(record_header, conn_header)
}
fn read_chunk_info(&mut self) -> Result<ChunkInfo> {
let header = self.read_header_op(Op::ChunkInfo)?;
// TODO: read connection message counts
self.skip_data()?;
ChunkInfo::from_header(header)
}
fn read_chunk_header(&mut self, pos: u64) -> Result<ChunkHeader> {
self.seek(SeekFrom::Start(pos))?;
let header = self.read_header_op(Op::Chunk)?;
ChunkHeader::from_header(header)
}
}

11
src/reader/error.rs Normal file
View File

@ -0,0 +1,11 @@
use thiserror::Error;
use crate::parse::Op;
#[derive(Debug, Error)]
#[error("unexpected EOF")]
pub struct UnexpectedEof;
#[derive(Debug, Error)]
#[error("expected op {0:?}, got {1:?}")]
pub struct UnexpectedOp(pub Op, pub Op);

View File

@ -2,8 +2,8 @@ use std::io;
use bytes::{Buf, BytesMut};
use super::BagReader;
use crate::{parse, Error, Result};
use super::{error::UnexpectedEof, BagReader};
use crate::{parse, Result};
const READ_SIZE: usize = 4096;
@ -31,7 +31,7 @@ impl<R: io::Read + io::Seek> IoReader<R> {
let read = self.read.read(&mut self.buffer[old_size..])?;
self.buffer.truncate(old_size + read);
if read == 0 {
return Err(Error::Eof);
return Err(UnexpectedEof.into());
}
Ok(())
}
@ -69,7 +69,7 @@ impl<R: io::Read + io::Seek> BagReader for IoReader<R> {
unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?;
}
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => {
return Err(e.into());
return Err(e.into_owned().into());
}
}
}

View File

@ -1,9 +1,10 @@
use std::{fs::File, io, sync::Arc};
use eyre::bail;
use memmap::Mmap;
use super::BagReader;
use crate::{parse, Error, Result};
use super::{error::UnexpectedEof, BagReader};
use crate::{parse, Result};
#[derive(Clone)]
pub struct MmapReader {
@ -46,14 +47,14 @@ impl BagReader for MmapReader {
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,
{
let input = self.mmap.get(self.pos..).ok_or(Error::Eof)?;
let input = self.mmap.get(self.pos..).ok_or(UnexpectedEof)?;
match parser.parse(input) {
Ok((rest, output)) => {
self.pos += input.len() - rest.len();
Ok(output)
}
Err(nom::Err::Incomplete(_)) => Err(Error::Eof),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into()),
Err(nom::Err::Incomplete(_)) => bail!(UnexpectedEof),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
}
}