diff --git a/sprinklers_mqtt/src/lib.rs b/sprinklers_mqtt/src/lib.rs index 58649c3..895499d 100644 --- a/sprinklers_mqtt/src/lib.rs +++ b/sprinklers_mqtt/src/lib.rs @@ -103,21 +103,47 @@ impl MqttInterface { pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { let program_ids: Vec<_> = programs.keys().cloned().collect(); - self.publish_data(self.topics.programs(), &program_ids) - .await - .wrap_err("failed to publish program ids")?; + self.publish_program_ids(&program_ids).await?; for program in programs.values() { self.publish_program(program).await?; } Ok(()) } + pub async fn publish_program_ids(&mut self, program_ids: &[ProgramId]) -> eyre::Result<()> { + self.publish_data(self.topics.programs(), &program_ids) + .await + .wrap_err("failed to publish program ids")?; + Ok(()) + } + pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { self.publish_data(self.topics.program_data(program.id), &program) .await .wrap_err("failed to publish program") } + pub async fn publish_programs_diff( + &mut self, + old_programs: &Programs, + programs: &Programs, + ) -> eyre::Result<()> { + for (id, program) in programs { + let publish = match old_programs.get(id) { + Some(old_program) => !Arc::ptr_eq(old_program, program), + None => { + let program_ids: Vec<_> = programs.keys().cloned().collect(); + self.publish_program_ids(&program_ids).await?; + true + } + }; + if publish { + self.publish_program(program).await?; + } + } + Ok(()) + } + pub async fn publish_program_running( &mut self, program_id: ProgramId, diff --git a/sprinklers_mqtt/src/update_listener.rs b/sprinklers_mqtt/src/update_listener.rs index 9850be4..98b003f 100644 --- a/sprinklers_mqtt/src/update_listener.rs +++ b/sprinklers_mqtt/src/update_listener.rs @@ -5,20 +5,22 @@ use sprinklers_actors::{ }; use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; +use futures_util::TryFutureExt; use sprinklers_core::model::Programs; +use std::sync::Arc; use tokio::sync::{broadcast, watch}; use tracing::{trace, warn}; struct UpdateListenerActor { mqtt_interface: MqttInterface, - has_published_program_states: bool, + old_programs: Option, } impl UpdateListenerActor { fn new(mqtt_interface: MqttInterface) -> Self { Self { mqtt_interface, - has_published_program_states: false, + old_programs: None, } } } @@ -116,27 +118,31 @@ impl StreamHandler for UpdateListenerActor { fn handle(&mut self, programs: Programs, ctx: &mut Self::Context) { let mut mqtt_interface = self.mqtt_interface.clone(); - let has_published_program_states = self.has_published_program_states; - self.has_published_program_states = true; + let old_programs = self.old_programs.replace(programs.clone()); let fut = async move { - if let Err(err) = mqtt_interface.publish_programs(&programs).await { - warn!("could not publish programs: {:?}", err); - } - // Some what of a hack - // Initialize program running states to false the first time we - // receive programs - if !has_published_program_states { - for program_id in programs.keys() { - if let Err(err) = mqtt_interface - .publish_program_running(*program_id, false) - .await - { - warn!("could not publish program running: {:?}", err); + match old_programs { + None => { + mqtt_interface.publish_programs(&programs).await?; + + // Some what of a hack + // Initialize program running states to false the first time we + // receive programs + for program_id in programs.keys() { + mqtt_interface + .publish_program_running(*program_id, false) + .await?; } } + Some(old_programs) => { + mqtt_interface + .publish_programs_diff(&old_programs, &programs) + .await?; + } } - }; + Ok(()) + } + .unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err)); ctx.spawn(wrap_future(fut)); } }