Browse Source

Merge pull request 'Add test for SectionRunner pause/unpause' (#3) from section-runner-pause into master

Reviewed-on: #3
refactor-section-runner
amikhalev 4 years ago
parent
commit
e75a326f7d
  1. 201
      src/section_runner.rs

201
src/section_runner.rs

@ -16,7 +16,7 @@ use tokio::{
sync::mpsc, sync::mpsc,
time::{delay_for, Instant}, time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span}; use tracing::{debug, trace, trace_span, warn};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunHandle(i32); pub struct RunHandle(i32);
@ -40,14 +40,22 @@ enum RunnerMsg {
QueueRun(RunHandle, SectionRef, Duration), QueueRun(RunHandle, SectionRef, Duration),
CancelRun(RunHandle), CancelRun(RunHandle),
CancelAll, CancelAll,
Pause,
Unpause,
} }
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
enum RunState { enum RunState {
Waiting, Waiting,
Running { start_time: Instant }, Running {
start_time: Instant,
},
Finished, Finished,
Cancelled, Cancelled,
Paused {
start_time: Instant,
pause_time: Instant,
},
} }
#[derive(Debug)] #[derive(Debug)]
@ -55,10 +63,21 @@ struct SecRun {
handle: RunHandle, handle: RunHandle,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
total_duration: Duration,
state: RunState, state: RunState,
} }
impl SecRun { impl SecRun {
fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self {
Self {
handle,
section,
duration,
total_duration: duration,
state: RunState::Waiting,
}
}
fn start(&mut self, interface: &dyn SectionInterface) { fn start(&mut self, interface: &dyn SectionInterface) {
use RunState::*; use RunState::*;
debug!(section_id = self.section.id, "starting running section"); debug!(section_id = self.section.id, "starting running section");
@ -72,11 +91,21 @@ impl SecRun {
matches!(self.state, RunState::Running{..}) matches!(self.state, RunState::Running{..})
} }
fn is_paused(&self) -> bool {
matches!(self.state, RunState::Paused{..})
}
fn finish(&mut self, interface: &dyn SectionInterface) { fn finish(&mut self, interface: &dyn SectionInterface) {
if self.is_running() { if self.is_running() {
debug!(section_id = self.section.id, "finished running section"); debug!(section_id = self.section.id, "finished running section");
interface.set_section_state(self.section.interface_id, false); interface.set_section_state(self.section.interface_id, false);
self.state = RunState::Finished; self.state = RunState::Finished;
} else {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"cannot finish run which is not running"
);
} }
} }
@ -87,6 +116,53 @@ impl SecRun {
} }
self.state = RunState::Cancelled; self.state = RunState::Cancelled;
} }
fn pause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Running { start_time } => {
debug!(section_id = self.section.id, "pausing running section");
interface.set_section_state(self.section.interface_id, false);
self.state = Paused {
start_time,
pause_time: Instant::now(),
};
}
Waiting => {
debug!(section_id = self.section.id, "pausing waiting section");
self.state = Paused {
start_time: Instant::now(),
pause_time: Instant::now(),
};
}
Finished | Cancelled | Paused { .. } => {}
}
}
fn unpause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Paused {
start_time,
pause_time,
} => {
debug!(section_id = self.section.id, "unpausing section");
interface.set_section_state(self.section.interface_id, true);
self.state = Running {
start_time: Instant::now(),
};
let ran_for = pause_time - start_time;
self.duration = self.duration - ran_for;
}
Waiting | Finished | Cancelled | Running { .. } => {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"can only unpause paused section"
);
}
}
}
} }
mod option_future { mod option_future {
@ -143,16 +219,25 @@ async fn runner_task(
let mut running = true; let mut running = true;
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); let mut run_queue: VecDeque<SecRun> = VecDeque::new();
let mut delay_future: OptionFuture<_> = None.into(); let mut delay_future: OptionFuture<_> = None.into();
let mut paused = false;
while running { while running {
if let Some(current_run) = run_queue.front_mut() { if let Some(current_run) = run_queue.front_mut() {
let done = match current_run.state { let done = match current_run.state {
Waiting => { Waiting => {
if paused {
current_run.pause(&*interface);
} else {
current_run.start(&*interface); current_run.start(&*interface);
delay_future = Some(delay_for(current_run.duration)).into(); delay_future = Some(delay_for(current_run.duration)).into();
}
false false
} }
Running { start_time } => { Running { start_time } => {
if start_time.elapsed() >= current_run.duration { if paused {
current_run.pause(&*interface);
delay_future = None.into();
false
} else if start_time.elapsed() >= current_run.duration {
current_run.finish(&*interface); current_run.finish(&*interface);
delay_future = None.into(); delay_future = None.into();
true true
@ -160,6 +245,13 @@ async fn runner_task(
false false
} }
} }
Paused { .. } => {
if !paused {
current_run.unpause(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false
}
Cancelled | Finished => true, Cancelled | Finished => true,
}; };
@ -176,12 +268,7 @@ async fn runner_task(
match msg { match msg {
Quit => running = false, Quit => running = false,
QueueRun(handle, section, duration) => { QueueRun(handle, section, duration) => {
run_queue.push_back(SecRun { run_queue.push_back(SecRun::new(handle, section, duration));
handle,
section,
duration,
state: Waiting,
});
} }
CancelRun(handle) => { CancelRun(handle) => {
for run in &mut run_queue { for run in &mut run_queue {
@ -200,6 +287,12 @@ async fn runner_task(
run.cancel(&*interface); run.cancel(&*interface);
} }
} }
Pause => {
paused = true;
}
Unpause => {
paused = false;
}
} }
}; };
let delay_done = || { let delay_done = || {
@ -270,11 +363,13 @@ impl SectionRunner {
} }
pub async fn pause(&mut self) -> Result<()> { pub async fn pause(&mut self) -> Result<()> {
todo!() self.msg_send.send(RunnerMsg::Pause).await?;
Ok(())
} }
pub async fn unpause(&mut self) -> Result<()> { pub async fn unpause(&mut self) -> Result<()> {
todo!() self.msg_send.send(RunnerMsg::Unpause).await?;
Ok(())
} }
} }
@ -347,8 +442,10 @@ mod test {
async fn advance(dur: Duration) { async fn advance(dur: Duration) {
// HACK: advance should really be enough, but we need another yield_now // HACK: advance should really be enough, but we need another yield_now
tokio::time::pause();
tokio::time::advance(dur).await; tokio::time::advance(dur).await;
tokio::task::yield_now().await; tokio::task::yield_now().await;
tokio::time::resume();
} }
#[tokio::test] #[tokio::test]
@ -366,12 +463,9 @@ mod test {
tokio::task::yield_now().await; tokio::task::yield_now().await;
pause();
advance(Duration::from_secs(1)).await;
assert_section_states(&interface, &[true, false]); assert_section_states(&interface, &[true, false]);
advance(Duration::from_secs(10)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]); assert_section_states(&interface, &[false, false]);
@ -386,11 +480,11 @@ mod test {
.await .await
.unwrap(); .unwrap();
advance(Duration::from_secs(1)).await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(10)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[true, false]); assert_section_states(&interface, &[true, false]);
@ -398,8 +492,6 @@ mod test {
assert_section_states(&interface, &[false, false]); assert_section_states(&interface, &[false, false]);
resume();
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
@ -424,9 +516,7 @@ mod test {
.await .await
.unwrap(); .unwrap();
pause(); tokio::task::yield_now().await;
advance(Duration::from_secs(1)).await;
assert_section_states(&interface, &[false, true]); assert_section_states(&interface, &[false, true]);
@ -436,12 +526,10 @@ mod test {
assert_section_states(&interface, &[true, false]); assert_section_states(&interface, &[true, false]);
runner.cancel_run(run3).await.unwrap(); runner.cancel_run(run3).await.unwrap();
advance(Duration::from_secs(10)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]); assert_section_states(&interface, &[false, false]);
resume();
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
@ -466,23 +554,76 @@ mod test {
.await .await
.unwrap(); .unwrap();
pause(); tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
advance(Duration::from_secs(1)).await; runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
#[tokio::test]
async fn test_pause() {
let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone());
let _run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
let run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10))
.await
.unwrap();
let _run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_section_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap(); runner.pause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
advance(Duration::from_secs(10)).await;
assert_section_states(&interface, &[false, false]); assert_section_states(&interface, &[false, false]);
runner.cancel_all().await.unwrap(); runner.unpause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(8)).await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(2)).await;
assert_section_states(&interface, &[true, false]);
runner.pause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_section_states(&interface, &[false, false]);
resume(); // cancel paused run
runner.cancel_run(run2).await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
runner.unpause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;

Loading…
Cancel
Save