add more info to mongobag
This commit is contained in:
parent
48e90f36c2
commit
0888937b68
@ -4,7 +4,7 @@ version = "0.1.0"
|
|||||||
edition = "2018"
|
edition = "2018"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["mmap", "rayon"]
|
default = ["mmap", "rayon", "bson"]
|
||||||
mmap = ["memmap"]
|
mmap = ["memmap"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
@ -23,11 +23,14 @@ smallvec = "1.6.1"
|
|||||||
thiserror = "1.0.28"
|
thiserror = "1.0.28"
|
||||||
smol_str = { version = "0.1.17", default-features = false }
|
smol_str = { version = "0.1.17", default-features = false }
|
||||||
indexmap = "1.7.0"
|
indexmap = "1.7.0"
|
||||||
|
bson = { version = "1.2.3", optional = true }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
color-eyre = "0.5.11"
|
color-eyre = "0.5.11"
|
||||||
env_logger = "0.9.0"
|
env_logger = "0.9.0"
|
||||||
eyre = "0.6.5"
|
eyre = "0.6.5"
|
||||||
|
indicatif = { version = "0.16.2", features = ["rayon"] }
|
||||||
|
mongodb = { version = "2.0.2", default-features = false, features = ["sync"] }
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = true
|
debug = true
|
||||||
|
@ -39,9 +39,10 @@ fn main() -> Result<()> {
|
|||||||
let mut documents = Vec::new();
|
let mut documents = Vec::new();
|
||||||
for msg in it {
|
for msg in it {
|
||||||
let msg = msg.unwrap();
|
let msg = msg.unwrap();
|
||||||
let (topic, msg) = layouts.decode_bson(&msg).unwrap();
|
let (info, msg) = layouts.decode_bson(&msg).unwrap();
|
||||||
documents.push(bson::doc! {
|
documents.push(bson::doc! {
|
||||||
"topic": topic,
|
"topic": info.topic.clone(),
|
||||||
|
"msgtype": info.datatype.clone(),
|
||||||
"msg": msg,
|
"msg": msg,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -11,7 +11,7 @@ use crate::{
|
|||||||
Result, Time,
|
Result, Time,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct ConnInfo {
|
pub struct ConnInfo {
|
||||||
pub id: u32,
|
pub id: u32,
|
||||||
pub topic: String,
|
pub topic: String,
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use std::{collections::HashMap, convert::TryFrom};
|
use std::{collections::HashMap, convert::TryFrom, sync::Arc};
|
||||||
|
|
||||||
use eyre::eyre;
|
use eyre::eyre;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
@ -53,7 +53,7 @@ pub fn compute_layout(conn: &ConnInfo) -> Result<MessageLayout> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
struct ConnData {
|
struct ConnData {
|
||||||
topic: String,
|
info: Arc<ConnInfo>,
|
||||||
layout: MessageLayout,
|
layout: MessageLayout,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,7 +61,7 @@ impl ConnData {
|
|||||||
fn new(conn: &ConnInfo) -> Result<ConnData> {
|
fn new(conn: &ConnInfo) -> Result<ConnData> {
|
||||||
let layout = compute_layout(conn)?;
|
let layout = compute_layout(conn)?;
|
||||||
Ok(ConnData {
|
Ok(ConnData {
|
||||||
topic: conn.topic.clone(),
|
info: Arc::new(conn.clone()),
|
||||||
layout,
|
layout,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -79,7 +79,7 @@ impl MessageLayouts {
|
|||||||
Ok(MessageLayouts { conns })
|
Ok(MessageLayouts { conns })
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn decode(&self, message: &MessageData) -> Result<(String, MessageValue)> {
|
pub fn decode(&self, message: &MessageData) -> Result<(Arc<ConnInfo>, MessageValue)> {
|
||||||
let conn = self
|
let conn = self
|
||||||
.conns
|
.conns
|
||||||
.get(&message.header.conn_id)
|
.get(&message.header.conn_id)
|
||||||
@ -87,11 +87,11 @@ impl MessageLayouts {
|
|||||||
|
|
||||||
let value = parse_message(&conn.layout, message.data.as_ref())?;
|
let value = parse_message(&conn.layout, message.data.as_ref())?;
|
||||||
|
|
||||||
Ok((conn.topic.clone(), value))
|
Ok((conn.info.clone(), value))
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(feature = "bson")]
|
#[cfg(feature = "bson")]
|
||||||
pub fn decode_bson(&self, message: &MessageData) -> Result<(String, bson::Document)> {
|
pub fn decode_bson(&self, message: &MessageData) -> Result<(Arc<ConnInfo>, bson::Document)> {
|
||||||
use crate::parse::parse_message_bson;
|
use crate::parse::parse_message_bson;
|
||||||
|
|
||||||
let conn = self
|
let conn = self
|
||||||
@ -101,6 +101,6 @@ impl MessageLayouts {
|
|||||||
|
|
||||||
let value = parse_message_bson(&conn.layout, message.data.as_ref())?;
|
let value = parse_message_bson(&conn.layout, message.data.as_ref())?;
|
||||||
|
|
||||||
Ok((conn.topic.clone(), value))
|
Ok((conn.info.clone(), value))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user