From f8c9fd2def1e074f34d439da990325396e87c420 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Thu, 20 Aug 2020 10:53:17 -0600 Subject: [PATCH] Refactor SectionRunner runner_task Create a struct to stores state and split up `runner_task` in to separate functions. --- src/section_runner.rs | 264 +++++++++++++++++++++++------------------- 1 file changed, 147 insertions(+), 117 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index c92e2c1..53647b7 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -79,15 +79,6 @@ impl SecRun { } } - fn start(&mut self, interface: &dyn SectionInterface) { - use RunState::*; - debug!(section_id = self.section.id, "starting running section"); - interface.set_section_state(self.section.interface_id, true); - self.state = Running { - start_time: Instant::now(), - }; - } - fn is_running(&self) -> bool { matches!(self.state, RunState::Running{..}) } @@ -96,43 +87,79 @@ impl SecRun { fn is_paused(&self) -> bool { matches!(self.state, RunState::Paused{..}) } +} + +struct RunnerTask { + interface: Arc, + msg_recv: mpsc::Receiver, + running: bool, + delay_future: OptionFuture, + paused: bool, +} + +impl RunnerTask { + fn new( + interface: Arc, + msg_recv: mpsc::Receiver, + ) -> Self { + Self { + interface, + msg_recv, + running: true, + delay_future: None.into(), + paused: false, + } + } + + fn start_run(&mut self, run: &mut SecRun) { + use RunState::*; + debug!(section_id = run.section.id, "starting running section"); + self.interface + .set_section_state(run.section.interface_id, true); + run.state = Running { + start_time: Instant::now(), + }; + } - fn finish(&mut self, interface: &dyn SectionInterface) { - if self.is_running() { - debug!(section_id = self.section.id, "finished running section"); - interface.set_section_state(self.section.interface_id, false); - self.state = RunState::Finished; + fn finish_run(&mut self, run: &mut SecRun) { + if run.is_running() { + debug!(section_id = run.section.id, "finished running section"); + self.interface + .set_section_state(run.section.interface_id, false); + run.state = RunState::Finished; } else { warn!( - section_id = self.section.id, - state = debug(&self.state), + section_id = run.section.id, + state = debug(&run.state), "cannot finish run which is not running" ); } } - fn cancel(&mut self, interface: &dyn SectionInterface) { - if self.is_running() { - debug!(section_id = self.section.id, "cancelling running section"); - interface.set_section_state(self.section.interface_id, false); + fn cancel_run(&mut self, run: &mut SecRun) { + if run.is_running() { + debug!(section_id = run.section.id, "cancelling running section"); + self.interface + .set_section_state(run.section.interface_id, false); } - self.state = RunState::Cancelled; + run.state = RunState::Cancelled; } - fn pause(&mut self, interface: &dyn SectionInterface) { + fn pause_run(&mut self, run: &mut SecRun) { use RunState::*; - match self.state { + match run.state { Running { start_time } => { - debug!(section_id = self.section.id, "pausing running section"); - interface.set_section_state(self.section.interface_id, false); - self.state = Paused { + debug!(section_id = run.section.id, "pausing running section"); + self.interface + .set_section_state(run.section.interface_id, false); + run.state = Paused { start_time, pause_time: Instant::now(), }; } Waiting => { - debug!(section_id = self.section.id, "pausing waiting section"); - self.state = Paused { + debug!(section_id = run.section.id, "pausing waiting section"); + run.state = Paused { start_time: Instant::now(), pause_time: Instant::now(), }; @@ -141,127 +168,130 @@ impl SecRun { } } - fn unpause(&mut self, interface: &dyn SectionInterface) { + fn unpause_run(&mut self, run: &mut SecRun) { use RunState::*; - match self.state { + match run.state { Paused { start_time, pause_time, } => { - debug!(section_id = self.section.id, "unpausing section"); - interface.set_section_state(self.section.interface_id, true); - self.state = Running { + debug!(section_id = run.section.id, "unpausing section"); + self.interface + .set_section_state(run.section.interface_id, true); + run.state = Running { start_time: Instant::now(), }; let ran_for = pause_time - start_time; - self.duration -= ran_for; + run.duration -= ran_for; } Waiting | Finished | Cancelled | Running { .. } => { warn!( - section_id = self.section.id, - state = debug(&self.state), + section_id = run.section.id, + state = debug(&run.state), "can only unpause paused section" ); } } } -} -async fn runner_task( - interface: Arc, - mut msg_recv: mpsc::Receiver, -) { - use RunState::*; - - let span = trace_span!("runner_task"); - let _enter = span.enter(); - - let mut running = true; - let mut run_queue: VecDeque = VecDeque::new(); - let mut delay_future: OptionFuture<_> = None.into(); - let mut paused = false; - while running { - if let Some(current_run) = run_queue.front_mut() { - let done = match current_run.state { - Waiting => { - if paused { - current_run.pause(&*interface); - } else { - current_run.start(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); - } + fn process_queue(&mut self, run_queue: &mut VecDeque) { + use RunState::*; + loop { + let current_run = match run_queue.front_mut() { + Some(current_run) => current_run, + None => break, + }; + let run_finished = match (¤t_run.state, self.paused) { + (Waiting, false) => { + self.start_run(current_run); + self.delay_future = Some(delay_for(current_run.duration)).into(); false } - Running { start_time } => { - if paused { - current_run.pause(&*interface); - delay_future = None.into(); - false - } else if start_time.elapsed() >= current_run.duration { - current_run.finish(&*interface); - delay_future = None.into(); - true - } else { - false + (Running { start_time }, false) => { + let time_to_finish = start_time.elapsed() >= current_run.duration; + if time_to_finish { + self.finish_run(current_run); + self.delay_future = None.into(); } + time_to_finish } - Paused { .. } => { - if !paused { - current_run.unpause(&*interface); - delay_future = Some(delay_for(current_run.duration)).into(); - } + (Paused { .. }, false) => { + self.unpause_run(current_run); + self.delay_future = Some(delay_for(current_run.duration)).into(); false } - Cancelled | Finished => true, + (Waiting, true) => { + self.pause_run(current_run); + false + } + (Running { .. }, true) => { + self.pause_run(current_run); + self.delay_future = None.into(); + false + } + (Paused { .. }, true) => false, + (Cancelled, _) | (Finished, _) => true, }; - if done { + if run_finished { run_queue.pop_front(); - continue; + } else { + break; } } + } - let mut handle_msg = |msg: Option| { - let msg = msg.expect("SectionRunner channel closed"); - use RunnerMsg::*; - trace!(msg = debug(&msg), "runner_task recv"); - match msg { - Quit => running = false, - QueueRun(handle, section, duration) => { - run_queue.push_back(SecRun::new(handle, section, duration)); - } - CancelRun(handle) => { - for run in &mut run_queue { - if run.handle != handle { - continue; - } - trace!(handle = handle.0, "cancelling run by handle"); - run.cancel(&*interface); - } - } - CancelAll => { - let mut old_runs = VecDeque::new(); - swap(&mut old_runs, &mut run_queue); - trace!(count = old_runs.len(), "cancelling all runs"); - for mut run in old_runs { - run.cancel(&*interface); + fn handle_msg(&mut self, msg: Option, run_queue: &mut VecDeque) { + let msg = msg.expect("SectionRunner channel closed"); + use RunnerMsg::*; + trace!(msg = debug(&msg), "runner_task recv"); + match msg { + Quit => self.running = false, + QueueRun(handle, section, duration) => { + run_queue.push_back(SecRun::new(handle, section, duration)); + } + CancelRun(handle) => { + for run in run_queue { + if run.handle != handle { + continue; } + trace!(handle = handle.0, "cancelling run by handle"); + self.cancel_run(run); } - Pause => { - paused = true; - } - Unpause => { - paused = false; + } + CancelAll => { + let mut old_runs = VecDeque::new(); + swap(&mut old_runs, run_queue); + trace!(count = old_runs.len(), "cancelling all runs"); + for mut run in old_runs { + self.cancel_run(&mut run); } } - }; - let delay_done = || { - trace!("delay done"); - }; - tokio::select! { - msg = msg_recv.recv() => handle_msg(msg), - _ = &mut delay_future, if delay_future.is_some() => delay_done() - }; + Pause => { + self.paused = true; + } + Unpause => { + self.paused = false; + } + } + } + + async fn start(mut self) { + let span = trace_span!("runner_task"); + let _enter = span.enter(); + + let mut run_queue: VecDeque = VecDeque::new(); + + while self.running { + self.process_queue(&mut run_queue); + let delay_done = || { + trace!("delay done"); + }; + tokio::select! { + msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), + _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() + }; + } } } @@ -287,7 +317,7 @@ pub struct SectionRunner { impl SectionRunner { pub fn new(interface: Arc) -> Self { let (msg_send, msg_recv) = mpsc::channel(8); - spawn(runner_task(interface, msg_recv)); + spawn(RunnerTask::new(interface, msg_recv).start()); Self { inner: Arc::new(SectionRunnerInner::new()), msg_send, -- 2.30.2