Browse Source

Add cancel_all test

drone-volume-cache
Alex Mikhalev 4 years ago
parent
commit
43f1e59516
  1. 98
      src/section_runner.rs

98
src/section_runner.rs

@ -2,7 +2,7 @@ use crate::model::SectionRef;
use crate::section_interface::SectionInterface; use crate::section_interface::SectionInterface;
use mpsc::error::SendError; use mpsc::error::SendError;
use std::{ use std::{
collections::VecDeque, collections::LinkedList,
sync::{ sync::{
atomic::{AtomicI32, Ordering}, atomic::{AtomicI32, Ordering},
Arc, Arc,
@ -17,7 +17,7 @@ use tokio::{
}; };
use tracing::{debug, trace, trace_span}; use tracing::{debug, trace, trace_span};
#[derive(Debug, Clone)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunHandle(i32); pub struct RunHandle(i32);
#[derive(Debug)] #[derive(Debug)]
@ -37,6 +37,7 @@ impl SectionRunnerInner {
enum RunnerMsg { enum RunnerMsg {
Quit, Quit,
QueueRun(RunHandle, SectionRef, Duration), QueueRun(RunHandle, SectionRef, Duration),
CancelRun(RunHandle),
} }
#[derive(Debug)] #[derive(Debug)]
@ -47,6 +48,38 @@ struct SecRun {
start_time: Option<Instant>, start_time: Option<Instant>,
} }
impl SecRun {
fn start(&mut self, interface: &dyn SectionInterface) {
debug!(section_id = self.section.id, "starting running section");
interface.set_section_state(self.section.interface_id, true);
self.start_time = Some(Instant::now());
}
fn finish(&mut self, interface: &dyn SectionInterface) {
if self.start_time.is_some() {
debug!(section_id = self.section.id, "finished running section");
interface.set_section_state(self.section.interface_id, false);
self.start_time = None;
}
}
fn cancel(&mut self, interface: &dyn SectionInterface) {
if self.start_time.is_some() {
debug!(section_id = self.section.id, "cancelling running section");
interface.set_section_state(self.section.interface_id, false);
self.start_time = None;
}
}
fn elapsed(&self) -> Option<Duration> {
self.start_time.map(|t| t.elapsed())
}
fn is_done(&self) -> Option<bool> {
self.elapsed().map(|elapsed| elapsed >= self.duration)
}
}
mod option_future { mod option_future {
use pin_project::pin_project; use pin_project::pin_project;
use std::{ use std::{
@ -97,25 +130,21 @@ async fn runner_task(
let _enter = span.enter(); let _enter = span.enter();
let mut running = true; let mut running = true;
let mut run_queue: VecDeque<SecRun> = VecDeque::new(); let mut run_queue: LinkedList<SecRun> = LinkedList::new();
let mut delay_future: OptionFuture<_> = None.into(); let mut delay_future: OptionFuture<_> = None.into();
while running { while running {
if let Some(current_run) = run_queue.front_mut() { if let Some(current_run) = run_queue.front_mut() {
let current_sec = &current_run.section;
let done = if let Some(start_time) = &current_run.start_time { let done = if let Some(start_time) = &current_run.start_time {
let elapsed = Instant::now() - *start_time; let elapsed = Instant::now() - *start_time;
elapsed >= current_run.duration elapsed >= current_run.duration
} else { } else {
debug!(section_id = current_sec.id, "starting running section"); current_run.start(&*interface);
interface.set_section_state(current_sec.interface_id, true);
current_run.start_time = Some(Instant::now());
delay_future = Some(delay_for(current_run.duration)).into(); delay_future = Some(delay_for(current_run.duration)).into();
false false
}; };
if done { if done {
debug!(section_id = current_sec.id, "finished running section"); current_run.finish(&*interface);
interface.set_section_state(current_sec.interface_id, false);
run_queue.pop_front(); run_queue.pop_front();
delay_future = None.into(); delay_future = None.into();
continue; continue;
@ -136,6 +165,12 @@ async fn runner_task(
start_time: None, start_time: None,
}); });
} }
CancelRun(handle) => {
for mut run in run_queue.drain_filter(|item| item.handle == handle) {
trace!(handle = handle.0, "cancelling run by handle");
run.cancel(&*interface);
}
}
} }
}; };
let delay_done = || { let delay_done = || {
@ -195,7 +230,8 @@ impl SectionRunner {
} }
pub async fn cancel_run(&mut self, handle: RunHandle) -> Result<()> { pub async fn cancel_run(&mut self, handle: RunHandle) -> Result<()> {
todo!() self.msg_send.send(RunnerMsg::CancelRun(handle)).await?;
Ok(())
} }
pub async fn cancel_all(&mut self) -> Result<()> { pub async fn cancel_all(&mut self) -> Result<()> {
@ -378,4 +414,46 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
#[tokio::test]
async fn test_cancel_all() {
let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone());
runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
runner
.queue_run(sections[0].clone(), Duration::from_secs(10))
.await
.unwrap();
runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
pause();
advance(Duration::from_secs(1)).await;
assert_section_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
runner.cancel_all().await.unwrap();
tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]);
resume();
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
} }

Loading…
Cancel
Save