diff --git a/src/main.rs b/src/main.rs index 00fc858..d0ab86f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,5 +1,3 @@ -#![feature(drain_filter)] - use color_eyre::eyre::Result; use rusqlite::Connection as DbConnection; use rusqlite::NO_PARAMS; diff --git a/src/section_runner.rs b/src/section_runner.rs index 8fc0a46..1b9fffb 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -2,12 +2,13 @@ use crate::model::SectionRef; use crate::section_interface::SectionInterface; use mpsc::error::SendError; use std::{ - collections::LinkedList, + collections::VecDeque, + mem::swap, sync::{ atomic::{AtomicI32, Ordering}, Arc, }, - time::Duration, mem::swap, + time::Duration, }; use thiserror::Error; use tokio::{ @@ -41,44 +42,50 @@ enum RunnerMsg { CancelAll, } +#[derive(Clone, Debug, PartialEq)] +enum RunState { + Waiting, + Running { start_time: Instant }, + Finished, + Cancelled, +} + #[derive(Debug)] struct SecRun { handle: RunHandle, section: SectionRef, duration: Duration, - start_time: Option, + state: RunState, } impl SecRun { fn start(&mut self, interface: &dyn SectionInterface) { + use RunState::*; debug!(section_id = self.section.id, "starting running section"); interface.set_section_state(self.section.interface_id, true); - self.start_time = Some(Instant::now()); + self.state = Running { + start_time: Instant::now(), + }; + } + + fn is_running(&self) -> bool { + matches!(self.state, RunState::Running{..}) } fn finish(&mut self, interface: &dyn SectionInterface) { - if self.start_time.is_some() { + if self.is_running() { debug!(section_id = self.section.id, "finished running section"); interface.set_section_state(self.section.interface_id, false); - self.start_time = None; + self.state = RunState::Finished; } } fn cancel(&mut self, interface: &dyn SectionInterface) { - if self.start_time.is_some() { + if self.is_running() { debug!(section_id = self.section.id, "cancelling running section"); interface.set_section_state(self.section.interface_id, false); - self.start_time = None; } - } - - fn elapsed(&self) -> Option { - self.start_time.map(|t| t.elapsed()) - } - - #[allow(dead_code)] - fn is_done(&self) -> Option { - self.elapsed().map(|elapsed| elapsed >= self.duration) + self.state = RunState::Cancelled; } } @@ -128,27 +135,36 @@ async fn runner_task( interface: Arc, mut msg_recv: mpsc::Receiver, ) { + use RunState::*; + let span = trace_span!("runner_task"); let _enter = span.enter(); let mut running = true; - let mut run_queue: LinkedList = LinkedList::new(); + let mut run_queue: VecDeque = VecDeque::new(); let mut delay_future: OptionFuture<_> = None.into(); while running { if let Some(current_run) = run_queue.front_mut() { - let done = if let Some(start_time) = ¤t_run.start_time { - let elapsed = Instant::now() - *start_time; - elapsed >= current_run.duration - } else { - current_run.start(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); - false + let done = match current_run.state { + Waiting => { + current_run.start(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + false + } + Running { start_time } => { + if start_time.elapsed() >= current_run.duration { + current_run.finish(&*interface); + delay_future = None.into(); + true + } else { + false + } + } + Cancelled | Finished => true, }; if done { - current_run.finish(&*interface); run_queue.pop_front(); - delay_future = None.into(); continue; } } @@ -164,17 +180,20 @@ async fn runner_task( handle, section, duration, - start_time: None, + state: Waiting, }); } CancelRun(handle) => { - for mut run in run_queue.drain_filter(|item| item.handle == handle) { + for run in &mut run_queue { + if run.handle != handle { + continue; + } trace!(handle = handle.0, "cancelling run by handle"); run.cancel(&*interface); } - }, + } CancelAll => { - let mut old_runs = LinkedList::new(); + let mut old_runs = VecDeque::new(); swap(&mut old_runs, &mut run_queue); trace!(count = old_runs.len(), "cancelling all runs"); for mut run in old_runs {