Browse Source

Use immutable collections in section_runner

master
Alex Mikhalev 4 years ago
parent
commit
46fab5b50f
  1. 97
      src/section_runner.rs

97
src/section_runner.rs

@ -2,7 +2,6 @@ use crate::model::SectionRef;
use crate::option_future::OptionFuture; use crate::option_future::OptionFuture;
use crate::section_interface::SectionInterface; use crate::section_interface::SectionInterface;
use std::{ use std::{
collections::VecDeque,
mem::swap, mem::swap,
sync::{ sync::{
atomic::{AtomicI32, Ordering}, atomic::{AtomicI32, Ordering},
@ -62,7 +61,7 @@ type SectionEventSend = broadcast::Sender<SectionEvent>;
const EVENT_CAPACITY: usize = 8; const EVENT_CAPACITY: usize = 8;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
enum RunState { pub enum SecRunState {
Waiting, Waiting,
Running { Running {
start_time: Instant, start_time: Instant,
@ -75,13 +74,13 @@ enum RunState {
}, },
} }
#[derive(Debug)] #[derive(Clone, Debug)]
struct SecRun { pub struct SecRun {
handle: SectionRunHandle, handle: SectionRunHandle,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
total_duration: Duration, total_duration: Duration,
state: RunState, state: SecRunState,
} }
impl SecRun { impl SecRun {
@ -91,17 +90,34 @@ impl SecRun {
section, section,
duration, duration,
total_duration: duration, total_duration: duration,
state: RunState::Waiting, state: SecRunState::Waiting,
} }
} }
fn is_running(&self) -> bool { pub fn is_running(&self) -> bool {
matches!(self.state, RunState::Running{..}) matches!(self.state, SecRunState::Running{..})
} }
#[allow(dead_code)] #[allow(dead_code)]
fn is_paused(&self) -> bool { pub fn is_paused(&self) -> bool {
matches!(self.state, RunState::Paused{..}) matches!(self.state, SecRunState::Paused{..})
}
}
pub type SecRunQueue = im::Vector<Arc<SecRun>>;
#[derive(Clone, Debug)]
pub struct SecRunnerState {
run_queue: SecRunQueue,
paused: bool,
}
impl Default for SecRunnerState {
fn default() -> Self {
Self {
run_queue: SecRunQueue::default(),
paused: false,
}
} }
} }
@ -110,7 +126,6 @@ struct RunnerTask {
msg_recv: mpsc::Receiver<RunnerMsg>, msg_recv: mpsc::Receiver<RunnerMsg>,
running: bool, running: bool,
delay_future: OptionFuture<tokio::time::Delay>, delay_future: OptionFuture<tokio::time::Delay>,
paused: bool,
event_send: Option<SectionEventSend>, event_send: Option<SectionEventSend>,
quit_tx: Option<oneshot::Sender<()>>, quit_tx: Option<oneshot::Sender<()>>,
} }
@ -125,7 +140,6 @@ impl RunnerTask {
msg_recv, msg_recv,
running: true, running: true,
delay_future: None.into(), delay_future: None.into(),
paused: false,
event_send: None, event_send: None,
quit_tx: None, quit_tx: None,
} }
@ -153,8 +167,9 @@ impl RunnerTask {
} }
} }
fn start_run(&mut self, run: &mut SecRun) { fn start_run(&mut self, run: &mut Arc<SecRun>) {
use RunState::*; use SecRunState::*;
let run = Arc::make_mut(run);
debug!(section_id = run.section.id, "starting running section"); debug!(section_id = run.section.id, "starting running section");
self.interface self.interface
.set_section_state(run.section.interface_id, true); .set_section_state(run.section.interface_id, true);
@ -164,12 +179,13 @@ impl RunnerTask {
self.send_event(SectionEvent::RunStart(run.handle.clone())); self.send_event(SectionEvent::RunStart(run.handle.clone()));
} }
fn finish_run(&mut self, run: &mut SecRun) { fn finish_run(&mut self, run: &mut Arc<SecRun>) {
let run = Arc::make_mut(run);
if run.is_running() { if run.is_running() {
debug!(section_id = run.section.id, "finished running section"); debug!(section_id = run.section.id, "finished running section");
self.interface self.interface
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
run.state = RunState::Finished; run.state = SecRunState::Finished;
self.send_event(SectionEvent::RunFinish(run.handle.clone())); self.send_event(SectionEvent::RunFinish(run.handle.clone()));
} else { } else {
warn!( warn!(
@ -180,18 +196,20 @@ impl RunnerTask {
} }
} }
fn cancel_run(&mut self, run: &mut SecRun) { fn cancel_run(&mut self, run: &mut Arc<SecRun>) {
let run = Arc::make_mut(run);
if run.is_running() { if run.is_running() {
debug!(section_id = run.section.id, "cancelling running section"); debug!(section_id = run.section.id, "cancelling running section");
self.interface self.interface
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
} }
run.state = RunState::Cancelled; run.state = SecRunState::Cancelled;
self.send_event(SectionEvent::RunCancel(run.handle.clone())); self.send_event(SectionEvent::RunCancel(run.handle.clone()));
} }
fn pause_run(&mut self, run: &mut SecRun) { fn pause_run(&mut self, run: &mut Arc<SecRun>) {
use RunState::*; use SecRunState::*;
let run = Arc::make_mut(run);
let new_state = match run.state { let new_state = match run.state {
Running { start_time } => { Running { start_time } => {
debug!(section_id = run.section.id, "pausing running section"); debug!(section_id = run.section.id, "pausing running section");
@ -217,8 +235,9 @@ impl RunnerTask {
self.send_event(SectionEvent::RunPause(run.handle.clone())); self.send_event(SectionEvent::RunPause(run.handle.clone()));
} }
fn unpause_run(&mut self, run: &mut SecRun) { fn unpause_run(&mut self, run: &mut Arc<SecRun>) {
use RunState::*; use SecRunState::*;
let run = Arc::make_mut(run);
match run.state { match run.state {
Paused { Paused {
start_time, start_time,
@ -244,10 +263,10 @@ impl RunnerTask {
} }
} }
fn process_queue(&mut self, run_queue: &mut VecDeque<SecRun>) { fn process_queue(&mut self, state: &mut SecRunnerState) {
use RunState::*; use SecRunState::*;
while let Some(current_run) = run_queue.front_mut() { while let Some(current_run) = state.run_queue.front_mut() {
let run_finished = match (&current_run.state, self.paused) { let run_finished = match (&current_run.state, state.paused) {
(Waiting, false) => { (Waiting, false) => {
self.start_run(current_run); self.start_run(current_run);
self.delay_future = Some(delay_for(current_run.duration)).into(); self.delay_future = Some(delay_for(current_run.duration)).into();
@ -280,14 +299,14 @@ impl RunnerTask {
}; };
if run_finished { if run_finished {
run_queue.pop_front(); state.run_queue.pop_front();
} else { } else {
break; break;
} }
} }
} }
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut VecDeque<SecRun>) { fn handle_msg(&mut self, msg: Option<RunnerMsg>, state: &mut SecRunnerState) {
let msg = msg.expect("SectionRunner channel closed"); let msg = msg.expect("SectionRunner channel closed");
use RunnerMsg::*; use RunnerMsg::*;
trace!(msg = debug(&msg), "runner_task recv"); trace!(msg = debug(&msg), "runner_task recv");
@ -295,12 +314,14 @@ impl RunnerTask {
Quit(quit_tx) => { Quit(quit_tx) => {
self.quit_tx = Some(quit_tx); self.quit_tx = Some(quit_tx);
self.running = false; self.running = false;
}, }
QueueRun(handle, section, duration) => { QueueRun(handle, section, duration) => {
run_queue.push_back(SecRun::new(handle, section, duration)); state
.run_queue
.push_back(Arc::new(SecRun::new(handle, section, duration)));
} }
CancelRun(handle) => { CancelRun(handle) => {
for run in run_queue { for run in state.run_queue.iter_mut() {
if run.handle != handle { if run.handle != handle {
continue; continue;
} }
@ -309,19 +330,19 @@ impl RunnerTask {
} }
} }
CancelAll => { CancelAll => {
let mut old_runs = VecDeque::new(); let mut old_runs = SecRunQueue::new();
swap(&mut old_runs, run_queue); swap(&mut old_runs, &mut state.run_queue);
trace!(count = old_runs.len(), "cancelling all runs"); trace!(count = old_runs.len(), "cancelling all runs");
for mut run in old_runs { for mut run in old_runs {
self.cancel_run(&mut run); self.cancel_run(&mut run);
} }
} }
Pause => { Pause => {
self.paused = true; state.paused = true;
self.send_event(SectionEvent::RunnerPause); self.send_event(SectionEvent::RunnerPause);
} }
Unpause => { Unpause => {
self.paused = false; state.paused = false;
self.send_event(SectionEvent::RunnerUnpause); self.send_event(SectionEvent::RunnerUnpause);
} }
Subscribe(res_send) => { Subscribe(res_send) => {
@ -336,15 +357,15 @@ impl RunnerTask {
let span = trace_span!("runner_task"); let span = trace_span!("runner_task");
let _enter = span.enter(); let _enter = span.enter();
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); let mut state = SecRunnerState::default();
while self.running { while self.running {
self.process_queue(&mut run_queue); self.process_queue(&mut state);
let delay_done = || { let delay_done = || {
trace!("delay done"); trace!("delay done");
}; };
tokio::select! { tokio::select! {
msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), msg = self.msg_recv.recv() => self.handle_msg(msg, &mut state),
_ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
}; };
} }

Loading…
Cancel
Save