move stuff

This commit is contained in:
Alex Mikhalev 2021-11-19 18:34:58 -08:00
parent bea68dfa87
commit 6237c25458
3 changed files with 36 additions and 35 deletions

View File

@ -1,4 +1,4 @@
use std::{collections::HashMap, convert::TryFrom, env::args, fs::File, io}; use std::{collections::HashMap, convert::TryFrom, env::args, fs::File};
use eyre::{bail, Context}; use eyre::{bail, Context};
use log::{error, info, trace}; use log::{error, info, trace};
@ -6,8 +6,8 @@ use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use regex::Regex; use regex::Regex;
use ros_message::{MessagePath, Msg}; use ros_message::{MessagePath, Msg};
use rsbag::{ use rsbag::{
chunk::{ChunkHeader, MessageDataHeader}, chunk::{read_chunk, MessageDataHeader},
index::{BagIndex, ConnInfo, IndexData}, index::{BagIndex, ConnInfo},
reader::{BagReader, MmapReader, SliceReader}, reader::{BagReader, MmapReader, SliceReader},
Result, Result,
}; };
@ -46,19 +46,6 @@ fn parse_message_definitions(conn: &ConnInfo) -> Result<Vec<Msg>> {
Ok(msgs) Ok(msgs)
} }
fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> Result<Vec<u8>> {
bag_reader.seek(io::SeekFrom::Start(pos))?;
let chunk_header = ChunkHeader::read(bag_reader)?;
let compressed_data = bag_reader.read_data()?;
let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize);
let mut decompresor = chunk_header.compression.decompress_stream(compressed_data);
io::copy(&mut decompresor, &mut data)?;
Ok(data)
}
#[derive(Default, Debug)] #[derive(Default, Debug)]
struct BagInfo { struct BagInfo {
total_uncompressed: u64, total_uncompressed: u64,
@ -114,7 +101,8 @@ fn main() -> Result<()> {
.par_iter() .par_iter()
// .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { // .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
// let mut reader = bag_reader.clone(); // let mut reader = bag_reader.clone();
// let chunk_header = ChunkHeader::read(&mut reader, chunk.pos)?; // reader.seek(io::SeekFrom::Start(chunk.pos))?;
// let chunk_header = ChunkHeader::read(&mut reader)?;
// info.total_uncompressed += chunk_header.uncompressed_size as u64; // info.total_uncompressed += chunk_header.uncompressed_size as u64;
// reader.skip_data()?; // reader.skip_data()?;
// for _ in &chunk.connections { // for _ in &chunk.connections {
@ -126,17 +114,11 @@ fn main() -> Result<()> {
// }) // })
.try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> { .try_fold(BagInfo::default, |mut info, chunk| -> rsbag::Result<_> {
let mut reader = bag_reader.clone(); let mut reader = bag_reader.clone();
let data = read_chunk(&mut reader, chunk.pos) let data = read_chunk(&mut reader, chunk.pos)
.wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?; .wrap_err_with(|| format!("failed to read chunk: {:#?}", chunk))?;
info.total_uncompressed += data.len() as u64; info.total_uncompressed += data.len() as u64;
let mut chunk_reader = SliceReader::from(data); let mut chunk_reader = SliceReader::from(data);
while chunk_reader.remaining() > 0 {
loop {
if chunk_reader.pos() == chunk_reader.as_ref().len() {
break;
}
let header = chunk_reader.read_header()?; let header = chunk_reader.read_header()?;
let op = header.read_op()?; let op = header.read_op()?;
match op { match op {
@ -150,7 +132,6 @@ fn main() -> Result<()> {
_ => bail!("unexpected op in chunk: {:?}", op), _ => bail!("unexpected op in chunk: {:?}", op),
} }
} }
Ok(info) Ok(info)
}) })
.try_reduce(BagInfo::default, |a, b| Ok(a.combine(b))) .try_reduce(BagInfo::default, |a, b| Ok(a.combine(b)))

View File

@ -1,7 +1,4 @@
use std::{ use std::{io, str::FromStr};
io::{self, SeekFrom},
str::FromStr,
};
use eyre::bail; use eyre::bail;
@ -62,6 +59,19 @@ impl ChunkHeader {
} }
} }
pub fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> Result<Vec<u8>> {
bag_reader.seek(io::SeekFrom::Start(pos))?;
let chunk_header = ChunkHeader::read(bag_reader)?;
let compressed_data = bag_reader.read_data()?;
let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize);
let mut decompresor = chunk_header.compression.decompress_stream(compressed_data);
io::copy(&mut decompresor, &mut data)?;
Ok(data)
}
pub struct MessageDataHeader { pub struct MessageDataHeader {
pub conn_id: u32, pub conn_id: u32,
pub time: Time, pub time: Time,

View File

@ -11,12 +11,6 @@ pub struct SliceReader<T> {
pos: usize, pos: usize,
} }
impl<T> AsRef<T> for SliceReader<T> {
fn as_ref(&self) -> &T {
&self.slice
}
}
impl<T> SliceReader<T> { impl<T> SliceReader<T> {
pub fn into_inner(self) -> T { pub fn into_inner(self) -> T {
self.slice self.slice
@ -27,6 +21,22 @@ impl<T> SliceReader<T> {
} }
} }
impl<T, U> SliceReader<T>
where
T: Deref<Target = U>,
U: AsRef<[u8]> + ?Sized + 'static,
{
pub fn remaining(&self) -> usize {
self.slice.deref().as_ref().len() - self.pos()
}
}
impl<T> AsRef<T> for SliceReader<T> {
fn as_ref(&self) -> &T {
&self.slice
}
}
impl<T> From<T> for SliceReader<T> { impl<T> From<T> for SliceReader<T> {
fn from(slice: T) -> Self { fn from(slice: T) -> Self {
Self { slice, pos: 0 } Self { slice, pos: 0 }