From 48e90f36c2dbb76958c49b89e6deb47daaed0c35 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Mon, 22 Nov 2021 00:07:58 -0800 Subject: [PATCH] export to mongodb --- examples/bag_info.rs | 3 -- examples/mongobag.rs | 65 +++++++++++++++++++++++ src/bag.rs | 27 +++++----- src/chunk.rs | 31 ++++++----- src/info.rs | 14 +++-- src/layout.rs | 4 ++ src/message.rs | 62 ++++++++++++++++++++-- src/parse.rs | 6 +++ src/parse/error.rs | 4 +- src/parse/message.rs | 86 ++++++++++++++++++++++++++++++ src/parse/message_bson.rs | 107 ++++++++++++++++++++++++++++++++++++++ 11 files changed, 368 insertions(+), 41 deletions(-) create mode 100644 examples/mongobag.rs create mode 100644 src/parse/message.rs create mode 100644 src/parse/message_bson.rs diff --git a/examples/bag_info.rs b/examples/bag_info.rs index a41b55b..41f470d 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -16,10 +16,7 @@ fn main() -> Result<()> { let bag_path = &args[1]; let mut bag = Bag::open(bag_path)?; - bag.compute_message_layouts()?; - let info = bag.compute_info()?; - info!("bag info: {:#?}", info); Ok(()) diff --git a/examples/mongobag.rs b/examples/mongobag.rs new file mode 100644 index 0000000..e48230c --- /dev/null +++ b/examples/mongobag.rs @@ -0,0 +1,65 @@ +use std::env::args; + +use bson::Document; +use indicatif::ParallelProgressIterator; +use log::info; +use mongodb::sync::Client; +use rayon::iter::ParallelIterator; +use rsbag::{chunk::ChunkMessageIterator, Bag, Result}; + +fn main() -> Result<()> { + color_eyre::install()?; + env_logger::init(); + + let args: Vec<_> = args().collect(); + if args.len() != 2 { + eprintln!("Usage: {} ", args[0]); + return Ok(()); + } + + let bag_path = &args[1]; + let mut bag = Bag::open(bag_path)?; + + let layouts = bag.compute_message_layouts()?; + + let info = bag.compute_info()?; + let total_messages: u64 = info.per_connection.values().sum(); + info!("exporting {} messages", total_messages); + + let client = Client::with_uri_str("mongodb://localhost:27017")?; + let database = client.database("rsbag"); + let collection = database.collection::("messages"); + + let num_chunks = bag.index().chunks.len(); + + bag.read_chunks() + .progress_count(num_chunks as u64) + .for_each_with(collection, |collection, chunk| { + let it = ChunkMessageIterator::new(chunk.unwrap()); + let mut documents = Vec::new(); + for msg in it { + let msg = msg.unwrap(); + let (topic, msg) = layouts.decode_bson(&msg).unwrap(); + documents.push(bson::doc! { + "topic": topic, + "msg": msg, + }); + } + collection.insert_many(documents, None).unwrap(); + }); + + // bag.read_messages() + // .progress_count(total_messages) + // .map(|msg| { + // let msg = msg.unwrap(); + // layouts.decode_bson(&msg).unwrap() + // }) + // .for_each_with(collection, |collection, (topic, msg)| { + // collection.insert_one(bson::doc! { + // "topic": topic, + // "msg": msg, + // }, None).unwrap(); + // }); + + Ok(()) +} diff --git a/src/bag.rs b/src/bag.rs index 6132351..c5b2beb 100644 --- a/src/bag.rs +++ b/src/bag.rs @@ -1,14 +1,13 @@ use std::{fs::File, path::Path}; use eyre::Context; -use log::debug; use rayon::iter::ParallelIterator; use crate::{ - chunk::{read_chunks_messages, MessageData}, + chunk::{read_chunks_data, read_chunks_messages, MessageData}, index::BagIndex, info::BagInfo, - message::compute_layout, + message::MessageLayouts, reader::MmapReader, Result, }; @@ -28,21 +27,23 @@ impl Bag { Ok(Bag { reader, index }) } - pub fn compute_message_layouts(&mut self) -> Result<()> { - for conn in &self.index.connections { - let layout = compute_layout(conn)?; - debug!("message layout: {:#?}", layout); - } - Ok(()) + pub fn index(&self) -> &BagIndex { + &self.index + } + + pub fn compute_message_layouts(&mut self) -> Result { + MessageLayouts::new(&self.index.connections) } pub fn compute_info(&mut self) -> Result { - let reader = self.reader.clone(); - BagInfo::compute(|| reader.clone(), &self.index) + BagInfo::compute(self.reader.clone(), &self.index) + } + + pub fn read_chunks(&mut self) -> impl ParallelIterator>> + '_ { + read_chunks_data(self.reader.clone(), &self.index.chunks) } pub fn read_messages(&mut self) -> impl ParallelIterator> + '_ { - let reader = self.reader.clone(); - read_chunks_messages(move || reader.clone(), &self.index.chunks) + read_chunks_messages(self.reader.clone(), &self.index.chunks) } } diff --git a/src/chunk.rs b/src/chunk.rs index 170bd7a..a8d0850 100644 --- a/src/chunk.rs +++ b/src/chunk.rs @@ -91,19 +91,19 @@ pub fn read_chunk_data_at( read_chunk_data(bag_reader).wrap_err_with(|| eyre!("failed to read chunk at offset {}", pos)) } -pub fn read_chunks_data<'a, R, F, C>( - make_reader: F, +pub fn read_chunks_data<'a, R, C>( + reader: R, chunks: C, ) -> impl ParallelIterator>> + 'a where - R: BagReader + io::Seek, - F: Fn() -> R + Send + Sync + 'a, + R: BagReader + io::Seek + Clone + Send + 'a, C: IntoParallelIterator + 'a, { - chunks.into_par_iter().map(move |chunk| { - let mut reader = make_reader(); - read_chunk_data_at(&mut reader, chunk.pos) - }) + chunks + .into_par_iter() + .map_with(reader, move |reader, chunk| { + read_chunk_data_at(reader, chunk.pos) + }) } #[derive(Debug)] @@ -132,16 +132,15 @@ pub struct MessageData { pub data: Bytes, } -pub fn read_chunks_messages<'a, R, F, C>( - make_reader: F, +pub fn read_chunks_messages<'a, R, C>( + reader: R, chunks: C, ) -> impl ParallelIterator> + 'a where - R: BagReader + io::Seek + 'a, - F: Fn() -> R + Send + Sync + 'a, + R: BagReader + io::Seek + Clone + Send + 'a, C: IntoParallelIterator + 'a, { - read_chunks_data(make_reader, chunks).flat_map_iter(move |data| ChunkMessageIterator { + read_chunks_data(reader, chunks).flat_map_iter(move |data| ChunkMessageIterator { reader: data.map(|data| BytesReader::from(Bytes::from(data))), }) } @@ -151,6 +150,12 @@ pub struct ChunkMessageIterator { } impl ChunkMessageIterator { + pub fn new>(b: B) -> Self { + Self { + reader: Ok(BytesReader::from(b.into())), + } + } + fn next_impl(&mut self) -> Result> { let reader = match &mut self.reader { Ok(reader) => reader, diff --git a/src/info.rs b/src/info.rs index 01f01ec..a3e3d87 100644 --- a/src/info.rs +++ b/src/info.rs @@ -24,16 +24,15 @@ impl BagInfo { self } - pub fn compute(make_reader: F, index: &BagIndex) -> Result + pub fn compute(reader: R, index: &BagIndex) -> Result where - R: BagReader + io::Seek, - F: Fn() -> R + Send + Sync, + R: BagReader + io::Seek + Clone + Send + Sync, { index .chunks .par_iter() .try_fold(BagInfo::default, move |mut info, chunk| -> Result<_> { - let mut reader = make_reader(); + let mut reader = reader.clone(); reader.seek(io::SeekFrom::Start(chunk.pos))?; let chunk_header = ChunkHeader::read(&mut reader)?; info.total_uncompressed += chunk_header.uncompressed_size as u64; @@ -48,12 +47,11 @@ impl BagInfo { .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) } - pub fn compute_without_index(make_reader: F, index: &BagIndex) -> Result + pub fn compute_without_index(reader: R, index: &BagIndex) -> Result where - R: BagReader + io::Seek, - F: Fn() -> R + Send + Sync, + R: BagReader + io::Seek + Clone + Send, { - read_chunks_messages(make_reader, &index.chunks) + read_chunks_messages(reader, &index.chunks) .try_fold(BagInfo::default, |mut info, chunk| -> Result<_> { let data = chunk?; info.total_uncompressed += data.data.len() as u64; diff --git a/src/layout.rs b/src/layout.rs index 9c45c22..122c915 100644 --- a/src/layout.rs +++ b/src/layout.rs @@ -129,6 +129,10 @@ pub struct MessageLayout { pub type MessageLayoutRef = std::sync::Arc; impl MessageLayout { + pub fn fields(&self) -> &[FieldLayout] { + &self.fields + } + pub fn from_msgs<'a, M, I>(msgs: M) -> Result where M: IntoIterator, diff --git a/src/message.rs b/src/message.rs index f4dd21f..b730ec3 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,11 +1,14 @@ -use std::convert::TryFrom; +use std::{collections::HashMap, convert::TryFrom}; +use eyre::eyre; use lazy_static::lazy_static; use log::trace; use regex::Regex; -use ros_message::{MessagePath, Msg}; +use ros_message::{MessagePath, MessageValue, Msg}; -use crate::{index::ConnInfo, layout::MessageLayout, Result}; +use crate::{ + chunk::MessageData, index::ConnInfo, layout::MessageLayout, parse::parse_message, Result, +}; fn parse_msg(message_name: &str, msgdef: &str) -> Result { trace!("message definition {}: {}", message_name, msgdef); @@ -48,3 +51,56 @@ pub fn compute_layout(conn: &ConnInfo) -> Result { let msgs = parse_msgs(conn)?; MessageLayout::from_msgs(&msgs) } + +struct ConnData { + topic: String, + layout: MessageLayout, +} + +impl ConnData { + fn new(conn: &ConnInfo) -> Result { + let layout = compute_layout(conn)?; + Ok(ConnData { + topic: conn.topic.clone(), + layout, + }) + } +} +pub struct MessageLayouts { + conns: HashMap, +} + +impl MessageLayouts { + pub fn new<'a, I: IntoIterator>(connections: I) -> Result { + let mut conns = HashMap::new(); + for conn in connections { + conns.insert(conn.id, ConnData::new(conn)?); + } + Ok(MessageLayouts { conns }) + } + + pub fn decode(&self, message: &MessageData) -> Result<(String, MessageValue)> { + let conn = self + .conns + .get(&message.header.conn_id) + .ok_or_else(|| eyre!("missing connection: {}", message.header.conn_id))?; + + let value = parse_message(&conn.layout, message.data.as_ref())?; + + Ok((conn.topic.clone(), value)) + } + + #[cfg(feature = "bson")] + pub fn decode_bson(&self, message: &MessageData) -> Result<(String, bson::Document)> { + use crate::parse::parse_message_bson; + + let conn = self + .conns + .get(&message.header.conn_id) + .ok_or_else(|| eyre!("missing connection: {}", message.header.conn_id))?; + + let value = parse_message_bson(&conn.layout, message.data.as_ref())?; + + Ok((conn.topic.clone(), value)) + } +} diff --git a/src/parse.rs b/src/parse.rs index f9b0273..0b01875 100644 --- a/src/parse.rs +++ b/src/parse.rs @@ -1,9 +1,15 @@ mod error; pub mod header; +mod message; +#[cfg(feature = "bson")] +mod message_bson; mod version; pub use error::{Error, ErrorKind}; pub use header::{Header, Op}; +pub use message::parse_message; +#[cfg(feature = "bson")] +pub use message_bson::parse_message_bson; pub use version::Version; pub type Input<'a> = &'a [u8]; diff --git a/src/parse/error.rs b/src/parse/error.rs index b5ac198..c949bc9 100644 --- a/src/parse/error.rs +++ b/src/parse/error.rs @@ -7,7 +7,7 @@ use nom::{ }, InputLength, }; -use std::fmt::Write; +use std::{fmt::Write, string::FromUtf8Error}; use thiserror::Error; #[derive(Clone, Debug, PartialEq, Error)] @@ -20,6 +20,8 @@ pub enum ErrorKind { Context(&'static str), #[error("invalid integer: {0}")] InvalidInteger(#[from] ParseIntError), + #[error("invalid utf8: {0}")] + InvalidUtf8(#[from] FromUtf8Error), #[error("expected identifier (like /[a-zA-Z][A-Za-z0-9_]*/)")] ExpectedIdentifier, } diff --git a/src/parse/message.rs b/src/parse/message.rs new file mode 100644 index 0000000..bb73b8a --- /dev/null +++ b/src/parse/message.rs @@ -0,0 +1,86 @@ +use eyre::bail; +use nom::{ + combinator::map_res, + multi::{length_count, length_data, many_m_n}, + number::complete::{ + le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8, + }, + Parser, +}; +use ros_message::{MessageValue, Value}; + +use crate::{ + layout::{FieldLayout, FieldType, MessageLayout, Multiplicity}, + parse::{header::fields::parse_time, ErrorKind}, +}; + +use super::{Error, IResult, Input}; +use crate::Result; + +fn type_parser<'a, 'b: 'a>( + typ: &'a FieldType, +) -> impl Parser, Value, Error>> + 'a { + move |input| -> IResult { + match typ { + FieldType::Bool => le_u8.map(|v| Value::Bool(v != 0)).parse(input), + FieldType::I8(_) => le_i8.map(Value::I8).parse(input), + FieldType::I16 => le_i16.map(Value::I16).parse(input), + FieldType::I32 => le_i32.map(Value::I32).parse(input), + FieldType::I64 => le_i64.map(Value::I64).parse(input), + FieldType::U8(_) => le_u8.map(Value::U8).parse(input), + FieldType::U16 => le_u16.map(Value::U16).parse(input), + FieldType::U32 => le_u32.map(Value::U32).parse(input), + FieldType::U64 => le_u64.map(Value::U64).parse(input), + FieldType::F32 => le_f32.map(Value::F32).parse(input), + FieldType::F64 => le_f64.map(Value::F64).parse(input), + FieldType::String => map_res(length_data(le_u32), |s| { + String::from_utf8(Vec::from(s)) + .map_err(ErrorKind::from) + .map(Value::String) + }) + .parse(input), + FieldType::Time => parse_time.map(Value::Time).parse(input), + FieldType::Duration => todo!(), + FieldType::Message(layout) => message_parser(layout.as_ref()) + .map(Value::Message) + .parse(input), + } + } +} + +fn field_parser<'a, 'b: 'a>( + field: &'a FieldLayout, +) -> impl Parser, (String, Value), Error>> + 'a { + move |input| -> IResult<(String, Value)> { + let mut parser = type_parser(&field.typ); + let (rest, value) = match field.multiplicity { + Multiplicity::Unit => parser.parse(input), + Multiplicity::Fixed(n) => many_m_n(n, n, parser).map(Value::Array).parse(input), + Multiplicity::Dynamic => length_count(le_u32, parser).map(Value::Array).parse(input), + }?; + Ok((rest, (String::from(field.name.as_str()), value))) + } +} + +fn message_parser<'a, 'b: 'a>( + layout: &'a MessageLayout, +) -> impl Parser, MessageValue, Error>> + 'a { + move |mut input| -> IResult<_> { + let mut message = MessageValue::with_capacity(layout.fields().len()); + for field in layout.fields() { + let (rest, (name, value)) = field_parser(field).parse(input)?; + input = rest; + message.insert(name, value); + } + Ok((input, message)) + } +} + +pub fn parse_message(layout: &MessageLayout, input: &[u8]) -> Result { + match message_parser(layout).parse(input) { + Ok((&[], message)) => Ok(message), + Ok(_) => bail!("extra data after message"), + Err(nom::Err::Incomplete(_)) => unreachable!(), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), + } +} diff --git a/src/parse/message_bson.rs b/src/parse/message_bson.rs new file mode 100644 index 0000000..443b251 --- /dev/null +++ b/src/parse/message_bson.rs @@ -0,0 +1,107 @@ +use bson::{Bson, Document}; +use eyre::bail; +use nom::{ + bytes::complete::take, + combinator::map_res, + multi::{length_count, length_data, many_m_n}, + number::complete::{ + le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8, + }, + Parser, +}; + +use crate::{ + layout::{FieldLayout, FieldType, MessageLayout, Multiplicity}, + parse::{header::fields::parse_time, ErrorKind}, +}; + +use super::{Error, IResult, Input}; +use crate::Result; + +fn type_parser<'a, 'b: 'a>( + typ: &'a FieldType, +) -> impl Parser, Bson, Error>> + 'a { + move |input| -> IResult { + match typ { + FieldType::Bool => le_u8.map(|v| Bson::Boolean(v != 0)).parse(input), + FieldType::I8(_) => le_i8.map(|v| Bson::Int32(v as i32)).parse(input), + FieldType::I16 => le_i16.map(|v| Bson::Int32(v as i32)).parse(input), + FieldType::I32 => le_i32.map(Bson::Int32).parse(input), + FieldType::I64 => le_i64.map(Bson::Int64).parse(input), + FieldType::U8(_) => le_u8.map(|v| Bson::Int32(v as i32)).parse(input), + FieldType::U16 => le_u16.map(|v| Bson::Int32(v as i32)).parse(input), + FieldType::U32 => le_u32.map(|v| Bson::Int64(v as i64)).parse(input), + FieldType::U64 => le_u64.map(|v| Bson::Int64(v as i64)).parse(input), + FieldType::F32 => le_f32.map(|v| Bson::Double(v as f64)).parse(input), + FieldType::F64 => le_f64.map(Bson::Double).parse(input), + FieldType::String => map_res(length_data(le_u32), |s| { + String::from_utf8(Vec::from(s)) + .map_err(ErrorKind::from) + .map(Bson::String) + }) + .parse(input), + FieldType::Time => parse_time + .map(|t| bson::bson!({"sec": (t.sec as i64), "nsec": (t.nsec as i64)})) + .parse(input), + FieldType::Duration => todo!(), + FieldType::Message(layout) => message_parser(layout.as_ref()) + .map(Bson::Document) + .parse(input), + } + } +} + +fn field_parser<'a, 'b: 'a>( + field: &'a FieldLayout, +) -> impl Parser, (String, Bson), Error>> + 'a { + move |input| -> IResult<(String, Bson)> { + let mut parser = type_parser(&field.typ); + let (rest, value) = match field.multiplicity { + Multiplicity::Unit => parser.parse(input), + Multiplicity::Fixed(n) => match field.typ { + FieldType::U8(_) => take(n) + .map(|b| bson::Binary { + subtype: bson::spec::BinarySubtype::Generic, + bytes: Vec::from(b), + }) + .map(Bson::from) + .parse(input), + _ => many_m_n(n, n, parser).map(Bson::Array).parse(input), + }, + Multiplicity::Dynamic => match field.typ { + FieldType::U8(_) => length_data(le_u32) + .map(|b| bson::Binary { + subtype: bson::spec::BinarySubtype::Generic, + bytes: Vec::from(b), + }) + .map(Bson::from) + .parse(input), + _ => length_count(le_u32, parser).map(Bson::Array).parse(input), + }, + }?; + Ok((rest, (String::from(field.name.as_str()), value))) + } +} + +fn message_parser<'a, 'b: 'a>( + layout: &'a MessageLayout, +) -> impl Parser, Document, Error>> + 'a { + move |mut input| -> IResult<_> { + let mut message = Document::new(); + for field in layout.fields() { + let (rest, (name, value)) = field_parser(field).parse(input)?; + input = rest; + message.insert(name, value); + } + Ok((input, message)) + } +} + +pub fn parse_message_bson(layout: &MessageLayout, input: &[u8]) -> Result { + match message_parser(layout).parse(input) { + Ok((&[], message)) => Ok(message), + Ok(_) => bail!("extra data after message"), + Err(nom::Err::Incomplete(_)) => unreachable!(), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), + } +}