export to mongodb
This commit is contained in:
parent
f6695008fb
commit
48e90f36c2
@ -16,10 +16,7 @@ fn main() -> Result<()> {
|
|||||||
let bag_path = &args[1];
|
let bag_path = &args[1];
|
||||||
let mut bag = Bag::open(bag_path)?;
|
let mut bag = Bag::open(bag_path)?;
|
||||||
|
|
||||||
bag.compute_message_layouts()?;
|
|
||||||
|
|
||||||
let info = bag.compute_info()?;
|
let info = bag.compute_info()?;
|
||||||
|
|
||||||
info!("bag info: {:#?}", info);
|
info!("bag info: {:#?}", info);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
65
examples/mongobag.rs
Normal file
65
examples/mongobag.rs
Normal file
@ -0,0 +1,65 @@
|
|||||||
|
use std::env::args;
|
||||||
|
|
||||||
|
use bson::Document;
|
||||||
|
use indicatif::ParallelProgressIterator;
|
||||||
|
use log::info;
|
||||||
|
use mongodb::sync::Client;
|
||||||
|
use rayon::iter::ParallelIterator;
|
||||||
|
use rsbag::{chunk::ChunkMessageIterator, Bag, Result};
|
||||||
|
|
||||||
|
fn main() -> Result<()> {
|
||||||
|
color_eyre::install()?;
|
||||||
|
env_logger::init();
|
||||||
|
|
||||||
|
let args: Vec<_> = args().collect();
|
||||||
|
if args.len() != 2 {
|
||||||
|
eprintln!("Usage: {} <bag path>", args[0]);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let bag_path = &args[1];
|
||||||
|
let mut bag = Bag::open(bag_path)?;
|
||||||
|
|
||||||
|
let layouts = bag.compute_message_layouts()?;
|
||||||
|
|
||||||
|
let info = bag.compute_info()?;
|
||||||
|
let total_messages: u64 = info.per_connection.values().sum();
|
||||||
|
info!("exporting {} messages", total_messages);
|
||||||
|
|
||||||
|
let client = Client::with_uri_str("mongodb://localhost:27017")?;
|
||||||
|
let database = client.database("rsbag");
|
||||||
|
let collection = database.collection::<Document>("messages");
|
||||||
|
|
||||||
|
let num_chunks = bag.index().chunks.len();
|
||||||
|
|
||||||
|
bag.read_chunks()
|
||||||
|
.progress_count(num_chunks as u64)
|
||||||
|
.for_each_with(collection, |collection, chunk| {
|
||||||
|
let it = ChunkMessageIterator::new(chunk.unwrap());
|
||||||
|
let mut documents = Vec::new();
|
||||||
|
for msg in it {
|
||||||
|
let msg = msg.unwrap();
|
||||||
|
let (topic, msg) = layouts.decode_bson(&msg).unwrap();
|
||||||
|
documents.push(bson::doc! {
|
||||||
|
"topic": topic,
|
||||||
|
"msg": msg,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
collection.insert_many(documents, None).unwrap();
|
||||||
|
});
|
||||||
|
|
||||||
|
// bag.read_messages()
|
||||||
|
// .progress_count(total_messages)
|
||||||
|
// .map(|msg| {
|
||||||
|
// let msg = msg.unwrap();
|
||||||
|
// layouts.decode_bson(&msg).unwrap()
|
||||||
|
// })
|
||||||
|
// .for_each_with(collection, |collection, (topic, msg)| {
|
||||||
|
// collection.insert_one(bson::doc! {
|
||||||
|
// "topic": topic,
|
||||||
|
// "msg": msg,
|
||||||
|
// }, None).unwrap();
|
||||||
|
// });
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
27
src/bag.rs
27
src/bag.rs
@ -1,14 +1,13 @@
|
|||||||
use std::{fs::File, path::Path};
|
use std::{fs::File, path::Path};
|
||||||
|
|
||||||
use eyre::Context;
|
use eyre::Context;
|
||||||
use log::debug;
|
|
||||||
use rayon::iter::ParallelIterator;
|
use rayon::iter::ParallelIterator;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
chunk::{read_chunks_messages, MessageData},
|
chunk::{read_chunks_data, read_chunks_messages, MessageData},
|
||||||
index::BagIndex,
|
index::BagIndex,
|
||||||
info::BagInfo,
|
info::BagInfo,
|
||||||
message::compute_layout,
|
message::MessageLayouts,
|
||||||
reader::MmapReader,
|
reader::MmapReader,
|
||||||
Result,
|
Result,
|
||||||
};
|
};
|
||||||
@ -28,21 +27,23 @@ impl Bag {
|
|||||||
Ok(Bag { reader, index })
|
Ok(Bag { reader, index })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn compute_message_layouts(&mut self) -> Result<()> {
|
pub fn index(&self) -> &BagIndex {
|
||||||
for conn in &self.index.connections {
|
&self.index
|
||||||
let layout = compute_layout(conn)?;
|
}
|
||||||
debug!("message layout: {:#?}", layout);
|
|
||||||
}
|
pub fn compute_message_layouts(&mut self) -> Result<MessageLayouts> {
|
||||||
Ok(())
|
MessageLayouts::new(&self.index.connections)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn compute_info(&mut self) -> Result<BagInfo> {
|
pub fn compute_info(&mut self) -> Result<BagInfo> {
|
||||||
let reader = self.reader.clone();
|
BagInfo::compute(self.reader.clone(), &self.index)
|
||||||
BagInfo::compute(|| reader.clone(), &self.index)
|
}
|
||||||
|
|
||||||
|
pub fn read_chunks(&mut self) -> impl ParallelIterator<Item = Result<Vec<u8>>> + '_ {
|
||||||
|
read_chunks_data(self.reader.clone(), &self.index.chunks)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_messages(&mut self) -> impl ParallelIterator<Item = Result<MessageData>> + '_ {
|
pub fn read_messages(&mut self) -> impl ParallelIterator<Item = Result<MessageData>> + '_ {
|
||||||
let reader = self.reader.clone();
|
read_chunks_messages(self.reader.clone(), &self.index.chunks)
|
||||||
read_chunks_messages(move || reader.clone(), &self.index.chunks)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
31
src/chunk.rs
31
src/chunk.rs
@ -91,19 +91,19 @@ pub fn read_chunk_data_at<R: BagReader + io::Seek>(
|
|||||||
read_chunk_data(bag_reader).wrap_err_with(|| eyre!("failed to read chunk at offset {}", pos))
|
read_chunk_data(bag_reader).wrap_err_with(|| eyre!("failed to read chunk at offset {}", pos))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_chunks_data<'a, R, F, C>(
|
pub fn read_chunks_data<'a, R, C>(
|
||||||
make_reader: F,
|
reader: R,
|
||||||
chunks: C,
|
chunks: C,
|
||||||
) -> impl ParallelIterator<Item = Result<Vec<u8>>> + 'a
|
) -> impl ParallelIterator<Item = Result<Vec<u8>>> + 'a
|
||||||
where
|
where
|
||||||
R: BagReader + io::Seek,
|
R: BagReader + io::Seek + Clone + Send + 'a,
|
||||||
F: Fn() -> R + Send + Sync + 'a,
|
|
||||||
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
|
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
|
||||||
{
|
{
|
||||||
chunks.into_par_iter().map(move |chunk| {
|
chunks
|
||||||
let mut reader = make_reader();
|
.into_par_iter()
|
||||||
read_chunk_data_at(&mut reader, chunk.pos)
|
.map_with(reader, move |reader, chunk| {
|
||||||
})
|
read_chunk_data_at(reader, chunk.pos)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
@ -132,16 +132,15 @@ pub struct MessageData {
|
|||||||
pub data: Bytes,
|
pub data: Bytes,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_chunks_messages<'a, R, F, C>(
|
pub fn read_chunks_messages<'a, R, C>(
|
||||||
make_reader: F,
|
reader: R,
|
||||||
chunks: C,
|
chunks: C,
|
||||||
) -> impl ParallelIterator<Item = Result<MessageData>> + 'a
|
) -> impl ParallelIterator<Item = Result<MessageData>> + 'a
|
||||||
where
|
where
|
||||||
R: BagReader + io::Seek + 'a,
|
R: BagReader + io::Seek + Clone + Send + 'a,
|
||||||
F: Fn() -> R + Send + Sync + 'a,
|
|
||||||
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
|
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
|
||||||
{
|
{
|
||||||
read_chunks_data(make_reader, chunks).flat_map_iter(move |data| ChunkMessageIterator {
|
read_chunks_data(reader, chunks).flat_map_iter(move |data| ChunkMessageIterator {
|
||||||
reader: data.map(|data| BytesReader::from(Bytes::from(data))),
|
reader: data.map(|data| BytesReader::from(Bytes::from(data))),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -151,6 +150,12 @@ pub struct ChunkMessageIterator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl ChunkMessageIterator {
|
impl ChunkMessageIterator {
|
||||||
|
pub fn new<B: Into<Bytes>>(b: B) -> Self {
|
||||||
|
Self {
|
||||||
|
reader: Ok(BytesReader::from(b.into())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn next_impl(&mut self) -> Result<Option<MessageData>> {
|
fn next_impl(&mut self) -> Result<Option<MessageData>> {
|
||||||
let reader = match &mut self.reader {
|
let reader = match &mut self.reader {
|
||||||
Ok(reader) => reader,
|
Ok(reader) => reader,
|
||||||
|
14
src/info.rs
14
src/info.rs
@ -24,16 +24,15 @@ impl BagInfo {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn compute<R, F>(make_reader: F, index: &BagIndex) -> Result<BagInfo>
|
pub fn compute<R>(reader: R, index: &BagIndex) -> Result<BagInfo>
|
||||||
where
|
where
|
||||||
R: BagReader + io::Seek,
|
R: BagReader + io::Seek + Clone + Send + Sync,
|
||||||
F: Fn() -> R + Send + Sync,
|
|
||||||
{
|
{
|
||||||
index
|
index
|
||||||
.chunks
|
.chunks
|
||||||
.par_iter()
|
.par_iter()
|
||||||
.try_fold(BagInfo::default, move |mut info, chunk| -> Result<_> {
|
.try_fold(BagInfo::default, move |mut info, chunk| -> Result<_> {
|
||||||
let mut reader = make_reader();
|
let mut reader = reader.clone();
|
||||||
reader.seek(io::SeekFrom::Start(chunk.pos))?;
|
reader.seek(io::SeekFrom::Start(chunk.pos))?;
|
||||||
let chunk_header = ChunkHeader::read(&mut reader)?;
|
let chunk_header = ChunkHeader::read(&mut reader)?;
|
||||||
info.total_uncompressed += chunk_header.uncompressed_size as u64;
|
info.total_uncompressed += chunk_header.uncompressed_size as u64;
|
||||||
@ -48,12 +47,11 @@ impl BagInfo {
|
|||||||
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))
|
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn compute_without_index<R, F>(make_reader: F, index: &BagIndex) -> Result<BagInfo>
|
pub fn compute_without_index<R>(reader: R, index: &BagIndex) -> Result<BagInfo>
|
||||||
where
|
where
|
||||||
R: BagReader + io::Seek,
|
R: BagReader + io::Seek + Clone + Send,
|
||||||
F: Fn() -> R + Send + Sync,
|
|
||||||
{
|
{
|
||||||
read_chunks_messages(make_reader, &index.chunks)
|
read_chunks_messages(reader, &index.chunks)
|
||||||
.try_fold(BagInfo::default, |mut info, chunk| -> Result<_> {
|
.try_fold(BagInfo::default, |mut info, chunk| -> Result<_> {
|
||||||
let data = chunk?;
|
let data = chunk?;
|
||||||
info.total_uncompressed += data.data.len() as u64;
|
info.total_uncompressed += data.data.len() as u64;
|
||||||
|
@ -129,6 +129,10 @@ pub struct MessageLayout {
|
|||||||
pub type MessageLayoutRef = std::sync::Arc<MessageLayout>;
|
pub type MessageLayoutRef = std::sync::Arc<MessageLayout>;
|
||||||
|
|
||||||
impl MessageLayout {
|
impl MessageLayout {
|
||||||
|
pub fn fields(&self) -> &[FieldLayout] {
|
||||||
|
&self.fields
|
||||||
|
}
|
||||||
|
|
||||||
pub fn from_msgs<'a, M, I>(msgs: M) -> Result<MessageLayout>
|
pub fn from_msgs<'a, M, I>(msgs: M) -> Result<MessageLayout>
|
||||||
where
|
where
|
||||||
M: IntoIterator<IntoIter = I>,
|
M: IntoIterator<IntoIter = I>,
|
||||||
|
@ -1,11 +1,14 @@
|
|||||||
use std::convert::TryFrom;
|
use std::{collections::HashMap, convert::TryFrom};
|
||||||
|
|
||||||
|
use eyre::eyre;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use log::trace;
|
use log::trace;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use ros_message::{MessagePath, Msg};
|
use ros_message::{MessagePath, MessageValue, Msg};
|
||||||
|
|
||||||
use crate::{index::ConnInfo, layout::MessageLayout, Result};
|
use crate::{
|
||||||
|
chunk::MessageData, index::ConnInfo, layout::MessageLayout, parse::parse_message, Result,
|
||||||
|
};
|
||||||
|
|
||||||
fn parse_msg(message_name: &str, msgdef: &str) -> Result<Msg> {
|
fn parse_msg(message_name: &str, msgdef: &str) -> Result<Msg> {
|
||||||
trace!("message definition {}: {}", message_name, msgdef);
|
trace!("message definition {}: {}", message_name, msgdef);
|
||||||
@ -48,3 +51,56 @@ pub fn compute_layout(conn: &ConnInfo) -> Result<MessageLayout> {
|
|||||||
let msgs = parse_msgs(conn)?;
|
let msgs = parse_msgs(conn)?;
|
||||||
MessageLayout::from_msgs(&msgs)
|
MessageLayout::from_msgs(&msgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ConnData {
|
||||||
|
topic: String,
|
||||||
|
layout: MessageLayout,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ConnData {
|
||||||
|
fn new(conn: &ConnInfo) -> Result<ConnData> {
|
||||||
|
let layout = compute_layout(conn)?;
|
||||||
|
Ok(ConnData {
|
||||||
|
topic: conn.topic.clone(),
|
||||||
|
layout,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub struct MessageLayouts {
|
||||||
|
conns: HashMap<u32, ConnData>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MessageLayouts {
|
||||||
|
pub fn new<'a, I: IntoIterator<Item = &'a ConnInfo>>(connections: I) -> Result<MessageLayouts> {
|
||||||
|
let mut conns = HashMap::new();
|
||||||
|
for conn in connections {
|
||||||
|
conns.insert(conn.id, ConnData::new(conn)?);
|
||||||
|
}
|
||||||
|
Ok(MessageLayouts { conns })
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn decode(&self, message: &MessageData) -> Result<(String, MessageValue)> {
|
||||||
|
let conn = self
|
||||||
|
.conns
|
||||||
|
.get(&message.header.conn_id)
|
||||||
|
.ok_or_else(|| eyre!("missing connection: {}", message.header.conn_id))?;
|
||||||
|
|
||||||
|
let value = parse_message(&conn.layout, message.data.as_ref())?;
|
||||||
|
|
||||||
|
Ok((conn.topic.clone(), value))
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "bson")]
|
||||||
|
pub fn decode_bson(&self, message: &MessageData) -> Result<(String, bson::Document)> {
|
||||||
|
use crate::parse::parse_message_bson;
|
||||||
|
|
||||||
|
let conn = self
|
||||||
|
.conns
|
||||||
|
.get(&message.header.conn_id)
|
||||||
|
.ok_or_else(|| eyre!("missing connection: {}", message.header.conn_id))?;
|
||||||
|
|
||||||
|
let value = parse_message_bson(&conn.layout, message.data.as_ref())?;
|
||||||
|
|
||||||
|
Ok((conn.topic.clone(), value))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1,9 +1,15 @@
|
|||||||
mod error;
|
mod error;
|
||||||
pub mod header;
|
pub mod header;
|
||||||
|
mod message;
|
||||||
|
#[cfg(feature = "bson")]
|
||||||
|
mod message_bson;
|
||||||
mod version;
|
mod version;
|
||||||
|
|
||||||
pub use error::{Error, ErrorKind};
|
pub use error::{Error, ErrorKind};
|
||||||
pub use header::{Header, Op};
|
pub use header::{Header, Op};
|
||||||
|
pub use message::parse_message;
|
||||||
|
#[cfg(feature = "bson")]
|
||||||
|
pub use message_bson::parse_message_bson;
|
||||||
pub use version::Version;
|
pub use version::Version;
|
||||||
|
|
||||||
pub type Input<'a> = &'a [u8];
|
pub type Input<'a> = &'a [u8];
|
||||||
|
@ -7,7 +7,7 @@ use nom::{
|
|||||||
},
|
},
|
||||||
InputLength,
|
InputLength,
|
||||||
};
|
};
|
||||||
use std::fmt::Write;
|
use std::{fmt::Write, string::FromUtf8Error};
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
#[derive(Clone, Debug, PartialEq, Error)]
|
#[derive(Clone, Debug, PartialEq, Error)]
|
||||||
@ -20,6 +20,8 @@ pub enum ErrorKind {
|
|||||||
Context(&'static str),
|
Context(&'static str),
|
||||||
#[error("invalid integer: {0}")]
|
#[error("invalid integer: {0}")]
|
||||||
InvalidInteger(#[from] ParseIntError),
|
InvalidInteger(#[from] ParseIntError),
|
||||||
|
#[error("invalid utf8: {0}")]
|
||||||
|
InvalidUtf8(#[from] FromUtf8Error),
|
||||||
#[error("expected identifier (like /[a-zA-Z][A-Za-z0-9_]*/)")]
|
#[error("expected identifier (like /[a-zA-Z][A-Za-z0-9_]*/)")]
|
||||||
ExpectedIdentifier,
|
ExpectedIdentifier,
|
||||||
}
|
}
|
||||||
|
86
src/parse/message.rs
Normal file
86
src/parse/message.rs
Normal file
@ -0,0 +1,86 @@
|
|||||||
|
use eyre::bail;
|
||||||
|
use nom::{
|
||||||
|
combinator::map_res,
|
||||||
|
multi::{length_count, length_data, many_m_n},
|
||||||
|
number::complete::{
|
||||||
|
le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8,
|
||||||
|
},
|
||||||
|
Parser,
|
||||||
|
};
|
||||||
|
use ros_message::{MessageValue, Value};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
layout::{FieldLayout, FieldType, MessageLayout, Multiplicity},
|
||||||
|
parse::{header::fields::parse_time, ErrorKind},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{Error, IResult, Input};
|
||||||
|
use crate::Result;
|
||||||
|
|
||||||
|
fn type_parser<'a, 'b: 'a>(
|
||||||
|
typ: &'a FieldType,
|
||||||
|
) -> impl Parser<Input<'b>, Value, Error<Input<'b>>> + 'a {
|
||||||
|
move |input| -> IResult<Value> {
|
||||||
|
match typ {
|
||||||
|
FieldType::Bool => le_u8.map(|v| Value::Bool(v != 0)).parse(input),
|
||||||
|
FieldType::I8(_) => le_i8.map(Value::I8).parse(input),
|
||||||
|
FieldType::I16 => le_i16.map(Value::I16).parse(input),
|
||||||
|
FieldType::I32 => le_i32.map(Value::I32).parse(input),
|
||||||
|
FieldType::I64 => le_i64.map(Value::I64).parse(input),
|
||||||
|
FieldType::U8(_) => le_u8.map(Value::U8).parse(input),
|
||||||
|
FieldType::U16 => le_u16.map(Value::U16).parse(input),
|
||||||
|
FieldType::U32 => le_u32.map(Value::U32).parse(input),
|
||||||
|
FieldType::U64 => le_u64.map(Value::U64).parse(input),
|
||||||
|
FieldType::F32 => le_f32.map(Value::F32).parse(input),
|
||||||
|
FieldType::F64 => le_f64.map(Value::F64).parse(input),
|
||||||
|
FieldType::String => map_res(length_data(le_u32), |s| {
|
||||||
|
String::from_utf8(Vec::from(s))
|
||||||
|
.map_err(ErrorKind::from)
|
||||||
|
.map(Value::String)
|
||||||
|
})
|
||||||
|
.parse(input),
|
||||||
|
FieldType::Time => parse_time.map(Value::Time).parse(input),
|
||||||
|
FieldType::Duration => todo!(),
|
||||||
|
FieldType::Message(layout) => message_parser(layout.as_ref())
|
||||||
|
.map(Value::Message)
|
||||||
|
.parse(input),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn field_parser<'a, 'b: 'a>(
|
||||||
|
field: &'a FieldLayout,
|
||||||
|
) -> impl Parser<Input<'b>, (String, Value), Error<Input<'b>>> + 'a {
|
||||||
|
move |input| -> IResult<(String, Value)> {
|
||||||
|
let mut parser = type_parser(&field.typ);
|
||||||
|
let (rest, value) = match field.multiplicity {
|
||||||
|
Multiplicity::Unit => parser.parse(input),
|
||||||
|
Multiplicity::Fixed(n) => many_m_n(n, n, parser).map(Value::Array).parse(input),
|
||||||
|
Multiplicity::Dynamic => length_count(le_u32, parser).map(Value::Array).parse(input),
|
||||||
|
}?;
|
||||||
|
Ok((rest, (String::from(field.name.as_str()), value)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn message_parser<'a, 'b: 'a>(
|
||||||
|
layout: &'a MessageLayout,
|
||||||
|
) -> impl Parser<Input<'b>, MessageValue, Error<Input<'b>>> + 'a {
|
||||||
|
move |mut input| -> IResult<_> {
|
||||||
|
let mut message = MessageValue::with_capacity(layout.fields().len());
|
||||||
|
for field in layout.fields() {
|
||||||
|
let (rest, (name, value)) = field_parser(field).parse(input)?;
|
||||||
|
input = rest;
|
||||||
|
message.insert(name, value);
|
||||||
|
}
|
||||||
|
Ok((input, message))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_message(layout: &MessageLayout, input: &[u8]) -> Result<MessageValue> {
|
||||||
|
match message_parser(layout).parse(input) {
|
||||||
|
Ok((&[], message)) => Ok(message),
|
||||||
|
Ok(_) => bail!("extra data after message"),
|
||||||
|
Err(nom::Err::Incomplete(_)) => unreachable!(),
|
||||||
|
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
|
||||||
|
}
|
||||||
|
}
|
107
src/parse/message_bson.rs
Normal file
107
src/parse/message_bson.rs
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
use bson::{Bson, Document};
|
||||||
|
use eyre::bail;
|
||||||
|
use nom::{
|
||||||
|
bytes::complete::take,
|
||||||
|
combinator::map_res,
|
||||||
|
multi::{length_count, length_data, many_m_n},
|
||||||
|
number::complete::{
|
||||||
|
le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8,
|
||||||
|
},
|
||||||
|
Parser,
|
||||||
|
};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
layout::{FieldLayout, FieldType, MessageLayout, Multiplicity},
|
||||||
|
parse::{header::fields::parse_time, ErrorKind},
|
||||||
|
};
|
||||||
|
|
||||||
|
use super::{Error, IResult, Input};
|
||||||
|
use crate::Result;
|
||||||
|
|
||||||
|
fn type_parser<'a, 'b: 'a>(
|
||||||
|
typ: &'a FieldType,
|
||||||
|
) -> impl Parser<Input<'b>, Bson, Error<Input<'b>>> + 'a {
|
||||||
|
move |input| -> IResult<Bson> {
|
||||||
|
match typ {
|
||||||
|
FieldType::Bool => le_u8.map(|v| Bson::Boolean(v != 0)).parse(input),
|
||||||
|
FieldType::I8(_) => le_i8.map(|v| Bson::Int32(v as i32)).parse(input),
|
||||||
|
FieldType::I16 => le_i16.map(|v| Bson::Int32(v as i32)).parse(input),
|
||||||
|
FieldType::I32 => le_i32.map(Bson::Int32).parse(input),
|
||||||
|
FieldType::I64 => le_i64.map(Bson::Int64).parse(input),
|
||||||
|
FieldType::U8(_) => le_u8.map(|v| Bson::Int32(v as i32)).parse(input),
|
||||||
|
FieldType::U16 => le_u16.map(|v| Bson::Int32(v as i32)).parse(input),
|
||||||
|
FieldType::U32 => le_u32.map(|v| Bson::Int64(v as i64)).parse(input),
|
||||||
|
FieldType::U64 => le_u64.map(|v| Bson::Int64(v as i64)).parse(input),
|
||||||
|
FieldType::F32 => le_f32.map(|v| Bson::Double(v as f64)).parse(input),
|
||||||
|
FieldType::F64 => le_f64.map(Bson::Double).parse(input),
|
||||||
|
FieldType::String => map_res(length_data(le_u32), |s| {
|
||||||
|
String::from_utf8(Vec::from(s))
|
||||||
|
.map_err(ErrorKind::from)
|
||||||
|
.map(Bson::String)
|
||||||
|
})
|
||||||
|
.parse(input),
|
||||||
|
FieldType::Time => parse_time
|
||||||
|
.map(|t| bson::bson!({"sec": (t.sec as i64), "nsec": (t.nsec as i64)}))
|
||||||
|
.parse(input),
|
||||||
|
FieldType::Duration => todo!(),
|
||||||
|
FieldType::Message(layout) => message_parser(layout.as_ref())
|
||||||
|
.map(Bson::Document)
|
||||||
|
.parse(input),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn field_parser<'a, 'b: 'a>(
|
||||||
|
field: &'a FieldLayout,
|
||||||
|
) -> impl Parser<Input<'b>, (String, Bson), Error<Input<'b>>> + 'a {
|
||||||
|
move |input| -> IResult<(String, Bson)> {
|
||||||
|
let mut parser = type_parser(&field.typ);
|
||||||
|
let (rest, value) = match field.multiplicity {
|
||||||
|
Multiplicity::Unit => parser.parse(input),
|
||||||
|
Multiplicity::Fixed(n) => match field.typ {
|
||||||
|
FieldType::U8(_) => take(n)
|
||||||
|
.map(|b| bson::Binary {
|
||||||
|
subtype: bson::spec::BinarySubtype::Generic,
|
||||||
|
bytes: Vec::from(b),
|
||||||
|
})
|
||||||
|
.map(Bson::from)
|
||||||
|
.parse(input),
|
||||||
|
_ => many_m_n(n, n, parser).map(Bson::Array).parse(input),
|
||||||
|
},
|
||||||
|
Multiplicity::Dynamic => match field.typ {
|
||||||
|
FieldType::U8(_) => length_data(le_u32)
|
||||||
|
.map(|b| bson::Binary {
|
||||||
|
subtype: bson::spec::BinarySubtype::Generic,
|
||||||
|
bytes: Vec::from(b),
|
||||||
|
})
|
||||||
|
.map(Bson::from)
|
||||||
|
.parse(input),
|
||||||
|
_ => length_count(le_u32, parser).map(Bson::Array).parse(input),
|
||||||
|
},
|
||||||
|
}?;
|
||||||
|
Ok((rest, (String::from(field.name.as_str()), value)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn message_parser<'a, 'b: 'a>(
|
||||||
|
layout: &'a MessageLayout,
|
||||||
|
) -> impl Parser<Input<'b>, Document, Error<Input<'b>>> + 'a {
|
||||||
|
move |mut input| -> IResult<_> {
|
||||||
|
let mut message = Document::new();
|
||||||
|
for field in layout.fields() {
|
||||||
|
let (rest, (name, value)) = field_parser(field).parse(input)?;
|
||||||
|
input = rest;
|
||||||
|
message.insert(name, value);
|
||||||
|
}
|
||||||
|
Ok((input, message))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn parse_message_bson(layout: &MessageLayout, input: &[u8]) -> Result<Document> {
|
||||||
|
match message_parser(layout).parse(input) {
|
||||||
|
Ok((&[], message)) => Ok(message),
|
||||||
|
Ok(_) => bail!("extra data after message"),
|
||||||
|
Err(nom::Err::Incomplete(_)) => unreachable!(),
|
||||||
|
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user