diff --git a/Cargo.toml b/Cargo.toml index 1793537..316b529 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,7 @@ thiserror = "1.0.28" smol_str = { version = "0.1.17", default-features = false } indexmap = "1.7.0" bson = { version = "1.2.3", optional = true } +nohash-hasher = "0.2.0" [dev-dependencies] color-eyre = "0.5.11" @@ -33,5 +34,5 @@ indicatif = { version = "0.16.2", features = ["rayon"] } mongodb = { version = "2.0.2", default-features = false, features = ["sync"] } [profile.release] -debug = true +# debug = false # lto = true diff --git a/examples/bag_info.rs b/examples/bag_info.rs index 41f470d..bb534c3 100644 --- a/examples/bag_info.rs +++ b/examples/bag_info.rs @@ -15,6 +15,25 @@ fn main() -> Result<()> { let bag_path = &args[1]; let mut bag = Bag::open(bag_path)?; + let chunk_positions = bag + .index() + .chunks + .iter() + .map(|chunk| chunk.pos) + .collect::>(); + let chunk_sizes = chunk_positions + .windows(2) + .map(|window| { + if let &[last, next] = window { + next - last + } else { + unreachable!(); + } + }) + .collect::>(); + let mean_chunk_size = + chunk_sizes.iter().map(|s| *s as f64).sum::() / (chunk_sizes.len() as f64); + info!("average chunk size: {}", mean_chunk_size); let info = bag.compute_info()?; info!("bag info: {:#?}", info); diff --git a/examples/mongobag.rs b/examples/mongobag.rs index acfc0a8..ee0f1d0 100644 --- a/examples/mongobag.rs +++ b/examples/mongobag.rs @@ -23,7 +23,7 @@ fn main() -> Result<()> { let layouts = bag.compute_message_layouts()?; let info = bag.compute_info()?; - let total_messages: u64 = info.per_connection.values().sum(); + let total_messages: u64 = info.per_connection.values().map(|con| con.count).sum(); info!("exporting {} messages", total_messages); let client = Client::with_uri_str("mongodb://localhost:27017")?; diff --git a/src/info.rs b/src/info.rs index a3e3d87..0b7ea8b 100644 --- a/src/info.rs +++ b/src/info.rs @@ -1,4 +1,5 @@ -use std::{collections::HashMap, io}; +use nohash_hasher::IntMap; +use std::io; use rayon::prelude::*; @@ -9,17 +10,26 @@ use crate::{ Result, }; +#[derive(Default, Debug)] +pub struct ConnectionStats { + pub count: u64, + pub unique_chunk_count: u64, +} + #[derive(Default, Debug)] pub struct BagInfo { pub total_uncompressed: u64, - pub per_connection: HashMap, + pub per_connection: IntMap, } impl BagInfo { + // only info from disjoint chunks can be combined fn combine(mut self, other: BagInfo) -> BagInfo { self.total_uncompressed += other.total_uncompressed; - for (conn, count) in other.per_connection { - *self.per_connection.entry(conn).or_insert(0) += count; + for (conn, other_stats) in &other.per_connection { + let stats = self.per_connection.entry(*conn).or_default(); + stats.count += other_stats.count; + stats.unique_chunk_count += other_stats.unique_chunk_count; } self } @@ -39,8 +49,10 @@ impl BagInfo { reader.skip_data()?; for _ in &chunk.connections { let index = IndexData::read(&mut reader)?; - *info.per_connection.entry(index.conn_id).or_insert(0) += - index.entries.len() as u64; + let stats = info.per_connection.entry(index.conn_id).or_default(); + stats.count += index.entries.len() as u64; + // TODO: verify that each connection appears once + stats.unique_chunk_count += 1; } Ok(info) }) @@ -55,8 +67,12 @@ impl BagInfo { .try_fold(BagInfo::default, |mut info, chunk| -> Result<_> { let data = chunk?; info.total_uncompressed += data.data.len() as u64; - let count = info.per_connection.entry(data.header.conn_id).or_insert(0); - *count += 1; + let stats = info + .per_connection + .entry(data.header.conn_id) + .or_default(); + stats.count += 1; + // TODO: stats.unique_chunk_count Ok(info) }) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))