Add test for SectionRunner pause/unpause #3
| @ -16,7 +16,7 @@ use tokio::{ | ||||
|     sync::mpsc, | ||||
|     time::{delay_for, Instant}, | ||||
| }; | ||||
| use tracing::{debug, trace, trace_span}; | ||||
| use tracing::{debug, trace, trace_span, warn}; | ||||
| 
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||||
| pub struct RunHandle(i32); | ||||
| @ -40,14 +40,22 @@ enum RunnerMsg { | ||||
|     QueueRun(RunHandle, SectionRef, Duration), | ||||
|     CancelRun(RunHandle), | ||||
|     CancelAll, | ||||
|     Pause, | ||||
|     Unpause, | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq)] | ||||
| enum RunState { | ||||
|     Waiting, | ||||
|     Running { start_time: Instant }, | ||||
|     Running { | ||||
|         start_time: Instant, | ||||
|     }, | ||||
|     Finished, | ||||
|     Cancelled, | ||||
|     Paused { | ||||
|         start_time: Instant, | ||||
|         pause_time: Instant, | ||||
|     }, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| @ -55,10 +63,21 @@ struct SecRun { | ||||
|     handle: RunHandle, | ||||
|     section: SectionRef, | ||||
|     duration: Duration, | ||||
|     total_duration: Duration, | ||||
|     state: RunState, | ||||
| } | ||||
| 
 | ||||
| impl SecRun { | ||||
|     fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { | ||||
|         Self { | ||||
|             handle, | ||||
|             section, | ||||
|             duration, | ||||
|             total_duration: duration, | ||||
|             state: RunState::Waiting, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn start(&mut self, interface: &dyn SectionInterface) { | ||||
|         use RunState::*; | ||||
|         debug!(section_id = self.section.id, "starting running section"); | ||||
| @ -72,11 +91,21 @@ impl SecRun { | ||||
|         matches!(self.state, RunState::Running{..}) | ||||
|     } | ||||
| 
 | ||||
|     fn is_paused(&self) -> bool { | ||||
|         matches!(self.state, RunState::Paused{..}) | ||||
|     } | ||||
| 
 | ||||
|     fn finish(&mut self, interface: &dyn SectionInterface) { | ||||
|         if self.is_running() { | ||||
|             debug!(section_id = self.section.id, "finished running section"); | ||||
|             interface.set_section_state(self.section.interface_id, false); | ||||
|             self.state = RunState::Finished; | ||||
|         } else { | ||||
|             warn!( | ||||
|                 section_id = self.section.id, | ||||
|                 state = debug(&self.state), | ||||
|                 "cannot finish run which is not running" | ||||
|             ); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| @ -87,6 +116,53 @@ impl SecRun { | ||||
|         } | ||||
|         self.state = RunState::Cancelled; | ||||
|     } | ||||
| 
 | ||||
|     fn pause(&mut self, interface: &dyn SectionInterface) { | ||||
|         use RunState::*; | ||||
|         match self.state { | ||||
|             Running { start_time } => { | ||||
|                 debug!(section_id = self.section.id, "pausing running section"); | ||||
|                 interface.set_section_state(self.section.interface_id, false); | ||||
|                 self.state = Paused { | ||||
|                     start_time, | ||||
|                     pause_time: Instant::now(), | ||||
|                 }; | ||||
|             } | ||||
|             Waiting => { | ||||
|                 debug!(section_id = self.section.id, "pausing waiting section"); | ||||
|                 self.state = Paused { | ||||
|                     start_time: Instant::now(), | ||||
|                     pause_time: Instant::now(), | ||||
|                 }; | ||||
|             } | ||||
|             Finished | Cancelled | Paused { .. } => {} | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn unpause(&mut self, interface: &dyn SectionInterface) { | ||||
|         use RunState::*; | ||||
|         match self.state { | ||||
|             Paused { | ||||
|                 start_time, | ||||
|                 pause_time, | ||||
|             } => { | ||||
|                 debug!(section_id = self.section.id, "unpausing section"); | ||||
|                 interface.set_section_state(self.section.interface_id, true); | ||||
|                 self.state = Running { | ||||
|                     start_time: Instant::now(), | ||||
|                 }; | ||||
|                 let ran_for = pause_time - start_time; | ||||
|                 self.duration = self.duration - ran_for; | ||||
|             } | ||||
|             Waiting | Finished | Cancelled | Running { .. } => { | ||||
|                 warn!( | ||||
|                     section_id = self.section.id, | ||||
|                     state = debug(&self.state), | ||||
|                     "can only unpause paused section" | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| mod option_future { | ||||
| @ -143,16 +219,25 @@ async fn runner_task( | ||||
|     let mut running = true; | ||||
|     let mut run_queue: VecDeque<SecRun> = VecDeque::new(); | ||||
|     let mut delay_future: OptionFuture<_> = None.into(); | ||||
|     let mut paused = false; | ||||
|     while running { | ||||
|         if let Some(current_run) = run_queue.front_mut() { | ||||
|             let done = match current_run.state { | ||||
|                 Waiting => { | ||||
|                     current_run.start(&*interface); | ||||
|                     delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     if paused { | ||||
|                         current_run.pause(&*interface); | ||||
|                     } else { | ||||
|                         current_run.start(&*interface); | ||||
|                         delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     } | ||||
|                     false | ||||
|                 } | ||||
|                 Running { start_time } => { | ||||
|                     if start_time.elapsed() >= current_run.duration { | ||||
|                     if paused { | ||||
|                         current_run.pause(&*interface); | ||||
|                         delay_future = None.into(); | ||||
|                         false | ||||
|                     } else if start_time.elapsed() >= current_run.duration { | ||||
|                         current_run.finish(&*interface); | ||||
|                         delay_future = None.into(); | ||||
|                         true | ||||
| @ -160,6 +245,13 @@ async fn runner_task( | ||||
|                         false | ||||
|                     } | ||||
|                 } | ||||
|                 Paused { .. } => { | ||||
|                     if !paused { | ||||
|                         current_run.unpause(&*interface); | ||||
|                         delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     } | ||||
|                     false | ||||
|                 } | ||||
|                 Cancelled | Finished => true, | ||||
|             }; | ||||
| 
 | ||||
| @ -176,12 +268,7 @@ async fn runner_task( | ||||
|             match msg { | ||||
|                 Quit => running = false, | ||||
|                 QueueRun(handle, section, duration) => { | ||||
|                     run_queue.push_back(SecRun { | ||||
|                         handle, | ||||
|                         section, | ||||
|                         duration, | ||||
|                         state: Waiting, | ||||
|                     }); | ||||
|                     run_queue.push_back(SecRun::new(handle, section, duration)); | ||||
|                 } | ||||
|                 CancelRun(handle) => { | ||||
|                     for run in &mut run_queue { | ||||
| @ -200,6 +287,12 @@ async fn runner_task( | ||||
|                         run.cancel(&*interface); | ||||
|                     } | ||||
|                 } | ||||
|                 Pause => { | ||||
|                     paused = true; | ||||
|                 } | ||||
|                 Unpause => { | ||||
|                     paused = false; | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|         let delay_done = || { | ||||
| @ -270,11 +363,13 @@ impl SectionRunner { | ||||
|     } | ||||
| 
 | ||||
|     pub async fn pause(&mut self) -> Result<()> { | ||||
|         todo!() | ||||
|         self.msg_send.send(RunnerMsg::Pause).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn unpause(&mut self) -> Result<()> { | ||||
|         todo!() | ||||
|         self.msg_send.send(RunnerMsg::Unpause).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -347,8 +442,10 @@ mod test { | ||||
| 
 | ||||
|     async fn advance(dur: Duration) { | ||||
|         // HACK: advance should really be enough, but we need another yield_now
 | ||||
|         tokio::time::pause(); | ||||
|         tokio::time::advance(dur).await; | ||||
|         tokio::task::yield_now().await; | ||||
|         tokio::time::resume(); | ||||
|     } | ||||
| 
 | ||||
|     #[tokio::test] | ||||
| @ -366,12 +463,9 @@ mod test { | ||||
| 
 | ||||
|         tokio::task::yield_now().await; | ||||
| 
 | ||||
|         pause(); | ||||
|         advance(Duration::from_secs(1)).await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[true, false]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(10)).await; | ||||
|         advance(Duration::from_secs(11)).await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
| @ -386,11 +480,11 @@ mod test { | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         advance(Duration::from_secs(1)).await; | ||||
|         tokio::task::yield_now().await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(10)).await; | ||||
|         advance(Duration::from_secs(11)).await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[true, false]); | ||||
| 
 | ||||
| @ -398,8 +492,6 @@ mod test { | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         resume(); | ||||
| 
 | ||||
|         runner.quit().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|     } | ||||
| @ -424,9 +516,7 @@ mod test { | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         pause(); | ||||
| 
 | ||||
|         advance(Duration::from_secs(1)).await; | ||||
|         tokio::task::yield_now().await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
| @ -436,12 +526,10 @@ mod test { | ||||
|         assert_section_states(&interface, &[true, false]); | ||||
| 
 | ||||
|         runner.cancel_run(run3).await.unwrap(); | ||||
|         advance(Duration::from_secs(10)).await; | ||||
|         advance(Duration::from_secs(11)).await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         resume(); | ||||
| 
 | ||||
|         runner.quit().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|     } | ||||
| @ -466,23 +554,76 @@ mod test { | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         pause(); | ||||
| 
 | ||||
|         advance(Duration::from_secs(1)).await; | ||||
| 
 | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         runner.cancel_all().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         runner.cancel_all().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
| 
 | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         resume(); | ||||
|         runner.quit().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|     } | ||||
| 
 | ||||
|     #[tokio::test] | ||||
|     async fn test_pause() { | ||||
|         let (sections, interface) = make_sections_and_interface(); | ||||
|         let mut runner = SectionRunner::new(interface.clone()); | ||||
| 
 | ||||
|         let _run1 = runner | ||||
|             .queue_run(sections[1].clone(), Duration::from_secs(10)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         let run2 = runner | ||||
|             .queue_run(sections[0].clone(), Duration::from_secs(10)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         let _run3 = runner | ||||
|             .queue_run(sections[1].clone(), Duration::from_secs(10)) | ||||
|             .await | ||||
|             .unwrap(); | ||||
| 
 | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         runner.pause().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(10)).await; | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         runner.unpause().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(8)).await; | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(2)).await; | ||||
|         assert_section_states(&interface, &[true, false]); | ||||
| 
 | ||||
|         runner.pause().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         // cancel paused run
 | ||||
|         runner.cancel_run(run2).await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         runner.unpause().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|         assert_section_states(&interface, &[false, true]); | ||||
| 
 | ||||
|         advance(Duration::from_secs(11)).await; | ||||
|         assert_section_states(&interface, &[false, false]); | ||||
| 
 | ||||
|         runner.quit().await.unwrap(); | ||||
|         tokio::task::yield_now().await; | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user