From ab994d027a1bf110f158b363fceb6b5a6c0ae27c Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 18 Aug 2020 20:55:53 -0600 Subject: [PATCH] Implement SectionRunner pause/unpause --- src/section_runner.rs | 121 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 108 insertions(+), 13 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index 14e07b1..53536e2 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -16,7 +16,7 @@ use tokio::{ sync::mpsc, time::{delay_for, Instant}, }; -use tracing::{debug, trace, trace_span}; +use tracing::{debug, trace, trace_span, warn}; #[derive(Debug, Clone, PartialEq, Eq)] pub struct RunHandle(i32); @@ -40,14 +40,22 @@ enum RunnerMsg { QueueRun(RunHandle, SectionRef, Duration), CancelRun(RunHandle), CancelAll, + Pause, + Unpause, } #[derive(Clone, Debug, PartialEq)] enum RunState { Waiting, - Running { start_time: Instant }, + Running { + start_time: Instant, + }, Finished, Cancelled, + Paused { + start_time: Instant, + pause_time: Instant, + }, } #[derive(Debug)] @@ -55,10 +63,21 @@ struct SecRun { handle: RunHandle, section: SectionRef, duration: Duration, + total_duration: Duration, state: RunState, } impl SecRun { + fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { + Self { + handle, + section, + duration, + total_duration: duration, + state: RunState::Waiting, + } + } + fn start(&mut self, interface: &dyn SectionInterface) { use RunState::*; debug!(section_id = self.section.id, "starting running section"); @@ -72,11 +91,21 @@ impl SecRun { matches!(self.state, RunState::Running{..}) } + fn is_paused(&self) -> bool { + matches!(self.state, RunState::Paused{..}) + } + fn finish(&mut self, interface: &dyn SectionInterface) { if self.is_running() { debug!(section_id = self.section.id, "finished running section"); interface.set_section_state(self.section.interface_id, false); self.state = RunState::Finished; + } else { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "cannot finish run which is not running" + ); } } @@ -87,6 +116,53 @@ impl SecRun { } self.state = RunState::Cancelled; } + + fn pause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Running { start_time } => { + debug!(section_id = self.section.id, "pausing running section"); + interface.set_section_state(self.section.interface_id, false); + self.state = Paused { + start_time, + pause_time: Instant::now(), + }; + } + Waiting => { + debug!(section_id = self.section.id, "pausing waiting section"); + self.state = Paused { + start_time: Instant::now(), + pause_time: Instant::now(), + }; + } + Finished | Cancelled | Paused { .. } => {} + } + } + + fn unpause(&mut self, interface: &dyn SectionInterface) { + use RunState::*; + match self.state { + Paused { + start_time, + pause_time, + } => { + debug!(section_id = self.section.id, "unpausing section"); + interface.set_section_state(self.section.interface_id, true); + self.state = Running { + start_time: Instant::now(), + }; + let ran_for = pause_time - start_time; + self.duration = self.duration - ran_for; + } + Waiting | Finished | Cancelled | Running { .. } => { + warn!( + section_id = self.section.id, + state = debug(&self.state), + "can only unpause paused section" + ); + } + } + } } mod option_future { @@ -143,16 +219,25 @@ async fn runner_task( let mut running = true; let mut run_queue: VecDeque = VecDeque::new(); let mut delay_future: OptionFuture<_> = None.into(); + let mut paused = false; while running { if let Some(current_run) = run_queue.front_mut() { let done = match current_run.state { Waiting => { - current_run.start(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); + if paused { + current_run.pause(&*interface); + } else { + current_run.start(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } false } Running { start_time } => { - if start_time.elapsed() >= current_run.duration { + if paused { + current_run.pause(&*interface); + delay_future = None.into(); + false + } else if start_time.elapsed() >= current_run.duration { current_run.finish(&*interface); delay_future = None.into(); true @@ -160,6 +245,13 @@ async fn runner_task( false } } + Paused { .. } => { + if !paused { + current_run.unpause(&*interface); + delay_future = Some(delay_for(current_run.duration)).into(); + } + false + } Cancelled | Finished => true, }; @@ -176,12 +268,7 @@ async fn runner_task( match msg { Quit => running = false, QueueRun(handle, section, duration) => { - run_queue.push_back(SecRun { - handle, - section, - duration, - state: Waiting, - }); + run_queue.push_back(SecRun::new(handle, section, duration)); } CancelRun(handle) => { for run in &mut run_queue { @@ -200,6 +287,12 @@ async fn runner_task( run.cancel(&*interface); } } + Pause => { + paused = true; + } + Unpause => { + paused = false; + } } }; let delay_done = || { @@ -270,11 +363,13 @@ impl SectionRunner { } pub async fn pause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Pause).await?; + Ok(()) } pub async fn unpause(&mut self) -> Result<()> { - todo!() + self.msg_send.send(RunnerMsg::Unpause).await?; + Ok(()) } }