Browse Source

Improve ProgramRunner task error handling

master
Alex Mikhalev 4 years ago
parent
commit
d4e067a947
  1. 85
      src/program_runner.rs

85
src/program_runner.rs

@ -1,5 +1,6 @@
use crate::model::{ProgramId, ProgramRef, Programs, Sections}; use crate::model::{ProgramId, ProgramRef, Programs, Sections};
use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner};
use eyre::WrapErr;
use std::collections::VecDeque; use std::collections::VecDeque;
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
@ -101,13 +102,13 @@ impl RunnerTask {
} }
} }
async fn start_program_run(&mut self, run: &mut ProgRun) { async fn start_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> {
if run.state != RunState::Waiting { if run.state != RunState::Waiting {
warn!( warn!(
program_id = run.program.id, program_id = run.program.id,
"cannot run program which is already running" "cannot run program which is already running"
); );
return; return Ok(());
} }
run.sec_run_handles.reserve(run.program.sequence.len()); run.sec_run_handles.reserve(run.program.sequence.len());
for item in &run.program.sequence { for item in &run.program.sequence {
@ -122,14 +123,11 @@ impl RunnerTask {
continue; continue;
} }
}; };
let handle = match self.section_runner.queue_run(section, item.duration).await { let handle = self
Ok(handle) => handle, .section_runner
Err(_closed) => { .queue_run(section, item.duration)
error!("section runner channel closed"); .await
self.running = false; .wrap_err("failed to queue section run")?;
return;
}
};
run.sec_run_handles.push(handle); run.sec_run_handles.push(handle);
} }
run.state = RunState::Running; run.state = RunState::Running;
@ -141,31 +139,36 @@ impl RunnerTask {
} else { } else {
debug!(program_id = run.program.id, "started running program"); debug!(program_id = run.program.id, "started running program");
} }
Ok(())
} }
async fn cancel_program_run(&mut self, run: &mut ProgRun) { async fn cancel_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> {
for handle in run.sec_run_handles.drain(..) { for handle in run.sec_run_handles.drain(..) {
if let Err(_closed) = self.section_runner.cancel_run(handle).await { self.section_runner
error!("section runner channel closed"); .cancel_run(handle)
self.running = false; .await
return; .wrap_err("failed to cancel section run")?;
}
} }
debug!(program_id = run.program.id, "program run is cancelled"); debug!(program_id = run.program.id, "program run is cancelled");
self.send_event(ProgramEvent::RunCancel(run.program.clone())); self.send_event(ProgramEvent::RunCancel(run.program.clone()));
Ok(())
} }
async fn process_queue(&mut self, run_queue: &mut RunQueue) { async fn process_queue(&mut self, run_queue: &mut RunQueue) -> eyre::Result<()> {
while let Some(current_run) = run_queue.front_mut() { while let Some(current_run) = run_queue.front_mut() {
let run_finished = match current_run.state { let run_finished = match current_run.state {
RunState::Waiting => { RunState::Waiting => {
self.start_program_run(current_run).await; self.start_program_run(current_run)
.await
.wrap_err("failed to start program run")?;
false false
} }
RunState::Running => false, RunState::Running => false,
RunState::Finished => true, RunState::Finished => true,
RunState::Cancelled => { RunState::Cancelled => {
self.cancel_program_run(current_run).await; self.cancel_program_run(current_run)
.await
.wrap_err("failed to cancel program run")?;
true true
} }
}; };
@ -175,6 +178,7 @@ impl RunnerTask {
break; break;
} }
} }
Ok(())
} }
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) { fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) {
@ -239,22 +243,8 @@ impl RunnerTask {
&mut self, &mut self,
sec_event: Result<SectionEvent, broadcast::RecvError>, sec_event: Result<SectionEvent, broadcast::RecvError>,
run_queue: &mut RunQueue, run_queue: &mut RunQueue,
) { ) -> eyre::Result<()> {
let sec_event = match sec_event { let sec_event = sec_event.wrap_err("failed to receive section event")?;
Ok(ev) => ev,
Err(broadcast::RecvError::Lagged(missed)) => {
warn!(
missed,
"missed some section events, increase event channel size"
);
return;
}
Err(broadcast::RecvError::Closed) => {
error!("section events channel closed");
self.running = false;
return;
}
};
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
match sec_event { match sec_event {
SectionEvent::RunFinish(finished_run) => { SectionEvent::RunFinish(finished_run) => {
@ -262,28 +252,39 @@ impl RunnerTask {
} }
_ => {} _ => {}
} }
Ok(())
} }
async fn start(mut self) { async fn start_impl(&mut self) -> eyre::Result<()> {
let span = trace_span!("runner_task");
let _enter = span.enter();
let mut sec_events = self let mut sec_events = self
.section_runner .section_runner
.subscribe() .subscribe()
.await .await
.expect("could not subscribe to SectionRunner events"); .wrap_err("could not subscribe to SectionRunner events")?;
let mut run_queue: RunQueue = VecDeque::new(); let mut run_queue: RunQueue = VecDeque::new();
while self.running { while self.running {
self.process_queue(&mut run_queue).await; self.process_queue(&mut run_queue)
.await
.wrap_err("error during queue processing")?;
tokio::select! { tokio::select! {
msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue),
sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue), sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?,
// _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() // _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
}; };
} }
Ok(())
}
async fn start(mut self) {
let span = trace_span!("runner_task");
let _enter = span.enter();
self.start_impl()
.await
.expect("error in ProgramRunner task");
} }
} }

Loading…
Cancel
Save