diff --git a/src/section_runner.rs b/src/section_runner.rs index 1b9fffb..caca6f3 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -16,7 +16,7 @@ use tokio::{ sync::mpsc, time::{delay_for, Instant}, }; -use tracing::{debug, trace, trace_span}; +use tracing::{debug, trace, trace_span, warn}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct RunHandle(i32); @@ -40,14 +40,22 @@ enum RunnerMsg { QueueRun(RunHandle, SectionRef, Duration), CancelRun(RunHandle), CancelAll, + Pause, + Unpause, } #[derive(Clone, Debug, PartialEq)] enum RunState { Waiting, - Running { start_time: Instant }, + Running { + start_time: Instant, + }, Finished, Cancelled, + Paused { + start_time: Instant, + pause_time: Instant, + }, } #[derive(Debug)] @@ -55,10 +63,21 @@ struct SecRun { handle: RunHandle, section: SectionRef, duration: Duration, + total_duration: Duration, state: RunState, } impl SecRun { + fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { + Self { + handle, + section, + duration, + total_duration: duration, + state: RunState::Waiting, + } + } + fn start(&mut self, interface: &dyn SectionInterface) { use RunState::*; debug!(section_id = self.section.id, "starting running section"); @@ -72,11 +91,21 @@ impl SecRun { matches!(self.state, RunState::Running{..}) } + fn is_paused(&self) -> bool { + matches!(self.state, RunState::Paused{..}) + } + fn finish(&mut self, interface: &dyn SectionInterface) { if self.is_running() { debug!(section_id = self.section.id, "finished running section"); interface.set_section_state(self.section.interface_id, false); self.state = RunState::Finished; + } else { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "cannot finish run which is not running" + ); } } @@ -87,6 +116,53 @@ impl SecRun { } self.state = RunState::Cancelled; } + + fn pause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Running { start_time } => { + debug!(section_id = self.section.id, "pausing running section"); + interface.set_section_state(self.section.interface_id, false); + self.state = Paused { + start_time, + pause_time: Instant::now(), + }; + } + Waiting => { + debug!(section_id = self.section.id, "pausing waiting section"); + self.state = Paused { + start_time: Instant::now(), + pause_time: Instant::now(), + }; + } + Finished | Cancelled | Paused { .. } => {} + } + } + + fn unpause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Paused { + start_time, + pause_time, + } => { + debug!(section_id = self.section.id, "unpausing section"); + interface.set_section_state(self.section.interface_id, true); + self.state = Running { + start_time: Instant::now(), + }; + let ran_for = pause_time - start_time; + self.duration = self.duration - ran_for; + } + Waiting | Finished | Cancelled | Running { .. } => { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "can only unpause paused section" + ); + } + } + } } mod option_future { @@ -143,16 +219,25 @@ async fn runner_task( let mut running = true; let mut run_queue: VecDeque = VecDeque::new(); let mut delay_future: OptionFuture<_> = None.into(); + let mut paused = false; while running { if let Some(current_run) = run_queue.front_mut() { let done = match current_run.state { Waiting => { - current_run.start(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); + if paused { + current_run.pause(&*interface); + } else { + current_run.start(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } false } Running { start_time } => { - if start_time.elapsed() >= current_run.duration { + if paused { + current_run.pause(&*interface); + delay_future = None.into(); + false + } else if start_time.elapsed() >= current_run.duration { current_run.finish(&*interface); delay_future = None.into(); true @@ -160,6 +245,13 @@ async fn runner_task( false } } + Paused { .. } => { + if !paused { + current_run.unpause(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } + false + } Cancelled | Finished => true, }; @@ -176,12 +268,7 @@ async fn runner_task( match msg { Quit => running = false, QueueRun(handle, section, duration) => { - run_queue.push_back(SecRun { - handle, - section, - duration, - state: Waiting, - }); + run_queue.push_back(SecRun::new(handle, section, duration)); } CancelRun(handle) => { for run in &mut run_queue { @@ -200,6 +287,12 @@ async fn runner_task( run.cancel(&*interface); } } + Pause => { + paused = true; + } + Unpause => { + paused = false; + } } }; let delay_done = || { @@ -270,11 +363,13 @@ impl SectionRunner { } pub async fn pause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Pause).await?; + Ok(()) } pub async fn unpause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Unpause).await?; + Ok(()) } } @@ -347,8 +442,10 @@ mod test { async fn advance(dur: Duration) { // HACK: advance should really be enough, but we need another yield_now + tokio::time::pause(); tokio::time::advance(dur).await; tokio::task::yield_now().await; + tokio::time::resume(); } #[tokio::test] @@ -366,12 +463,9 @@ mod test { tokio::task::yield_now().await; - pause(); - advance(Duration::from_secs(1)).await; - assert_section_states(&interface, &[true, false]); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[false, false]); @@ -386,11 +480,11 @@ mod test { .await .unwrap(); - advance(Duration::from_secs(1)).await; + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[true, false]); @@ -398,8 +492,6 @@ mod test { assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; } @@ -424,9 +516,7 @@ mod test { .await .unwrap(); - pause(); - - advance(Duration::from_secs(1)).await; + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); @@ -436,12 +526,10 @@ mod test { assert_section_states(&interface, &[true, false]); runner.cancel_run(run3).await.unwrap(); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; } @@ -466,23 +554,76 @@ mod test { .await .unwrap(); - pause(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, true]); + + runner.cancel_all().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + + runner.cancel_all().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + + runner.quit().await.unwrap(); + tokio::task::yield_now().await; + } + + #[tokio::test] + async fn test_pause() { + let (sections, interface) = make_sections_and_interface(); + let mut runner = SectionRunner::new(interface.clone()); + + let _run1 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + let run2 = runner + .queue_run(sections[0].clone(), Duration::from_secs(10)) + .await + .unwrap(); - advance(Duration::from_secs(1)).await; + let _run3 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); - runner.cancel_all().await.unwrap(); + runner.pause().await.unwrap(); tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + advance(Duration::from_secs(10)).await; assert_section_states(&interface, &[false, false]); - runner.cancel_all().await.unwrap(); + runner.unpause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(8)).await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(2)).await; + assert_section_states(&interface, &[true, false]); + + runner.pause().await.unwrap(); tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + // cancel paused run + runner.cancel_run(run2).await.unwrap(); + tokio::task::yield_now().await; assert_section_states(&interface, &[false, false]); - resume(); + runner.unpause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(11)).await; + assert_section_states(&interface, &[false, false]); runner.quit().await.unwrap(); tokio::task::yield_now().await;