From bd643607a9ebe0cde64905425d70e2b0a82749f2 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Wed, 16 Sep 2020 21:23:05 -0600 Subject: [PATCH] Add SectionRunner events --- src/section_runner.rs | 172 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 160 insertions(+), 12 deletions(-) diff --git a/src/section_runner.rs b/src/section_runner.rs index 106b339..ce0d9c6 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -1,7 +1,6 @@ use crate::model::SectionRef; use crate::option_future::OptionFuture; use crate::section_interface::SectionInterface; -use mpsc::error::SendError; use std::{ collections::VecDeque, mem::swap, @@ -14,7 +13,7 @@ use std::{ use thiserror::Error; use tokio::{ spawn, - sync::mpsc, + sync::{broadcast, mpsc, oneshot}, time::{delay_for, Instant}, }; use tracing::{debug, trace, trace_span, warn}; @@ -35,7 +34,7 @@ impl SectionRunnerInner { } } -#[derive(Clone, Debug)] +#[derive(Debug)] enum RunnerMsg { Quit, QueueRun(RunHandle, SectionRef, Duration), @@ -43,8 +42,25 @@ enum RunnerMsg { CancelAll, Pause, Unpause, + Subscribe(oneshot::Sender), +} + +#[derive(Clone, Debug, PartialEq)] +pub enum RunnerEvent { + RunStart(RunHandle), + RunFinish(RunHandle), + RunPause(RunHandle), + RunUnpause(RunHandle), + RunCancel(RunHandle), + RunnerPause, + RunnerUnpause, } +pub type RunnerEventRecv = broadcast::Receiver; +type RunnerEventSend = broadcast::Sender; + +const EVENT_CAPACITY: usize = 8; + #[derive(Clone, Debug, PartialEq)] enum RunState { Waiting, @@ -95,6 +111,7 @@ struct RunnerTask { running: bool, delay_future: OptionFuture, paused: bool, + event_send: Option, } impl RunnerTask { @@ -108,6 +125,29 @@ impl RunnerTask { running: true, delay_future: None.into(), paused: false, + event_send: None, + } + } + + fn send_event(&mut self, event: RunnerEvent) { + if let Some(event_send) = &mut self.event_send { + match event_send.send(event) { + Ok(_) => {} + Err(_closed) => { + self.event_send = None; + } + } + } + } + + fn subscribe_event(&mut self) -> RunnerEventRecv { + match &mut self.event_send { + Some(event_send) => event_send.subscribe(), + None => { + let (event_send, event_recv) = broadcast::channel(EVENT_CAPACITY); + self.event_send = Some(event_send); + event_recv + } } } @@ -119,6 +159,7 @@ impl RunnerTask { run.state = Running { start_time: Instant::now(), }; + self.send_event(RunnerEvent::RunStart(run.handle.clone())); } fn finish_run(&mut self, run: &mut SecRun) { @@ -127,6 +168,7 @@ impl RunnerTask { self.interface .set_section_state(run.section.interface_id, false); run.state = RunState::Finished; + self.send_event(RunnerEvent::RunFinish(run.handle.clone())); } else { warn!( section_id = run.section.id, @@ -143,29 +185,34 @@ impl RunnerTask { .set_section_state(run.section.interface_id, false); } run.state = RunState::Cancelled; + self.send_event(RunnerEvent::RunCancel(run.handle.clone())); } fn pause_run(&mut self, run: &mut SecRun) { use RunState::*; - match run.state { + let new_state = match run.state { Running { start_time } => { debug!(section_id = run.section.id, "pausing running section"); self.interface .set_section_state(run.section.interface_id, false); - run.state = Paused { + Paused { start_time, pause_time: Instant::now(), - }; + } } Waiting => { debug!(section_id = run.section.id, "pausing waiting section"); - run.state = Paused { + Paused { start_time: Instant::now(), pause_time: Instant::now(), - }; + } } - Finished | Cancelled | Paused { .. } => {} - } + Finished | Cancelled | Paused { .. } => { + return; + } + }; + run.state = new_state; + self.send_event(RunnerEvent::RunPause(run.handle.clone())); } fn unpause_run(&mut self, run: &mut SecRun) { @@ -183,6 +230,7 @@ impl RunnerTask { }; let ran_for = pause_time - start_time; run.duration -= ran_for; + self.send_event(RunnerEvent::RunUnpause(run.handle.clone())); } Waiting | Finished | Cancelled | Running { .. } => { warn!( @@ -265,9 +313,16 @@ impl RunnerTask { } Pause => { self.paused = true; + self.send_event(RunnerEvent::RunnerPause); } Unpause => { self.paused = false; + self.send_event(RunnerEvent::RunnerUnpause); + } + Subscribe(res_send) => { + let event_recv = self.subscribe_event(); + // Ignore error if channel closed + let _ = res_send.send(event_recv); } } } @@ -297,8 +352,14 @@ pub struct ChannelClosed; pub type Result = std::result::Result; -impl From> for ChannelClosed { - fn from(_: SendError) -> Self { +impl From> for ChannelClosed { + fn from(_: mpsc::error::SendError) -> Self { + Self + } +} + +impl From for ChannelClosed { + fn from(_: oneshot::error::RecvError) -> Self { Self } } @@ -357,6 +418,13 @@ impl SectionRunner { self.msg_send.send(RunnerMsg::Unpause).await?; Ok(()) } + + pub async fn subscribe(&mut self) -> Result { + let (res_send, res_recv) = oneshot::channel(); + self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; + let event_recv = res_recv.await?; + Ok(event_recv) + } } #[cfg(test)] @@ -613,4 +681,84 @@ mod test { runner.quit().await.unwrap(); tokio::task::yield_now().await; } + + #[tokio::test] + async fn test_event() { + let (sections, interface) = make_sections_and_interface(); + let mut runner = SectionRunner::new(interface.clone()); + + let mut event_recv = runner.subscribe().await.unwrap(); + + let run1 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + let run2 = runner + .queue_run(sections[0].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + let run3 = runner + .queue_run(sections[1].clone(), Duration::from_secs(10)) + .await + .unwrap(); + + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunStart(run1.clone())) + ); + + runner.pause().await.unwrap(); + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunPause(run1.clone())) + ); + + runner.unpause().await.unwrap(); + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunUnpause(run1.clone())) + ); + + advance(Duration::from_secs(11)).await; + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run1))); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunStart(run2.clone())) + ); + + runner.pause().await.unwrap(); + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunPause(run2.clone())) + ); + + // cancel paused run + runner.cancel_run(run2.clone()).await.unwrap(); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunCancel(run2.clone())) + ); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunPause(run3.clone())) + ); + + runner.unpause().await.unwrap(); + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); + assert_eq!( + event_recv.recv().await, + Ok(RunnerEvent::RunUnpause(run3.clone())) + ); + + advance(Duration::from_secs(11)).await; + assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run3))); + + runner.quit().await.unwrap(); + tokio::task::yield_now().await; + } }