diff --git a/src/program_runner.rs b/src/program_runner.rs index 4c08e5c..9acda05 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -1,5 +1,6 @@ use crate::model::{ProgramId, ProgramRef, Programs, Sections}; use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; +use eyre::WrapErr; use std::collections::VecDeque; use thiserror::Error; use tokio::{ @@ -101,13 +102,13 @@ impl RunnerTask { } } - async fn start_program_run(&mut self, run: &mut ProgRun) { + async fn start_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { if run.state != RunState::Waiting { warn!( program_id = run.program.id, "cannot run program which is already running" ); - return; + return Ok(()); } run.sec_run_handles.reserve(run.program.sequence.len()); for item in &run.program.sequence { @@ -122,14 +123,11 @@ impl RunnerTask { continue; } }; - let handle = match self.section_runner.queue_run(section, item.duration).await { - Ok(handle) => handle, - Err(_closed) => { - error!("section runner channel closed"); - self.running = false; - return; - } - }; + let handle = self + .section_runner + .queue_run(section, item.duration) + .await + .wrap_err("failed to queue section run")?; run.sec_run_handles.push(handle); } run.state = RunState::Running; @@ -141,31 +139,36 @@ impl RunnerTask { } else { debug!(program_id = run.program.id, "started running program"); } + Ok(()) } - async fn cancel_program_run(&mut self, run: &mut ProgRun) { + async fn cancel_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { for handle in run.sec_run_handles.drain(..) { - if let Err(_closed) = self.section_runner.cancel_run(handle).await { - error!("section runner channel closed"); - self.running = false; - return; - } + self.section_runner + .cancel_run(handle) + .await + .wrap_err("failed to cancel section run")?; } debug!(program_id = run.program.id, "program run is cancelled"); self.send_event(ProgramEvent::RunCancel(run.program.clone())); + Ok(()) } - async fn process_queue(&mut self, run_queue: &mut RunQueue) { + async fn process_queue(&mut self, run_queue: &mut RunQueue) -> eyre::Result<()> { while let Some(current_run) = run_queue.front_mut() { let run_finished = match current_run.state { RunState::Waiting => { - self.start_program_run(current_run).await; + self.start_program_run(current_run) + .await + .wrap_err("failed to start program run")?; false } RunState::Running => false, RunState::Finished => true, RunState::Cancelled => { - self.cancel_program_run(current_run).await; + self.cancel_program_run(current_run) + .await + .wrap_err("failed to cancel program run")?; true } }; @@ -175,6 +178,7 @@ impl RunnerTask { break; } } + Ok(()) } fn handle_msg(&mut self, msg: Option, run_queue: &mut RunQueue) { @@ -239,22 +243,8 @@ impl RunnerTask { &mut self, sec_event: Result, run_queue: &mut RunQueue, - ) { - let sec_event = match sec_event { - Ok(ev) => ev, - Err(broadcast::RecvError::Lagged(missed)) => { - warn!( - missed, - "missed some section events, increase event channel size" - ); - return; - } - Err(broadcast::RecvError::Closed) => { - error!("section events channel closed"); - self.running = false; - return; - } - }; + ) -> eyre::Result<()> { + let sec_event = sec_event.wrap_err("failed to receive section event")?; #[allow(clippy::single_match)] match sec_event { SectionEvent::RunFinish(finished_run) => { @@ -262,28 +252,39 @@ impl RunnerTask { } _ => {} } + Ok(()) } - async fn start(mut self) { - let span = trace_span!("runner_task"); - let _enter = span.enter(); - + async fn start_impl(&mut self) -> eyre::Result<()> { let mut sec_events = self .section_runner .subscribe() .await - .expect("could not subscribe to SectionRunner events"); + .wrap_err("could not subscribe to SectionRunner events")?; let mut run_queue: RunQueue = VecDeque::new(); while self.running { - self.process_queue(&mut run_queue).await; + self.process_queue(&mut run_queue) + .await + .wrap_err("error during queue processing")?; tokio::select! { msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), - sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue), + sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, // _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() }; } + + Ok(()) + } + + async fn start(mut self) { + let span = trace_span!("runner_task"); + let _enter = span.enter(); + + self.start_impl() + .await + .expect("error in ProgramRunner task"); } }