parallel decompression

This commit is contained in:
Alex Mikhalev 2021-11-18 22:50:42 -08:00
parent c6abdac270
commit f71048b185
6 changed files with 103 additions and 11 deletions

View File

@ -4,15 +4,17 @@ version = "0.1.0"
edition = "2018"
[features]
default = ["mmap"]
default = ["mmap", "rayon"]
mmap = ["memmap"]
[dependencies]
bytes = "1.1.0"
log = "0.4.14"
lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "checked-decode", "frame"] }
memmap = { version = "0.7.0", optional = true }
nom = "7.0.0"
num_enum = "0.5.4"
rayon = { version = "1.5.1", optional = true }
regex = "1.5.4"
ros_message = "0.1.0"
smallvec = "1.6.1"

View File

@ -1,6 +1,7 @@
use std::{convert::TryFrom, env::args, fs::File};
use std::{convert::TryFrom, env::args, fs::File, io};
use log::{error, info, trace};
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
use regex::Regex;
use ros_message::{MessagePath, Msg};
use rsbag::{
@ -42,6 +43,17 @@ fn parse_message_definitions(conn: &ConnInfo) -> rsbag::Result<Vec<Msg>> {
Ok(msgs)
}
fn read_chunk<R: BagReader>(bag_reader: &mut R, pos: u64) -> rsbag::Result<Vec<u8>> {
let chunk_header = bag_reader.read_chunk_header(pos)?;
let compressed_data = bag_reader.read_data().unwrap();
let mut data = Vec::with_capacity(chunk_header.uncompressed_size as usize);
let mut decompresor = chunk_header.compression.decompress(compressed_data);
io::copy(&mut decompresor, &mut data).unwrap();
Ok(data)
}
fn main() {
env_logger::init();
@ -67,7 +79,7 @@ fn main() {
match parse_message_definitions(conn) {
Ok(msgs) => {
for msg in &msgs {
info!(
trace!(
"message definition parsed: {:#?}",
msg.fields()
.iter()
@ -81,12 +93,42 @@ fn main() {
}
}
let mut total_size = 0;
// let mut total_size = 0;
for chunk in &index.chunks {
let chunk_header = bag_reader.read_chunk_header(chunk.pos).unwrap();
total_size += chunk_header.uncompressed_size;
}
// let total_size = index
// .chunks
// .par_iter()
// .try_fold(
// || 0u64,
// |total_size, chunk| -> rsbag::Result<_> {
// let chunk_header = bag_reader.clone().read_chunk_header(chunk.pos)?;
// // let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
// // chunks.push(data);
// Ok(total_size + chunk_header.uncompressed_size as u64)
// },
// )
// .reduce(
// // || Ok(Vec::new()),
// || Ok(0),
// |a, b| a.and_then(|a| b.map(|b| a + b)),
// ).unwrap();
let total_size = index
.chunks
.par_iter()
.try_fold(
|| 0u64,
|total_size, chunk| -> rsbag::Result<_> {
let data = read_chunk(&mut bag_reader.clone(), chunk.pos)?;
Ok(total_size + data.len() as u64)
},
)
.reduce(
// || Ok(Vec::new()),
|| Ok(0),
|a, b| a.and_then(|a| b.map(|b| a + b)),
).unwrap();
info!("total uncompressed size: {}", total_size);
}

View File

@ -1,4 +1,4 @@
use std::str::FromStr;
use std::{io, str::FromStr};
use crate::{parse::Header, Error, Result};
@ -22,6 +22,16 @@ impl FromStr for Compression {
}
}
impl Compression {
pub fn decompress<'a, R: io::Read + 'a>(self, read: R) -> Box<dyn io::Read + 'a> {
match self {
Compression::None => Box::new(read),
Compression::Bz2 => todo!("bz2 decompression"),
Compression::Lz4 => Box::new(lz4_flex::frame::FrameDecoder::new(read)),
}
}
}
#[derive(Debug)]
pub struct ChunkHeader {
pub compression: Compression,

View File

@ -1,5 +1,6 @@
use std::io::SeekFrom;
use nom::multi::length_data;
use nom::number::streaming::le_u32;
use crate::chunk::ChunkHeader;
@ -16,6 +17,10 @@ pub use io::IoReader;
pub use mmap::MmapReader;
pub trait BagReader {
type Read: std::io::Read;
fn as_read(&mut self) -> &mut Self::Read;
fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>;
@ -48,6 +53,10 @@ pub trait BagReader {
Ok(())
}
fn read_data(&mut self) -> Result<&[u8]> {
self.read_parser(length_data(le_u32))
}
fn read_conn_info(&mut self) -> Result<ConnInfo> {
let record_header = self.read_header_op(Op::Connection)?;
let conn_header = self.read_header()?;

View File

@ -38,6 +38,12 @@ impl<R: io::Read + io::Seek> IoReader<R> {
}
impl<R: io::Read + io::Seek> BagReader for IoReader<R> {
type Read = R;
fn as_read(&mut self) -> &mut Self::Read {
&mut self.read
}
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,

View File

@ -1,12 +1,13 @@
use std::{fs::File, io};
use std::{fs::File, io, sync::Arc};
use memmap::Mmap;
use super::BagReader;
use crate::{parse, Error, Result};
#[derive(Clone)]
pub struct MmapReader {
mmap: Mmap,
mmap: Arc<Mmap>,
pos: usize,
}
@ -14,11 +15,33 @@ impl MmapReader {
pub fn new(file: File) -> Result<Self> {
// Safety: ¯\_(ツ)_/¯
let mmap = unsafe { Mmap::map(&file) }?;
let mmap = Arc::new(mmap);
Ok(Self { mmap, pos: 0 })
}
}
impl io::Read for MmapReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if let Some(slice) = self
.mmap
.get(self.pos..)
.and_then(|unread| unread.get(..buf.len()))
{
buf.copy_from_slice(slice);
Ok(slice.len())
} else {
Ok(0)
}
}
}
impl BagReader for MmapReader {
type Read = Self;
fn as_read(&mut self) -> &mut Self::Read {
self
}
fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result<O>
where
P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>,