Browse Source

Add SectionRunner events

master
Alex Mikhalev 4 years ago
parent
commit
bd643607a9
  1. 172
      src/section_runner.rs

172
src/section_runner.rs

@ -1,7 +1,6 @@
use crate::model::SectionRef; use crate::model::SectionRef;
use crate::option_future::OptionFuture; use crate::option_future::OptionFuture;
use crate::section_interface::SectionInterface; use crate::section_interface::SectionInterface;
use mpsc::error::SendError;
use std::{ use std::{
collections::VecDeque, collections::VecDeque,
mem::swap, mem::swap,
@ -14,7 +13,7 @@ use std::{
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
spawn, spawn,
sync::mpsc, sync::{broadcast, mpsc, oneshot},
time::{delay_for, Instant}, time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span, warn}; use tracing::{debug, trace, trace_span, warn};
@ -35,7 +34,7 @@ impl SectionRunnerInner {
} }
} }
#[derive(Clone, Debug)] #[derive(Debug)]
enum RunnerMsg { enum RunnerMsg {
Quit, Quit,
QueueRun(RunHandle, SectionRef, Duration), QueueRun(RunHandle, SectionRef, Duration),
@ -43,8 +42,25 @@ enum RunnerMsg {
CancelAll, CancelAll,
Pause, Pause,
Unpause, Unpause,
Subscribe(oneshot::Sender<RunnerEventRecv>),
}
#[derive(Clone, Debug, PartialEq)]
pub enum RunnerEvent {
RunStart(RunHandle),
RunFinish(RunHandle),
RunPause(RunHandle),
RunUnpause(RunHandle),
RunCancel(RunHandle),
RunnerPause,
RunnerUnpause,
} }
pub type RunnerEventRecv = broadcast::Receiver<RunnerEvent>;
type RunnerEventSend = broadcast::Sender<RunnerEvent>;
const EVENT_CAPACITY: usize = 8;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
enum RunState { enum RunState {
Waiting, Waiting,
@ -95,6 +111,7 @@ struct RunnerTask {
running: bool, running: bool,
delay_future: OptionFuture<tokio::time::Delay>, delay_future: OptionFuture<tokio::time::Delay>,
paused: bool, paused: bool,
event_send: Option<RunnerEventSend>,
} }
impl RunnerTask { impl RunnerTask {
@ -108,6 +125,29 @@ impl RunnerTask {
running: true, running: true,
delay_future: None.into(), delay_future: None.into(),
paused: false, paused: false,
event_send: None,
}
}
fn send_event(&mut self, event: RunnerEvent) {
if let Some(event_send) = &mut self.event_send {
match event_send.send(event) {
Ok(_) => {}
Err(_closed) => {
self.event_send = None;
}
}
}
}
fn subscribe_event(&mut self) -> RunnerEventRecv {
match &mut self.event_send {
Some(event_send) => event_send.subscribe(),
None => {
let (event_send, event_recv) = broadcast::channel(EVENT_CAPACITY);
self.event_send = Some(event_send);
event_recv
}
} }
} }
@ -119,6 +159,7 @@ impl RunnerTask {
run.state = Running { run.state = Running {
start_time: Instant::now(), start_time: Instant::now(),
}; };
self.send_event(RunnerEvent::RunStart(run.handle.clone()));
} }
fn finish_run(&mut self, run: &mut SecRun) { fn finish_run(&mut self, run: &mut SecRun) {
@ -127,6 +168,7 @@ impl RunnerTask {
self.interface self.interface
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
run.state = RunState::Finished; run.state = RunState::Finished;
self.send_event(RunnerEvent::RunFinish(run.handle.clone()));
} else { } else {
warn!( warn!(
section_id = run.section.id, section_id = run.section.id,
@ -143,29 +185,34 @@ impl RunnerTask {
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
} }
run.state = RunState::Cancelled; run.state = RunState::Cancelled;
self.send_event(RunnerEvent::RunCancel(run.handle.clone()));
} }
fn pause_run(&mut self, run: &mut SecRun) { fn pause_run(&mut self, run: &mut SecRun) {
use RunState::*; use RunState::*;
match run.state { let new_state = match run.state {
Running { start_time } => { Running { start_time } => {
debug!(section_id = run.section.id, "pausing running section"); debug!(section_id = run.section.id, "pausing running section");
self.interface self.interface
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
run.state = Paused { Paused {
start_time, start_time,
pause_time: Instant::now(), pause_time: Instant::now(),
}; }
} }
Waiting => { Waiting => {
debug!(section_id = run.section.id, "pausing waiting section"); debug!(section_id = run.section.id, "pausing waiting section");
run.state = Paused { Paused {
start_time: Instant::now(), start_time: Instant::now(),
pause_time: Instant::now(), pause_time: Instant::now(),
}; }
} }
Finished | Cancelled | Paused { .. } => {} Finished | Cancelled | Paused { .. } => {
} return;
}
};
run.state = new_state;
self.send_event(RunnerEvent::RunPause(run.handle.clone()));
} }
fn unpause_run(&mut self, run: &mut SecRun) { fn unpause_run(&mut self, run: &mut SecRun) {
@ -183,6 +230,7 @@ impl RunnerTask {
}; };
let ran_for = pause_time - start_time; let ran_for = pause_time - start_time;
run.duration -= ran_for; run.duration -= ran_for;
self.send_event(RunnerEvent::RunUnpause(run.handle.clone()));
} }
Waiting | Finished | Cancelled | Running { .. } => { Waiting | Finished | Cancelled | Running { .. } => {
warn!( warn!(
@ -265,9 +313,16 @@ impl RunnerTask {
} }
Pause => { Pause => {
self.paused = true; self.paused = true;
self.send_event(RunnerEvent::RunnerPause);
} }
Unpause => { Unpause => {
self.paused = false; self.paused = false;
self.send_event(RunnerEvent::RunnerUnpause);
}
Subscribe(res_send) => {
let event_recv = self.subscribe_event();
// Ignore error if channel closed
let _ = res_send.send(event_recv);
} }
} }
} }
@ -297,8 +352,14 @@ pub struct ChannelClosed;
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>; pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>;
impl<T> From<SendError<T>> for ChannelClosed { impl<T> From<mpsc::error::SendError<T>> for ChannelClosed {
fn from(_: SendError<T>) -> Self { fn from(_: mpsc::error::SendError<T>) -> Self {
Self
}
}
impl From<oneshot::error::RecvError> for ChannelClosed {
fn from(_: oneshot::error::RecvError) -> Self {
Self Self
} }
} }
@ -357,6 +418,13 @@ impl SectionRunner {
self.msg_send.send(RunnerMsg::Unpause).await?; self.msg_send.send(RunnerMsg::Unpause).await?;
Ok(()) Ok(())
} }
pub async fn subscribe(&mut self) -> Result<RunnerEventRecv> {
let (res_send, res_recv) = oneshot::channel();
self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?;
let event_recv = res_recv.await?;
Ok(event_recv)
}
} }
#[cfg(test)] #[cfg(test)]
@ -613,4 +681,84 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
} }
#[tokio::test]
async fn test_event() {
let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone());
let mut event_recv = runner.subscribe().await.unwrap();
let run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
let run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10))
.await
.unwrap();
let run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10))
.await
.unwrap();
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunStart(run1.clone()))
);
runner.pause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause));
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunPause(run1.clone()))
);
runner.unpause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause));
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunUnpause(run1.clone()))
);
advance(Duration::from_secs(11)).await;
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run1)));
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunStart(run2.clone()))
);
runner.pause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause));
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunPause(run2.clone()))
);
// cancel paused run
runner.cancel_run(run2.clone()).await.unwrap();
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunCancel(run2.clone()))
);
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunPause(run3.clone()))
);
runner.unpause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause));
assert_eq!(
event_recv.recv().await,
Ok(RunnerEvent::RunUnpause(run3.clone()))
);
advance(Duration::from_secs(11)).await;
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run3)));
runner.quit().await.unwrap();
tokio::task::yield_now().await;
}
} }

Loading…
Cancel
Save