Browse Source

Implement SectionRunner pause/unpause

pull/3/head
Alex Mikhalev 4 years ago
parent
commit
ab994d027a
  1. 121
      src/section_runner.rs

121
src/section_runner.rs

@ -16,7 +16,7 @@ use tokio::{
sync::mpsc, sync::mpsc,
time::{delay_for, Instant}, time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span}; use tracing::{debug, trace, trace_span, warn};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunHandle(i32); pub struct RunHandle(i32);
@ -40,14 +40,22 @@ enum RunnerMsg {
QueueRun(RunHandle, SectionRef, Duration), QueueRun(RunHandle, SectionRef, Duration),
CancelRun(RunHandle), CancelRun(RunHandle),
CancelAll, CancelAll,
Pause,
Unpause,
} }
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
enum RunState { enum RunState {
Waiting, Waiting,
Running { start_time: Instant }, Running {
start_time: Instant,
},
Finished, Finished,
Cancelled, Cancelled,
Paused {
start_time: Instant,
pause_time: Instant,
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -55,10 +63,21 @@ struct SecRun {
handle: RunHandle, handle: RunHandle,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
total_duration: Duration,
state: RunState, state: RunState,
} }
impl SecRun { impl SecRun {
fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self {
Self {
handle,
section,
duration,
total_duration: duration,
state: RunState::Waiting,
}
}
fn start(&mut self, interface: &dyn SectionInterface) { fn start(&mut self, interface: &dyn SectionInterface) {
use RunState::*; use RunState::*;
debug!(section_id = self.section.id, "starting running section"); debug!(section_id = self.section.id, "starting running section");
@ -72,11 +91,21 @@ impl SecRun {
matches!(self.state, RunState::Running{..}) matches!(self.state, RunState::Running{..})
} }
fn is_paused(&self) -> bool {
matches!(self.state, RunState::Paused{..})
}
fn finish(&mut self, interface: &dyn SectionInterface) { fn finish(&mut self, interface: &dyn SectionInterface) {
if self.is_running() { if self.is_running() {
debug!(section_id = self.section.id, "finished running section"); debug!(section_id = self.section.id, "finished running section");
interface.set_section_state(self.section.interface_id, false); interface.set_section_state(self.section.interface_id, false);
self.state = RunState::Finished; self.state = RunState::Finished;
} else {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"cannot finish run which is not running"
);
} }
} }
@ -87,6 +116,53 @@ impl SecRun {
} }
self.state = RunState::Cancelled; self.state = RunState::Cancelled;
} }
fn pause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Running { start_time } => {
debug!(section_id = self.section.id, "pausing running section");
interface.set_section_state(self.section.interface_id, false);
self.state = Paused {
start_time,
pause_time: Instant::now(),
};
}
Waiting => {
debug!(section_id = self.section.id, "pausing waiting section");
self.state = Paused {
start_time: Instant::now(),
pause_time: Instant::now(),
};
}
Finished | Cancelled | Paused { .. } => {}
}
}
fn unpause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Paused {
start_time,
pause_time,
} => {
debug!(section_id = self.section.id, "unpausing section");
interface.set_section_state(self.section.interface_id, true);
self.state = Running {
start_time: Instant::now(),
};
let ran_for = pause_time - start_time;
self.duration = self.duration - ran_for;
}
Waiting | Finished | Cancelled | Running { .. } => {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"can only unpause paused section"
);
}
}
}
} }
mod option_future { mod option_future {
@ -143,16 +219,25 @@ async fn runner_task(
let mut running = true; let mut running = true;
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); let mut run_queue: VecDeque<SecRun> = VecDeque::new();
let mut delay_future: OptionFuture<_> = None.into(); let mut delay_future: OptionFuture<_> = None.into();
let mut paused = false;
while running { while running {
if let Some(current_run) = run_queue.front_mut() { if let Some(current_run) = run_queue.front_mut() {
let done = match current_run.state { let done = match current_run.state {
Waiting => { Waiting => {
current_run.start(&*interface); if paused {
delay_future = Some(delay_for(current_run.duration)).into(); current_run.pause(&*interface);
} else {
current_run.start(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false false
} }
Running { start_time } => { Running { start_time } => {
if start_time.elapsed() >= current_run.duration { if paused {
current_run.pause(&*interface);
delay_future = None.into();
false
} else if start_time.elapsed() >= current_run.duration {
current_run.finish(&*interface); current_run.finish(&*interface);
delay_future = None.into(); delay_future = None.into();
true true
@ -160,6 +245,13 @@ async fn runner_task(
false false
} }
} }
Paused { .. } => {
if !paused {
current_run.unpause(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false
}
Cancelled | Finished => true, Cancelled | Finished => true,
}; };
@ -176,12 +268,7 @@ async fn runner_task(
match msg { match msg {
Quit => running = false, Quit => running = false,
QueueRun(handle, section, duration) => { QueueRun(handle, section, duration) => {
run_queue.push_back(SecRun { run_queue.push_back(SecRun::new(handle, section, duration));
handle,
section,
duration,
state: Waiting,
});
} }
CancelRun(handle) => { CancelRun(handle) => {
for run in &mut run_queue { for run in &mut run_queue {
@ -200,6 +287,12 @@ async fn runner_task(
run.cancel(&*interface); run.cancel(&*interface);
} }
} }
Pause => {
paused = true;
}
Unpause => {
paused = false;
}
} }
}; };
let delay_done = || { let delay_done = || {
@ -270,11 +363,13 @@ impl SectionRunner {
} }
pub async fn pause(&mut self) -> Result<()> { pub async fn pause(&mut self) -> Result<()> {
todo!() self.msg_send.send(RunnerMsg::Pause).await?;
Ok(())
} }
pub async fn unpause(&mut self) -> Result<()> { pub async fn unpause(&mut self) -> Result<()> {
todo!() self.msg_send.send(RunnerMsg::Unpause).await?;
Ok(())
} }
} }

Loading…
Cancel
Save