From 0888937b68fe480e58db71c059fcca6aaf116156 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 23 Nov 2021 15:31:08 -0800 Subject: [PATCH] add more info to mongobag --- Cargo.toml | 5 ++++- examples/mongobag.rs | 5 +++-- src/index.rs | 2 +- src/message.rs | 14 +++++++------- 4 files changed, 15 insertions(+), 11 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3560805..1793537 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,7 +4,7 @@ version = "0.1.0" edition = "2018" [features] -default = ["mmap", "rayon"] +default = ["mmap", "rayon", "bson"] mmap = ["memmap"] [dependencies] @@ -23,11 +23,14 @@ smallvec = "1.6.1" thiserror = "1.0.28" smol_str = { version = "0.1.17", default-features = false } indexmap = "1.7.0" +bson = { version = "1.2.3", optional = true } [dev-dependencies] color-eyre = "0.5.11" env_logger = "0.9.0" eyre = "0.6.5" +indicatif = { version = "0.16.2", features = ["rayon"] } +mongodb = { version = "2.0.2", default-features = false, features = ["sync"] } [profile.release] debug = true diff --git a/examples/mongobag.rs b/examples/mongobag.rs index e48230c..acfc0a8 100644 --- a/examples/mongobag.rs +++ b/examples/mongobag.rs @@ -39,9 +39,10 @@ fn main() -> Result<()> { let mut documents = Vec::new(); for msg in it { let msg = msg.unwrap(); - let (topic, msg) = layouts.decode_bson(&msg).unwrap(); + let (info, msg) = layouts.decode_bson(&msg).unwrap(); documents.push(bson::doc! { - "topic": topic, + "topic": info.topic.clone(), + "msgtype": info.datatype.clone(), "msg": msg, }); } diff --git a/src/index.rs b/src/index.rs index cae0979..ac4fcd7 100644 --- a/src/index.rs +++ b/src/index.rs @@ -11,7 +11,7 @@ use crate::{ Result, Time, }; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct ConnInfo { pub id: u32, pub topic: String, diff --git a/src/message.rs b/src/message.rs index b730ec3..538e1ce 100644 --- a/src/message.rs +++ b/src/message.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, convert::TryFrom}; +use std::{collections::HashMap, convert::TryFrom, sync::Arc}; use eyre::eyre; use lazy_static::lazy_static; @@ -53,7 +53,7 @@ pub fn compute_layout(conn: &ConnInfo) -> Result { } struct ConnData { - topic: String, + info: Arc, layout: MessageLayout, } @@ -61,7 +61,7 @@ impl ConnData { fn new(conn: &ConnInfo) -> Result { let layout = compute_layout(conn)?; Ok(ConnData { - topic: conn.topic.clone(), + info: Arc::new(conn.clone()), layout, }) } @@ -79,7 +79,7 @@ impl MessageLayouts { Ok(MessageLayouts { conns }) } - pub fn decode(&self, message: &MessageData) -> Result<(String, MessageValue)> { + pub fn decode(&self, message: &MessageData) -> Result<(Arc, MessageValue)> { let conn = self .conns .get(&message.header.conn_id) @@ -87,11 +87,11 @@ impl MessageLayouts { let value = parse_message(&conn.layout, message.data.as_ref())?; - Ok((conn.topic.clone(), value)) + Ok((conn.info.clone(), value)) } #[cfg(feature = "bson")] - pub fn decode_bson(&self, message: &MessageData) -> Result<(String, bson::Document)> { + pub fn decode_bson(&self, message: &MessageData) -> Result<(Arc, bson::Document)> { use crate::parse::parse_message_bson; let conn = self @@ -101,6 +101,6 @@ impl MessageLayouts { let value = parse_message_bson(&conn.layout, message.data.as_ref())?; - Ok((conn.topic.clone(), value)) + Ok((conn.info.clone(), value)) } }