Add test for SectionRunner pause/unpause #3

Merged
amikhalev merged 3 commits from section-runner-pause into master 4 years ago
  1. 201
      src/section_runner.rs

201
src/section_runner.rs

@ -16,7 +16,7 @@ use tokio::{ @@ -16,7 +16,7 @@ use tokio::{
sync::mpsc,
time::{delay_for, Instant},
};
use tracing::{debug, trace, trace_span};
use tracing::{debug, trace, trace_span, warn};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunHandle(i32);
@ -40,14 +40,22 @@ enum RunnerMsg { @@ -40,14 +40,22 @@ enum RunnerMsg {
QueueRun(RunHandle, SectionRef, Duration),
CancelRun(RunHandle),
CancelAll,
Pause,
Unpause,
}
#[derive(Clone, Debug, PartialEq)]
enum RunState {
Waiting,
Running { start_time: Instant },
Running {
start_time: Instant,
},
Finished,
Cancelled,
Paused {
start_time: Instant,
pause_time: Instant,
},
}
#[derive(Debug)]
@ -55,10 +63,21 @@ struct SecRun { @@ -55,10 +63,21 @@ struct SecRun {
handle: RunHandle,
section: SectionRef,
duration: Duration,
total_duration: Duration,
state: RunState,
}
impl SecRun {
fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self {
Self {
handle,
section,
duration,
total_duration: duration,
state: RunState::Waiting,
}
}
fn start(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
debug!(section_id = self.section.id, "starting running section");
@ -72,11 +91,21 @@ impl SecRun { @@ -72,11 +91,21 @@ impl SecRun {
matches!(self.state, RunState::Running{..})
}
fn is_paused(&self) -> bool {
matches!(self.state, RunState::Paused{..})
}
fn finish(&mut self, interface: &dyn SectionInterface) {
if self.is_running() {
debug!(section_id = self.section.id, "finished running section");
interface.set_section_state(self.section.interface_id, false);
self.state = RunState::Finished;
} else {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"cannot finish run which is not running"
);
}
}
@ -87,6 +116,53 @@ impl SecRun { @@ -87,6 +116,53 @@ impl SecRun {
}
self.state = RunState::Cancelled;
}
fn pause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Running { start_time } => {
debug!(section_id = self.section.id, "pausing running section");
interface.set_section_state(self.section.interface_id, false);
self.state = Paused {
start_time,
pause_time: Instant::now(),
};
}
Waiting => {
debug!(section_id = self.section.id, "pausing waiting section");
self.state = Paused {
start_time: Instant::now(),
pause_time: Instant::now(),
};
}
Finished | Cancelled | Paused { .. } => {}
}
}
fn unpause(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
match self.state {
Paused {
start_time,
pause_time,
} => {
debug!(section_id = self.section.id, "unpausing section");
interface.set_section_state(self.section.interface_id, true);
self.state = Running {
start_time: Instant::now(),
};
let ran_for = pause_time - start_time;
self.duration = self.duration - ran_for;
}
Waiting | Finished | Cancelled | Running { .. } => {
warn!(
section_id = self.section.id,
state = debug(&self.state),
"can only unpause paused section"
);
}
}
}
}
mod option_future {
@ -143,16 +219,25 @@ async fn runner_task( @@ -143,16 +219,25 @@ async fn runner_task(
let mut running = true;
let mut run_queue: VecDeque<SecRun> = VecDeque::new();
let mut delay_future: OptionFuture<_> = None.into();
let mut paused = false;
while running {
if let Some(current_run) = run_queue.front_mut() {
let done = match current_run.state {
Waiting => {
if paused {
current_run.pause(&*interface);
} else {
current_run.start(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false
}
Running { start_time } => {
if start_time.elapsed() >= current_run.duration {
if paused {
current_run.pause(&*interface);
delay_future = None.into();
false
} else if start_time.elapsed() >= current_run.duration {
current_run.finish(&*interface);
delay_future = None.into();
true
@ -160,6 +245,13 @@ async fn runner_task( @@ -160,6 +245,13 @@ async fn runner_task(
false
}
}
Paused { .. } => {
if !paused {
current_run.unpause(&*interface);
delay_future = Some(delay_for(current_run.duration)).into();
}
false
}
Cancelled | Finished => true,
};
@ -176,12 +268,7 @@ async fn runner_task( @@ -176,12 +268,7 @@ async fn runner_task(
match msg {
Quit => running = false,
QueueRun(handle, section, duration) => {
run_queue.push_back(SecRun {
handle,
section,
duration,
state: Waiting,
});
run_queue.push_back(SecRun::new(handle, section, duration));
}
CancelRun(handle) => {
for run in &mut run_queue {
@ -200,6 +287,12 @@ async fn runner_task( @@ -200,6 +287,12 @@ async fn runner_task(
run.cancel(&*interface);
}
}
Pause => {
paused = true;
}
Unpause => {
paused = false;
}
}
};
let delay_done = || {
@ -270,11 +363,13 @@ impl SectionRunner { @@ -270,11 +363,13 @@ impl SectionRunner {
}
pub async fn pause(&mut self) -> Result<()> {
todo!()
self.msg_send.send(RunnerMsg::Pause).await?;
Ok(())
}
pub async fn unpause(&mut self) -> Result<()> {
todo!()
self.msg_send.send(RunnerMsg::Unpause).await?;
Ok(())
}
}
@ -347,8 +442,10 @@ mod test { @@ -347,8 +442,10 @@ mod test {
async fn advance(dur: Duration) {
// HACK: advance should really be enough, but we need another yield_now
tokio::time::pause();
tokio::time::advance(dur).await;
tokio::task::yield_now().await;
tokio::time::resume();
}
#[tokio::test]
@ -366,12 +463,9 @@ mod test { @@ -366,12 +463,9 @@ mod test {
tokio::task::yield_now().await;
pause();
advance(Duration::from_secs(1)).await;
assert_section_states(&interface, &[true, false]);
advance(Duration::from_secs(10)).await;
advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]);
@ -386,11 +480,11 @@ mod test { @@ -386,11 +480,11 @@ mod test {
.await
.unwrap();
advance(Duration::from_secs(1)).await;
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(10)).await;
advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[true, false]);
@ -398,8 +492,6 @@ mod test { @@ -398,8 +492,6 @@ mod test {
assert_section_states(&interface, &[false, false]);
resume();
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
@ -424,9 +516,7 @@ mod test { @@ -424,9 +516,7 @@ mod test {
.await
.unwrap();
pause();
advance(Duration::from_secs(1)).await;
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
@ -436,12 +526,10 @@ mod test { @@ -436,12 +526,10 @@ mod test {
assert_section_states(&interface, &[true, false]);
runner.cancel_run(run3).await.unwrap();
advance(Duration::from_secs(10)).await;
advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]);
resume();
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
@ -466,23 +554,76 @@ mod test { @@ -466,23 +554,76 @@ mod test {
.await
.unwrap();
pause();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
advance(Duration::from_secs(1)).await;
runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
#[tokio::test]
async fn test_pause() {
let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone());
let _run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
let run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10))
.await
.unwrap();
let _run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap();
runner.pause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
advance(Duration::from_secs(10)).await;
assert_section_states(&interface, &[false, false]);
runner.cancel_all().await.unwrap();
runner.unpause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(8)).await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(2)).await;
assert_section_states(&interface, &[true, false]);
runner.pause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
resume();
// cancel paused run
runner.cancel_run(run2).await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
runner.unpause().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]);
advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]);
runner.quit().await.unwrap();
tokio::task::yield_now().await;

Loading…
Cancel
Save