From 42cce06d688c598e7ffdc5e3bf96f38c4cd45bae Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 18 Aug 2020 19:34:11 -0600 Subject: [PATCH 1/3] Add test for SectionRunner pause/unpause --- src/section_runner.rs | 64 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/src/section_runner.rs b/src/section_runner.rs index 1b9fffb..14e07b1 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -487,4 +487,68 @@ mod test { runner.quit().await.unwrap(); tokio::task::yield_now().await; } + + #[tokio::test] + async fn test_pause() { + let (sections, interface) = make_sections_and_interface(); + let mut runner = SectionRunner::new(interface.clone()); + + let _run1 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + let run2 = runner + .queue_run(sections[0].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + let _run3 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + pause(); + + advance(Duration::from_secs(1)).await; + assert_section_states(&interface, &[false, true]); + + runner.pause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + + advance(Duration::from_secs(10)).await; + assert_section_states(&interface, &[false, false]); + + runner.unpause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(8)).await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(2)).await; + assert_section_states(&interface, &[true, false]); + + runner.pause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + + // cancel paused run + runner.cancel_run(run2).await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, false]); + + runner.unpause().await.unwrap(); + tokio::task::yield_now().await; + assert_section_states(&interface, &[false, true]); + + advance(Duration::from_secs(10)).await; + assert_section_states(&interface, &[false, false]); + + resume(); + + runner.quit().await.unwrap(); + tokio::task::yield_now().await; + } } From ab994d027a1bf110f158b363fceb6b5a6c0ae27c Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 18 Aug 2020 20:55:53 -0600 Subject: [PATCH 2/3] Implement SectionRunner pause/unpause --- src/section_runner.rs | 121 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 13 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index 14e07b1..53536e2 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -16,7 +16,7 @@ use tokio::{ sync::mpsc, time::{delay_for, Instant}, }; -use tracing::{debug, trace, trace_span}; +use tracing::{debug, trace, trace_span, warn}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct RunHandle(i32); @@ -40,14 +40,22 @@ enum RunnerMsg { QueueRun(RunHandle, SectionRef, Duration), CancelRun(RunHandle), CancelAll, + Pause, + Unpause, } #[derive(Clone, Debug, PartialEq)] enum RunState { Waiting, - Running { start_time: Instant }, + Running { + start_time: Instant, + }, Finished, Cancelled, + Paused { + start_time: Instant, + pause_time: Instant, + }, } #[derive(Debug)] @@ -55,10 +63,21 @@ struct SecRun { handle: RunHandle, section: SectionRef, duration: Duration, + total_duration: Duration, state: RunState, } impl SecRun { + fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { + Self { + handle, + section, + duration, + total_duration: duration, + state: RunState::Waiting, + } + } + fn start(&mut self, interface: &dyn SectionInterface) { use RunState::*; debug!(section_id = self.section.id, "starting running section"); @@ -72,11 +91,21 @@ impl SecRun { matches!(self.state, RunState::Running{..}) } + fn is_paused(&self) -> bool { + matches!(self.state, RunState::Paused{..}) + } + fn finish(&mut self, interface: &dyn SectionInterface) { if self.is_running() { debug!(section_id = self.section.id, "finished running section"); interface.set_section_state(self.section.interface_id, false); self.state = RunState::Finished; + } else { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "cannot finish run which is not running" + ); } } @@ -87,6 +116,53 @@ impl SecRun { } self.state = RunState::Cancelled; } + + fn pause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Running { start_time } => { + debug!(section_id = self.section.id, "pausing running section"); + interface.set_section_state(self.section.interface_id, false); + self.state = Paused { + start_time, + pause_time: Instant::now(), + }; + } + Waiting => { + debug!(section_id = self.section.id, "pausing waiting section"); + self.state = Paused { + start_time: Instant::now(), + pause_time: Instant::now(), + }; + } + Finished | Cancelled | Paused { .. } => {} + } + } + + fn unpause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Paused { + start_time, + pause_time, + } => { + debug!(section_id = self.section.id, "unpausing section"); + interface.set_section_state(self.section.interface_id, true); + self.state = Running { + start_time: Instant::now(), + }; + let ran_for = pause_time - start_time; + self.duration = self.duration - ran_for; + } + Waiting | Finished | Cancelled | Running { .. } => { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "can only unpause paused section" + ); + } + } + } } mod option_future { @@ -143,16 +219,25 @@ async fn runner_task( let mut running = true; let mut run_queue: VecDeque = VecDeque::new(); let mut delay_future: OptionFuture<_> = None.into(); + let mut paused = false; while running { if let Some(current_run) = run_queue.front_mut() { let done = match current_run.state { Waiting => { - current_run.start(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); + if paused { + current_run.pause(&*interface); + } else { + current_run.start(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } false } Running { start_time } => { - if start_time.elapsed() >= current_run.duration { + if paused { + current_run.pause(&*interface); + delay_future = None.into(); + false + } else if start_time.elapsed() >= current_run.duration { current_run.finish(&*interface); delay_future = None.into(); true @@ -160,6 +245,13 @@ async fn runner_task( false } } + Paused { .. } => { + if !paused { + current_run.unpause(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } + false + } Cancelled | Finished => true, }; @@ -176,12 +268,7 @@ async fn runner_task( match msg { Quit => running = false, QueueRun(handle, section, duration) => { - run_queue.push_back(SecRun { - handle, - section, - duration, - state: Waiting, - }); + run_queue.push_back(SecRun::new(handle, section, duration)); } CancelRun(handle) => { for run in &mut run_queue { @@ -200,6 +287,12 @@ async fn runner_task( run.cancel(&*interface); } } + Pause => { + paused = true; + } + Unpause => { + paused = false; + } } }; let delay_done = || { @@ -270,11 +363,13 @@ impl SectionRunner { } pub async fn pause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Pause).await?; + Ok(()) } pub async fn unpause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Unpause).await?; + Ok(()) } } From 4cf93a9568d12fe2a73c5e81f9f6ddef433b206d Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 18 Aug 2020 20:56:04 -0600 Subject: [PATCH 3/3] Fix test usage of `tokio::time::advance` --- src/section_runner.rs | 38 ++++++++++---------------------------- 1 file changed, 10 insertions(+), 28 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index 53536e2..caca6f3 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -442,8 +442,10 @@ mod test { async fn advance(dur: Duration) { // HACK: advance should really be enough, but we need another yield_now + tokio::time::pause(); tokio::time::advance(dur).await; tokio::task::yield_now().await; + tokio::time::resume(); } #[tokio::test] @@ -461,12 +463,9 @@ mod test { tokio::task::yield_now().await; - pause(); - advance(Duration::from_secs(1)).await; - assert_section_states(&interface, &[true, false]); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[false, false]); @@ -481,11 +480,11 @@ mod test { .await .unwrap(); - advance(Duration::from_secs(1)).await; + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[true, false]); @@ -493,8 +492,6 @@ mod test { assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; } @@ -519,9 +516,7 @@ mod test { .await .unwrap(); - pause(); - - advance(Duration::from_secs(1)).await; + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); @@ -531,12 +526,10 @@ mod test { assert_section_states(&interface, &[true, false]); runner.cancel_run(run3).await.unwrap(); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; } @@ -561,24 +554,17 @@ mod test { .await .unwrap(); - pause(); - - advance(Duration::from_secs(1)).await; - + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); runner.cancel_all().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); runner.cancel_all().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; } @@ -603,9 +589,7 @@ mod test { .await .unwrap(); - pause(); - - advance(Duration::from_secs(1)).await; + tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); runner.pause().await.unwrap(); @@ -638,11 +622,9 @@ mod test { tokio::task::yield_now().await; assert_section_states(&interface, &[false, true]); - advance(Duration::from_secs(10)).await; + advance(Duration::from_secs(11)).await; assert_section_states(&interface, &[false, false]); - resume(); - runner.quit().await.unwrap(); tokio::task::yield_now().await; }