diff --git a/src/section_runner.rs b/src/section_runner.rs index ce0d9c6..5f610f1 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -19,7 +19,7 @@ use tokio::{ use tracing::{debug, trace, trace_span, warn}; #[derive(Debug, Clone, PartialEq, Eq)] -pub struct RunHandle(i32); +pub struct SectionRunHandle(i32); #[derive(Debug)] struct SectionRunnerInner { @@ -37,27 +37,27 @@ impl SectionRunnerInner { #[derive(Debug)] enum RunnerMsg { Quit, - QueueRun(RunHandle, SectionRef, Duration), - CancelRun(RunHandle), + QueueRun(SectionRunHandle, SectionRef, Duration), + CancelRun(SectionRunHandle), CancelAll, Pause, Unpause, - Subscribe(oneshot::Sender), + Subscribe(oneshot::Sender), } #[derive(Clone, Debug, PartialEq)] -pub enum RunnerEvent { - RunStart(RunHandle), - RunFinish(RunHandle), - RunPause(RunHandle), - RunUnpause(RunHandle), - RunCancel(RunHandle), +pub enum SectionEvent { + RunStart(SectionRunHandle), + RunFinish(SectionRunHandle), + RunPause(SectionRunHandle), + RunUnpause(SectionRunHandle), + RunCancel(SectionRunHandle), RunnerPause, RunnerUnpause, } -pub type RunnerEventRecv = broadcast::Receiver; -type RunnerEventSend = broadcast::Sender; +pub type SectionEventRecv = broadcast::Receiver; +type SectionEventSend = broadcast::Sender; const EVENT_CAPACITY: usize = 8; @@ -77,7 +77,7 @@ enum RunState { #[derive(Debug)] struct SecRun { - handle: RunHandle, + handle: SectionRunHandle, section: SectionRef, duration: Duration, total_duration: Duration, @@ -85,7 +85,7 @@ struct SecRun { } impl SecRun { - fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { + fn new(handle: SectionRunHandle, section: SectionRef, duration: Duration) -> Self { Self { handle, section, @@ -111,7 +111,7 @@ struct RunnerTask { running: bool, delay_future: OptionFuture, paused: bool, - event_send: Option, + event_send: Option, } impl RunnerTask { @@ -129,7 +129,7 @@ impl RunnerTask { } } - fn send_event(&mut self, event: RunnerEvent) { + fn send_event(&mut self, event: SectionEvent) { if let Some(event_send) = &mut self.event_send { match event_send.send(event) { Ok(_) => {} @@ -140,7 +140,7 @@ impl RunnerTask { } } - fn subscribe_event(&mut self) -> RunnerEventRecv { + fn subscribe_event(&mut self) -> SectionEventRecv { match &mut self.event_send { Some(event_send) => event_send.subscribe(), None => { @@ -159,7 +159,7 @@ impl RunnerTask { run.state = Running { start_time: Instant::now(), }; - self.send_event(RunnerEvent::RunStart(run.handle.clone())); + self.send_event(SectionEvent::RunStart(run.handle.clone())); } fn finish_run(&mut self, run: &mut SecRun) { @@ -168,7 +168,7 @@ impl RunnerTask { self.interface .set_section_state(run.section.interface_id, false); run.state = RunState::Finished; - self.send_event(RunnerEvent::RunFinish(run.handle.clone())); + self.send_event(SectionEvent::RunFinish(run.handle.clone())); } else { warn!( section_id = run.section.id, @@ -185,7 +185,7 @@ impl RunnerTask { .set_section_state(run.section.interface_id, false); } run.state = RunState::Cancelled; - self.send_event(RunnerEvent::RunCancel(run.handle.clone())); + self.send_event(SectionEvent::RunCancel(run.handle.clone())); } fn pause_run(&mut self, run: &mut SecRun) { @@ -212,7 +212,7 @@ impl RunnerTask { } }; run.state = new_state; - self.send_event(RunnerEvent::RunPause(run.handle.clone())); + self.send_event(SectionEvent::RunPause(run.handle.clone())); } fn unpause_run(&mut self, run: &mut SecRun) { @@ -230,7 +230,7 @@ impl RunnerTask { }; let ran_for = pause_time - start_time; run.duration -= ran_for; - self.send_event(RunnerEvent::RunUnpause(run.handle.clone())); + self.send_event(SectionEvent::RunUnpause(run.handle.clone())); } Waiting | Finished | Cancelled | Running { .. } => { warn!( @@ -313,11 +313,11 @@ impl RunnerTask { } Pause => { self.paused = true; - self.send_event(RunnerEvent::RunnerPause); + self.send_event(SectionEvent::RunnerPause); } Unpause => { self.paused = false; - self.send_event(RunnerEvent::RunnerUnpause); + self.send_event(SectionEvent::RunnerUnpause); } Subscribe(res_send) => { let event_recv = self.subscribe_event(); @@ -390,16 +390,16 @@ impl SectionRunner { &mut self, section: SectionRef, duration: Duration, - ) -> Result { + ) -> Result { let run_id = self.inner.next_run_id.fetch_add(1, Ordering::Relaxed); - let handle = RunHandle(run_id); + let handle = SectionRunHandle(run_id); self.msg_send .send(RunnerMsg::QueueRun(handle.clone(), section, duration)) .await?; Ok(handle) } - pub async fn cancel_run(&mut self, handle: RunHandle) -> Result<()> { + pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> { self.msg_send.send(RunnerMsg::CancelRun(handle)).await?; Ok(()) } @@ -419,7 +419,7 @@ impl SectionRunner { Ok(()) } - pub async fn subscribe(&mut self) -> Result { + pub async fn subscribe(&mut self) -> Result { let (res_send, res_recv) = oneshot::channel(); self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; let event_recv = res_recv.await?; @@ -432,9 +432,10 @@ mod test { use super::*; use crate::section_interface::MockSectionInterface; use crate::{ - model::Section, + model::{Section, Sections}, trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, }; + use im::ordmap; use tracing_subscriber::prelude::*; #[tokio::test] @@ -465,19 +466,19 @@ mod test { assert_eq!(task_span.get_exit_count(), 1); } - fn make_sections_and_interface() -> (Vec, Arc) { + fn make_sections_and_interface() -> (Sections, Arc) { let interface = Arc::new(MockSectionInterface::new(2)); - let sections: Vec = vec![ - Arc::new(Section { + let sections: Sections = ordmap![ + 1 => Section { id: 1, name: "Section 1".into(), interface_id: 0, - }), - Arc::new(Section { + }.into(), + 2 => Section { id: 2, name: "Section 2".into(), interface_id: 1, - }), + }.into() ]; (sections, interface) } @@ -510,7 +511,7 @@ mod test { // Queue single section, make sure it runs runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); @@ -524,12 +525,12 @@ mod test { // Queue two sections, make sure they run one at a time runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); @@ -555,17 +556,17 @@ mod test { let mut runner = SectionRunner::new(interface.clone()); let run1 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let _run2 = runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let run3 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); @@ -593,17 +594,17 @@ mod test { let mut runner = SectionRunner::new(interface.clone()); runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); @@ -628,17 +629,17 @@ mod test { let mut runner = SectionRunner::new(interface.clone()); let _run1 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let run2 = runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let _run3 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); @@ -690,73 +691,73 @@ mod test { let mut event_recv = runner.subscribe().await.unwrap(); let run1 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let run2 = runner - .queue_run(sections[0].clone(), Duration::from_secs(10)) + .queue_run(sections[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let run3 = runner - .queue_run(sections[1].clone(), Duration::from_secs(10)) + .queue_run(sections[&2].clone(), Duration::from_secs(10)) .await .unwrap(); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunStart(run1.clone())) + Ok(SectionEvent::RunStart(run1.clone())) ); runner.pause().await.unwrap(); - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunPause(run1.clone())) + Ok(SectionEvent::RunPause(run1.clone())) ); runner.unpause().await.unwrap(); - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunUnpause(run1.clone())) + Ok(SectionEvent::RunUnpause(run1.clone())) ); advance(Duration::from_secs(11)).await; - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run1))); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunFinish(run1))); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunStart(run2.clone())) + Ok(SectionEvent::RunStart(run2.clone())) ); runner.pause().await.unwrap(); - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunPause(run2.clone())) + Ok(SectionEvent::RunPause(run2.clone())) ); // cancel paused run runner.cancel_run(run2.clone()).await.unwrap(); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunCancel(run2.clone())) + Ok(SectionEvent::RunCancel(run2.clone())) ); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunPause(run3.clone())) + Ok(SectionEvent::RunPause(run3.clone())) ); runner.unpause().await.unwrap(); - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); assert_eq!( event_recv.recv().await, - Ok(RunnerEvent::RunUnpause(run3.clone())) + Ok(SectionEvent::RunUnpause(run3.clone())) ); advance(Duration::from_secs(11)).await; - assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run3))); + assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunFinish(run3))); runner.quit().await.unwrap(); tokio::task::yield_now().await;