Browse Source

Change SectionRunner to use VecDeque again

Change SecRun to have a RunState enum with a cancelled state instead
of removing run from the queue when cancelled.
This means we no longer need `drain_filter` which is unstable
drone-volume-cache
Alex Mikhalev 4 years ago
parent
commit
7e24b03f22
  1. 2
      src/main.rs
  2. 81
      src/section_runner.rs

2
src/main.rs

@ -1,5 +1,3 @@
#![feature(drain_filter)]
use color_eyre::eyre::Result; use color_eyre::eyre::Result;
use rusqlite::Connection as DbConnection; use rusqlite::Connection as DbConnection;
use rusqlite::NO_PARAMS; use rusqlite::NO_PARAMS;

81
src/section_runner.rs

@ -2,12 +2,13 @@ use crate::model::SectionRef;
use crate::section_interface::SectionInterface; use crate::section_interface::SectionInterface;
use mpsc::error::SendError; use mpsc::error::SendError;
use std::{ use std::{
collections::LinkedList, collections::VecDeque,
mem::swap,
sync::{ sync::{
atomic::{AtomicI32, Ordering}, atomic::{AtomicI32, Ordering},
Arc, Arc,
}, },
time::Duration, mem::swap, time::Duration,
}; };
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
@ -41,44 +42,50 @@ enum RunnerMsg {
CancelAll, CancelAll,
} }
#[derive(Clone, Debug, PartialEq)]
enum RunState {
Waiting,
Running { start_time: Instant },
Finished,
Cancelled,
}
#[derive(Debug)] #[derive(Debug)]
struct SecRun { struct SecRun {
handle: RunHandle, handle: RunHandle,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
start_time: Option<Instant>, state: RunState,
} }
impl SecRun { impl SecRun {
fn start(&mut self, interface: &dyn SectionInterface) { fn start(&mut self, interface: &dyn SectionInterface) {
use RunState::*;
debug!(section_id = self.section.id, "starting running section"); debug!(section_id = self.section.id, "starting running section");
interface.set_section_state(self.section.interface_id, true); interface.set_section_state(self.section.interface_id, true);
self.start_time = Some(Instant::now()); self.state = Running {
start_time: Instant::now(),
};
}
fn is_running(&self) -> bool {
matches!(self.state, RunState::Running{..})
} }
fn finish(&mut self, interface: &dyn SectionInterface) { fn finish(&mut self, interface: &dyn SectionInterface) {
if self.start_time.is_some() { if self.is_running() {
debug!(section_id = self.section.id, "finished running section"); debug!(section_id = self.section.id, "finished running section");
interface.set_section_state(self.section.interface_id, false); interface.set_section_state(self.section.interface_id, false);
self.start_time = None; self.state = RunState::Finished;
} }
} }
fn cancel(&mut self, interface: &dyn SectionInterface) { fn cancel(&mut self, interface: &dyn SectionInterface) {
if self.start_time.is_some() { if self.is_running() {
debug!(section_id = self.section.id, "cancelling running section"); debug!(section_id = self.section.id, "cancelling running section");
interface.set_section_state(self.section.interface_id, false); interface.set_section_state(self.section.interface_id, false);
self.start_time = None;
} }
} self.state = RunState::Cancelled;
fn elapsed(&self) -> Option<Duration> {
self.start_time.map(|t| t.elapsed())
}
#[allow(dead_code)]
fn is_done(&self) -> Option<bool> {
self.elapsed().map(|elapsed| elapsed >= self.duration)
} }
} }
@ -128,27 +135,36 @@ async fn runner_task(
interface: Arc<dyn SectionInterface + Sync>, interface: Arc<dyn SectionInterface + Sync>,
mut msg_recv: mpsc::Receiver<RunnerMsg>, mut msg_recv: mpsc::Receiver<RunnerMsg>,
) { ) {
use RunState::*;
let span = trace_span!("runner_task"); let span = trace_span!("runner_task");
let _enter = span.enter(); let _enter = span.enter();
let mut running = true; let mut running = true;
let mut run_queue: LinkedList<SecRun> = LinkedList::new(); let mut run_queue: VecDeque<SecRun> = VecDeque::new();
let mut delay_future: OptionFuture<_> = None.into(); let mut delay_future: OptionFuture<_> = None.into();
while running { while running {
if let Some(current_run) = run_queue.front_mut() { if let Some(current_run) = run_queue.front_mut() {
let done = if let Some(start_time) = &current_run.start_time { let done = match current_run.state {
let elapsed = Instant::now() - *start_time; Waiting => {
elapsed >= current_run.duration current_run.start(&*interface);
} else { delay_future = Some(delay_for(current_run.duration)).into();
current_run.start(&*interface); false
delay_future = Some(delay_for(current_run.duration)).into(); }
false Running { start_time } => {
if start_time.elapsed() >= current_run.duration {
current_run.finish(&*interface);
delay_future = None.into();
true
} else {
false
}
}
Cancelled | Finished => true,
}; };
if done { if done {
current_run.finish(&*interface);
run_queue.pop_front(); run_queue.pop_front();
delay_future = None.into();
continue; continue;
} }
} }
@ -164,17 +180,20 @@ async fn runner_task(
handle, handle,
section, section,
duration, duration,
start_time: None, state: Waiting,
}); });
} }
CancelRun(handle) => { CancelRun(handle) => {
for mut run in run_queue.drain_filter(|item| item.handle == handle) { for run in &mut run_queue {
if run.handle != handle {
continue;
}
trace!(handle = handle.0, "cancelling run by handle"); trace!(handle = handle.0, "cancelling run by handle");
run.cancel(&*interface); run.cancel(&*interface);
} }
}, }
CancelAll => { CancelAll => {
let mut old_runs = LinkedList::new(); let mut old_runs = VecDeque::new();
swap(&mut old_runs, &mut run_queue); swap(&mut old_runs, &mut run_queue);
trace!(count = old_runs.len(), "cancelling all runs"); trace!(count = old_runs.len(), "cancelling all runs");
for mut run in old_runs { for mut run in old_runs {

Loading…
Cancel
Save