rsbag_arrow: converting rosbag to parqeut

This commit is contained in:
Alex Mikhalev 2025-02-07 10:26:57 -08:00
parent 92868a954b
commit 2b20469859
13 changed files with 875 additions and 14 deletions

View File

@ -2,6 +2,8 @@
members = [ members = [
"rsbag", "rsbag",
"rsbagpy",
"rsbag_arrow"
] ]
[profile.release] [profile.release]

View File

@ -1,7 +1,7 @@
[package] [package]
name = "rsbag" name = "rsbag"
version = "0.1.0" version = "0.1.0"
edition = "2018" edition = "2021"
[features] [features]
default = ["mmap", "rayon", "bson"] default = ["mmap", "rayon", "bson"]
@ -16,7 +16,7 @@ lz4_flex = { version = "0.8.2", default-features = false, features = ["std", "ch
memmap = { version = "0.7.0", optional = true } memmap = { version = "0.7.0", optional = true }
nom = "7.0.0" nom = "7.0.0"
num_enum = "0.5.4" num_enum = "0.5.4"
rayon = { version = "1.5.1", optional = true } rayon = { version = "1.6.1", optional = true }
regex = "1.5.4" regex = "1.5.4"
ros_message = "0.1.0" ros_message = "0.1.0"
smallvec = "1.6.1" smallvec = "1.6.1"

View File

@ -1,7 +1,7 @@
use std::{fs::File, path::Path}; use std::{fs::File, path::Path};
use eyre::Context; use eyre::Context;
use rayon::iter::ParallelIterator; use rayon::prelude::{IndexedParallelIterator, ParallelIterator};
use crate::{ use crate::{
chunk::{read_chunks_data, read_chunks_messages, MessageData}, chunk::{read_chunks_data, read_chunks_messages, MessageData},
@ -31,6 +31,10 @@ impl Bag {
&self.index &self.index
} }
pub fn reader(&self) -> &MmapReader {
&self.reader
}
pub fn compute_message_layouts(&mut self) -> Result<MessageLayouts> { pub fn compute_message_layouts(&mut self) -> Result<MessageLayouts> {
MessageLayouts::new(&self.index.connections) MessageLayouts::new(&self.index.connections)
} }
@ -39,7 +43,7 @@ impl Bag {
BagInfo::compute(self.reader.clone(), &self.index) BagInfo::compute(self.reader.clone(), &self.index)
} }
pub fn read_chunks(&mut self) -> impl ParallelIterator<Item = Result<Vec<u8>>> + '_ { pub fn read_chunks(&mut self) -> impl IndexedParallelIterator<Item = Result<Vec<u8>>> + '_ {
read_chunks_data(self.reader.clone(), &self.index.chunks) read_chunks_data(self.reader.clone(), &self.index.chunks)
} }

View File

@ -6,7 +6,10 @@ use std::{
use bytes::Bytes; use bytes::Bytes;
use eyre::{bail, eyre, Context}; use eyre::{bail, eyre, Context};
use rayon::iter::{IntoParallelIterator, ParallelIterator}; use rayon::{
iter::{IntoParallelIterator, ParallelIterator},
prelude::IndexedParallelIterator,
};
use crate::{ use crate::{
error, error,
@ -94,10 +97,11 @@ pub fn read_chunk_data_at<R: BagReader + io::Seek>(
pub fn read_chunks_data<'a, R, C>( pub fn read_chunks_data<'a, R, C>(
reader: R, reader: R,
chunks: C, chunks: C,
) -> impl ParallelIterator<Item = Result<Vec<u8>>> + 'a ) -> impl IndexedParallelIterator<Item = Result<Vec<u8>>> + 'a
where where
R: BagReader + io::Seek + Clone + Send + 'a, R: BagReader + io::Seek + Clone + Send + 'a,
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a, C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
<C as IntoParallelIterator>::Iter: IndexedParallelIterator,
{ {
chunks chunks
.into_par_iter() .into_par_iter()
@ -139,6 +143,7 @@ pub fn read_chunks_messages<'a, R, C>(
where where
R: BagReader + io::Seek + Clone + Send + 'a, R: BagReader + io::Seek + Clone + Send + 'a,
C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a, C: IntoParallelIterator<Item = &'a ChunkInfo> + 'a,
<C as IntoParallelIterator>::Iter: IndexedParallelIterator,
{ {
read_chunks_data(reader, chunks).flat_map_iter(move |data| ChunkMessageIterator { read_chunks_data(reader, chunks).flat_map_iter(move |data| ChunkMessageIterator {
reader: data.map(|data| BytesReader::from(Bytes::from(data))), reader: data.map(|data| BytesReader::from(Bytes::from(data))),

View File

@ -38,7 +38,7 @@ impl ConnInfo {
} }
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct ChunkInfo { pub struct ChunkInfo {
pub pos: u64, pub pos: u64,
pub start_time: Time, pub start_time: Time,
@ -70,7 +70,7 @@ impl ChunkInfo {
} }
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct ChunkConnection { pub struct ChunkConnection {
pub conn_id: u32, pub conn_id: u32,
pub count: u32, pub count: u32,
@ -88,7 +88,7 @@ impl ChunkConnection {
} }
} }
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct BagIndex { pub struct BagIndex {
pub connections: Vec<ConnInfo>, pub connections: Vec<ConnInfo>,
pub chunks: Vec<ChunkInfo>, pub chunks: Vec<ChunkInfo>,

View File

@ -114,7 +114,7 @@ impl FieldLayout {
} }
} }
#[derive(Clone, Debug, PartialEq, Eq)] #[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Multiplicity { pub enum Multiplicity {
Unit, Unit,
Fixed(usize), Fixed(usize),

View File

@ -52,9 +52,9 @@ pub fn compute_layout(conn: &ConnInfo) -> Result<MessageLayout> {
MessageLayout::from_msgs(&msgs) MessageLayout::from_msgs(&msgs)
} }
struct ConnData { pub struct ConnData {
info: Arc<ConnInfo>, pub info: Arc<ConnInfo>,
layout: MessageLayout, pub layout: MessageLayout,
} }
impl ConnData { impl ConnData {
@ -67,7 +67,7 @@ impl ConnData {
} }
} }
pub struct MessageLayouts { pub struct MessageLayouts {
conns: HashMap<u32, ConnData>, pub conns: HashMap<u32, ConnData>,
} }
impl MessageLayouts { impl MessageLayouts {
@ -79,6 +79,10 @@ impl MessageLayouts {
Ok(MessageLayouts { conns }) Ok(MessageLayouts { conns })
} }
pub fn get(&self, conn_id: u32) -> Option<&MessageLayout> {
self.conns.get(&conn_id).map(|conn_data| &conn_data.layout)
}
pub fn decode(&self, message: &MessageData) -> Result<(Arc<ConnInfo>, MessageValue)> { pub fn decode(&self, message: &MessageData) -> Result<(Arc<ConnInfo>, MessageValue)> {
let conn = self let conn = self
.conns .conns

View File

@ -7,6 +7,7 @@ use crate::{parse, Result};
const READ_SIZE: usize = 4096; const READ_SIZE: usize = 4096;
#[derive(Clone)]
pub struct IoReader<R> { pub struct IoReader<R> {
read: R, read: R,
buffer: BytesMut, buffer: BytesMut,

20
rsbag_arrow/Cargo.toml Normal file
View File

@ -0,0 +1,20 @@
[package]
name = "rsbag_arrow"
version = "0.1.0"
edition = "2018"
rust-version = "1.66"
[dependencies]
arrow2 = { version = "0.15.0", features = ["io_parquet", "io_parquet_compression"] }
color-eyre = "0.5.11"
env_logger = "0.9.0"
eyre = "0.6.5"
log = "0.4.14"
indicatif = { version = "0.16.2", features = ["rayon"] }
rayon = { version = "1.6.1" }
rsbag = { path = "../rsbag" }
nom = "7.0.0"
mkdirp = "1.0.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
jemallocator = "0.3.2"

355
rsbag_arrow/src/arrow.rs Normal file
View File

@ -0,0 +1,355 @@
use arrow2::{
array::{
MutableArray, MutableBinaryArray, MutableBooleanArray, MutableFixedSizeBinaryArray,
MutableListArray, MutablePrimitiveArray, MutableStructArray, MutableUtf8ValuesArray,
TryPush,
},
datatypes::{DataType, Field, TimeUnit},
types::NativeType,
};
use rsbag::{
layout::{FieldLayout, FieldType, MessageLayout, Multiplicity},
parse::{header::fields::parse_time, Error, ErrorKind, IResult, Input},
Result,
};
use eyre::bail;
use nom::{
bytes::complete::take,
combinator::map_res,
multi::{length_count, length_data, many_m_n},
number::complete::{
le_f32, le_f64, le_i16, le_i32, le_i64, le_i8, le_u16, le_u32, le_u64, le_u8,
},
Parser,
};
fn make_mutable_array<A: MutableArray + Default + 'static>(
multiplicity: Multiplicity,
) -> Box<dyn MutableArray> {
make_mutable_array_from_values(multiplicity, A::default())
}
fn make_mutable_array_from_values<A: MutableArray + 'static>(
multiplicity: Multiplicity,
values: A,
) -> Box<dyn MutableArray> {
match multiplicity {
Multiplicity::Unit => Box::new(values),
// TODO: cannot write fixed size list to parqeut
// Multiplicity::Fixed(size) => Box::new(MutableFixedSizeListArray::new_with_field(
// values, "item", false, size,
// )),
Multiplicity::Fixed(_) | Multiplicity::Dynamic => Box::new(
MutableListArray::<i32, A>::new_with_field(values, "item", false),
),
}
}
fn field_to_mutable_array(field: &FieldLayout) -> Box<dyn MutableArray> {
// TODO: BinaryArray for I8/U8
match &field.typ {
FieldType::Bool => make_mutable_array::<MutableBooleanArray>(field.multiplicity),
FieldType::I8(_) => make_mutable_array::<MutablePrimitiveArray<i8>>(field.multiplicity),
FieldType::I16 => make_mutable_array::<MutablePrimitiveArray<i16>>(field.multiplicity),
FieldType::I32 => make_mutable_array::<MutablePrimitiveArray<i32>>(field.multiplicity),
FieldType::I64 => make_mutable_array::<MutablePrimitiveArray<i64>>(field.multiplicity),
FieldType::U8(_) => match field.multiplicity {
Multiplicity::Unit => Box::new(MutablePrimitiveArray::<u8>::new()),
Multiplicity::Fixed(n) => Box::new(MutableFixedSizeBinaryArray::new(n)),
Multiplicity::Dynamic => Box::new(MutableBinaryArray::<i32>::new()),
},
FieldType::U16 => make_mutable_array::<MutablePrimitiveArray<u16>>(field.multiplicity),
FieldType::U32 => make_mutable_array::<MutablePrimitiveArray<u32>>(field.multiplicity),
FieldType::U64 => make_mutable_array::<MutablePrimitiveArray<u64>>(field.multiplicity),
FieldType::F32 => make_mutable_array::<MutablePrimitiveArray<f32>>(field.multiplicity),
FieldType::F64 => make_mutable_array::<MutablePrimitiveArray<f64>>(field.multiplicity),
FieldType::String => make_mutable_array::<MutableUtf8ValuesArray<i32>>(field.multiplicity),
FieldType::Time => make_mutable_array_from_values(
field.multiplicity,
MutablePrimitiveArray::<i64>::from(DataType::Timestamp(
TimeUnit::Nanosecond,
Some("UTC".into()),
)),
),
FieldType::Duration => make_mutable_array_from_values(
field.multiplicity,
MutablePrimitiveArray::<i64>::from(DataType::Duration(TimeUnit::Nanosecond)),
),
FieldType::Message(layout) => {
make_mutable_array_from_values(field.multiplicity, layout_to_mutable_array(&*layout))
}
}
}
fn layout_to_mutable_arrays(layout: &MessageLayout) -> Vec<Box<dyn MutableArray>> {
layout.fields().iter().map(field_to_mutable_array).collect()
}
pub fn layout_to_mutable_array(layout: &MessageLayout) -> MutableStructArray {
let values = layout_to_mutable_arrays(layout);
let fields = layout
.fields()
.iter()
.zip(&values)
.map(|(field, array)| Field::new(field.name.to_string(), array.data_type().clone(), false))
.collect();
let datatype = DataType::Struct(fields);
MutableStructArray::new(datatype, values)
}
// TODO: more efficient primitive parsing using bytemuck
fn parse_push_primitive<'a, 'b: 'a, T, P>(
mut parser: P,
array: &'a mut dyn MutableArray,
multiplicity: Multiplicity,
input: Input<'b>,
) -> IResult<'b, ()>
where
T: NativeType,
P: Parser<Input<'b>, T, Error<Input<'b>>> + 'a,
{
match multiplicity {
Multiplicity::Unit => {
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<T>>()
.expect("wrong array type");
let (rest, value) = parser.parse(input)?;
array.push(Some(value));
Ok((rest, ()))
}
Multiplicity::Fixed(n) => {
// TODO: FixedSizeListArray unsupported to write to parquet
// let array = array
// .as_mut_any()
// .downcast_mut::<MutableFixedSizeListArray<MutablePrimitiveArray<T>>>()
// .expect("wrong array type");
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, MutablePrimitiveArray<T>>>()
.expect("wrong array type");
let (rest, values) = many_m_n(n, n, parser).parse(input)?;
array.mut_values().extend_from_slice(values.as_ref());
array.try_push_valid().unwrap();
Ok((rest, ()))
}
Multiplicity::Dynamic => {
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, MutablePrimitiveArray<T>>>()
.expect("wrong array type");
let (rest, values) = length_count(le_u32, parser).parse(input)?;
array.mut_values().extend_from_slice(values.as_ref());
array.try_push_valid().unwrap();
Ok((rest, ()))
}
}
}
// TODO: more efficient primitive parsing using bytemuck
fn parse_push_u8<'a, 'b: 'a>(
array: &'a mut dyn MutableArray,
multiplicity: Multiplicity,
input: Input<'b>,
) -> IResult<'b, ()> {
// TODO: copy directly without
match multiplicity {
Multiplicity::Unit => {
let array = array
.as_mut_any()
.downcast_mut::<MutablePrimitiveArray<u8>>()
.expect("wrong array type");
let (rest, value) = le_u8.parse(input)?;
array.push(Some(value));
Ok((rest, ()))
}
Multiplicity::Fixed(n) => {
let array = array
.as_mut_any()
.downcast_mut::<MutableFixedSizeBinaryArray>()
.expect("wrong array type");
let (rest, values) = take(n).parse(input)?;
array.push(Some(values));
Ok((rest, ()))
}
Multiplicity::Dynamic => {
let array = array
.as_mut_any()
.downcast_mut::<MutableBinaryArray<i32>>()
.expect("wrong array type");
let (rest, values) = length_data(le_u32).parse(input)?;
array.push(Some(values));
Ok((rest, ()))
}
}
}
fn parse_push<'a, 'b: 'a, A, T, P>(
mut parser: P,
array: &'a mut dyn MutableArray,
multiplicity: Multiplicity,
input: Input<'b>,
) -> IResult<'b, ()>
where
P: Parser<Input<'b>, T, Error<Input<'b>>> + 'a,
A: MutableArray + TryPush<T> + 'static,
{
match multiplicity {
Multiplicity::Unit => {
let array = array
.as_mut_any()
.downcast_mut::<A>()
.expect("wrong array type");
let (rest, value) = parser.parse(input)?;
array.try_push(value).expect("array push failed");
Ok((rest, ()))
}
Multiplicity::Fixed(n) => {
// TODO: FixedSizeListArray unsupported to write to parquet
// let array = array
// .as_mut_any()
// .downcast_mut::<MutableFixedSizeListArray<A>>()
// .expect("wrong array type");
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, A>>()
.expect("wrong array type");
let (rest, values) = many_m_n(n, n, parser).parse(input)?;
for value in values {
array
.mut_values()
.try_push(value)
.expect("array push failed");
}
array.try_push_valid().unwrap();
Ok((rest, ()))
}
Multiplicity::Dynamic => {
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, A>>()
.expect("wrong array type");
let (rest, values) = length_count(le_u32, parser).parse(input)?;
for value in values {
array
.mut_values()
.try_push(value)
.expect("array push failed");
}
array.try_push_valid().unwrap();
Ok((rest, ()))
}
}
}
fn parse_push_message<'a, 'b: 'a>(
layout: &MessageLayout,
array: &'a mut dyn MutableArray,
multiplicity: Multiplicity,
input: Input<'b>,
) -> IResult<'b, ()> {
match multiplicity {
Multiplicity::Unit => {
let array = array
.as_mut_any()
.downcast_mut::<MutableStructArray>()
.expect("wrong array type");
message_parser(layout, array).parse(input)
}
Multiplicity::Fixed(n) => {
// TODO: FixedSizeListArray unsupported to write to parquet
// let array = array
// .as_mut_any()
// .downcast_mut::<MutableFixedSizeListArray<MutableStructArray>>()
// .expect("wrong array type");
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, MutableStructArray>>()
.expect("wrong array type");
let parser = message_parser(layout, array.mut_values());
let (rest, _) = many_m_n(n, n, parser).map(|_| ()).parse(input)?;
array.try_push_valid().unwrap();
Ok((rest, ()))
}
Multiplicity::Dynamic => {
let array = array
.as_mut_any()
.downcast_mut::<MutableListArray<i32, MutableStructArray>>()
.expect("wrong array type");
let parser = message_parser(layout, array.mut_values());
let (rest, _) = length_count(le_u32, parser).map(|_| ()).parse(input)?;
array.try_push_valid().unwrap();
Ok((rest, ()))
}
}
}
fn field_parser<'a, 'b: 'a>(
field: &'a FieldLayout,
array: &'a mut dyn MutableArray,
) -> impl Parser<Input<'b>, (), Error<Input<'b>>> + 'a {
move |input| -> IResult<()> {
match &field.typ {
FieldType::Bool => parse_push::<MutableBooleanArray, _, _>(
le_u8.map(|v| Some(v != 0)),
array,
field.multiplicity,
input,
),
FieldType::I8(_) => parse_push_primitive(le_i8, array, field.multiplicity, input),
FieldType::I16 => parse_push_primitive(le_i16, array, field.multiplicity, input),
FieldType::I32 => parse_push_primitive(le_i32, array, field.multiplicity, input),
FieldType::I64 => parse_push_primitive(le_i64, array, field.multiplicity, input),
FieldType::U8(_) => parse_push_u8(array, field.multiplicity, input),
FieldType::U16 => parse_push_primitive(le_u16, array, field.multiplicity, input),
FieldType::U32 => parse_push_primitive(le_u32, array, field.multiplicity, input),
FieldType::U64 => parse_push_primitive(le_u64, array, field.multiplicity, input),
FieldType::F32 => parse_push_primitive(le_f32, array, field.multiplicity, input),
FieldType::F64 => parse_push_primitive(le_f64, array, field.multiplicity, input),
FieldType::String => parse_push::<MutableUtf8ValuesArray<i32>, _, _>(
map_res(length_data(le_u32), |s| {
String::from_utf8(Vec::from(s)).map_err(ErrorKind::from)
}),
array,
field.multiplicity,
input,
),
FieldType::Time => parse_push_primitive(
parse_time.map(|time| time.nanos()),
array,
field.multiplicity,
input,
),
FieldType::Duration => todo!(),
FieldType::Message(layout) => {
parse_push_message(layout.as_ref(), array, field.multiplicity, input)
}
}
}
}
fn message_parser<'a, 'b: 'a>(
layout: &'a MessageLayout,
array: &'a mut MutableStructArray,
) -> impl Parser<Input<'b>, (), Error<Input<'b>>> + 'a {
move |mut input| -> IResult<_> {
for (field, values) in layout.fields().iter().zip(array.mut_values()) {
let (rest, _) = field_parser(field, &mut **values).parse(input)?;
input = rest;
}
Ok((input, ()))
}
}
pub fn parse_message(
layout: &MessageLayout,
input: &[u8],
array: &mut MutableStructArray,
) -> Result<()> {
match message_parser(layout, array).parse(input) {
Ok((&[], message)) => Ok(message),
Ok(_) => bail!("extra data after message"),
Err(nom::Err::Incomplete(_)) => unreachable!(),
Err(nom::Err::Error(e) | nom::Err::Failure(e)) => Err(e.into_owned().into()),
}
}

421
rsbag_arrow/src/main.rs Normal file
View File

@ -0,0 +1,421 @@
use std::{
collections::{BTreeMap, BTreeSet, HashMap, VecDeque},
env::args,
fs::File,
path::Path,
sync::{Arc, Mutex},
};
use arrow2::{
array::{Array, MutableArray, MutableStructArray},
chunk::Chunk,
datatypes::{DataType, Field, PhysicalType, Schema},
io::parquet::write::{self as pq_write, row_group_iter, CompressedPage, ZstdLevel},
types::PrimitiveType,
};
use indicatif::ParallelProgressIterator;
use log::{info, trace};
use rayon::{
iter::ParallelIterator,
prelude::{IndexedParallelIterator, ParallelBridge},
};
use rsbag::{
chunk::{read_chunk_data_at, ChunkMessageIterator, MessageData},
index::BagIndex,
layout::MessageLayout,
message::MessageLayouts,
Bag, Result,
};
#[cfg(not(target_env = "msvc"))]
use jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
mod arrow;
struct Bla {
columns: VecDeque<CompressedPage>,
current: Option<CompressedPage>,
}
impl Bla {
pub fn new(columns: VecDeque<CompressedPage>) -> Self {
Self {
columns,
current: None,
}
}
}
impl pq_write::FallibleStreamingIterator for Bla {
type Item = CompressedPage;
type Error = arrow2::error::Error;
fn advance(&mut self) -> Result<(), arrow2::error::Error> {
self.current = self.columns.pop_front();
Ok(())
}
fn get(&self) -> Option<&Self::Item> {
self.current.as_ref()
}
}
type ChunkSet = BTreeSet<usize>;
struct ConnToArrow {
layout: MessageLayout,
array: MutableStructArray,
expected_chunks: ChunkSet,
/// Messages waiting to be inserted in array in order of chunk
pending_messages: BTreeMap<usize, Vec<MessageData>>,
estimated_size: usize,
ready_chunks: VecDeque<Chunk<Box<dyn Array>>>,
max_chunk_size: usize,
}
impl ConnToArrow {
fn try_new(layout: MessageLayout, expected_chunks: ChunkSet) -> Result<Self> {
let array = arrow::layout_to_mutable_array(&layout);
Ok(Self {
layout: layout,
array: array,
expected_chunks,
pending_messages: Default::default(),
estimated_size: 0,
ready_chunks: Default::default(),
max_chunk_size: 128 * 1024 * 1024,
})
}
fn add_message(&mut self, chunk_idx: usize, message: MessageData) {
self.pending_messages
.entry(chunk_idx)
.or_default()
.push(message);
}
fn chunk_complete(&mut self, chunk_idx: usize) -> Result<()> {
assert!(self.expected_chunks.remove(&chunk_idx));
// Flush all pending messages from chunks before the last expected chunk.
// i.e. wait to flush a chunk until all previous chunks have been recieved.
let latest_expected_chunk = self.expected_chunks.first().copied();
let mut flushed = 0;
for (&chunk_idx, messages) in self.pending_messages.iter_mut() {
if let Some(latest_expected_chunk) = latest_expected_chunk {
if chunk_idx >= latest_expected_chunk {
break;
}
}
flushed += 1;
for message in messages.drain(..) {
arrow::parse_message(&self.layout, message.data.as_ref(), &mut self.array)?;
self.estimated_size += message.data.len();
}
}
trace!(
"Chunk {} complete, with {} pending chunks (latest {:?}). Flushed {} chunks",
chunk_idx,
self.pending_messages.len(),
latest_expected_chunk,
flushed
);
// Remove all empty chunks
self.pending_messages.retain(|_, v| !v.is_empty());
if self.estimated_size > self.max_chunk_size {
trace!(
"connection data size exceeds {}MiB, flushing",
self.max_chunk_size / (1024 * 1024)
);
self.flush();
}
Ok(())
}
fn flush(&mut self) {
let chunk = Chunk::new(
std::mem::take(self.array.mut_values())
.into_iter()
.map(|mut v| v.as_box())
.collect(),
);
self.ready_chunks.push_back(chunk);
self.array = arrow::layout_to_mutable_array(&self.layout);
self.estimated_size = 0;
}
fn take_chunk(&mut self) -> Option<Chunk<Box<dyn Array>>> {
self.ready_chunks.pop_front()
}
}
struct ParquetWriter {
encodings: Vec<Vec<pq_write::Encoding>>,
file_writer: pq_write::FileWriter<File>,
}
impl ParquetWriter {
fn try_new(fields: &[Field], file: File) -> Result<Self> {
let writer_options = pq_write::WriteOptions {
write_statistics: true,
version: pq_write::Version::V2,
compression: pq_write::CompressionOptions::Zstd(Some(ZstdLevel::try_new(10).unwrap())),
// compression: pq_write::CompressionOptions::Zstd(None),
// compression: pq_write::CompressionOptions::Snappy,
// compression: pq_write::CompressionOptions::Uncompressed,
data_pagesize_limit: Some(128 * 1024 * 1024),
// data_pagesize_limit: Some(16 * 1024 * 1024),
};
let schema = Schema {
fields: fields.to_vec(),
metadata: Default::default(),
};
let encodings: Vec<_> = fields
.iter()
.map(|f| {
pq_write::transverse(&f.data_type, |dtype| match dtype.to_physical_type() {
PhysicalType::Primitive(pt) => match pt {
PrimitiveType::Float16
| PrimitiveType::Float32
| PrimitiveType::Float64 => pq_write::Encoding::Plain,
_ => pq_write::Encoding::DeltaBinaryPacked,
},
PhysicalType::Binary => pq_write::Encoding::DeltaLengthByteArray,
_ => pq_write::Encoding::Plain,
})
})
.collect();
let file_writer = pq_write::FileWriter::try_new(file, schema, writer_options)?;
Ok(Self {
encodings,
file_writer,
})
}
fn write_chunk(&mut self, chunk: Chunk<Box<dyn Array>>) -> Result<()> {
let options = self.file_writer.options();
// let columns = chunk
// .columns()
// .par_iter()
// .zip(self.file_writer.parquet_schema().fields().to_vec())
// .zip(self.encodings.par_iter())
// .flat_map(move |((array, type_), encoding)| {
// let encoded_columns =
// pq_write::array_to_columns(array, type_, options, encoding).unwrap();
// encoded_columns
// .into_iter()
// .map(|encoded_pages| {
// let encoded_pages = pq_write::DynIter::new(
// encoded_pages
// .into_iter()
// .map(|x| x.map_err(arrow2::error::Error::from_external_error)),
// );
// encoded_pages
// .map(|page| {
// pq_write::compress(page?, vec![], options.compression)
// .map_err(|x| x.into())
// })
// .collect::<Result<VecDeque<_>>>()
// })
// .collect::<Vec<_>>()
// })
// .collect::<Result<Vec<VecDeque<pq_write::CompressedPage>>>>()?;
// let rgi = pq_write::DynIter::new(
// columns
// .into_iter()
// .map(|column| Ok(pq_write::DynStreamingIterator::new(Bla::new(column)))),
// );
let rgi = row_group_iter(
chunk,
self.encodings.clone(),
self.file_writer.parquet_schema().fields().to_vec(),
options,
);
self.file_writer.write(rgi)?;
Ok(())
}
fn end(&mut self) -> Result<()> {
// TODO: footer metadata?
self.file_writer.end(None)?;
Ok(())
}
}
struct ConnToParquet {
to_arrow: Mutex<ConnToArrow>,
parquet_writer: Mutex<ParquetWriter>,
}
impl ConnToParquet {
fn try_new(layout: MessageLayout, expected_chunks: ChunkSet, file: File) -> Result<Self> {
let to_arrow = ConnToArrow::try_new(layout, expected_chunks)?;
let DataType::Struct(fields) = to_arrow.array.data_type() else { unreachable!() };
let parquet_writer = ParquetWriter::try_new(fields, file)?;
Ok(Self {
to_arrow: to_arrow.into(),
parquet_writer: parquet_writer.into(),
})
}
fn add_message(&self, chunk_idx: usize, message: MessageData) {
self.to_arrow
.lock()
.unwrap()
.add_message(chunk_idx, message);
}
fn chunk_complete(&self, chunk_idx: usize) -> Result<()> {
self.to_arrow.lock().unwrap().chunk_complete(chunk_idx)?;
self.write_ready()?;
Ok(())
}
fn write_ready(&self) -> Result<()> {
loop {
let Some(chunk) = ({
let mut to_arrow = self.to_arrow.lock().unwrap();
to_arrow.take_chunk()
}) else { break };
self.parquet_writer.lock().unwrap().write_chunk(chunk)?;
}
Ok(())
}
fn end(&self) -> Result<()> {
self.to_arrow.lock().unwrap().flush();
self.write_ready()?;
self.parquet_writer.lock().unwrap().end()?;
Ok(())
}
}
struct BagToParquet {
connections: HashMap<u32, ConnToParquet>,
index: BagIndex,
}
impl BagToParquet {
fn try_new(index: &BagIndex, layouts: &MessageLayouts, base_path: &Path) -> Result<Self> {
let mut expected_chunks_per_conn: HashMap<u32, ChunkSet> = Default::default();
for (chunk_idx, chunk) in index.chunks.iter().enumerate() {
for conn in &chunk.connections {
let expected_chunks = expected_chunks_per_conn.entry(conn.conn_id).or_default();
expected_chunks.insert(chunk_idx);
}
}
let connections: HashMap<_, _> = layouts
.conns
.iter()
.filter(|(_, conn)| {
!conn.info.topic.ends_with("image_rect_raw")
|| conn.info.topic.starts_with("/camera3")
})
.map(|(id, conn)| -> Result<_> {
let expected_chunks = std::mem::take(expected_chunks_per_conn.get_mut(id).unwrap());
let conn_path = base_path.join(format!("{}.parquet", &conn.info.topic[1..]));
if let Some(parent) = conn_path.parent() {
mkdirp::mkdirp(parent)?;
}
let conn_file = File::create(conn_path)?;
let ctp = ConnToParquet::try_new(conn.layout.clone(), expected_chunks, conn_file)?;
Ok((*id, ctp))
})
.collect::<Result<_>>()?;
Ok(Self {
connections,
index: index.clone(),
})
}
fn add_message(&self, chunk_idx: usize, message: MessageData) {
if let Some(ctp) = self.connections.get(&message.header.conn_id) {
ctp.add_message(chunk_idx, message)
}
}
fn chunk_complete(&self, chunk_idx: usize) -> Result<()> {
let chunk_info = self.index.chunks.get(chunk_idx).unwrap();
for conn in &chunk_info.connections {
if let Some(ctp) = self.connections.get(&conn.conn_id) {
ctp.chunk_complete(chunk_idx)?;
}
}
Ok(())
}
fn end(&self) -> Result<()> {
self.connections
.values()
.par_bridge()
.try_for_each(|ctp| ctp.end())?;
Ok(())
}
}
fn main() -> Result<()> {
color_eyre::install()?;
env_logger::init();
let args: Vec<_> = args().collect();
if args.len() != 3 {
eprintln!("Usage: {} <bag path> <parquet file>", args[0]);
return Ok(());
}
let bag_path = &args[1];
let arrow_path = &args[2];
let mut bag = Bag::open(bag_path)?;
let layouts = bag.compute_message_layouts()?;
// let info = bag.compute_info()?;
// let total_messages: u64 = info.per_connection.values().map(|con| con.count).sum();
// info!("exporting {} messages", total_messages);
let num_chunks = bag.index().chunks.len();
let btp = Arc::new(BagToParquet::try_new(
bag.index(),
&layouts,
Path::new(arrow_path),
)?);
bag.index()
.chunks
.iter()
.enumerate()
// Force using par_bridge as into_par_iter distributes chunks throughout the file
.par_bridge()
.progress_count(num_chunks as u64)
.map_with(bag.reader().clone(), |reader, (idx, chunk)| {
(idx, read_chunk_data_at(reader, chunk.pos))
})
.for_each_with(btp.clone(), |btp, (chunk_idx, chunk)| {
let it = ChunkMessageIterator::new(chunk.unwrap());
for msg in it {
let msg = msg.unwrap();
btp.add_message(chunk_idx, msg);
}
btp.chunk_complete(chunk_idx).unwrap();
});
btp.end()?;
Ok(())
}

19
rsbagpy/Cargo.toml Normal file
View File

@ -0,0 +1,19 @@
[package]
name = "rsbagpy"
version = "0.1.0"
edition = "2018"
[lib]
# The name of the native library. This is the name which will be used in Python to import the
# library (i.e. `import string_sum`). If you change this, you must also change the name of the
# `#[pymodule]` in `src/lib.rs`.
name = "rsbag"
# "cdylib" is necessary to produce a shared library for Python to import from.
#
# Downstream Rust code (including code in `bin/`, `examples/`, and `tests/`) will not be able
# to `use string_sum;` unless the "rlib" or "lib" crate type is also included, e.g.:
# crate-type = ["cdylib", "rlib"]
crate-type = ["cdylib"]
[dependencies]
pyo3 = { version = "0.16.4", features = ["extension-module"] }

30
rsbagpy/src/lib.rs Normal file
View File

@ -0,0 +1,30 @@
use std::path::PathBuf;
use pyo3::prelude::*;
#[pyclass]
struct Message {}
#[pyclass]
struct Bag {}
#[pymethods]
impl Bag {
#[new]
fn new(path: PathBuf) -> Self {
Self {}
}
fn read(&mut self) -> Option<Message> {
Some(Message {})
}
}
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule]
fn rsbag(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_class::<Bag>()?;
Ok(())
}