From 46fab5b50f66749716aec2534e84c1ad348cd62f Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Sep 2020 18:28:18 -0600 Subject: [PATCH] Use immutable collections in section_runner --- src/section_runner.rs | 97 ++++++++++++++++++++++++++----------------- 1 file changed, 59 insertions(+), 38 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index c9235b2..62c0737 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -2,7 +2,6 @@ use crate::model::SectionRef; use crate::option_future::OptionFuture; use crate::section_interface::SectionInterface; use std::{ - collections::VecDeque, mem::swap, sync::{ atomic::{AtomicI32, Ordering}, @@ -62,7 +61,7 @@ type SectionEventSend = broadcast::Sender; const EVENT_CAPACITY: usize = 8; #[derive(Clone, Debug, PartialEq)] -enum RunState { +pub enum SecRunState { Waiting, Running { start_time: Instant, @@ -75,13 +74,13 @@ enum RunState { }, } -#[derive(Debug)] -struct SecRun { +#[derive(Clone, Debug)] +pub struct SecRun { handle: SectionRunHandle, section: SectionRef, duration: Duration, total_duration: Duration, - state: RunState, + state: SecRunState, } impl SecRun { @@ -91,17 +90,34 @@ impl SecRun { section, duration, total_duration: duration, - state: RunState::Waiting, + state: SecRunState::Waiting, } } - fn is_running(&self) -> bool { - matches!(self.state, RunState::Running{..}) + pub fn is_running(&self) -> bool { + matches!(self.state, SecRunState::Running{..}) } #[allow(dead_code)] - fn is_paused(&self) -> bool { - matches!(self.state, RunState::Paused{..}) + pub fn is_paused(&self) -> bool { + matches!(self.state, SecRunState::Paused{..}) + } +} + +pub type SecRunQueue = im::Vector>; + +#[derive(Clone, Debug)] +pub struct SecRunnerState { + run_queue: SecRunQueue, + paused: bool, +} + +impl Default for SecRunnerState { + fn default() -> Self { + Self { + run_queue: SecRunQueue::default(), + paused: false, + } } } @@ -110,7 +126,6 @@ struct RunnerTask { msg_recv: mpsc::Receiver, running: bool, delay_future: OptionFuture, - paused: bool, event_send: Option, quit_tx: Option>, } @@ -125,7 +140,6 @@ impl RunnerTask { msg_recv, running: true, delay_future: None.into(), - paused: false, event_send: None, quit_tx: None, } @@ -153,8 +167,9 @@ impl RunnerTask { } } - fn start_run(&mut self, run: &mut SecRun) { - use RunState::*; + fn start_run(&mut self, run: &mut Arc) { + use SecRunState::*; + let run = Arc::make_mut(run); debug!(section_id = run.section.id, "starting running section"); self.interface .set_section_state(run.section.interface_id, true); @@ -164,12 +179,13 @@ impl RunnerTask { self.send_event(SectionEvent::RunStart(run.handle.clone())); } - fn finish_run(&mut self, run: &mut SecRun) { + fn finish_run(&mut self, run: &mut Arc) { + let run = Arc::make_mut(run); if run.is_running() { debug!(section_id = run.section.id, "finished running section"); self.interface .set_section_state(run.section.interface_id, false); - run.state = RunState::Finished; + run.state = SecRunState::Finished; self.send_event(SectionEvent::RunFinish(run.handle.clone())); } else { warn!( @@ -180,18 +196,20 @@ impl RunnerTask { } } - fn cancel_run(&mut self, run: &mut SecRun) { + fn cancel_run(&mut self, run: &mut Arc) { + let run = Arc::make_mut(run); if run.is_running() { debug!(section_id = run.section.id, "cancelling running section"); self.interface .set_section_state(run.section.interface_id, false); } - run.state = RunState::Cancelled; + run.state = SecRunState::Cancelled; self.send_event(SectionEvent::RunCancel(run.handle.clone())); } - fn pause_run(&mut self, run: &mut SecRun) { - use RunState::*; + fn pause_run(&mut self, run: &mut Arc) { + use SecRunState::*; + let run = Arc::make_mut(run); let new_state = match run.state { Running { start_time } => { debug!(section_id = run.section.id, "pausing running section"); @@ -217,8 +235,9 @@ impl RunnerTask { self.send_event(SectionEvent::RunPause(run.handle.clone())); } - fn unpause_run(&mut self, run: &mut SecRun) { - use RunState::*; + fn unpause_run(&mut self, run: &mut Arc) { + use SecRunState::*; + let run = Arc::make_mut(run); match run.state { Paused { start_time, @@ -244,10 +263,10 @@ impl RunnerTask { } } - fn process_queue(&mut self, run_queue: &mut VecDeque) { - use RunState::*; - while let Some(current_run) = run_queue.front_mut() { - let run_finished = match (¤t_run.state, self.paused) { + fn process_queue(&mut self, state: &mut SecRunnerState) { + use SecRunState::*; + while let Some(current_run) = state.run_queue.front_mut() { + let run_finished = match (¤t_run.state, state.paused) { (Waiting, false) => { self.start_run(current_run); self.delay_future = Some(delay_for(current_run.duration)).into(); @@ -280,14 +299,14 @@ impl RunnerTask { }; if run_finished { - run_queue.pop_front(); + state.run_queue.pop_front(); } else { break; } } } - fn handle_msg(&mut self, msg: Option, run_queue: &mut VecDeque) { + fn handle_msg(&mut self, msg: Option, state: &mut SecRunnerState) { let msg = msg.expect("SectionRunner channel closed"); use RunnerMsg::*; trace!(msg = debug(&msg), "runner_task recv"); @@ -295,12 +314,14 @@ impl RunnerTask { Quit(quit_tx) => { self.quit_tx = Some(quit_tx); self.running = false; - }, + } QueueRun(handle, section, duration) => { - run_queue.push_back(SecRun::new(handle, section, duration)); + state + .run_queue + .push_back(Arc::new(SecRun::new(handle, section, duration))); } CancelRun(handle) => { - for run in run_queue { + for run in state.run_queue.iter_mut() { if run.handle != handle { continue; } @@ -309,19 +330,19 @@ impl RunnerTask { } } CancelAll => { - let mut old_runs = VecDeque::new(); - swap(&mut old_runs, run_queue); + let mut old_runs = SecRunQueue::new(); + swap(&mut old_runs, &mut state.run_queue); trace!(count = old_runs.len(), "cancelling all runs"); for mut run in old_runs { self.cancel_run(&mut run); } } Pause => { - self.paused = true; + state.paused = true; self.send_event(SectionEvent::RunnerPause); } Unpause => { - self.paused = false; + state.paused = false; self.send_event(SectionEvent::RunnerUnpause); } Subscribe(res_send) => { @@ -336,15 +357,15 @@ impl RunnerTask { let span = trace_span!("runner_task"); let _enter = span.enter(); - let mut run_queue: VecDeque = VecDeque::new(); + let mut state = SecRunnerState::default(); while self.running { - self.process_queue(&mut run_queue); + self.process_queue(&mut state); let delay_done = || { trace!("delay done"); }; tokio::select! { - msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), + msg = self.msg_recv.recv() => self.handle_msg(msg, &mut state), _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() }; }