compute some more info

This commit is contained in:
Alex Mikhalev 2022-06-08 20:18:55 -07:00
parent 0888937b68
commit 443c3ebcdc
4 changed files with 46 additions and 10 deletions

View File

@ -24,6 +24,7 @@ thiserror = "1.0.28"
smol_str = { version = "0.1.17", default-features = false }
indexmap = "1.7.0"
bson = { version = "1.2.3", optional = true }
nohash-hasher = "0.2.0"
[dev-dependencies]
color-eyre = "0.5.11"
@ -33,5 +34,5 @@ indicatif = { version = "0.16.2", features = ["rayon"] }
mongodb = { version = "2.0.2", default-features = false, features = ["sync"] }
[profile.release]
debug = true
# debug = false
# lto = true

View File

@ -15,6 +15,25 @@ fn main() -> Result<()> {
let bag_path = &args[1];
let mut bag = Bag::open(bag_path)?;
let chunk_positions = bag
.index()
.chunks
.iter()
.map(|chunk| chunk.pos)
.collect::<Vec<_>>();
let chunk_sizes = chunk_positions
.windows(2)
.map(|window| {
if let &[last, next] = window {
next - last
} else {
unreachable!();
}
})
.collect::<Vec<_>>();
let mean_chunk_size =
chunk_sizes.iter().map(|s| *s as f64).sum::<f64>() / (chunk_sizes.len() as f64);
info!("average chunk size: {}", mean_chunk_size);
let info = bag.compute_info()?;
info!("bag info: {:#?}", info);

View File

@ -23,7 +23,7 @@ fn main() -> Result<()> {
let layouts = bag.compute_message_layouts()?;
let info = bag.compute_info()?;
let total_messages: u64 = info.per_connection.values().sum();
let total_messages: u64 = info.per_connection.values().map(|con| con.count).sum();
info!("exporting {} messages", total_messages);
let client = Client::with_uri_str("mongodb://localhost:27017")?;

View File

@ -1,4 +1,5 @@
use std::{collections::HashMap, io};
use nohash_hasher::IntMap;
use std::io;
use rayon::prelude::*;
@ -9,17 +10,26 @@ use crate::{
Result,
};
#[derive(Default, Debug)]
pub struct ConnectionStats {
pub count: u64,
pub unique_chunk_count: u64,
}
#[derive(Default, Debug)]
pub struct BagInfo {
pub total_uncompressed: u64,
pub per_connection: HashMap<u32, u64>,
pub per_connection: IntMap<u32, ConnectionStats>,
}
impl BagInfo {
// only info from disjoint chunks can be combined
fn combine(mut self, other: BagInfo) -> BagInfo {
self.total_uncompressed += other.total_uncompressed;
for (conn, count) in other.per_connection {
*self.per_connection.entry(conn).or_insert(0) += count;
for (conn, other_stats) in &other.per_connection {
let stats = self.per_connection.entry(*conn).or_default();
stats.count += other_stats.count;
stats.unique_chunk_count += other_stats.unique_chunk_count;
}
self
}
@ -39,8 +49,10 @@ impl BagInfo {
reader.skip_data()?;
for _ in &chunk.connections {
let index = IndexData::read(&mut reader)?;
*info.per_connection.entry(index.conn_id).or_insert(0) +=
index.entries.len() as u64;
let stats = info.per_connection.entry(index.conn_id).or_default();
stats.count += index.entries.len() as u64;
// TODO: verify that each connection appears once
stats.unique_chunk_count += 1;
}
Ok(info)
})
@ -55,8 +67,12 @@ impl BagInfo {
.try_fold(BagInfo::default, |mut info, chunk| -> Result<_> {
let data = chunk?;
info.total_uncompressed += data.data.len() as u64;
let count = info.per_connection.entry(data.header.conn_id).or_insert(0);
*count += 1;
let stats = info
.per_connection
.entry(data.header.conn_id)
.or_default();
stats.count += 1;
// TODO: stats.unique_chunk_count
Ok(info)
})
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))