Browse Source

Refactor SectionRunner runner_task

Create a struct to stores state and split up `runner_task` in
to separate functions.
refactor-section-runner
Alex Mikhalev 4 years ago
parent
commit
f8c9fd2def
  1. 208
      src/section_runner.rs

208
src/section_runner.rs

@ -79,15 +79,6 @@ impl SecRun {
} }
} }
fn start(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
debug!(section_id = self.section.id, "starting running section");
interface.set_section_state(self.section.interface_id, true);
self.state = Running {
start_time: Instant::now(),
};
}
fn is_running(&self) -> bool { fn is_running(&self) -> bool {
matches!(self.state, RunState::Running{..}) matches!(self.state, RunState::Running{..})
} }
@ -96,43 +87,79 @@ impl SecRun {
fn is_paused(&self) -> bool { fn is_paused(&self) -> bool {
matches!(self.state, RunState::Paused{..}) matches!(self.state, RunState::Paused{..})
} }
}
fn finish(&mut self, interface: &dyn SectionInterface) { struct RunnerTask {
if self.is_running() { interface: Arc<dyn SectionInterface + Sync>,
debug!(section_id = self.section.id, "finished running section"); msg_recv: mpsc::Receiver<RunnerMsg>,
interface.set_section_state(self.section.interface_id, false); running: bool,
self.state = RunState::Finished; delay_future: OptionFuture<tokio::time::Delay>,
paused: bool,
}
impl RunnerTask {
fn new(
interface: Arc<dyn SectionInterface + Sync>,
msg_recv: mpsc::Receiver<RunnerMsg>,
) -> Self {
Self {
interface,
msg_recv,
running: true,
delay_future: None.into(),
paused: false,
}
}
fn start_run(&mut self, run: &mut SecRun) {
use RunState::*;
debug!(section_id = run.section.id, "starting running section");
self.interface
.set_section_state(run.section.interface_id, true);
run.state = Running {
start_time: Instant::now(),
};
}
fn finish_run(&mut self, run: &mut SecRun) {
if run.is_running() {
debug!(section_id = run.section.id, "finished running section");
self.interface
.set_section_state(run.section.interface_id, false);
run.state = RunState::Finished;
} else { } else {
warn!( warn!(
section_id = self.section.id, section_id = run.section.id,
state = debug(&self.state), state = debug(&run.state),
"cannot finish run which is not running" "cannot finish run which is not running"
); );
} }
} }
fn cancel(&mut self, interface: &dyn SectionInterface) { fn cancel_run(&mut self, run: &mut SecRun) {
if self.is_running() { if run.is_running() {
debug!(section_id = self.section.id, "cancelling running section"); debug!(section_id = run.section.id, "cancelling running section");
interface.set_section_state(self.section.interface_id, false); self.interface
.set_section_state(run.section.interface_id, false);
} }
self.state = RunState::Cancelled; run.state = RunState::Cancelled;
} }
fn pause(&mut self, interface: &dyn SectionInterface) { fn pause_run(&mut self, run: &mut SecRun) {
use RunState::*; use RunState::*;
match self.state { match run.state {
Running { start_time } => { Running { start_time } => {
debug!(section_id = self.section.id, "pausing running section"); debug!(section_id = run.section.id, "pausing running section");
interface.set_section_state(self.section.interface_id, false); self.interface
self.state = Paused { .set_section_state(run.section.interface_id, false);
run.state = Paused {
start_time, start_time,
pause_time: Instant::now(), pause_time: Instant::now(),
}; };
} }
Waiting => { Waiting => {
debug!(section_id = self.section.id, "pausing waiting section"); debug!(section_id = run.section.id, "pausing waiting section");
self.state = Paused { run.state = Paused {
start_time: Instant::now(), start_time: Instant::now(),
pause_time: Instant::now(), pause_time: Instant::now(),
}; };
@ -141,129 +168,132 @@ impl SecRun {
} }
} }
fn unpause(&mut self, interface: &dyn SectionInterface) { fn unpause_run(&mut self, run: &mut SecRun) {
use RunState::*; use RunState::*;
match self.state { match run.state {
Paused { Paused {
start_time, start_time,
pause_time, pause_time,
} => { } => {
debug!(section_id = self.section.id, "unpausing section"); debug!(section_id = run.section.id, "unpausing section");
interface.set_section_state(self.section.interface_id, true); self.interface
self.state = Running { .set_section_state(run.section.interface_id, true);
run.state = Running {
start_time: Instant::now(), start_time: Instant::now(),
}; };
let ran_for = pause_time - start_time; let ran_for = pause_time - start_time;
self.duration -= ran_for; run.duration -= ran_for;
} }
Waiting | Finished | Cancelled | Running { .. } => { Waiting | Finished | Cancelled | Running { .. } => {
warn!( warn!(
section_id = self.section.id, section_id = run.section.id,
state = debug(&self.state), state = debug(&run.state),
"can only unpause paused section" "can only unpause paused section"
); );
} }
} }
} }
}
async fn runner_task( fn process_queue(&mut self, run_queue: &mut VecDeque<SecRun>) {
interface: Arc<dyn SectionInterface + Sync>,
mut msg_recv: mpsc::Receiver<RunnerMsg>,
) {
use RunState::*; use RunState::*;
loop {
let span = trace_span!("runner_task"); let current_run = match run_queue.front_mut() {
let _enter = span.enter(); Some(current_run) => current_run,
None => break,
let mut running = true; };
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); let run_finished = match (&current_run.state, self.paused) {
let mut delay_future: OptionFuture<_> = None.into(); (Waiting, false) => {
let mut paused = false; self.start_run(current_run);
while running { self.delay_future = Some(delay_for(current_run.duration)).into();
if let Some(current_run) = run_queue.front_mut() {
let done = match current_run.state {
Waiting => {
if paused {
current_run.pause(&*interface);
} else {
current_run.start(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false false
} }
Running { start_time } => { (Running { start_time }, false) => {
if paused { let time_to_finish = start_time.elapsed() >= current_run.duration;
current_run.pause(&*interface); if time_to_finish {
delay_future = None.into(); self.finish_run(current_run);
false self.delay_future = None.into();
} else if start_time.elapsed() >= current_run.duration {
current_run.finish(&*interface);
delay_future = None.into();
true
} else {
false
} }
time_to_finish
} }
Paused { .. } => { (Paused { .. }, false) => {
if !paused { self.unpause_run(current_run);
current_run.unpause(&*interface); self.delay_future = Some(delay_for(current_run.duration)).into();
delay_future = Some(delay_for(current_run.duration)).into(); false
}
(Waiting, true) => {
self.pause_run(current_run);
false
} }
(Running { .. }, true) => {
self.pause_run(current_run);
self.delay_future = None.into();
false false
} }
Cancelled | Finished => true, (Paused { .. }, true) => false,
(Cancelled, _) | (Finished, _) => true,
}; };
if done { if run_finished {
run_queue.pop_front(); run_queue.pop_front();
continue; } else {
break;
}
} }
} }
let mut handle_msg = |msg: Option<RunnerMsg>| { fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut VecDeque<SecRun>) {
let msg = msg.expect("SectionRunner channel closed"); let msg = msg.expect("SectionRunner channel closed");
use RunnerMsg::*; use RunnerMsg::*;
trace!(msg = debug(&msg), "runner_task recv"); trace!(msg = debug(&msg), "runner_task recv");
match msg { match msg {
Quit => running = false, Quit => self.running = false,
QueueRun(handle, section, duration) => { QueueRun(handle, section, duration) => {
run_queue.push_back(SecRun::new(handle, section, duration)); run_queue.push_back(SecRun::new(handle, section, duration));
} }
CancelRun(handle) => { CancelRun(handle) => {
for run in &mut run_queue { for run in run_queue {
if run.handle != handle { if run.handle != handle {
continue; continue;
} }
trace!(handle = handle.0, "cancelling run by handle"); trace!(handle = handle.0, "cancelling run by handle");
run.cancel(&*interface); self.cancel_run(run);
} }
} }
CancelAll => { CancelAll => {
let mut old_runs = VecDeque::new(); let mut old_runs = VecDeque::new();
swap(&mut old_runs, &mut run_queue); swap(&mut old_runs, run_queue);
trace!(count = old_runs.len(), "cancelling all runs"); trace!(count = old_runs.len(), "cancelling all runs");
for mut run in old_runs { for mut run in old_runs {
run.cancel(&*interface); self.cancel_run(&mut run);
} }
} }
Pause => { Pause => {
paused = true; self.paused = true;
} }
Unpause => { Unpause => {
paused = false; self.paused = false;
} }
} }
}; }
async fn start(mut self) {
let span = trace_span!("runner_task");
let _enter = span.enter();
let mut run_queue: VecDeque<SecRun> = VecDeque::new();
while self.running {
self.process_queue(&mut run_queue);
let delay_done = || { let delay_done = || {
trace!("delay done"); trace!("delay done");
}; };
tokio::select! { tokio::select! {
msg = msg_recv.recv() => handle_msg(msg), msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue),
_ = &mut delay_future, if delay_future.is_some() => delay_done() _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
}; };
} }
} }
}
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
#[error("the SectionRunner channel is closed")] #[error("the SectionRunner channel is closed")]
@ -287,7 +317,7 @@ pub struct SectionRunner {
impl SectionRunner { impl SectionRunner {
pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self {
let (msg_send, msg_recv) = mpsc::channel(8); let (msg_send, msg_recv) = mpsc::channel(8);
spawn(runner_task(interface, msg_recv)); spawn(RunnerTask::new(interface, msg_recv).start());
Self { Self {
inner: Arc::new(SectionRunnerInner::new()), inner: Arc::new(SectionRunnerInner::new()),
msg_send, msg_send,

Loading…
Cancel
Save