commit eef7484866d327128f120fd1ba6d21a81a88a3cc Author: Alex Mikhalev Date: Thu Nov 18 15:11:56 2021 -0800 Initial commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..96ef6c0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/target +Cargo.lock diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e83060e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,23 @@ +[package] +name = "rsbag" +version = "0.1.0" +edition = "2018" +build = "build.rs" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +bytes = "1.1.0" +lalrpop-util = "0.19.6" +log = "0.4.14" +nom = "7.0.0" +num_enum = "0.5.4" +smallvec = "1.6.1" +thiserror = "1.0.28" + +[dev-dependencies] +env_logger = "0.9.0" +eyre = "0.6.5" + +[build-dependencies] +lalrpop = "0.19.6" diff --git a/build.rs b/build.rs new file mode 100644 index 0000000..228a429 --- /dev/null +++ b/build.rs @@ -0,0 +1,6 @@ +fn main() { + lalrpop::Configuration::new() + .generate_in_source_tree() + .process() + .unwrap(); +} diff --git a/examples/bag_info.rs b/examples/bag_info.rs new file mode 100644 index 0000000..7c86692 --- /dev/null +++ b/examples/bag_info.rs @@ -0,0 +1,34 @@ +use std::{env::args, fs::File}; + +use log::{error, info, trace}; +use nom::Parser; +use rsbag::{index::BagIndex, reader::IoReader}; + +fn main() { + env_logger::init(); + + let args: Vec<_> = args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + return; + } + + let bag_path = &args[1]; + let bag_file = File::open(bag_path).expect("Could not open bag file"); + let mut bag_reader = IoReader::new(bag_file); + + match BagIndex::read_all(&mut bag_reader) { + Ok(index) => { + for conn in &index.connections { + let msgdef = conn.message_definition().unwrap(); + trace!("message definition: {}", msgdef); + let parser = rsbag::message_definition::grammar::MessageDefinitionParser::new(); + match parser.parse(&msgdef) { + Ok(def) => info!("message definition parsed: {}", def), + Err(err) => error!("message definition parse error: {}", err), + } + } + }, + Err(err) => error!("bag parse error: {}", err), + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..d2b7855 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,49 @@ +use num_enum::TryFromPrimitiveError; +use thiserror::Error; + +use crate::parse::{self, Op}; + +#[derive(Debug, Error)] +pub enum Error { + #[error("i/o error: {0}")] + Io(#[from] std::io::Error), + #[error("{0}")] + Parse(parse::Error), + #[error("unsupported version: {0}")] + UnsupportedVersion(parse::Version), + #[error("unsupported encryptor: {0}")] + UnsupportedEncryptor(String), + #[error("unexpected EOF")] + Eof, + #[error("invalid header op: {0}")] + InvalidOp(#[from] TryFromPrimitiveError), + #[error("missing field: {0:?}")] + MissingField(String), + #[error("bag is unindexed")] + Unindexed, + #[error("{0}")] + Other(String), +} + +impl<'a> From>> for Error { + fn from(err: parse::Error) -> Self { + Error::Parse(err.into_owned()) + } +} + +impl<'a> From>>> for Error { + fn from(err: nom::Err>>) -> Self { + match err { + nom::Err::Error(e) | nom::Err::Failure(e) => e.into(), + nom::Err::Incomplete(_) => panic!("incomplete error"), + } + } +} + +impl Error { + pub fn other>(message: S) -> Self { + Error::Other(message.into()) + } +} + +pub type Result = core::result::Result; diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..8c1ab92 --- /dev/null +++ b/src/index.rs @@ -0,0 +1,124 @@ +use std::io::SeekFrom; + +use log::trace; + +use crate::{ + parse::{Header, Op}, + reader::BagReader, + Error, Result, +}; + +#[derive(Debug)] +pub struct ConnInfo { + pub id: u32, + pub topic: String, + pub conn_header: Header, +} + +impl ConnInfo { + pub fn from_headers(record_header: Header, conn_header: Header) -> Result { + Ok(ConnInfo { + id: record_header.read_u32(b"conn")?, + topic: record_header.read_string(b"topic")?, + conn_header, + }) + } + + pub fn message_definition(&self) -> Result { + self.conn_header.read_string(b"message_definition") + } + + pub fn datatype(&self) -> Result { + self.conn_header.read_string(b"type") + } + + pub fn md5sum(&self) -> Result { + self.conn_header.read_string(b"md5sum") + } +} + +#[derive(Debug)] +pub struct ChunkInfo { + pub pos: u64, + pub start_time: u64, // TODO: unpack time + pub end_time: u64, + pub conn_count: u32, +} + +impl ChunkInfo { + pub fn from_header(header: Header) -> Result { + if header.read_u32(b"ver")? != 1 { + return Err(Error::other("unsupported ChunkInfo version")); + } + Ok(ChunkInfo { + pos: header.read_u64(b"chunk_pos")?, + start_time: header.read_u64(b"start_time")?, + end_time: header.read_u64(b"end_time")?, + conn_count: header.read_u32(b"count")?, + }) + } +} + +#[derive(Debug)] +pub struct BagIndex { + pub connections: Vec, + pub chunks: Vec, +} + +impl BagIndex { + fn read_v2(reader: &mut R) -> Result { + let file_header = reader.read_header_op(Op::FileHeader)?; + + let data_length = reader.read_data_length()?; + trace!("data length: {}", data_length); + + let index_pos = file_header.read_u64(b"index_pos")?; + trace!("index pos: {}", index_pos); + + if index_pos == 0 { + return Err(Error::Unindexed); + } + + if let Ok(encryptor) = file_header.read_string(b"encryptor") { + return Err(Error::UnsupportedEncryptor(encryptor)); + } + + reader.seek(SeekFrom::Start(index_pos))?; + + let conn_count = file_header.read_u32(b"conn_count")?; + trace!("connection count: {}", conn_count); + + let mut connections = Vec::with_capacity(conn_count as usize); + + for _ in 0..conn_count { + let conn = reader.read_conn_info()?; + trace!("connection: id={}, topic={}", conn.id, conn.topic); + connections.push(conn); + } + + let chunk_count = file_header.read_u32(b"chunk_count")?; + trace!("chunk count: {}", chunk_count); + + let mut chunks = Vec::with_capacity(chunk_count as usize); + + for _ in 0..chunk_count { + let chunk = reader.read_chunk_info()?; + chunks.push(chunk); + } + + Ok(BagIndex { + connections, + chunks, + }) + } + + pub fn read_all(mut reader: &mut R) -> Result { + let version = reader.read_version()?; + trace!("bag version: {}", version); + if (version.major, version.minor) == (2, 0) { + Self::read_v2(reader) + } else { + Err(Error::UnsupportedVersion(version)) + } + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..e342684 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,7 @@ +mod error; +pub mod index; +pub mod parse; +pub mod reader; +pub mod message_definition; + +pub use error::{Error, Result}; diff --git a/src/message_definition.rs b/src/message_definition.rs new file mode 100644 index 0000000..b47f0ce --- /dev/null +++ b/src/message_definition.rs @@ -0,0 +1,2 @@ +pub mod ast; +pub mod grammar; diff --git a/src/message_definition/.gitignore b/src/message_definition/.gitignore new file mode 100644 index 0000000..39cd0aa --- /dev/null +++ b/src/message_definition/.gitignore @@ -0,0 +1 @@ +/grammar.rs \ No newline at end of file diff --git a/src/message_definition/ast.rs b/src/message_definition/ast.rs new file mode 100644 index 0000000..f5a3c53 --- /dev/null +++ b/src/message_definition/ast.rs @@ -0,0 +1,140 @@ +use core::fmt::{self, Write}; + +pub type SmallStr = String; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum PrimitiveType { + Bool, + Int8, + Uint8, + Int16, + Uint16, + Int32, + Uint32, + Int64, + Uint64, + Float32, + Float64, +} + +impl PrimitiveType { + fn as_str(self) -> &'static str { + match self { + PrimitiveType::Bool => "bool", + PrimitiveType::Int8 => "int8_t", + PrimitiveType::Uint8 => "uint8_t", + PrimitiveType::Int16 => "int16_t", + PrimitiveType::Uint16 => "uint16_t", + PrimitiveType::Int32 => "int32_t", + PrimitiveType::Uint32 => "uint32_t", + PrimitiveType::Int64 => "int64_t", + PrimitiveType::Uint64 => "uint64_t", + PrimitiveType::Float32 => "float32", + PrimitiveType::Float64 => "float64", + } + } + + pub fn size(self) -> usize { + match self { + PrimitiveType::Bool => 1, + PrimitiveType::Int8 => 1, + PrimitiveType::Uint8 => 1, + PrimitiveType::Int16 => 2, + PrimitiveType::Uint16 => 2, + PrimitiveType::Int32 => 4, + PrimitiveType::Uint32 => 4, + PrimitiveType::Int64 => 8, + PrimitiveType::Uint64 => 8, + PrimitiveType::Float32 => 4, + PrimitiveType::Float64 => 8, + } + } +} + +impl fmt::Display for PrimitiveType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(self.as_str()) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct ArrayType { + pub element_type: Box, + pub length: Option, +} + +impl ArrayType { + pub fn new(element_type: FieldType, length: Option) -> Self { + Self { + element_type: Box::new(element_type), + length, + } + } +} + +impl fmt::Display for ArrayType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.element_type.fmt(f)?; + f.write_char('[')?; + if let Some(length) = self.length { + length.fmt(f)?; + } + f.write_char(']') + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum FieldType { + Primitive(PrimitiveType), + String, + Time, + Duration, + Header, + Array(ArrayType), + Other(SmallStr), +} + +impl fmt::Display for FieldType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + FieldType::Primitive(primitive) => primitive.fmt(f), + FieldType::String => "string".fmt(f), + FieldType::Time => "time".fmt(f), + FieldType::Duration => "duration".fmt(f), + FieldType::Header => "Header".fmt(f), + FieldType::Array(array) => array.fmt(f), + FieldType::Other(other) => other.fmt(f), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct MessageField { + pub name: SmallStr, + pub typ: FieldType, +} + +impl fmt::Display for MessageField { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{} {}", &self.typ, &self.name) + } +} + +#[derive(Clone, Debug, PartialEq)] +pub struct MessageDefinition { + pub fields: Vec, +} + +impl fmt::Display for MessageDefinition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for field in &self.fields { + writeln!(f, "{}", field)?; + } + Ok(()) + } +} + +pub struct MessageDefinitions { + pub primary: MessageDefinition, + pub dependencies: Vec<(String, MessageDefinition)>, +} \ No newline at end of file diff --git a/src/message_definition/grammar.lalrpop b/src/message_definition/grammar.lalrpop new file mode 100644 index 0000000..e911577 --- /dev/null +++ b/src/message_definition/grammar.lalrpop @@ -0,0 +1,90 @@ +use super::ast::{ + PrimitiveType, ArrayType, FieldType, MessageField, MessageDefinition, MessageDefinitions +}; +use std::str::FromStr; + +grammar; + +match { + "bool", + "int8", + "uint8", + "int16", + "uint16", + "int32", + "uint32", + "int64", + "uint64", + "float32", + "float64", + "string", + "time", + "duration", + "Header", + "[", + "]", + r"==+\r?\nMSG: " => "MESSAGE_BOUNDARY", + r"[0-9]+" => "LENGTH", + r"[\t ]*" => { }, + r"#[^\n\r]*" => { }, // Skip `# comments` +} else { + r"[a-zA-Z][a-zA-Z0-9_]*" => "IDENT", +} else { + r"[a-zA-Z][a-zA-Z0-9_]*(/[a-zA-Z][a-zA-Z0-9_]*)*" => "TYPENAME", + r"\r?\n" => "CRLF", +} + +PrimitiveType: PrimitiveType = { + "bool" => PrimitiveType::Bool, + "int8" => PrimitiveType::Int8, + "uint8" => PrimitiveType::Uint8, + "int16" => PrimitiveType::Int16, + "uint16" => PrimitiveType::Uint16, + "int32" => PrimitiveType::Int32, + "uint32" => PrimitiveType::Uint32, + "int64" => PrimitiveType::Int64, + "uint64" => PrimitiveType::Uint64, + "float32" => PrimitiveType::Float32, + "float64" => PrimitiveType::Float64, +}; + +ArrayLength: Option = { + "[" <"LENGTH"> "]" => Some(usize::from_str(<>).unwrap()), + "[" "]" => None, +}; + +ArrayType: ArrayType = + FieldType ArrayLength => ArrayType::new(<>); + +Ident: String = "IDENT" => <>.to_string(); +TypeName: String = { + "TYPENAME" => <>.to_string(), + "IDENT" => <>.to_string(), +}; + +FieldType: FieldType = { + PrimitiveType => FieldType::Primitive(<>), + "string" => FieldType::String, + "time" => FieldType::Time, + "duration" => FieldType::Duration, + "Header" => FieldType::Header, + ArrayType => FieldType::Array(<>), + TypeName => FieldType::Other(<>), +} + +MessageField: MessageField = { + => MessageField { <> } +} + +MessageFields: Vec = + <( "CRLF"+)*> => <>; + +pub MessageDefinition: MessageDefinition = + "CRLF"* => MessageDefinition { fields }; + +NamedMessageDefinition: (String, MessageDefinition) = + "MESSAGE_BOUNDARY" "CRLF" => (<>); + +pub MessageDefinitions: MessageDefinitions = + + => MessageDefinitions { <> }; diff --git a/src/parse.rs b/src/parse.rs new file mode 100644 index 0000000..b458a04 --- /dev/null +++ b/src/parse.rs @@ -0,0 +1,18 @@ +mod error; +mod fields; +mod header; +mod version; + +pub use error::{Error, ErrorKind}; +pub use fields::Op; +pub use header::Header; +pub use version::Version; + +pub type Input<'a> = &'a [u8]; +pub type InputOwned = Vec; +pub type IResult<'a, T> = nom::IResult, T, Error>>; + +pub type InputStr<'a> = &'a str; +pub type IResultStr<'a, T> = nom::IResult, T, Error>>; + +pub type SmallStr = String; diff --git a/src/parse/error.rs b/src/parse/error.rs new file mode 100644 index 0000000..c17ca61 --- /dev/null +++ b/src/parse/error.rs @@ -0,0 +1,321 @@ +use super::InputOwned; +use core::{fmt, mem, num::ParseIntError, ops::Range}; +use nom::{ + error::{ + ContextError as NomContextError, ErrorKind as NomErrorKind, FromExternalError, + ParseError as NomParseError, + }, + InputLength, +}; +use std::fmt::Write; +use thiserror::Error; + +#[derive(Clone, Debug, PartialEq, Error)] +pub enum ErrorKind { + #[error("nom error {0:?}")] + Nom(NomErrorKind), + #[error("expected '{0}'")] + ExpectedChar(char), + #[error("...in {0}")] + Context(&'static str), + #[error("invalid integer: {0}")] + InvalidInteger(#[from] ParseIntError), + #[error("expected identifier (like /[a-zA-Z][A-Za-z0-9_]*/)")] + ExpectedIdentifier, +} + +#[allow(clippy::len_without_is_empty)] +pub trait ErrorInput: InputLength { + /// Offset of (start of) other within self. + /// Returns None if the start of other is not in self, + /// or if the operation is otherwise not supported + fn offset(&self, other: &Self) -> Option; +} + +#[allow(clippy::len_without_is_empty)] +pub trait ErrorInputDisplay: fmt::Debug { + fn len(&self) -> usize; + + fn write_range(&self, range: Range, f: &mut fmt::Formatter) -> fmt::Result; + + fn pointer() -> &'static str; + fn line() -> &'static str; + fn space() -> &'static str; +} + +#[derive(Clone, PartialEq, Debug)] +pub struct Error { + pub input: I, + pub errors: Vec<(Option>, ErrorKind)>, +} + +impl Error { + pub fn new(input: I, kind: ErrorKind) -> Self { + let errors = vec![(Some(0..input.input_len()), kind)]; + Self { input, errors } + } + + fn append_kind(&mut self, mut input: I, kind: ErrorKind) { + let range = if let Some(offset) = self.input.offset(&input) { + // if the new input is contained in self, just use that + Some(offset..(offset + input.input_len())) + } else if let Some(offset) = input.offset(&self.input) { + // if the new input is before self, replace the self input + // and update all range offsets + let input_len = input.input_len(); + mem::swap(&mut self.input, &mut input); + for err in &mut self.errors { + if let Some(err_range) = &mut err.0 { + err_range.start += offset; + err_range.end += offset; + } + } + Some(0..input_len) + } else { + None + }; + self.errors.push((range, kind)); + } +} + +pub fn error_context( + mut f: F, + mut p: P, +) -> impl FnMut(I) -> nom::IResult> +where + F: FnMut() -> ErrorKind, + P: nom::Parser>, +{ + move |data| { + p.parse(data.clone()).map_err(|err| { + err.map(|mut err| { + err.append_kind(data, f()); + err + }) + }) + } +} + +impl NomParseError for Error { + fn from_error_kind(input: I, kind: NomErrorKind) -> Self { + Self::new(input, ErrorKind::Nom(kind)) + } + + fn append(input: I, kind: NomErrorKind, mut other: Self) -> Self { + other.append_kind(input, ErrorKind::Nom(kind)); + other + } + + fn from_char(input: I, c: char) -> Self { + Self::new(input, ErrorKind::ExpectedChar(c)) + } +} + +impl NomContextError for Error { + fn add_context(input: I, ctx: &'static str, mut other: Self) -> Self { + other.append_kind(input, ErrorKind::Context(ctx)); + other + } +} + +impl> FromExternalError for Error { + fn from_external_error(input: I, _kind: NomErrorKind, e: E) -> Self { + Self::new(input, e.into()) + } +} + +impl Error { + pub fn map_input(self, mut f: F) -> Error + where + F: FnMut(I) -> I2, + { + Error { + input: f(self.input), + errors: self.errors, + } + } +} + +impl<'a> From> for Error<&'a [u8]> { + fn from(e: Error<&'a str>) -> Self { + e.map_input(str::as_bytes) + } +} + +impl Error<&I> { + pub fn into_owned(self) -> Error { + self.map_input(I::to_owned) + } +} + +// adapted from https://fasterthanli.me/series/making-our-own-ping/part-9 +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + writeln!(f, "parsing error")?; + + let margin_left = 4; + let margin_str = " ".repeat(margin_left); + + // maximum amount of binary data we'll dump per line + let maxlen = 60; + + // given a big slice, an offset, and a length, attempt to show + // some data before, some data after, and highlight which part + // we're talking about with tildes. + let print_slice = |f: &mut fmt::Formatter, s: &I, range: &Range| -> fmt::Result { + // decide which part of `s` we're going to show. + let (range, offset, len) = { + // see diagram further in article. + let offset = range.start; + let len = range.end - range.start; + + let avail_after = s.len() - offset; + let after = std::cmp::min(avail_after, maxlen / 2); + + let avail_before = offset; + let before = std::cmp::min(avail_before, maxlen / 2); + + let new_start = offset - before; + let new_end = offset + after; + let new_offset = before; + let new_len = std::cmp::min(new_end - new_start, len); + + (new_start..new_end, new_offset, new_len) + }; + + f.write_str(&margin_str)?; + s.write_range(range.clone(), f)?; + writeln!(f)?; + + write!(f, "{}", margin_str)?; + for i in 0..range.len() { + // each byte takes three characters, ie "FF " + if i == offset + len - 1 { + // ..except the last one + f.write_str(I::pointer())?; + } else if (offset..offset + len).contains(&i) { + f.write_str(I::line())?; + } else { + f.write_str(I::space())?; + }; + } + writeln!(f)?; + + Ok(()) + }; + + for (range, kind) in self.errors.iter().rev() { + writeln!(f, "{}", kind)?; + if let Some(range) = range { + print_slice(f, &self.input, range)?; + } + } + Ok(()) + } +} + +// // TODO: should this really be the same as Display? +// impl fmt::Debug for Error { +// fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { +// ::fmt(self, f) +// } +// } + +impl std::error::Error for Error {} + +impl<'a> ErrorInput for &'a [u8] { + fn offset(&self, other: &Self) -> Option { + (other.as_ptr() as usize) + .checked_sub(self.as_ptr() as usize) + .filter(|&offset| offset < self.len()) + } +} + +impl<'a> ErrorInputDisplay for &'a [u8] { + fn len(&self) -> usize { + <[u8]>::len(self) + } + fn write_range(&self, range: std::ops::Range, f: &mut fmt::Formatter) -> fmt::Result { + for b in &self[range] { + write!(f, "{:02X} ", b)?; + } + Ok(()) + } + fn pointer() -> &'static str { + "~~" + } + fn line() -> &'static str { + "~~~" + } + fn space() -> &'static str { + " " + } +} + +impl ErrorInputDisplay for InputOwned { + fn len(&self) -> usize { + Vec::len(self) + } + fn write_range(&self, range: std::ops::Range, f: &mut fmt::Formatter) -> fmt::Result { + self.as_slice().write_range(range, f) + } + fn pointer() -> &'static str { + <&'static [u8] as ErrorInputDisplay>::pointer() + } + fn line() -> &'static str { + <&'static [u8] as ErrorInputDisplay>::line() + } + fn space() -> &'static str { + <&'static [u8] as ErrorInputDisplay>::space() + } +} + +impl<'a> ErrorInput for &'a str { + fn offset(&self, other: &Self) -> Option { + (other.as_ptr() as usize).checked_sub(self.as_ptr() as usize) + } +} + +impl<'a> ErrorInputDisplay for &'a str { + fn len(&self) -> usize { + str::len(self) + } + fn write_range(&self, range: std::ops::Range, f: &mut fmt::Formatter) -> fmt::Result { + for c in self[range].chars() { + f.write_char(match c { + '\r' => '␍', + '\n' => '␊', + c if c.is_control() => '␣', + c => c, + })?; + } + Ok(()) + } + fn pointer() -> &'static str { + "~" + } + fn line() -> &'static str { + "~" + } + fn space() -> &'static str { + " " + } +} + +impl ErrorInputDisplay for String { + fn len(&self) -> usize { + String::len(self) + } + fn write_range(&self, range: std::ops::Range, f: &mut fmt::Formatter) -> fmt::Result { + self.as_str().write_range(range, f) + } + fn pointer() -> &'static str { + <&'static str as ErrorInputDisplay>::pointer() + } + fn line() -> &'static str { + <&'static str as ErrorInputDisplay>::line() + } + fn space() -> &'static str { + <&'static str as ErrorInputDisplay>::space() + } +} diff --git a/src/parse/fields.rs b/src/parse/fields.rs new file mode 100644 index 0000000..47efada --- /dev/null +++ b/src/parse/fields.rs @@ -0,0 +1,41 @@ +use nom::{ + combinator::all_consuming, + number::complete::{le_u32, le_u64, le_u8}, + Parser, +}; +use num_enum::TryFromPrimitive; + +use super::Input; +use crate::{Error, Result}; + +#[derive(Clone, Copy, Debug, TryFromPrimitive, PartialEq)] +#[repr(u8)] +pub enum Op { + MsgData = 0x02, + FileHeader = 0x03, + IndexData = 0x04, + Chunk = 0x05, + ChunkInfo = 0x06, + Connection = 0x07, +} + +impl Op { + pub fn parse(input: Input) -> Result { + let (_, op) = all_consuming(le_u8).parse(input)?; + Op::try_from_primitive(op).map_err(Into::into) + } +} + +pub fn parse_u32(input: Input) -> Result { + let (_, x) = all_consuming(le_u32).parse(input)?; + Ok(x) +} + +pub fn parse_u64(input: Input) -> Result { + let (_, x) = all_consuming(le_u64).parse(input)?; + Ok(x) +} + +pub fn parse_string(input: Input) -> Result { + String::from_utf8(input.to_owned()).map_err(|_| Error::other("invalid utf8")) +} diff --git a/src/parse/header.rs b/src/parse/header.rs new file mode 100644 index 0000000..f963d91 --- /dev/null +++ b/src/parse/header.rs @@ -0,0 +1,111 @@ +use core::fmt::{self, Write as _}; + +use nom::{ + bytes::complete::{tag, take_until}, + combinator::rest, + error::context, + multi::{length_value, many0}, + number::{complete::le_u32 as le_u32_complete, streaming::le_u32 as le_u32_streaming}, + sequence::separated_pair, + Parser, +}; +use smallvec::SmallVec; + +use super::{fields, IResult, Input, Op}; +use crate::{Error, Result}; + +#[derive(Clone, Debug)] +pub struct HeaderField { + name: SmallVec<[u8; 16]>, + value: SmallVec<[u8; 16]>, +} + +impl HeaderField { + pub fn name(&self) -> &[u8] { + &self.name + } + + pub fn value(&self) -> &[u8] { + &self.value + } + + pub fn parse(input: Input) -> IResult { + // Uses complete combinators because Header has length + context( + "Header Entry", + length_value( + le_u32_complete, + separated_pair(take_until("="), tag("="), rest), + ), + ) + .map(|(name, value)| HeaderField { + name: SmallVec::from_slice(name), + value: SmallVec::from_slice(value), + }) + .parse(input) + } +} + +impl fmt::Display for HeaderField { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let name = String::from_utf8_lossy(self.name.as_ref()); + write!(f, "{}=", name)?; + for byte in self.value.iter().copied() { + write!(f, "{:02x} ", byte)?; + } + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct Header(pub Vec); + +impl Header { + pub fn parse(input: Input) -> IResult { + context( + "Header", + length_value(le_u32_streaming, many0(HeaderField::parse)), + ) + .map(Header) + .parse(input) + } + + pub fn find_field(&self, name: &[u8]) -> Result<&[u8]> { + self.0 + .iter() + .find(|field| field.name() == name) + .map(|field| field.value()) + .ok_or_else(|| Error::MissingField(String::from_utf8_lossy(name).into_owned())) + } + + pub fn read_op(&self) -> Result { + self.find_field(b"op").and_then(fields::Op::parse) + } + + pub fn read_u64(&self, field: &[u8]) -> Result { + self.find_field(field).and_then(fields::parse_u64) + } + + pub fn read_u32(&self, field: &[u8]) -> Result { + self.find_field(field).and_then(fields::parse_u32) + } + + pub fn read_string(&self, field: &[u8]) -> Result { + self.find_field(field).and_then(fields::parse_string) + } +} + +impl fmt::Display for Header { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut first = true; + for header in &self.0 { + if first { + first = false; + } else { + f.write_char('\n')?; + } + header.fmt(f)?; + } + Ok(()) + } +} diff --git a/src/parse/version.rs b/src/parse/version.rs new file mode 100644 index 0000000..182f5c5 --- /dev/null +++ b/src/parse/version.rs @@ -0,0 +1,45 @@ +use core::fmt; + +use nom::{ + bytes::streaming::{is_a, tag}, + error::context, + sequence::tuple, + Parser, +}; + +use super::{IResult, Input}; + +#[derive(Clone, Debug, PartialEq)] +pub struct Version { + pub major: u16, + pub minor: u16, +} + +impl fmt::Display for Version { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}.{}", self.major, self.minor) + } +} + +fn decimal_u16(input: Input) -> IResult { + is_a("0123456789") + .map(|a| String::from_utf8_lossy(a).parse::().unwrap()) + .parse(input) +} + +impl Version { + pub fn parse(input: Input) -> IResult { + context( + "Version Header", + tuple(( + tag(b"#ROSBAG V"), + decimal_u16, + tag(b"."), + decimal_u16, + tag(b"\n"), + )) + .map(|(_, major, _, minor, _)| Version { major, minor }), + ) + .parse(input) + } +} diff --git a/src/reader.rs b/src/reader.rs new file mode 100644 index 0000000..d749069 --- /dev/null +++ b/src/reader.rs @@ -0,0 +1,140 @@ +use std::io::{self, SeekFrom}; + +use bytes::{Buf, BytesMut}; +use nom::number::streaming::le_u32; + +use crate::error::{Error, Result}; +use crate::index::{ChunkInfo, ConnInfo}; +use crate::parse::{self, Header, Op, Version}; + +const READ_SIZE: usize = 4096; + +pub trait BagReader { + fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result + where + P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>; + + fn seek(&mut self, pos: SeekFrom) -> Result<()>; + + fn read_version(&mut self) -> Result { + self.read_parser(Version::parse) + } + + fn read_header(&mut self) -> Result
{ + self.read_parser(Header::parse) + } + + fn read_header_op(&mut self, op: Op) -> Result
{ + let header = self.read_header()?; + if header.read_op()? != op { + return Err(Error::other(format!("Expected {:?} op", op))); + } + Ok(header) + } + + fn read_data_length(&mut self) -> Result { + self.read_parser(le_u32) + } + + fn skip_data(&mut self) -> Result<()> { + let data_length = self.read_data_length()?; + self.seek(io::SeekFrom::Current(data_length as i64))?; + Ok(()) + } + + fn read_conn_info(&mut self) -> Result { + let record_header = self.read_header_op(Op::Connection)?; + let conn_header = self.read_header()?; + ConnInfo::from_headers(record_header, conn_header) + } + + fn read_chunk_info(&mut self) -> Result { + let header = self.read_header_op(Op::ChunkInfo)?; + // TODO: read connection message counts + self.skip_data()?; + ChunkInfo::from_header(header) + } +} + +pub struct IoReader { + read: R, + buffer: BytesMut, + consumed: usize, +} + +impl IoReader { + pub fn new(read: R) -> Self { + Self { + read, + buffer: BytesMut::with_capacity(READ_SIZE), + consumed: 0, + } + } + + fn read_more(&mut self, n: usize) -> Result<()> { + if n == 0 { + return Ok(()); + } + let old_size = self.buffer.len(); + self.buffer.resize(old_size + n, 0); + let read = self.read.read(&mut self.buffer[old_size..])?; + self.buffer.truncate(old_size + read); + if read == 0 { + return Err(Error::Eof); + } + Ok(()) + } +} + +impl BagReader for IoReader { + fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result + where + P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, + { + self.buffer.advance(self.consumed); + self.consumed = 0; + + let this = self as *mut Self; + + loop { + match parser.parse(&self.buffer) { + Ok((rest, output)) => { + self.consumed += self.buffer.len() - rest.len(); + return Ok(output); + } + Err(nom::Err::Incomplete(needed)) => { + let needed = match needed { + nom::Needed::Unknown => 0, + nom::Needed::Size(n) => n.get(), + }; + // Safety: this.buffer is only borrowed in the Ok case above, which + // immediately returns. + unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?; + } + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => { + return Err(e.into()); + } + } + } + } + + fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> { + if let io::SeekFrom::Current(pos) = &mut pos { + // If seeking relative to current position, subtract data + // read from the file but not yet consumed. + let remaining = (self.buffer.len() - self.consumed) as i64; + let new_pos = *pos - remaining; + if *pos >= 0 && new_pos < 0 { + // The new position is within the already read data, just consume more data + self.consumed += *pos as usize; + return Ok(()); + } + *pos = new_pos; + } + + self.buffer.clear(); + self.consumed = 0; + self.read.seek(pos)?; + Ok(()) + } +}