From 7541618ec80fe5a866b5e40b6f4f693dd0c4991f Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Mon, 28 Sep 2020 13:21:56 -0600 Subject: [PATCH] Implement section runner state publishing --- src/main.rs | 9 ++++- src/mqtt_interface.rs | 18 ++++++++- src/section_runner.rs | 66 +++++++++++++++++++++++++++------ src/section_runner_json.rs | 75 ++++++++++++++++++++++++++++++++++++++ src/update_listener.rs | 29 ++++++++++++--- 5 files changed, 179 insertions(+), 18 deletions(-) create mode 100644 src/section_runner_json.rs diff --git a/src/main.rs b/src/main.rs index e96452a..d02f780 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,7 @@ mod program_runner; mod schedule; mod section_interface; mod section_runner; +mod section_runner_json; #[cfg(test)] mod trace_listeners; mod update_listener; @@ -67,7 +68,13 @@ async fn main() -> Result<()> { let update_listener = { let section_events = section_runner.subscribe().await?; let program_events = program_runner.subscribe().await?; - UpdateListener::start(section_events, program_events, mqtt_interface.clone()) + let sec_runner_state = section_runner.state_receiver(); + UpdateListener::start( + section_events, + program_events, + sec_runner_state, + mqtt_interface.clone(), + ) }; program_runner.update_sections(sections.clone()).await?; diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs index 9026e3f..1926f16 100644 --- a/src/mqtt_interface.rs +++ b/src/mqtt_interface.rs @@ -1,4 +1,8 @@ -use crate::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; +use crate::{ + model::{Program, ProgramId, Programs, Section, SectionId, Sections}, + section_runner::SecRunnerState, + section_runner_json::SecRunnerStateJson, +}; use eyre::WrapErr; use rumqttc::{LastWill, MqttOptions, QoS}; use std::{ @@ -52,6 +56,10 @@ where fn program_running(&self, program_id: ProgramId) -> String { format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) } + + fn section_runner(&self) -> String { + format!("{}/section_runner", self.prefix.as_ref()) + } } #[derive(Clone, Debug)] @@ -165,6 +173,7 @@ impl MqttInterface { .wrap_err("failed to publish section") } + // Section state can be derived from section runner state... pub async fn publish_section_state( &mut self, section_id: SectionId, @@ -201,6 +210,13 @@ impl MqttInterface { .await .wrap_err("failed to publish program running") } + + pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { + let json: SecRunnerStateJson = sr_state.into(); + self.publish_data(self.topics.section_runner(), &json) + .await + .wrap_err("failed to publish section runner") + } } pub struct MqttInterfaceTask { diff --git a/src/section_runner.rs b/src/section_runner.rs index 3fd05ae..d26a1a8 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -12,7 +12,7 @@ use std::{ use thiserror::Error; use tokio::{ spawn, - sync::{broadcast, mpsc, oneshot}, + sync::{broadcast, mpsc, oneshot, watch}, time::{delay_for, Instant}, }; use tracing::{debug, trace, trace_span, warn}; @@ -21,6 +21,12 @@ use tracing_futures::Instrument; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SectionRunHandle(i32); +impl SectionRunHandle { + pub fn into_inner(self) -> i32 { + self.0 + } +} + #[derive(Debug)] struct SectionRunnerInner { next_run_id: AtomicI32, @@ -77,11 +83,11 @@ pub enum SecRunState { #[derive(Clone, Debug)] pub struct SecRun { - handle: SectionRunHandle, - section: SectionRef, - duration: Duration, - total_duration: Duration, - state: SecRunState, + pub(crate) handle: SectionRunHandle, + pub(crate) section: SectionRef, + pub(crate) duration: Duration, + pub(crate) total_duration: Duration, + pub(crate) state: SecRunState, } impl SecRun { @@ -109,8 +115,8 @@ pub type SecRunQueue = im::Vector>; #[derive(Clone, Debug)] pub struct SecRunnerState { - run_queue: SecRunQueue, - paused: bool, + pub(crate) run_queue: SecRunQueue, + pub(crate) paused: bool, } impl Default for SecRunnerState { @@ -122,19 +128,24 @@ impl Default for SecRunnerState { } } +pub type SecRunnerStateRecv = watch::Receiver; + struct RunnerTask { interface: Arc, msg_recv: mpsc::Receiver, running: bool, delay_future: OptionFuture, event_send: Option, + state_send: watch::Sender, quit_tx: Option>, + did_change: bool, } impl RunnerTask { fn new( interface: Arc, msg_recv: mpsc::Receiver, + state_send: watch::Sender, ) -> Self { Self { interface, @@ -142,7 +153,9 @@ impl RunnerTask { running: true, delay_future: None.into(), event_send: None, + state_send, quit_tx: None, + did_change: false, } } @@ -181,6 +194,7 @@ impl RunnerTask { run.handle.clone(), run.section.clone(), )); + self.did_change = true; } fn finish_run(&mut self, run: &mut Arc) { @@ -194,6 +208,7 @@ impl RunnerTask { run.handle.clone(), run.section.clone(), )); + self.did_change = true; } else { warn!( section_id = run.section.id, @@ -215,6 +230,7 @@ impl RunnerTask { run.handle.clone(), run.section.clone(), )); + self.did_change = true; } fn pause_run(&mut self, run: &mut Arc) { @@ -246,6 +262,7 @@ impl RunnerTask { run.handle.clone(), run.section.clone(), )); + self.did_change = true; } fn unpause_run(&mut self, run: &mut Arc) { @@ -277,6 +294,7 @@ impl RunnerTask { ); } } + self.did_change = true; } fn process_queue(&mut self, state: &mut SecRunnerState) { @@ -335,6 +353,7 @@ impl RunnerTask { state .run_queue .push_back(Arc::new(SecRun::new(handle, section, duration))); + self.did_change = true; } CancelRun(handle) => { for run in state.run_queue.iter_mut() { @@ -356,10 +375,12 @@ impl RunnerTask { Pause => { state.paused = true; self.send_event(SectionEvent::RunnerPause); + self.did_change = true; } Unpause => { state.paused = false; self.send_event(SectionEvent::RunnerUnpause); + self.did_change = true; } Subscribe(res_send) => { let event_recv = self.subscribe_event(); @@ -373,12 +394,28 @@ impl RunnerTask { let mut state = SecRunnerState::default(); while self.running { + // Process all pending messages + // This is so if there are many pending messages, the state + // is only broadcast once + while let Ok(msg) = self.msg_recv.try_recv() { + self.handle_msg(Some(msg), &mut state); + } + self.process_queue(&mut state); + + // If a change was made to state, broadcast it + if self.did_change { + let _ = self.state_send.broadcast(state.clone()); + self.did_change = false; + } + let delay_done = || { trace!("delay done"); }; tokio::select! { - msg = self.msg_recv.recv() => self.handle_msg(msg, &mut state), + msg = self.msg_recv.recv() => { + self.handle_msg(msg, &mut state) + }, _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() }; } @@ -417,16 +454,19 @@ impl From for ChannelClosed { pub struct SectionRunner { inner: Arc, msg_send: mpsc::Sender, + state_recv: SecRunnerStateRecv, } #[allow(dead_code)] impl SectionRunner { pub fn new(interface: Arc) -> Self { - let (msg_send, msg_recv) = mpsc::channel(8); - spawn(RunnerTask::new(interface, msg_recv).run()); + let (msg_send, msg_recv) = mpsc::channel(32); + let (state_send, state_recv) = watch::channel(SecRunnerState::default()); + spawn(RunnerTask::new(interface, msg_recv, state_send).run()); Self { inner: Arc::new(SectionRunnerInner::new()), msg_send, + state_recv, } } @@ -476,6 +516,10 @@ impl SectionRunner { let event_recv = res_recv.await?; Ok(event_recv) } + + pub fn state_receiver(&self) -> SecRunnerStateRecv { + self.state_recv.clone() + } } #[cfg(test)] diff --git a/src/section_runner_json.rs b/src/section_runner_json.rs new file mode 100644 index 0000000..65bb363 --- /dev/null +++ b/src/section_runner_json.rs @@ -0,0 +1,75 @@ +use crate::{ + model::SectionId, + section_runner::{SecRun, SecRunState, SecRunnerState}, +}; +use chrono::{DateTime, Utc}; +use serde::Serialize; +use std::time::SystemTime; +use tokio::time::Instant; + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SecRunJson { + id: i32, + section: SectionId, + total_duration: f64, + duration: f64, + start_time: Option, + pause_time: Option, + unpause_time: Option, +} + +impl SecRunJson { + fn from_run(run: &SecRun) -> Option { + let (now, system_now) = (Instant::now(), SystemTime::now()); + let instant_to_string = |instant: Instant| -> String { + DateTime::::from(system_now - now.duration_since(instant)).to_rfc3339() + }; + let (start_time, pause_time) = match run.state { + SecRunState::Finished | SecRunState::Cancelled => { + return None; + } + SecRunState::Waiting => (None, None), + SecRunState::Running { start_time } => (Some(instant_to_string(start_time)), None), + SecRunState::Paused { + start_time, + pause_time, + } => ( + Some(instant_to_string(start_time)), + Some(instant_to_string(pause_time)), + ), + }; + Some(Self { + id: run.handle.clone().into_inner(), + section: run.section.id, + total_duration: run.total_duration.as_secs_f64(), + duration: run.duration.as_secs_f64(), + start_time, + pause_time, + unpause_time: None, // TODO: this is kinda useless, should probably be removed + }) + } +} + +#[derive(Clone, Debug, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct SecRunnerStateJson { + queue: Vec, + current: Option, + paused: bool, +} + +impl From<&SecRunnerState> for SecRunnerStateJson { + fn from(state: &SecRunnerState) -> Self { + let mut run_queue = state.run_queue.iter(); + let current = run_queue.next().and_then(|run| SecRunJson::from_run(run)); + let queue = run_queue + .filter_map(|run| SecRunJson::from_run(run)) + .collect(); + Self { + queue, + current, + paused: state.paused, + } + } +} diff --git a/src/update_listener.rs b/src/update_listener.rs index a891eb8..a030689 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -1,19 +1,20 @@ use crate::{ mqtt_interface::MqttInterface, program_runner::{ProgramEvent, ProgramEventRecv}, - section_runner::{SectionEvent, SectionEventRecv}, + section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, }; use tokio::{ select, sync::{broadcast, oneshot}, task::JoinHandle, }; -use tracing::{trace, trace_span, warn}; +use tracing::{trace_span, warn}; use tracing_futures::Instrument; struct UpdateListenerTask { section_events: SectionEventRecv, program_events: ProgramEventRecv, + sec_runner_state: SecRunnerStateRecv, mqtt_interface: MqttInterface, running: bool, } @@ -26,7 +27,7 @@ impl UpdateListenerTask { let event = match event { Ok(ev) => ev, Err(broadcast::RecvError::Closed) => { - trace!("section events channel closed"); + warn!("section events channel closed"); self.running = false; return Ok(()); } @@ -58,12 +59,12 @@ impl UpdateListenerTask { let event = match event { Ok(ev) => ev, Err(broadcast::RecvError::Closed) => { - trace!("section events channel closed"); + warn!("program events channel closed"); self.running = false; return Ok(()); } Err(broadcast::RecvError::Lagged(n)) => { - warn!("section events lagged by {}", n); + warn!("program events lagged by {}", n); return Ok(()); } }; @@ -77,6 +78,19 @@ impl UpdateListenerTask { Ok(()) } + async fn handle_sec_runner_state(&mut self, state: Option) -> eyre::Result<()> { + let state = match state { + Some(state) => state, + None => { + warn!("section runner events channel closed"); + self.running = false; + return Ok(()); + } + }; + self.mqtt_interface.publish_section_runner(&state).await?; + Ok(()) + } + async fn run_impl(&mut self) -> eyre::Result<()> { while self.running { select! { @@ -86,6 +100,9 @@ impl UpdateListenerTask { program_event = self.program_events.recv() => { self.handle_program_event(program_event).await? } + sec_runner_state = self.sec_runner_state.recv() => { + self.handle_sec_runner_state(sec_runner_state).await? + } }; } Ok(()) @@ -122,11 +139,13 @@ impl UpdateListener { pub fn start( section_events: SectionEventRecv, program_events: ProgramEventRecv, + sec_runner_state: SecRunnerStateRecv, mqtt_interface: MqttInterface, ) -> Self { let task = UpdateListenerTask { section_events, program_events, + sec_runner_state, mqtt_interface, running: true, };