Browse Source

Rename types pertaining to SectionRunner

master
Alex Mikhalev 4 years ago
parent
commit
10abb912d4
  1. 129
      src/section_runner.rs

129
src/section_runner.rs

@ -19,7 +19,7 @@ use tokio::{
use tracing::{debug, trace, trace_span, warn}; use tracing::{debug, trace, trace_span, warn};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct RunHandle(i32); pub struct SectionRunHandle(i32);
#[derive(Debug)] #[derive(Debug)]
struct SectionRunnerInner { struct SectionRunnerInner {
@ -37,27 +37,27 @@ impl SectionRunnerInner {
#[derive(Debug)] #[derive(Debug)]
enum RunnerMsg { enum RunnerMsg {
Quit, Quit,
QueueRun(RunHandle, SectionRef, Duration), QueueRun(SectionRunHandle, SectionRef, Duration),
CancelRun(RunHandle), CancelRun(SectionRunHandle),
CancelAll, CancelAll,
Pause, Pause,
Unpause, Unpause,
Subscribe(oneshot::Sender<RunnerEventRecv>), Subscribe(oneshot::Sender<SectionEventRecv>),
} }
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum RunnerEvent { pub enum SectionEvent {
RunStart(RunHandle), RunStart(SectionRunHandle),
RunFinish(RunHandle), RunFinish(SectionRunHandle),
RunPause(RunHandle), RunPause(SectionRunHandle),
RunUnpause(RunHandle), RunUnpause(SectionRunHandle),
RunCancel(RunHandle), RunCancel(SectionRunHandle),
RunnerPause, RunnerPause,
RunnerUnpause, RunnerUnpause,
} }
pub type RunnerEventRecv = broadcast::Receiver<RunnerEvent>; pub type SectionEventRecv = broadcast::Receiver<SectionEvent>;
type RunnerEventSend = broadcast::Sender<RunnerEvent>; type SectionEventSend = broadcast::Sender<SectionEvent>;
const EVENT_CAPACITY: usize = 8; const EVENT_CAPACITY: usize = 8;
@ -77,7 +77,7 @@ enum RunState {
#[derive(Debug)] #[derive(Debug)]
struct SecRun { struct SecRun {
handle: RunHandle, handle: SectionRunHandle,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
total_duration: Duration, total_duration: Duration,
@ -85,7 +85,7 @@ struct SecRun {
} }
impl SecRun { impl SecRun {
fn new(handle: RunHandle, section: SectionRef, duration: Duration) -> Self { fn new(handle: SectionRunHandle, section: SectionRef, duration: Duration) -> Self {
Self { Self {
handle, handle,
section, section,
@ -111,7 +111,7 @@ struct RunnerTask {
running: bool, running: bool,
delay_future: OptionFuture<tokio::time::Delay>, delay_future: OptionFuture<tokio::time::Delay>,
paused: bool, paused: bool,
event_send: Option<RunnerEventSend>, event_send: Option<SectionEventSend>,
} }
impl RunnerTask { impl RunnerTask {
@ -129,7 +129,7 @@ impl RunnerTask {
} }
} }
fn send_event(&mut self, event: RunnerEvent) { fn send_event(&mut self, event: SectionEvent) {
if let Some(event_send) = &mut self.event_send { if let Some(event_send) = &mut self.event_send {
match event_send.send(event) { match event_send.send(event) {
Ok(_) => {} Ok(_) => {}
@ -140,7 +140,7 @@ impl RunnerTask {
} }
} }
fn subscribe_event(&mut self) -> RunnerEventRecv { fn subscribe_event(&mut self) -> SectionEventRecv {
match &mut self.event_send { match &mut self.event_send {
Some(event_send) => event_send.subscribe(), Some(event_send) => event_send.subscribe(),
None => { None => {
@ -159,7 +159,7 @@ impl RunnerTask {
run.state = Running { run.state = Running {
start_time: Instant::now(), start_time: Instant::now(),
}; };
self.send_event(RunnerEvent::RunStart(run.handle.clone())); self.send_event(SectionEvent::RunStart(run.handle.clone()));
} }
fn finish_run(&mut self, run: &mut SecRun) { fn finish_run(&mut self, run: &mut SecRun) {
@ -168,7 +168,7 @@ impl RunnerTask {
self.interface self.interface
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
run.state = RunState::Finished; run.state = RunState::Finished;
self.send_event(RunnerEvent::RunFinish(run.handle.clone())); self.send_event(SectionEvent::RunFinish(run.handle.clone()));
} else { } else {
warn!( warn!(
section_id = run.section.id, section_id = run.section.id,
@ -185,7 +185,7 @@ impl RunnerTask {
.set_section_state(run.section.interface_id, false); .set_section_state(run.section.interface_id, false);
} }
run.state = RunState::Cancelled; run.state = RunState::Cancelled;
self.send_event(RunnerEvent::RunCancel(run.handle.clone())); self.send_event(SectionEvent::RunCancel(run.handle.clone()));
} }
fn pause_run(&mut self, run: &mut SecRun) { fn pause_run(&mut self, run: &mut SecRun) {
@ -212,7 +212,7 @@ impl RunnerTask {
} }
}; };
run.state = new_state; run.state = new_state;
self.send_event(RunnerEvent::RunPause(run.handle.clone())); self.send_event(SectionEvent::RunPause(run.handle.clone()));
} }
fn unpause_run(&mut self, run: &mut SecRun) { fn unpause_run(&mut self, run: &mut SecRun) {
@ -230,7 +230,7 @@ impl RunnerTask {
}; };
let ran_for = pause_time - start_time; let ran_for = pause_time - start_time;
run.duration -= ran_for; run.duration -= ran_for;
self.send_event(RunnerEvent::RunUnpause(run.handle.clone())); self.send_event(SectionEvent::RunUnpause(run.handle.clone()));
} }
Waiting | Finished | Cancelled | Running { .. } => { Waiting | Finished | Cancelled | Running { .. } => {
warn!( warn!(
@ -313,11 +313,11 @@ impl RunnerTask {
} }
Pause => { Pause => {
self.paused = true; self.paused = true;
self.send_event(RunnerEvent::RunnerPause); self.send_event(SectionEvent::RunnerPause);
} }
Unpause => { Unpause => {
self.paused = false; self.paused = false;
self.send_event(RunnerEvent::RunnerUnpause); self.send_event(SectionEvent::RunnerUnpause);
} }
Subscribe(res_send) => { Subscribe(res_send) => {
let event_recv = self.subscribe_event(); let event_recv = self.subscribe_event();
@ -390,16 +390,16 @@ impl SectionRunner {
&mut self, &mut self,
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
) -> Result<RunHandle> { ) -> Result<SectionRunHandle> {
let run_id = self.inner.next_run_id.fetch_add(1, Ordering::Relaxed); let run_id = self.inner.next_run_id.fetch_add(1, Ordering::Relaxed);
let handle = RunHandle(run_id); let handle = SectionRunHandle(run_id);
self.msg_send self.msg_send
.send(RunnerMsg::QueueRun(handle.clone(), section, duration)) .send(RunnerMsg::QueueRun(handle.clone(), section, duration))
.await?; .await?;
Ok(handle) Ok(handle)
} }
pub async fn cancel_run(&mut self, handle: RunHandle) -> Result<()> { pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> {
self.msg_send.send(RunnerMsg::CancelRun(handle)).await?; self.msg_send.send(RunnerMsg::CancelRun(handle)).await?;
Ok(()) Ok(())
} }
@ -419,7 +419,7 @@ impl SectionRunner {
Ok(()) Ok(())
} }
pub async fn subscribe(&mut self) -> Result<RunnerEventRecv> { pub async fn subscribe(&mut self) -> Result<SectionEventRecv> {
let (res_send, res_recv) = oneshot::channel(); let (res_send, res_recv) = oneshot::channel();
self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?;
let event_recv = res_recv.await?; let event_recv = res_recv.await?;
@ -432,9 +432,10 @@ mod test {
use super::*; use super::*;
use crate::section_interface::MockSectionInterface; use crate::section_interface::MockSectionInterface;
use crate::{ use crate::{
model::Section, model::{Section, Sections},
trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, trace_listeners::{EventListener, Filters, SpanFilters, SpanListener},
}; };
use im::ordmap;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
#[tokio::test] #[tokio::test]
@ -465,19 +466,19 @@ mod test {
assert_eq!(task_span.get_exit_count(), 1); assert_eq!(task_span.get_exit_count(), 1);
} }
fn make_sections_and_interface() -> (Vec<SectionRef>, Arc<MockSectionInterface>) { fn make_sections_and_interface() -> (Sections, Arc<MockSectionInterface>) {
let interface = Arc::new(MockSectionInterface::new(2)); let interface = Arc::new(MockSectionInterface::new(2));
let sections: Vec<SectionRef> = vec![ let sections: Sections = ordmap![
Arc::new(Section { 1 => Section {
id: 1, id: 1,
name: "Section 1".into(), name: "Section 1".into(),
interface_id: 0, interface_id: 0,
}), }.into(),
Arc::new(Section { 2 => Section {
id: 2, id: 2,
name: "Section 2".into(), name: "Section 2".into(),
interface_id: 1, interface_id: 1,
}), }.into()
]; ];
(sections, interface) (sections, interface)
} }
@ -510,7 +511,7 @@ mod test {
// Queue single section, make sure it runs // Queue single section, make sure it runs
runner runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
@ -524,12 +525,12 @@ mod test {
// Queue two sections, make sure they run one at a time // Queue two sections, make sure they run one at a time
runner runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
@ -555,17 +556,17 @@ mod test {
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
let run1 = runner let run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let _run2 = runner let _run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run3 = runner let run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
@ -593,17 +594,17 @@ mod test {
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
runner runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
@ -628,17 +629,17 @@ mod test {
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
let _run1 = runner let _run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run2 = runner let run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let _run3 = runner let _run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
@ -690,73 +691,73 @@ mod test {
let mut event_recv = runner.subscribe().await.unwrap(); let mut event_recv = runner.subscribe().await.unwrap();
let run1 = runner let run1 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run2 = runner let run2 = runner
.queue_run(sections[0].clone(), Duration::from_secs(10)) .queue_run(sections[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run3 = runner let run3 = runner
.queue_run(sections[1].clone(), Duration::from_secs(10)) .queue_run(sections[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunStart(run1.clone())) Ok(SectionEvent::RunStart(run1.clone()))
); );
runner.pause().await.unwrap(); runner.pause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerPause));
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunPause(run1.clone())) Ok(SectionEvent::RunPause(run1.clone()))
); );
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause));
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunUnpause(run1.clone())) Ok(SectionEvent::RunUnpause(run1.clone()))
); );
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run1))); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunFinish(run1)));
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunStart(run2.clone())) Ok(SectionEvent::RunStart(run2.clone()))
); );
runner.pause().await.unwrap(); runner.pause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerPause)); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerPause));
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunPause(run2.clone())) Ok(SectionEvent::RunPause(run2.clone()))
); );
// cancel paused run // cancel paused run
runner.cancel_run(run2.clone()).await.unwrap(); runner.cancel_run(run2.clone()).await.unwrap();
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunCancel(run2.clone())) Ok(SectionEvent::RunCancel(run2.clone()))
); );
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunPause(run3.clone())) Ok(SectionEvent::RunPause(run3.clone()))
); );
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunnerUnpause)); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause));
assert_eq!( assert_eq!(
event_recv.recv().await, event_recv.recv().await,
Ok(RunnerEvent::RunUnpause(run3.clone())) Ok(SectionEvent::RunUnpause(run3.clone()))
); );
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_eq!(event_recv.recv().await, Ok(RunnerEvent::RunFinish(run3))); assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunFinish(run3)));
runner.quit().await.unwrap(); runner.quit().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;

Loading…
Cancel
Save