From 2b20469859786fa1ea639e9f2b99b8cd1914f643 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Fri, 7 Feb 2025 10:26:57 -0800 Subject: [PATCH] rsbag_arrow: converting rosbag to parqeut --- Cargo.toml | 2 + rsbag/Cargo.toml | 4 +- rsbag/src/bag.rs | 8 +- rsbag/src/chunk.rs | 9 +- rsbag/src/index.rs | 6 +- rsbag/src/layout.rs | 2 +- rsbag/src/message.rs | 12 +- rsbag/src/reader/io.rs | 1 + rsbag_arrow/Cargo.toml | 20 ++ rsbag_arrow/src/arrow.rs | 355 +++++++++++++++++++++++++++++++++ rsbag_arrow/src/main.rs | 421 +++++++++++++++++++++++++++++++++++++++ rsbagpy/Cargo.toml | 19 ++ rsbagpy/src/lib.rs | 30 +++ 13 files changed, 875 insertions(+), 14 deletions(-) create mode 100644 rsbag_arrow/Cargo.toml create mode 100644 rsbag_arrow/src/arrow.rs create mode 100644 rsbag_arrow/src/main.rs create mode 100644 rsbagpy/Cargo.toml create mode 100644 rsbagpy/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 3a93968..a39556b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,8 @@ members = [ "rsbag", + "rsbagpy", + "rsbag_arrow" ] [profile.release] diff --git a/rsbag/Cargo.toml b/rsbag/Cargo.toml index 3104af3..5769715 100644 --- a/rsbag/Cargo.toml +++ b/rsbag/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "rsbag" version = "0.1.0" -edition = "2018" +edition = "2021" [features] default = ["mmap", "rayon", "bson"] @@ -16,7 +16,7 @@ lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "ch memmap = { version = "0.7.0", optional = true } nom = "7.0.0" num_enum = "0.5.4" -rayon = { version = "1.5.1", optional = true } +rayon = { version = "1.6.1", optional = true } regex = "1.5.4" ros_message = "0.1.0" smallvec = "1.6.1" diff --git a/rsbag/src/bag.rs b/rsbag/src/bag.rs index c5b2beb..9b50fd1 100644 --- a/rsbag/src/bag.rs +++ b/rsbag/src/bag.rs @@ -1,7 +1,7 @@ use std::{fs::File, path::Path}; use eyre::Context; -use rayon::iter::ParallelIterator; +use rayon::prelude::{IndexedParallelIterator, ParallelIterator}; use crate::{ chunk::{read_chunks_data, read_chunks_messages, MessageData}, @@ -31,6 +31,10 @@ impl Bag { &self.index } + pub fn reader(&self) -> &MmapReader { + &self.reader + } + pub fn compute_message_layouts(&mut self) -> Result { MessageLayouts::new(&self.index.connections) } @@ -39,7 +43,7 @@ impl Bag { BagInfo::compute(self.reader.clone(), &self.index) } - pub fn read_chunks(&mut self) -> impl ParallelIterator>> + '_ { + pub fn read_chunks(&mut self) -> impl IndexedParallelIterator>> + '_ { read_chunks_data(self.reader.clone(), &self.index.chunks) } diff --git a/rsbag/src/chunk.rs b/rsbag/src/chunk.rs index a8d0850..7203c86 100644 --- a/rsbag/src/chunk.rs +++ b/rsbag/src/chunk.rs @@ -6,7 +6,10 @@ use std::{ use bytes::Bytes; use eyre::{bail, eyre, Context}; -use rayon::iter::{IntoParallelIterator, ParallelIterator}; +use rayon::{ + iter::{IntoParallelIterator, ParallelIterator}, + prelude::IndexedParallelIterator, +}; use crate::{ error, @@ -94,10 +97,11 @@ pub fn read_chunk_data_at( pub fn read_chunks_data<'a, R, C>( reader: R, chunks: C, -) -> impl ParallelIterator>> + 'a +) -> impl IndexedParallelIterator>> + 'a where R: BagReader + io::Seek + Clone + Send + 'a, C: IntoParallelIterator + 'a, + ::Iter: IndexedParallelIterator, { chunks .into_par_iter() @@ -139,6 +143,7 @@ pub fn read_chunks_messages<'a, R, C>( where R: BagReader + io::Seek + Clone + Send + 'a, C: IntoParallelIterator + 'a, + ::Iter: IndexedParallelIterator, { read_chunks_data(reader, chunks).flat_map_iter(move |data| ChunkMessageIterator { reader: data.map(|data| BytesReader::from(Bytes::from(data))), diff --git a/rsbag/src/index.rs b/rsbag/src/index.rs index ac4fcd7..631c52e 100644 --- a/rsbag/src/index.rs +++ b/rsbag/src/index.rs @@ -38,7 +38,7 @@ impl ConnInfo { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChunkInfo { pub pos: u64, pub start_time: Time, @@ -70,7 +70,7 @@ impl ChunkInfo { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ChunkConnection { pub conn_id: u32, pub count: u32, @@ -88,7 +88,7 @@ impl ChunkConnection { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct BagIndex { pub connections: Vec, pub chunks: Vec, diff --git a/rsbag/src/layout.rs b/rsbag/src/layout.rs index 122c915..92d2202 100644 --- a/rsbag/src/layout.rs +++ b/rsbag/src/layout.rs @@ -114,7 +114,7 @@ impl FieldLayout { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Copy, Debug, PartialEq, Eq)] pub enum Multiplicity { Unit, Fixed(usize), diff --git a/rsbag/src/message.rs b/rsbag/src/message.rs index 538e1ce..aef9ab1 100644 --- a/rsbag/src/message.rs +++ b/rsbag/src/message.rs @@ -52,9 +52,9 @@ pub fn compute_layout(conn: &ConnInfo) -> Result { MessageLayout::from_msgs(&msgs) } -struct ConnData { - info: Arc, - layout: MessageLayout, +pub struct ConnData { + pub info: Arc, + pub layout: MessageLayout, } impl ConnData { @@ -67,7 +67,7 @@ impl ConnData { } } pub struct MessageLayouts { - conns: HashMap, + pub conns: HashMap, } impl MessageLayouts { @@ -79,6 +79,10 @@ impl MessageLayouts { Ok(MessageLayouts { conns }) } + pub fn get(&self, conn_id: u32) -> Option<&MessageLayout> { + self.conns.get(&conn_id).map(|conn_data| &conn_data.layout) + } + pub fn decode(&self, message: &MessageData) -> Result<(Arc, MessageValue)> { let conn = self .conns diff --git a/rsbag/src/reader/io.rs b/rsbag/src/reader/io.rs index c594dbf..250798f 100644 --- a/rsbag/src/reader/io.rs +++ b/rsbag/src/reader/io.rs @@ -7,6 +7,7 @@ use crate::{parse, Result}; const READ_SIZE: usize = 4096; +#[derive(Clone)] pub struct IoReader { read: R, buffer: BytesMut, diff --git a/rsbag_arrow/Cargo.toml b/rsbag_arrow/Cargo.toml new file mode 100644 index 0000000..ceac285 --- /dev/null +++ b/rsbag_arrow/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "rsbag_arrow" +version = "0.1.0" +edition = "2018" +rust-version = "1.66" + +[dependencies] +arrow2 = { version = "0.15.0", features = ["io_parquet", "io_parquet_compression"] } +color-eyre = "0.5.11" +env_logger = "0.9.0" +eyre = "0.6.5" +log = "0.4.14" +indicatif = { version = "0.16.2", features = ["rayon"] } +rayon = { version = "1.6.1" } +rsbag = { path = "../rsbag" } +nom = "7.0.0" +mkdirp = "1.0.0" + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = "0.3.2" diff --git a/rsbag_arrow/src/arrow.rs b/rsbag_arrow/src/arrow.rs new file mode 100644 index 0000000..46cb53f --- /dev/null +++ b/rsbag_arrow/src/arrow.rs @@ -0,0 +1,355 @@ +use arrow2::{ + array::{ + MutableArray, MutableBinaryArray, MutableBooleanArray, MutableFixedSizeBinaryArray, + MutableListArray, MutablePrimitiveArray, MutableStructArray, MutableUtf8ValuesArray, + TryPush, + }, + datatypes::{DataType, Field, TimeUnit}, + types::NativeType, +}; +use rsbag::{ + layout::{FieldLayout, FieldType, MessageLayout, Multiplicity}, + parse::{header::fields::parse_time, Error, ErrorKind, IResult, Input}, + Result, +}; +use eyre::bail; +use nom::{ + bytes::complete::take, + combinator::map_res, + multi::{length_count, length_data, many_m_n}, + number::complete::{ + le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8, + }, + Parser, +}; + +fn make_mutable_array( + multiplicity: Multiplicity, +) -> Box { + make_mutable_array_from_values(multiplicity, A::default()) +} + +fn make_mutable_array_from_values( + multiplicity: Multiplicity, + values: A, +) -> Box { + match multiplicity { + Multiplicity::Unit => Box::new(values), + // TODO: cannot write fixed size list to parqeut + // Multiplicity::Fixed(size) => Box::new(MutableFixedSizeListArray::new_with_field( + // values, "item", false, size, + // )), + Multiplicity::Fixed(_) | Multiplicity::Dynamic => Box::new( + MutableListArray::::new_with_field(values, "item", false), + ), + } +} + +fn field_to_mutable_array(field: &FieldLayout) -> Box { + // TODO: BinaryArray for I8/U8 + match &field.typ { + FieldType::Bool => make_mutable_array::(field.multiplicity), + FieldType::I8(_) => make_mutable_array::>(field.multiplicity), + FieldType::I16 => make_mutable_array::>(field.multiplicity), + FieldType::I32 => make_mutable_array::>(field.multiplicity), + FieldType::I64 => make_mutable_array::>(field.multiplicity), + FieldType::U8(_) => match field.multiplicity { + Multiplicity::Unit => Box::new(MutablePrimitiveArray::::new()), + Multiplicity::Fixed(n) => Box::new(MutableFixedSizeBinaryArray::new(n)), + Multiplicity::Dynamic => Box::new(MutableBinaryArray::::new()), + }, + FieldType::U16 => make_mutable_array::>(field.multiplicity), + FieldType::U32 => make_mutable_array::>(field.multiplicity), + FieldType::U64 => make_mutable_array::>(field.multiplicity), + FieldType::F32 => make_mutable_array::>(field.multiplicity), + FieldType::F64 => make_mutable_array::>(field.multiplicity), + FieldType::String => make_mutable_array::>(field.multiplicity), + FieldType::Time => make_mutable_array_from_values( + field.multiplicity, + MutablePrimitiveArray::::from(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("UTC".into()), + )), + ), + FieldType::Duration => make_mutable_array_from_values( + field.multiplicity, + MutablePrimitiveArray::::from(DataType::Duration(TimeUnit::Nanosecond)), + ), + FieldType::Message(layout) => { + make_mutable_array_from_values(field.multiplicity, layout_to_mutable_array(&*layout)) + } + } +} + +fn layout_to_mutable_arrays(layout: &MessageLayout) -> Vec> { + layout.fields().iter().map(field_to_mutable_array).collect() +} + +pub fn layout_to_mutable_array(layout: &MessageLayout) -> MutableStructArray { + let values = layout_to_mutable_arrays(layout); + let fields = layout + .fields() + .iter() + .zip(&values) + .map(|(field, array)| Field::new(field.name.to_string(), array.data_type().clone(), false)) + .collect(); + let datatype = DataType::Struct(fields); + MutableStructArray::new(datatype, values) +} + +// TODO: more efficient primitive parsing using bytemuck +fn parse_push_primitive<'a, 'b: 'a, T, P>( + mut parser: P, + array: &'a mut dyn MutableArray, + multiplicity: Multiplicity, + input: Input<'b>, +) -> IResult<'b, ()> +where + T: NativeType, + P: Parser, T, Error>> + 'a, +{ + match multiplicity { + Multiplicity::Unit => { + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let (rest, value) = parser.parse(input)?; + array.push(Some(value)); + Ok((rest, ())) + } + Multiplicity::Fixed(n) => { + // TODO: FixedSizeListArray unsupported to write to parquet + // let array = array + // .as_mut_any() + // .downcast_mut::>>() + // .expect("wrong array type"); + let array = array + .as_mut_any() + .downcast_mut::>>() + .expect("wrong array type"); + let (rest, values) = many_m_n(n, n, parser).parse(input)?; + array.mut_values().extend_from_slice(values.as_ref()); + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + Multiplicity::Dynamic => { + let array = array + .as_mut_any() + .downcast_mut::>>() + .expect("wrong array type"); + let (rest, values) = length_count(le_u32, parser).parse(input)?; + array.mut_values().extend_from_slice(values.as_ref()); + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + } +} + +// TODO: more efficient primitive parsing using bytemuck +fn parse_push_u8<'a, 'b: 'a>( + array: &'a mut dyn MutableArray, + multiplicity: Multiplicity, + input: Input<'b>, +) -> IResult<'b, ()> { + // TODO: copy directly without + match multiplicity { + Multiplicity::Unit => { + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let (rest, value) = le_u8.parse(input)?; + array.push(Some(value)); + Ok((rest, ())) + } + Multiplicity::Fixed(n) => { + let array = array + .as_mut_any() + .downcast_mut::() + .expect("wrong array type"); + let (rest, values) = take(n).parse(input)?; + array.push(Some(values)); + Ok((rest, ())) + } + Multiplicity::Dynamic => { + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let (rest, values) = length_data(le_u32).parse(input)?; + array.push(Some(values)); + Ok((rest, ())) + } + } +} + +fn parse_push<'a, 'b: 'a, A, T, P>( + mut parser: P, + array: &'a mut dyn MutableArray, + multiplicity: Multiplicity, + input: Input<'b>, +) -> IResult<'b, ()> +where + P: Parser, T, Error>> + 'a, + A: MutableArray + TryPush + 'static, +{ + match multiplicity { + Multiplicity::Unit => { + let array = array + .as_mut_any() + .downcast_mut::() + .expect("wrong array type"); + let (rest, value) = parser.parse(input)?; + array.try_push(value).expect("array push failed"); + Ok((rest, ())) + } + Multiplicity::Fixed(n) => { + // TODO: FixedSizeListArray unsupported to write to parquet + // let array = array + // .as_mut_any() + // .downcast_mut::>() + // .expect("wrong array type"); + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let (rest, values) = many_m_n(n, n, parser).parse(input)?; + for value in values { + array + .mut_values() + .try_push(value) + .expect("array push failed"); + } + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + Multiplicity::Dynamic => { + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let (rest, values) = length_count(le_u32, parser).parse(input)?; + for value in values { + array + .mut_values() + .try_push(value) + .expect("array push failed"); + } + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + } +} + +fn parse_push_message<'a, 'b: 'a>( + layout: &MessageLayout, + array: &'a mut dyn MutableArray, + multiplicity: Multiplicity, + input: Input<'b>, +) -> IResult<'b, ()> { + match multiplicity { + Multiplicity::Unit => { + let array = array + .as_mut_any() + .downcast_mut::() + .expect("wrong array type"); + message_parser(layout, array).parse(input) + } + Multiplicity::Fixed(n) => { + // TODO: FixedSizeListArray unsupported to write to parquet + // let array = array + // .as_mut_any() + // .downcast_mut::>() + // .expect("wrong array type"); + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let parser = message_parser(layout, array.mut_values()); + let (rest, _) = many_m_n(n, n, parser).map(|_| ()).parse(input)?; + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + Multiplicity::Dynamic => { + let array = array + .as_mut_any() + .downcast_mut::>() + .expect("wrong array type"); + let parser = message_parser(layout, array.mut_values()); + let (rest, _) = length_count(le_u32, parser).map(|_| ()).parse(input)?; + array.try_push_valid().unwrap(); + Ok((rest, ())) + } + } +} + +fn field_parser<'a, 'b: 'a>( + field: &'a FieldLayout, + array: &'a mut dyn MutableArray, +) -> impl Parser, (), Error>> + 'a { + move |input| -> IResult<()> { + match &field.typ { + FieldType::Bool => parse_push::( + le_u8.map(|v| Some(v != 0)), + array, + field.multiplicity, + input, + ), + FieldType::I8(_) => parse_push_primitive(le_i8, array, field.multiplicity, input), + FieldType::I16 => parse_push_primitive(le_i16, array, field.multiplicity, input), + FieldType::I32 => parse_push_primitive(le_i32, array, field.multiplicity, input), + FieldType::I64 => parse_push_primitive(le_i64, array, field.multiplicity, input), + FieldType::U8(_) => parse_push_u8(array, field.multiplicity, input), + FieldType::U16 => parse_push_primitive(le_u16, array, field.multiplicity, input), + FieldType::U32 => parse_push_primitive(le_u32, array, field.multiplicity, input), + FieldType::U64 => parse_push_primitive(le_u64, array, field.multiplicity, input), + FieldType::F32 => parse_push_primitive(le_f32, array, field.multiplicity, input), + FieldType::F64 => parse_push_primitive(le_f64, array, field.multiplicity, input), + FieldType::String => parse_push::, _, _>( + map_res(length_data(le_u32), |s| { + String::from_utf8(Vec::from(s)).map_err(ErrorKind::from) + }), + array, + field.multiplicity, + input, + ), + FieldType::Time => parse_push_primitive( + parse_time.map(|time| time.nanos()), + array, + field.multiplicity, + input, + ), + FieldType::Duration => todo!(), + FieldType::Message(layout) => { + parse_push_message(layout.as_ref(), array, field.multiplicity, input) + } + } + } +} + +fn message_parser<'a, 'b: 'a>( + layout: &'a MessageLayout, + array: &'a mut MutableStructArray, +) -> impl Parser, (), Error>> + 'a { + move |mut input| -> IResult<_> { + for (field, values) in layout.fields().iter().zip(array.mut_values()) { + let (rest, _) = field_parser(field, &mut **values).parse(input)?; + input = rest; + } + Ok((input, ())) + } +} + +pub fn parse_message( + layout: &MessageLayout, + input: &[u8], + array: &mut MutableStructArray, +) -> Result<()> { + match message_parser(layout, array).parse(input) { + Ok((&[], message)) => Ok(message), + Ok(_) => bail!("extra data after message"), + Err(nom::Err::Incomplete(_)) => unreachable!(), + Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()), + } +} diff --git a/rsbag_arrow/src/main.rs b/rsbag_arrow/src/main.rs new file mode 100644 index 0000000..0db1a69 --- /dev/null +++ b/rsbag_arrow/src/main.rs @@ -0,0 +1,421 @@ +use std::{ + collections::{BTreeMap, BTreeSet, HashMap, VecDeque}, + env::args, + fs::File, + path::Path, + sync::{Arc, Mutex}, +}; + +use arrow2::{ + array::{Array, MutableArray, MutableStructArray}, + chunk::Chunk, + datatypes::{DataType, Field, PhysicalType, Schema}, + io::parquet::write::{self as pq_write, row_group_iter, CompressedPage, ZstdLevel}, + types::PrimitiveType, +}; +use indicatif::ParallelProgressIterator; +use log::{info, trace}; +use rayon::{ + iter::ParallelIterator, + prelude::{IndexedParallelIterator, ParallelBridge}, +}; +use rsbag::{ + chunk::{read_chunk_data_at, ChunkMessageIterator, MessageData}, + index::BagIndex, + layout::MessageLayout, + message::MessageLayouts, + Bag, Result, +}; + +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +mod arrow; + +struct Bla { + columns: VecDeque, + current: Option, +} + +impl Bla { + pub fn new(columns: VecDeque) -> Self { + Self { + columns, + current: None, + } + } +} + +impl pq_write::FallibleStreamingIterator for Bla { + type Item = CompressedPage; + type Error = arrow2::error::Error; + + fn advance(&mut self) -> Result<(), arrow2::error::Error> { + self.current = self.columns.pop_front(); + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + self.current.as_ref() + } +} + +type ChunkSet = BTreeSet; + +struct ConnToArrow { + layout: MessageLayout, + array: MutableStructArray, + expected_chunks: ChunkSet, + /// Messages waiting to be inserted in array in order of chunk + pending_messages: BTreeMap>, + estimated_size: usize, + ready_chunks: VecDeque>>, + max_chunk_size: usize, +} + +impl ConnToArrow { + fn try_new(layout: MessageLayout, expected_chunks: ChunkSet) -> Result { + let array = arrow::layout_to_mutable_array(&layout); + + Ok(Self { + layout: layout, + array: array, + expected_chunks, + pending_messages: Default::default(), + estimated_size: 0, + ready_chunks: Default::default(), + max_chunk_size: 128 * 1024 * 1024, + }) + } + + fn add_message(&mut self, chunk_idx: usize, message: MessageData) { + self.pending_messages + .entry(chunk_idx) + .or_default() + .push(message); + } + + fn chunk_complete(&mut self, chunk_idx: usize) -> Result<()> { + assert!(self.expected_chunks.remove(&chunk_idx)); + // Flush all pending messages from chunks before the last expected chunk. + // i.e. wait to flush a chunk until all previous chunks have been recieved. + let latest_expected_chunk = self.expected_chunks.first().copied(); + let mut flushed = 0; + for (&chunk_idx, messages) in self.pending_messages.iter_mut() { + if let Some(latest_expected_chunk) = latest_expected_chunk { + if chunk_idx >= latest_expected_chunk { + break; + } + } + flushed += 1; + for message in messages.drain(..) { + arrow::parse_message(&self.layout, message.data.as_ref(), &mut self.array)?; + self.estimated_size += message.data.len(); + } + } + trace!( + "Chunk {} complete, with {} pending chunks (latest {:?}). Flushed {} chunks", + chunk_idx, + self.pending_messages.len(), + latest_expected_chunk, + flushed + ); + // Remove all empty chunks + self.pending_messages.retain(|_, v| !v.is_empty()); + + if self.estimated_size > self.max_chunk_size { + trace!( + "connection data size exceeds {}MiB, flushing", + self.max_chunk_size / (1024 * 1024) + ); + self.flush(); + } + + Ok(()) + } + + fn flush(&mut self) { + let chunk = Chunk::new( + std::mem::take(self.array.mut_values()) + .into_iter() + .map(|mut v| v.as_box()) + .collect(), + ); + self.ready_chunks.push_back(chunk); + + self.array = arrow::layout_to_mutable_array(&self.layout); + self.estimated_size = 0; + } + + fn take_chunk(&mut self) -> Option>> { + self.ready_chunks.pop_front() + } +} + +struct ParquetWriter { + encodings: Vec>, + file_writer: pq_write::FileWriter, +} + +impl ParquetWriter { + fn try_new(fields: &[Field], file: File) -> Result { + let writer_options = pq_write::WriteOptions { + write_statistics: true, + version: pq_write::Version::V2, + compression: pq_write::CompressionOptions::Zstd(Some(ZstdLevel::try_new(10).unwrap())), + // compression: pq_write::CompressionOptions::Zstd(None), + // compression: pq_write::CompressionOptions::Snappy, + // compression: pq_write::CompressionOptions::Uncompressed, + data_pagesize_limit: Some(128 * 1024 * 1024), + // data_pagesize_limit: Some(16 * 1024 * 1024), + }; + let schema = Schema { + fields: fields.to_vec(), + metadata: Default::default(), + }; + + let encodings: Vec<_> = fields + .iter() + .map(|f| { + pq_write::transverse(&f.data_type, |dtype| match dtype.to_physical_type() { + PhysicalType::Primitive(pt) => match pt { + PrimitiveType::Float16 + | PrimitiveType::Float32 + | PrimitiveType::Float64 => pq_write::Encoding::Plain, + _ => pq_write::Encoding::DeltaBinaryPacked, + }, + PhysicalType::Binary => pq_write::Encoding::DeltaLengthByteArray, + _ => pq_write::Encoding::Plain, + }) + }) + .collect(); + + let file_writer = pq_write::FileWriter::try_new(file, schema, writer_options)?; + Ok(Self { + encodings, + file_writer, + }) + } + + fn write_chunk(&mut self, chunk: Chunk>) -> Result<()> { + let options = self.file_writer.options(); + // let columns = chunk + // .columns() + // .par_iter() + // .zip(self.file_writer.parquet_schema().fields().to_vec()) + // .zip(self.encodings.par_iter()) + // .flat_map(move |((array, type_), encoding)| { + // let encoded_columns = + // pq_write::array_to_columns(array, type_, options, encoding).unwrap(); + // encoded_columns + // .into_iter() + // .map(|encoded_pages| { + // let encoded_pages = pq_write::DynIter::new( + // encoded_pages + // .into_iter() + // .map(|x| x.map_err(arrow2::error::Error::from_external_error)), + // ); + // encoded_pages + // .map(|page| { + // pq_write::compress(page?, vec![], options.compression) + // .map_err(|x| x.into()) + // }) + // .collect::>>() + // }) + // .collect::>() + // }) + // .collect::>>>()?; + + // let rgi = pq_write::DynIter::new( + // columns + // .into_iter() + // .map(|column| Ok(pq_write::DynStreamingIterator::new(Bla::new(column)))), + // ); + + let rgi = row_group_iter( + chunk, + self.encodings.clone(), + self.file_writer.parquet_schema().fields().to_vec(), + options, + ); + self.file_writer.write(rgi)?; + + Ok(()) + } + + fn end(&mut self) -> Result<()> { + // TODO: footer metadata? + self.file_writer.end(None)?; + Ok(()) + } +} + +struct ConnToParquet { + to_arrow: Mutex, + parquet_writer: Mutex, +} + +impl ConnToParquet { + fn try_new(layout: MessageLayout, expected_chunks: ChunkSet, file: File) -> Result { + let to_arrow = ConnToArrow::try_new(layout, expected_chunks)?; + + let DataType::Struct(fields) = to_arrow.array.data_type() else { unreachable!() }; + let parquet_writer = ParquetWriter::try_new(fields, file)?; + + Ok(Self { + to_arrow: to_arrow.into(), + parquet_writer: parquet_writer.into(), + }) + } + + fn add_message(&self, chunk_idx: usize, message: MessageData) { + self.to_arrow + .lock() + .unwrap() + .add_message(chunk_idx, message); + } + + fn chunk_complete(&self, chunk_idx: usize) -> Result<()> { + self.to_arrow.lock().unwrap().chunk_complete(chunk_idx)?; + + self.write_ready()?; + + Ok(()) + } + + fn write_ready(&self) -> Result<()> { + loop { + let Some(chunk) = ({ + let mut to_arrow = self.to_arrow.lock().unwrap(); + to_arrow.take_chunk() + }) else { break }; + + self.parquet_writer.lock().unwrap().write_chunk(chunk)?; + } + Ok(()) + } + + fn end(&self) -> Result<()> { + self.to_arrow.lock().unwrap().flush(); + self.write_ready()?; + self.parquet_writer.lock().unwrap().end()?; + Ok(()) + } +} + +struct BagToParquet { + connections: HashMap, + index: BagIndex, +} + +impl BagToParquet { + fn try_new(index: &BagIndex, layouts: &MessageLayouts, base_path: &Path) -> Result { + let mut expected_chunks_per_conn: HashMap = Default::default(); + for (chunk_idx, chunk) in index.chunks.iter().enumerate() { + for conn in &chunk.connections { + let expected_chunks = expected_chunks_per_conn.entry(conn.conn_id).or_default(); + expected_chunks.insert(chunk_idx); + } + } + let connections: HashMap<_, _> = layouts + .conns + .iter() + .filter(|(_, conn)| { + !conn.info.topic.ends_with("image_rect_raw") + || conn.info.topic.starts_with("/camera3") + }) + .map(|(id, conn)| -> Result<_> { + let expected_chunks = std::mem::take(expected_chunks_per_conn.get_mut(id).unwrap()); + let conn_path = base_path.join(format!("{}.parquet", &conn.info.topic[1..])); + if let Some(parent) = conn_path.parent() { + mkdirp::mkdirp(parent)?; + } + let conn_file = File::create(conn_path)?; + let ctp = ConnToParquet::try_new(conn.layout.clone(), expected_chunks, conn_file)?; + Ok((*id, ctp)) + }) + .collect::>()?; + Ok(Self { + connections, + index: index.clone(), + }) + } + + fn add_message(&self, chunk_idx: usize, message: MessageData) { + if let Some(ctp) = self.connections.get(&message.header.conn_id) { + ctp.add_message(chunk_idx, message) + } + } + + fn chunk_complete(&self, chunk_idx: usize) -> Result<()> { + let chunk_info = self.index.chunks.get(chunk_idx).unwrap(); + for conn in &chunk_info.connections { + if let Some(ctp) = self.connections.get(&conn.conn_id) { + ctp.chunk_complete(chunk_idx)?; + } + } + Ok(()) + } + + fn end(&self) -> Result<()> { + self.connections + .values() + .par_bridge() + .try_for_each(|ctp| ctp.end())?; + Ok(()) + } +} + +fn main() -> Result<()> { + color_eyre::install()?; + env_logger::init(); + + let args: Vec<_> = args().collect(); + if args.len() != 3 { + eprintln!("Usage: {} ", args[0]); + return Ok(()); + } + + let bag_path = &args[1]; + let arrow_path = &args[2]; + let mut bag = Bag::open(bag_path)?; + + let layouts = bag.compute_message_layouts()?; + + // let info = bag.compute_info()?; + // let total_messages: u64 = info.per_connection.values().map(|con| con.count).sum(); + // info!("exporting {} messages", total_messages); + + let num_chunks = bag.index().chunks.len(); + let btp = Arc::new(BagToParquet::try_new( + bag.index(), + &layouts, + Path::new(arrow_path), + )?); + bag.index() + .chunks + .iter() + .enumerate() + // Force using par_bridge as into_par_iter distributes chunks throughout the file + .par_bridge() + .progress_count(num_chunks as u64) + .map_with(bag.reader().clone(), |reader, (idx, chunk)| { + (idx, read_chunk_data_at(reader, chunk.pos)) + }) + .for_each_with(btp.clone(), |btp, (chunk_idx, chunk)| { + let it = ChunkMessageIterator::new(chunk.unwrap()); + for msg in it { + let msg = msg.unwrap(); + btp.add_message(chunk_idx, msg); + } + btp.chunk_complete(chunk_idx).unwrap(); + }); + + btp.end()?; + + Ok(()) +} diff --git a/rsbagpy/Cargo.toml b/rsbagpy/Cargo.toml new file mode 100644 index 0000000..52e99cf --- /dev/null +++ b/rsbagpy/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "rsbagpy" +version = "0.1.0" +edition = "2018" + +[lib] +# The name of the native library. This is the name which will be used in Python to import the +# library (i.e. `import string_sum`). If you change this, you must also change the name of the +# `#[pymodule]` in `src/lib.rs`. +name = "rsbag" +# "cdylib" is necessary to produce a shared library for Python to import from. +# +# Downstream Rust code (including code in `bin/`, `examples/`, and `tests/`) will not be able +# to `use string_sum;` unless the "rlib" or "lib" crate type is also included, e.g.: +# crate-type = ["cdylib", "rlib"] +crate-type = ["cdylib"] + +[dependencies] +pyo3 = { version = "0.16.4", features = ["extension-module"] } \ No newline at end of file diff --git a/rsbagpy/src/lib.rs b/rsbagpy/src/lib.rs new file mode 100644 index 0000000..e30676e --- /dev/null +++ b/rsbagpy/src/lib.rs @@ -0,0 +1,30 @@ +use std::path::PathBuf; + +use pyo3::prelude::*; + +#[pyclass] +struct Message {} + +#[pyclass] +struct Bag {} + +#[pymethods] +impl Bag { + #[new] + fn new(path: PathBuf) -> Self { + Self {} + } + + fn read(&mut self) -> Option { + Some(Message {}) + } +} + +/// A Python module implemented in Rust. The name of this function must match +/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to +/// import the module. +#[pymodule] +fn rsbag(_py: Python<'_>, m: &PyModule) -> PyResult<()> { + m.add_class::()?; + Ok(()) +}