Browse Source

Only publish programs which have changed

master
Alex Mikhalev 4 years ago
parent
commit
5f54aa037b
  1. 32
      sprinklers_mqtt/src/lib.rs
  2. 42
      sprinklers_mqtt/src/update_listener.rs

32
sprinklers_mqtt/src/lib.rs

@ -103,21 +103,47 @@ impl MqttInterface { @@ -103,21 +103,47 @@ impl MqttInterface {
pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> {
let program_ids: Vec<_> = programs.keys().cloned().collect();
self.publish_data(self.topics.programs(), &program_ids)
.await
.wrap_err("failed to publish program ids")?;
self.publish_program_ids(&program_ids).await?;
for program in programs.values() {
self.publish_program(program).await?;
}
Ok(())
}
pub async fn publish_program_ids(&mut self, program_ids: &[ProgramId]) -> eyre::Result<()> {
self.publish_data(self.topics.programs(), &program_ids)
.await
.wrap_err("failed to publish program ids")?;
Ok(())
}
pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> {
self.publish_data(self.topics.program_data(program.id), &program)
.await
.wrap_err("failed to publish program")
}
pub async fn publish_programs_diff(
&mut self,
old_programs: &Programs,
programs: &Programs,
) -> eyre::Result<()> {
for (id, program) in programs {
let publish = match old_programs.get(id) {
Some(old_program) => !Arc::ptr_eq(old_program, program),
None => {
let program_ids: Vec<_> = programs.keys().cloned().collect();
self.publish_program_ids(&program_ids).await?;
true
}
};
if publish {
self.publish_program(program).await?;
}
}
Ok(())
}
pub async fn publish_program_running(
&mut self,
program_id: ProgramId,

42
sprinklers_mqtt/src/update_listener.rs

@ -5,20 +5,22 @@ use sprinklers_actors::{ @@ -5,20 +5,22 @@ use sprinklers_actors::{
};
use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use futures_util::TryFutureExt;
use sprinklers_core::model::Programs;
use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use tracing::{trace, warn};
struct UpdateListenerActor {
mqtt_interface: MqttInterface,
has_published_program_states: bool,
old_programs: Option<Programs>,
}
impl UpdateListenerActor {
fn new(mqtt_interface: MqttInterface) -> Self {
Self {
mqtt_interface,
has_published_program_states: false,
old_programs: None,
}
}
}
@ -116,27 +118,31 @@ impl StreamHandler<Programs> for UpdateListenerActor { @@ -116,27 +118,31 @@ impl StreamHandler<Programs> for UpdateListenerActor {
fn handle(&mut self, programs: Programs, ctx: &mut Self::Context) {
let mut mqtt_interface = self.mqtt_interface.clone();
let has_published_program_states = self.has_published_program_states;
self.has_published_program_states = true;
let old_programs = self.old_programs.replace(programs.clone());
let fut = async move {
if let Err(err) = mqtt_interface.publish_programs(&programs).await {
warn!("could not publish programs: {:?}", err);
}
// Some what of a hack
// Initialize program running states to false the first time we
// receive programs
if !has_published_program_states {
for program_id in programs.keys() {
if let Err(err) = mqtt_interface
.publish_program_running(*program_id, false)
.await
{
warn!("could not publish program running: {:?}", err);
match old_programs {
None => {
mqtt_interface.publish_programs(&programs).await?;
// Some what of a hack
// Initialize program running states to false the first time we
// receive programs
for program_id in programs.keys() {
mqtt_interface
.publish_program_running(*program_id, false)
.await?;
}
}
Some(old_programs) => {
mqtt_interface
.publish_programs_diff(&old_programs, &programs)
.await?;
}
}
};
Ok(())
}
.unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err));
ctx.spawn(wrap_future(fut));
}
}

Loading…
Cancel
Save