diff --git a/src/section_runner.rs b/src/section_runner.rs index 19715dd..94ea457 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -2,7 +2,7 @@ use crate::model::SectionRef; use crate::section_interface::SectionInterface; use mpsc::error::SendError; use std::{ - collections::VecDeque, + collections::LinkedList, sync::{ atomic::{AtomicI32, Ordering}, Arc, @@ -17,7 +17,7 @@ use tokio::{ }; use tracing::{debug, trace, trace_span}; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq, Eq)] pub struct RunHandle(i32); #[derive(Debug)] @@ -37,6 +37,7 @@ impl SectionRunnerInner { enum RunnerMsg { Quit, QueueRun(RunHandle, SectionRef, Duration), + CancelRun(RunHandle), } #[derive(Debug)] @@ -47,6 +48,38 @@ struct SecRun { start_time: Option, } +impl SecRun { + fn start(&mut self, interface: &dyn SectionInterface) { + debug!(section_id = self.section.id, "starting running section"); + interface.set_section_state(self.section.interface_id, true); + self.start_time = Some(Instant::now()); + } + + fn finish(&mut self, interface: &dyn SectionInterface) { + if self.start_time.is_some() { + debug!(section_id = self.section.id, "finished running section"); + interface.set_section_state(self.section.interface_id, false); + self.start_time = None; + } + } + + fn cancel(&mut self, interface: &dyn SectionInterface) { + if self.start_time.is_some() { + debug!(section_id = self.section.id, "cancelling running section"); + interface.set_section_state(self.section.interface_id, false); + self.start_time = None; + } + } + + fn elapsed(&self) -> Option { + self.start_time.map(|t| t.elapsed()) + } + + fn is_done(&self) -> Option { + self.elapsed().map(|elapsed| elapsed >= self.duration) + } +} + mod option_future { use pin_project::pin_project; use std::{ @@ -97,25 +130,21 @@ async fn runner_task( let _enter = span.enter(); let mut running = true; - let mut run_queue: VecDeque = VecDeque::new(); + let mut run_queue: LinkedList = LinkedList::new(); let mut delay_future: OptionFuture<_> = None.into(); while running { if let Some(current_run) = run_queue.front_mut() { - let current_sec = ¤t_run.section; let done = if let Some(start_time) = ¤t_run.start_time { let elapsed = Instant::now() - *start_time; elapsed >= current_run.duration } else { - debug!(section_id = current_sec.id, "starting running section"); - interface.set_section_state(current_sec.interface_id, true); - current_run.start_time = Some(Instant::now()); + current_run.start(&*interface); delay_future = Some(delay_for(current_run.duration)).into(); false }; if done { - debug!(section_id = current_sec.id, "finished running section"); - interface.set_section_state(current_sec.interface_id, false); + current_run.finish(&*interface); run_queue.pop_front(); delay_future = None.into(); continue; @@ -136,6 +165,12 @@ async fn runner_task( start_time: None, }); } + CancelRun(handle) => { + for mut run in run_queue.drain_filter(|item| item.handle == handle) { + trace!(handle = handle.0, "cancelling run by handle"); + run.cancel(&*interface); + } + } } }; let delay_done = || { @@ -195,7 +230,8 @@ impl SectionRunner { } pub async fn cancel_run(&mut self, handle: RunHandle) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::CancelRun(handle)).await?; + Ok(()) } pub async fn cancel_all(&mut self) -> Result<()> { @@ -378,4 +414,46 @@ mod test { runner.quit().await.unwrap(); tokio::task::yield_now().await; } + + #[tokio::test] + async fn test_cancel_all() { + let (sections, interface) = make_sections_and_interface(); + let mut runner = SectionRunner::new(interface.clone()); + + runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + runner + .queue_run(sections[0].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + pause(); + + advance(Duration::from_secs(1)).await; + + assert_section_states(&interface, &[false, true]); + + runner.cancel_all().await.unwrap(); + tokio::task::yield_now().await; + + assert_section_states(&interface, &[false, false]); + + runner.cancel_all().await.unwrap(); + tokio::task::yield_now().await; + + assert_section_states(&interface, &[false, false]); + + resume(); + + runner.quit().await.unwrap(); + tokio::task::yield_now().await; + } }