diff --git a/src/main.rs b/src/main.rs index 903cadf..e96452a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ mod update_listener; use eyre::Result; use std::sync::Arc; -use tracing::info; +use tracing::{debug, info}; use tracing_subscriber::EnvFilter; use update_listener::UpdateListener; @@ -34,7 +34,7 @@ async fn main() -> Result<()> { let sections = database::query_sections(&conn)?; for sec in sections.values() { - info!(section = debug(&sec), "read section"); + debug!(section = debug(&sec), "read section"); } // TODO: Section interface which actual does something. Preferrably selectable somehow @@ -53,7 +53,7 @@ async fn main() -> Result<()> { } for prog in programs.values() { - info!(program = debug(&prog), "read program"); + debug!(program = debug(&prog), "read program"); } let mqtt_options = mqtt_interface::Options { @@ -64,10 +64,11 @@ async fn main() -> Result<()> { }; let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; - let update_listener = UpdateListener::start( - section_runner.subscribe().await?, - mqtt_interface.clone(), - ); + let update_listener = { + let section_events = section_runner.subscribe().await?; + let program_events = program_runner.subscribe().await?; + UpdateListener::start(section_events, program_events, mqtt_interface.clone()) + }; program_runner.update_sections(sections.clone()).await?; mqtt_interface.publish_sections(§ions).await?; @@ -77,6 +78,11 @@ async fn main() -> Result<()> { .await?; } program_runner.update_programs(programs.clone()).await?; + for program_id in programs.keys() { + mqtt_interface + .publish_program_running(*program_id, false) + .await?; + } mqtt_interface.publish_programs(&programs).await?; info!("sprinklers_rs initialized"); diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs index aa41672..81615d9 100644 --- a/src/mqtt_interface.rs +++ b/src/mqtt_interface.rs @@ -48,6 +48,10 @@ where fn program_data(&self, program_id: ProgramId) -> String { format!("{}/programs/{}", self.prefix.as_ref(), program_id) } + + fn program_running(&self, program_id: ProgramId) -> String { + format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) + } } #[derive(Clone, Debug)] @@ -59,7 +63,7 @@ pub struct Options { } async fn event_loop_task( - mut interface: MqttInterface, + interface: MqttInterface, mut event_loop: rumqttc::EventLoop, ) -> eyre::Result<()> { use rumqttc::{ConnectionError, Event}; @@ -77,9 +81,7 @@ async fn event_loop_task( // HACK: this really should just be await // but that can sometimes deadlock if the publish channel is full let mut interface = interface.clone(); - let fut = async move { - interface.publish_connected(true).await - }; + let fut = async move { interface.publish_connected(true).await }; tokio::spawn(fut); } //.await?; @@ -216,6 +218,23 @@ impl MqttInterface { .await?; Ok(()) } + + pub async fn publish_program_running( + &mut self, + program_id: ProgramId, + running: bool, + ) -> eyre::Result<()> { + let payload = running.to_string(); + self.client + .publish( + self.topics.program_running(program_id), + QoS::AtLeastOnce, + true, + payload, + ) + .await?; + Ok(()) + } } pub struct MqttInterfaceTask { diff --git a/src/update_listener.rs b/src/update_listener.rs index 91945f7..a891eb8 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -1,5 +1,6 @@ use crate::{ mqtt_interface::MqttInterface, + program_runner::{ProgramEvent, ProgramEventRecv}, section_runner::{SectionEvent, SectionEventRecv}, }; use tokio::{ @@ -11,6 +12,8 @@ use tracing::{trace, trace_span, warn}; use tracing_futures::Instrument; struct UpdateListenerTask { + section_events: SectionEventRecv, + program_events: ProgramEventRecv, mqtt_interface: MqttInterface, running: bool, } @@ -48,37 +51,62 @@ impl UpdateListenerTask { Ok(()) } - async fn run_impl(&mut self, mut section_events: SectionEventRecv) -> eyre::Result<()> { + async fn handle_program_event( + &mut self, + event: Result, + ) -> eyre::Result<()> { + let event = match event { + Ok(ev) => ev, + Err(broadcast::RecvError::Closed) => { + trace!("section events channel closed"); + self.running = false; + return Ok(()); + } + Err(broadcast::RecvError::Lagged(n)) => { + warn!("section events lagged by {}", n); + return Ok(()); + } + }; + let (program_id, running) = match event { + ProgramEvent::RunStart(prog) => (prog.id, true), + ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false), + }; + self.mqtt_interface + .publish_program_running(program_id, running) + .await?; + Ok(()) + } + + async fn run_impl(&mut self) -> eyre::Result<()> { while self.running { select! { - section_event = section_events.recv() => { + section_event = self.section_events.recv() => { self.handle_section_event(section_event).await? } + program_event = self.program_events.recv() => { + self.handle_program_event(program_event).await? + } }; } Ok(()) } - async fn run_or_quit( - mut self, - mut quit_rx: oneshot::Receiver<()>, - section_events: SectionEventRecv, - ) -> eyre::Result<()> { + async fn run_or_quit(mut self, mut quit_rx: oneshot::Receiver<()>) -> eyre::Result<()> { select! { _ = &mut quit_rx => { self.running = false; Ok(()) } - res = self.run_impl(section_events) => { + res = self.run_impl() => { res } } } - async fn run(self, quit_rx: oneshot::Receiver<()>, section_events: SectionEventRecv) { + async fn run(self, quit_rx: oneshot::Receiver<()>) { let span = trace_span!("update_listener task"); - self.run_or_quit(quit_rx, section_events) + self.run_or_quit(quit_rx) .instrument(span) .await .expect("error in UpdateListenerTask"); @@ -91,13 +119,19 @@ pub struct UpdateListener { } impl UpdateListener { - pub fn start(section_events: SectionEventRecv, mqtt_interface: MqttInterface) -> Self { + pub fn start( + section_events: SectionEventRecv, + program_events: ProgramEventRecv, + mqtt_interface: MqttInterface, + ) -> Self { let task = UpdateListenerTask { + section_events, + program_events, mqtt_interface, running: true, }; let (quit_tx, quit_rx) = oneshot::channel(); - let join_handle = tokio::spawn(task.run(quit_rx, section_events)); + let join_handle = tokio::spawn(task.run(quit_rx)); Self { quit_tx, join_handle,