|
|
|
@ -16,7 +16,7 @@ use tokio::{
@@ -16,7 +16,7 @@ use tokio::{
|
|
|
|
|
sync::mpsc, |
|
|
|
|
time::{delay_for, Instant}, |
|
|
|
|
}; |
|
|
|
|
use tracing::{debug, trace, trace_span}; |
|
|
|
|
use tracing::{debug, trace, trace_span, warn}; |
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, PartialEq, Eq)] |
|
|
|
|
pub struct RunHandle(i32); |
|
|
|
@ -40,14 +40,22 @@ enum RunnerMsg {
@@ -40,14 +40,22 @@ enum RunnerMsg {
|
|
|
|
|
QueueRun(RunHandle, SectionRef, Duration), |
|
|
|
|
CancelRun(RunHandle), |
|
|
|
|
CancelAll, |
|
|
|
|
Pause, |
|
|
|
|
Unpause, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Clone, Debug, PartialEq)] |
|
|
|
|
enum RunState { |
|
|
|
|
Waiting, |
|
|
|
|
Running { start_time: Instant }, |
|
|
|
|
Running { |
|
|
|
|
start_time: Instant, |
|
|
|
|
}, |
|
|
|
|
Finished, |
|
|
|
|
Cancelled, |
|
|
|
|
Paused { |
|
|
|
|
start_time: Instant, |
|
|
|
|
pause_time: Instant, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[derive(Debug)] |
|
|
|
@ -55,10 +63,21 @@ struct SecRun {
@@ -55,10 +63,21 @@ struct SecRun {
|
|
|
|
|
handle: RunHandle, |
|
|
|
|
section: SectionRef, |
|
|
|
|
duration: Duration, |
|
|
|
|
total_duration: Duration, |
|
|
|
|
state: RunState, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
impl SecRun { |
|
|
|
|
fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { |
|
|
|
|
Self { |
|
|
|
|
handle, |
|
|
|
|
section, |
|
|
|
|
duration, |
|
|
|
|
total_duration: duration, |
|
|
|
|
state: RunState::Waiting, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn start(&mut self, interface: &dyn SectionInterface) { |
|
|
|
|
use RunState::*; |
|
|
|
|
debug!(section_id = self.section.id, "starting running section"); |
|
|
|
@ -72,11 +91,21 @@ impl SecRun {
@@ -72,11 +91,21 @@ impl SecRun {
|
|
|
|
|
matches!(self.state, RunState::Running{..}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn is_paused(&self) -> bool { |
|
|
|
|
matches!(self.state, RunState::Paused{..}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn finish(&mut self, interface: &dyn SectionInterface) { |
|
|
|
|
if self.is_running() { |
|
|
|
|
debug!(section_id = self.section.id, "finished running section"); |
|
|
|
|
interface.set_section_state(self.section.interface_id, false); |
|
|
|
|
self.state = RunState::Finished; |
|
|
|
|
} else { |
|
|
|
|
warn!( |
|
|
|
|
section_id = self.section.id, |
|
|
|
|
state = debug(&self.state), |
|
|
|
|
"cannot finish run which is not running" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -87,6 +116,53 @@ impl SecRun {
@@ -87,6 +116,53 @@ impl SecRun {
|
|
|
|
|
} |
|
|
|
|
self.state = RunState::Cancelled; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn pause(&mut self, interface: &dyn SectionInterface) { |
|
|
|
|
use RunState::*; |
|
|
|
|
match self.state { |
|
|
|
|
Running { start_time } => { |
|
|
|
|
debug!(section_id = self.section.id, "pausing running section"); |
|
|
|
|
interface.set_section_state(self.section.interface_id, false); |
|
|
|
|
self.state = Paused { |
|
|
|
|
start_time, |
|
|
|
|
pause_time: Instant::now(), |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
Waiting => { |
|
|
|
|
debug!(section_id = self.section.id, "pausing waiting section"); |
|
|
|
|
self.state = Paused { |
|
|
|
|
start_time: Instant::now(), |
|
|
|
|
pause_time: Instant::now(), |
|
|
|
|
}; |
|
|
|
|
} |
|
|
|
|
Finished | Cancelled | Paused { .. } => {} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
fn unpause(&mut self, interface: &dyn SectionInterface) { |
|
|
|
|
use RunState::*; |
|
|
|
|
match self.state { |
|
|
|
|
Paused { |
|
|
|
|
start_time, |
|
|
|
|
pause_time, |
|
|
|
|
} => { |
|
|
|
|
debug!(section_id = self.section.id, "unpausing section"); |
|
|
|
|
interface.set_section_state(self.section.interface_id, true); |
|
|
|
|
self.state = Running { |
|
|
|
|
start_time: Instant::now(), |
|
|
|
|
}; |
|
|
|
|
let ran_for = pause_time - start_time; |
|
|
|
|
self.duration = self.duration - ran_for; |
|
|
|
|
} |
|
|
|
|
Waiting | Finished | Cancelled | Running { .. } => { |
|
|
|
|
warn!( |
|
|
|
|
section_id = self.section.id, |
|
|
|
|
state = debug(&self.state), |
|
|
|
|
"can only unpause paused section" |
|
|
|
|
); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
mod option_future { |
|
|
|
@ -143,16 +219,25 @@ async fn runner_task(
@@ -143,16 +219,25 @@ async fn runner_task(
|
|
|
|
|
let mut running = true; |
|
|
|
|
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); |
|
|
|
|
let mut delay_future: OptionFuture<_> = None.into(); |
|
|
|
|
let mut paused = false; |
|
|
|
|
while running { |
|
|
|
|
if let Some(current_run) = run_queue.front_mut() { |
|
|
|
|
let done = match current_run.state { |
|
|
|
|
Waiting => { |
|
|
|
|
if paused { |
|
|
|
|
current_run.pause(&*interface); |
|
|
|
|
} else { |
|
|
|
|
current_run.start(&*interface); |
|
|
|
|
delay_future = Some(delay_for(current_run.duration)).into(); |
|
|
|
|
} |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
Running { start_time } => { |
|
|
|
|
if start_time.elapsed() >= current_run.duration { |
|
|
|
|
if paused { |
|
|
|
|
current_run.pause(&*interface); |
|
|
|
|
delay_future = None.into(); |
|
|
|
|
false |
|
|
|
|
} else if start_time.elapsed() >= current_run.duration { |
|
|
|
|
current_run.finish(&*interface); |
|
|
|
|
delay_future = None.into(); |
|
|
|
|
true |
|
|
|
@ -160,6 +245,13 @@ async fn runner_task(
@@ -160,6 +245,13 @@ async fn runner_task(
|
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Paused { .. } => { |
|
|
|
|
if !paused { |
|
|
|
|
current_run.unpause(&*interface); |
|
|
|
|
delay_future = Some(delay_for(current_run.duration)).into(); |
|
|
|
|
} |
|
|
|
|
false |
|
|
|
|
} |
|
|
|
|
Cancelled | Finished => true, |
|
|
|
|
}; |
|
|
|
|
|
|
|
|
@ -176,12 +268,7 @@ async fn runner_task(
@@ -176,12 +268,7 @@ async fn runner_task(
|
|
|
|
|
match msg { |
|
|
|
|
Quit => running = false, |
|
|
|
|
QueueRun(handle, section, duration) => { |
|
|
|
|
run_queue.push_back(SecRun { |
|
|
|
|
handle, |
|
|
|
|
section, |
|
|
|
|
duration, |
|
|
|
|
state: Waiting, |
|
|
|
|
}); |
|
|
|
|
run_queue.push_back(SecRun::new(handle, section, duration)); |
|
|
|
|
} |
|
|
|
|
CancelRun(handle) => { |
|
|
|
|
for run in &mut run_queue { |
|
|
|
@ -200,6 +287,12 @@ async fn runner_task(
@@ -200,6 +287,12 @@ async fn runner_task(
|
|
|
|
|
run.cancel(&*interface); |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
Pause => { |
|
|
|
|
paused = true; |
|
|
|
|
} |
|
|
|
|
Unpause => { |
|
|
|
|
paused = false; |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
}; |
|
|
|
|
let delay_done = || { |
|
|
|
@ -270,11 +363,13 @@ impl SectionRunner {
@@ -270,11 +363,13 @@ impl SectionRunner {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub async fn pause(&mut self) -> Result<()> { |
|
|
|
|
todo!() |
|
|
|
|
self.msg_send.send(RunnerMsg::Pause).await?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
pub async fn unpause(&mut self) -> Result<()> { |
|
|
|
|
todo!() |
|
|
|
|
self.msg_send.send(RunnerMsg::Unpause).await?; |
|
|
|
|
Ok(()) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -347,8 +442,10 @@ mod test {
@@ -347,8 +442,10 @@ mod test {
|
|
|
|
|
|
|
|
|
|
async fn advance(dur: Duration) { |
|
|
|
|
// HACK: advance should really be enough, but we need another yield_now
|
|
|
|
|
tokio::time::pause(); |
|
|
|
|
tokio::time::advance(dur).await; |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
tokio::time::resume(); |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
@ -366,12 +463,9 @@ mod test {
@@ -366,12 +463,9 @@ mod test {
|
|
|
|
|
|
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
|
|
|
|
|
pause(); |
|
|
|
|
advance(Duration::from_secs(1)).await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[true, false]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(10)).await; |
|
|
|
|
advance(Duration::from_secs(11)).await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
@ -386,11 +480,11 @@ mod test {
@@ -386,11 +480,11 @@ mod test {
|
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(1)).await; |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(10)).await; |
|
|
|
|
advance(Duration::from_secs(11)).await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[true, false]); |
|
|
|
|
|
|
|
|
@ -398,8 +492,6 @@ mod test {
@@ -398,8 +492,6 @@ mod test {
|
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
resume(); |
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
} |
|
|
|
@ -424,9 +516,7 @@ mod test {
@@ -424,9 +516,7 @@ mod test {
|
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
pause(); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(1)).await; |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
@ -436,12 +526,10 @@ mod test {
@@ -436,12 +526,10 @@ mod test {
|
|
|
|
|
assert_section_states(&interface, &[true, false]); |
|
|
|
|
|
|
|
|
|
runner.cancel_run(run3).await.unwrap(); |
|
|
|
|
advance(Duration::from_secs(10)).await; |
|
|
|
|
advance(Duration::from_secs(11)).await; |
|
|
|
|
|
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
resume(); |
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
} |
|
|
|
@ -466,23 +554,76 @@ mod test {
@@ -466,23 +554,76 @@ mod test {
|
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
pause(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
runner.cancel_all().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(1)).await; |
|
|
|
|
runner.cancel_all().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
|
async fn test_pause() { |
|
|
|
|
let (sections, interface) = make_sections_and_interface(); |
|
|
|
|
let mut runner = SectionRunner::new(interface.clone()); |
|
|
|
|
|
|
|
|
|
let _run1 = runner |
|
|
|
|
.queue_run(sections[1].clone(), Duration::from_secs(10)) |
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
let run2 = runner |
|
|
|
|
.queue_run(sections[0].clone(), Duration::from_secs(10)) |
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
let _run3 = runner |
|
|
|
|
.queue_run(sections[1].clone(), Duration::from_secs(10)) |
|
|
|
|
.await |
|
|
|
|
.unwrap(); |
|
|
|
|
|
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
runner.cancel_all().await.unwrap(); |
|
|
|
|
runner.pause().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(10)).await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
runner.cancel_all().await.unwrap(); |
|
|
|
|
runner.unpause().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(8)).await; |
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(2)).await; |
|
|
|
|
assert_section_states(&interface, &[true, false]); |
|
|
|
|
|
|
|
|
|
runner.pause().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
resume(); |
|
|
|
|
// cancel paused run
|
|
|
|
|
runner.cancel_run(run2).await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
runner.unpause().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|
assert_section_states(&interface, &[false, true]); |
|
|
|
|
|
|
|
|
|
advance(Duration::from_secs(11)).await; |
|
|
|
|
assert_section_states(&interface, &[false, false]); |
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
tokio::task::yield_now().await; |
|
|
|
|