use std::io::{self, SeekFrom}; use bytes::{Buf, BytesMut}; use nom::number::streaming::le_u32; use crate::error::{Error, Result}; use crate::index::{ChunkInfo, ConnInfo}; use crate::parse::{self, Header, Op, Version}; const READ_SIZE: usize = 4096; pub trait BagReader { fn read_parser<'a, O: 'a, P>(&'a mut self, parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>; fn seek(&mut self, pos: SeekFrom) -> Result<()>; fn read_version(&mut self) -> Result { self.read_parser(Version::parse) } fn read_header(&mut self) -> Result
{ self.read_parser(Header::parse) } fn read_header_op(&mut self, op: Op) -> Result
{ let header = self.read_header()?; if header.read_op()? != op { return Err(Error::other(format!("Expected {:?} op", op))); } Ok(header) } fn read_data_length(&mut self) -> Result { self.read_parser(le_u32) } fn skip_data(&mut self) -> Result<()> { let data_length = self.read_data_length()?; self.seek(io::SeekFrom::Current(data_length as i64))?; Ok(()) } fn read_conn_info(&mut self) -> Result { let record_header = self.read_header_op(Op::Connection)?; let conn_header = self.read_header()?; ConnInfo::from_headers(record_header, conn_header) } fn read_chunk_info(&mut self) -> Result { let header = self.read_header_op(Op::ChunkInfo)?; // TODO: read connection message counts self.skip_data()?; ChunkInfo::from_header(header) } } pub struct IoReader { read: R, buffer: BytesMut, consumed: usize, } impl IoReader { pub fn new(read: R) -> Self { Self { read, buffer: BytesMut::with_capacity(READ_SIZE), consumed: 0, } } fn read_more(&mut self, n: usize) -> Result<()> { if n == 0 { return Ok(()); } let old_size = self.buffer.len(); self.buffer.resize(old_size + n, 0); let read = self.read.read(&mut self.buffer[old_size..])?; self.buffer.truncate(old_size + read); if read == 0 { return Err(Error::Eof); } Ok(()) } } impl BagReader for IoReader { fn read_parser<'a, O: 'a, P>(&'a mut self, mut parser: P) -> Result where P: nom::Parser<&'a [u8], O, parse::Error<&'a [u8]>>, { self.buffer.advance(self.consumed); self.consumed = 0; let this = self as *mut Self; loop { match parser.parse(&self.buffer) { Ok((rest, output)) => { self.consumed += self.buffer.len() - rest.len(); return Ok(output); } Err(nom::Err::Incomplete(needed)) => { let needed = match needed { nom::Needed::Unknown => 0, nom::Needed::Size(n) => n.get(), }; // Safety: this.buffer is only borrowed in the Ok case above, which // immediately returns. unsafe { &mut *this }.read_more(needed.max(READ_SIZE))?; } Err(nom::Err::Error(e) | nom::Err::Failure(e)) => { return Err(e.into()); } } } } fn seek(&mut self, mut pos: io::SeekFrom) -> Result<()> { if let io::SeekFrom::Current(pos) = &mut pos { // If seeking relative to current position, subtract data // read from the file but not yet consumed. let remaining = (self.buffer.len() - self.consumed) as i64; let new_pos = *pos - remaining; if *pos >= 0 && new_pos < 0 { // The new position is within the already read data, just consume more data self.consumed += *pos as usize; return Ok(()); } *pos = new_pos; } self.buffer.clear(); self.consumed = 0; self.read.seek(pos)?; Ok(()) } }