diff --git a/sprinklers_actors/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs index 66c78fe..6792ca3 100644 --- a/sprinklers_actors/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -20,6 +20,7 @@ pub enum ProgramEvent { RunStart(ProgramRef), RunFinish(ProgramRef), RunCancel(ProgramRef), + NextRun(ProgramRef, chrono::DateTime), } pub type ProgramEventRecv = broadcast::Receiver; @@ -136,7 +137,7 @@ impl ProgramRunnerInner { fn update_schedules(&mut self, ctx: &mut actix::Context) { let mut scheduled_run_queue = DelayQueue::with_capacity(self.programs.len()); - for (_, prog) in &self.programs { + for (_, prog) in self.programs.clone() { if !prog.enabled { continue; } @@ -148,6 +149,7 @@ impl ProgramRunnerInner { let delay = (next_run - ref_time).to_std().unwrap(); trace!("will run program in {:?}", delay); scheduled_run_queue.insert(prog.clone(), delay); + self.send_event(ProgramEvent::NextRun(prog, next_run)); } let fut = actix::fut::wrap_stream(scheduled_run_queue) .map(|item, act: &mut ProgramRunnerActor, ctx| act.handle_scheduled_run(item, ctx)) diff --git a/sprinklers_mqtt/src/lib.rs b/sprinklers_mqtt/src/lib.rs index 895499d..da03268 100644 --- a/sprinklers_mqtt/src/lib.rs +++ b/sprinklers_mqtt/src/lib.rs @@ -154,6 +154,17 @@ impl MqttInterface { .wrap_err("failed to publish program running") } + pub async fn publish_program_next_run( + &mut self, + program_id: ProgramId, + next_run: chrono::DateTime, + ) -> eyre::Result<()> { + let payload = next_run.to_rfc3339(); + self.publish_data(self.topics.program_next_run(program_id), &payload) + .await + .wrap_err("failed to publish program next run") + } + pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { let json: SecRunnerStateJson = sr_state.into(); self.publish_data(self.topics.section_runner(), &json) diff --git a/sprinklers_mqtt/src/topics.rs b/sprinklers_mqtt/src/topics.rs index 7a40978..3fad15a 100644 --- a/sprinklers_mqtt/src/topics.rs +++ b/sprinklers_mqtt/src/topics.rs @@ -44,6 +44,11 @@ where format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) } + pub fn program_next_run(&self, program_id: ProgramId) -> String { + // TODO: reconcile naming convention + format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id) + } + pub fn section_runner(&self) -> String { format!("{}/section_runner", self.prefix.as_ref()) } diff --git a/sprinklers_mqtt/src/update_listener.rs b/sprinklers_mqtt/src/update_listener.rs index 98b003f..bb3da63 100644 --- a/sprinklers_mqtt/src/update_listener.rs +++ b/sprinklers_mqtt/src/update_listener.rs @@ -7,7 +7,6 @@ use sprinklers_actors::{ use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use futures_util::TryFutureExt; use sprinklers_core::model::Programs; -use std::sync::Arc; use tokio::sync::{broadcast, watch}; use tracing::{trace, warn}; @@ -85,17 +84,37 @@ impl StreamHandler> for UpdateListene return; } }; - let (program_id, running) = match event { - ProgramEvent::RunStart(prog) => (prog.id, true), - ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false), - }; let mut mqtt_interface = self.mqtt_interface.clone(); let fut = async move { - if let Err(err) = mqtt_interface - .publish_program_running(program_id, running) - .await - { - warn!("could not publish program running: {}", err); + enum Publish { + Running(bool), + NextRun(chrono::DateTime), + } + + let (program_id, publish) = match event { + ProgramEvent::RunStart(prog) => (prog.id, Publish::Running(true)), + ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => { + (prog.id, Publish::Running(false)) + } + ProgramEvent::NextRun(prog, next_run) => (prog.id, Publish::NextRun(next_run)), + }; + match publish { + Publish::Running(running) => { + if let Err(err) = mqtt_interface + .publish_program_running(program_id, running) + .await + { + warn!("could not publish program running: {}", err); + } + } + Publish::NextRun(next_run) => { + if let Err(err) = mqtt_interface + .publish_program_next_run(program_id, next_run) + .await + { + warn!("could not publish program next run: {}", err); + } + } } }; ctx.spawn(wrap_future(fut)); diff --git a/sprinklers_rs/src/main.rs b/sprinklers_rs/src/main.rs index 28d4ffc..986ce22 100644 --- a/sprinklers_rs/src/main.rs +++ b/sprinklers_rs/src/main.rs @@ -39,8 +39,6 @@ async fn main() -> Result<()> { let state_manager = crate::state_manager::StateManagerThread::start(db_conn); - program_runner.listen_programs(state_manager.get_programs()); - let mqtt_options = mqtt::Options { broker_host: "localhost".into(), broker_port: 1883, @@ -62,6 +60,9 @@ async fn main() -> Result<()> { update_listener.listen_programs(state_manager.get_programs()); update_listener.listen_program_events(program_runner.subscribe().await?); + // Only listen to programs now so above subscriptions get events + program_runner.listen_programs(state_manager.get_programs()); + program_runner.update_sections(sections.clone()).await?; // TODO: update listener should probably do this mqtt_interface.publish_sections(§ions).await?;