From 691f14b79a6d8585ee38529e8a6fdf57f50062f5 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Thu, 24 Dec 2020 16:03:53 -0700 Subject: [PATCH] Better abstraction over MQTT collectiosn --- sprinklers_mqtt/Cargo.toml | 1 + sprinklers_mqtt/src/lib.rs | 232 +++++++++++++------------ sprinklers_mqtt/src/topics.rs | 120 ++++++++----- sprinklers_mqtt/src/update_listener.rs | 64 +++---- 4 files changed, 221 insertions(+), 196 deletions(-) diff --git a/sprinklers_mqtt/Cargo.toml b/sprinklers_mqtt/Cargo.toml index db40c23..194634e 100644 --- a/sprinklers_mqtt/Cargo.toml +++ b/sprinklers_mqtt/Cargo.toml @@ -20,6 +20,7 @@ chrono = "0.4.15" num-traits = "0.2.12" num-derive = "0.3.2" futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] } +im = "15.0.0" [dependencies.tokio] version = "0.2.22" diff --git a/sprinklers_mqtt/src/lib.rs b/sprinklers_mqtt/src/lib.rs index 3ffb0cb..dfaf6c7 100644 --- a/sprinklers_mqtt/src/lib.rs +++ b/sprinklers_mqtt/src/lib.rs @@ -8,15 +8,16 @@ mod zone_runner_json; pub use request::RequestContext; pub use update_listener::UpdateListener; -use self::topics::Topics; +use self::topics::{CollectionTopics, Topics}; use sprinklers_actors::zone_runner::ZoneRunnerState; -use sprinklers_core::model::{Program, ProgramId, Programs, Zone, ZoneId, Zones}; +use sprinklers_core::model::{ProgramId, Programs, ZoneId, Zones}; use zone_runner_json::ZoneRunnerStateJson; use actix::{Actor, Addr}; use eyre::WrapErr; use rumqttc::{LastWill, MqttOptions, QoS}; use std::{ + marker::PhantomData, ops::{Deref, DerefMut}, sync::Arc, }; @@ -73,119 +74,12 @@ impl MqttInterface { self.client.cancel().await } - pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> { - let zone_ids: Vec<_> = zones.keys().cloned().collect(); - self.publish_zone_ids(&zone_ids).await?; - for zone in zones.values() { - self.publish_zone(zone).await?; - } - Ok(()) + pub fn zones(&mut self) -> MqttCollection<'_, topics::ZoneTopics, Zones> { + MqttCollection::new(self) } - // TODO: figure out how to share logic with publish_programs_diff and publish_zones - pub async fn publish_zones_diff( - &mut self, - old_zones: &Zones, - zones: &Zones, - ) -> eyre::Result<()> { - for (id, zone) in zones { - let publish = match old_zones.get(id) { - Some(old_zone) => !Arc::ptr_eq(old_zone, zone), - None => { - let zone_ids: Vec<_> = zones.keys().cloned().collect(); - self.publish_zone_ids(&zone_ids).await?; - true - } - }; - if publish { - self.publish_zone(zone).await?; - } - } - Ok(()) - } - - pub async fn publish_zone_ids(&mut self, zone_ids: &[ZoneId]) -> eyre::Result<()> { - self.publish_data(self.topics.zones(), &zone_ids) - .await - .wrap_err("failed to publish zone ids")?; - Ok(()) - } - - pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> { - self.publish_data(self.topics.zone_data(zone.id), zone) - .await - .wrap_err("failed to publish zone") - } - - // Zone state can be derived from zone runner state... - pub async fn publish_zone_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> { - self.publish_data(self.topics.zone_state(zone_id), &state) - .await - .wrap_err("failed to publish zone state") - } - - pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { - let program_ids: Vec<_> = programs.keys().cloned().collect(); - self.publish_program_ids(&program_ids).await?; - for program in programs.values() { - self.publish_program(program).await?; - } - Ok(()) - } - - pub async fn publish_program_ids(&mut self, program_ids: &[ProgramId]) -> eyre::Result<()> { - self.publish_data(self.topics.programs(), &program_ids) - .await - .wrap_err("failed to publish program ids")?; - Ok(()) - } - - pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { - self.publish_data(self.topics.program_data(program.id), &program) - .await - .wrap_err("failed to publish program") - } - - pub async fn publish_programs_diff( - &mut self, - old_programs: &Programs, - programs: &Programs, - ) -> eyre::Result<()> { - for (id, program) in programs { - let publish = match old_programs.get(id) { - Some(old_program) => !Arc::ptr_eq(old_program, program), - None => { - let program_ids: Vec<_> = programs.keys().cloned().collect(); - self.publish_program_ids(&program_ids).await?; - true - } - }; - if publish { - self.publish_program(program).await?; - } - } - Ok(()) - } - - pub async fn publish_program_running( - &mut self, - program_id: ProgramId, - running: bool, - ) -> eyre::Result<()> { - self.publish_data(self.topics.program_running(program_id), &running) - .await - .wrap_err("failed to publish program running") - } - - pub async fn publish_program_next_run( - &mut self, - program_id: ProgramId, - next_run: chrono::DateTime, - ) -> eyre::Result<()> { - let payload = next_run.to_rfc3339(); - self.publish_data(self.topics.program_next_run(program_id), &payload) - .await - .wrap_err("failed to publish program next run") + pub fn programs(&mut self) -> MqttCollection<'_, topics::ProgramTopics, Programs> { + MqttCollection::new(self) } pub async fn publish_zone_runner(&mut self, sr_state: &ZoneRunnerState) -> eyre::Result<()> { @@ -214,6 +108,118 @@ impl MqttInterface { } } +pub struct MqttCollection<'a, T, U> { + client: &'a mut rumqttc::AsyncClient, + topics: T, + collection: PhantomData, +} + +impl<'a, T: CollectionTopics<'a>, U> MqttCollection<'a, T, U> { + fn new(interface: &'a mut MqttInterface) -> Self { + Self { + client: &mut interface.client, + topics: T::new(interface.topics.prefix()), + collection: PhantomData, + } + } + + async fn publish( + &mut self, + topic: String, + payload: &P, + ) -> eyre::Result<()> { + let payload_vec = + serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?; + self.client + .publish(topic, QoS::AtLeastOnce, true, payload_vec) + .await + .wrap_err("failed to publish")?; + Ok(()) + } + + async fn publish_ids_impl(&mut self, ids: &[u32]) -> eyre::Result<()> { + self.publish(self.topics.ids(), &ids).await?; + Ok(()) + } + + async fn publish_data(&mut self, id: u32, item: &V) -> eyre::Result<()> { + self.publish(self.topics.data(id), item).await?; + Ok(()) + } +} + +impl<'a, T: CollectionTopics<'a>, V: serde::Serialize> + MqttCollection<'a, T, im::OrdMap>> +{ + async fn publish_ids(&mut self, items: &im::OrdMap>) -> eyre::Result<()> { + let ids: Vec = items.keys().cloned().collect(); + self.publish_ids_impl(&ids).await + } + + pub async fn publish_diff( + &mut self, + old_values: Option<&im::OrdMap>>, + new_values: &im::OrdMap>, + ) -> eyre::Result<()> { + let mut published_ids = false; + for (id, value) in new_values { + let new_value_different = old_values + .and_then(|old_values| old_values.get(id)) + .map(|old_value| !Arc::ptr_eq(old_value, value)); + let publish_value = if let Some(different) = new_value_different { + different + } else { + // old value does not exist + if !published_ids { + self.publish_ids(new_values).await?; + published_ids = true; + } + true + }; + if publish_value { + self.publish_data(*id, &**value).await?; + } + } + Ok(()) + } + + pub async fn publish_all(&mut self, values: &im::OrdMap>) -> eyre::Result<()> { + self.publish_diff(None, values).await + } +} + +impl<'a> MqttCollection<'a, topics::ZoneTopics<'a>, Zones> { + // Zone state can be derived from zone runner state... + pub async fn publish_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> { + self.publish(self.topics.state(zone_id), &state) + .await + .wrap_err("failed to publish zone state") + } +} + +impl<'a> MqttCollection<'a, topics::ProgramTopics<'a>, Programs> { + pub async fn publish_running( + &mut self, + program_id: ProgramId, + running: bool, + ) -> eyre::Result<()> { + self.publish(self.topics.running(program_id), &running) + .await + .wrap_err("failed to publish program running") + } + + pub async fn publish_next_run( + &mut self, + program_id: ProgramId, + next_run: chrono::DateTime, + ) -> eyre::Result<()> { + let payload = next_run.to_rfc3339(); + self.publish(self.topics.next_run(program_id), &payload) + .await + .wrap_err("failed to publish program next run") + } +} + pub struct MqttInterfaceTask { interface: MqttInterface, addr: Addr, diff --git a/sprinklers_mqtt/src/topics.rs b/sprinklers_mqtt/src/topics.rs index 0312105..2cecf5e 100644 --- a/sprinklers_mqtt/src/topics.rs +++ b/sprinklers_mqtt/src/topics.rs @@ -1,67 +1,97 @@ -use sprinklers_core::model::{ProgramId, ZoneId}; - -#[derive(Clone, Debug)] -pub struct Topics -where - T: AsRef, -{ - prefix: T, +pub trait CollectionTopics<'t> { + fn new(prefix: &'t str) -> Self; + fn ids(&self) -> String; + fn data(&self, id: u32) -> String; } -impl Topics -where - T: AsRef, -{ +#[derive(Clone, Debug)] +pub struct ZoneTopics<'a>(pub &'a str); + +impl<'a> CollectionTopics<'a> for ZoneTopics<'a> { + fn new(prefix: &'a str) -> Self { + ZoneTopics(prefix) + } + + fn ids(&self) -> String { + // TODO: change nomenclature + format!("{}/sections", self.0) + } + + fn data(&self, zone_id: u32) -> String { + // TODO: change nomenclature + format!("{}/sections/{}", self.0, zone_id) + } +} + +impl<'a> ZoneTopics<'a> { + pub fn state(&self, zone_id: u32) -> String { + // TODO: change nomenclature + format!("{}/sections/{}/state", self.0, zone_id) + } +} + +#[derive(Clone, Debug)] +pub struct ProgramTopics<'a>(pub &'a str); + +impl<'a> CollectionTopics<'a> for ProgramTopics<'a> { + fn new(prefix: &'a str) -> Self { + ProgramTopics(prefix) + } + + fn ids(&self) -> String { + format!("{}/programs", self.0) + } + + fn data(&self, zone_id: u32) -> String { + format!("{}/programs/{}", self.0, zone_id) + } +} + +impl<'a> ProgramTopics<'a> { + pub fn running(&self, zone_id: u32) -> String { + format!("{}/programs/{}/running", self.0, zone_id) + } + + pub fn next_run(&self, zone_id: u32) -> String { + // TODO: reconcile naming convention + format!("{}/programs/{}/nextRun", self.0, zone_id) + } +} + +#[derive(Clone, Debug)] +pub struct Topics>(pub T); + +impl> Topics { pub fn new(prefix: T) -> Self { - Self { prefix } + Self(prefix) + } + + pub fn prefix(&self) -> &str { + self.0.as_ref() } pub fn connected(&self) -> String { - format!("{}/connected", self.prefix.as_ref()) + format!("{}/connected", self.0.as_ref()) } - pub fn zones(&self) -> String { - // TODO: change nomenclature - format!("{}/sections", self.prefix.as_ref()) + pub fn zones(&self) -> ZoneTopics { + ZoneTopics::new(self.0.as_ref()) } - pub fn zone_data(&self, zone_id: ZoneId) -> String { - // TODO: change nomenclature - format!("{}/sections/{}", self.prefix.as_ref(), zone_id) - } - - pub fn zone_state(&self, zone_id: ZoneId) -> String { - // TODO: change nomenclature - format!("{}/sections/{}/state", self.prefix.as_ref(), zone_id) - } - - pub fn programs(&self) -> String { - format!("{}/programs", self.prefix.as_ref()) - } - - pub fn program_data(&self, program_id: ProgramId) -> String { - format!("{}/programs/{}", self.prefix.as_ref(), program_id) - } - - pub fn program_running(&self, program_id: ProgramId) -> String { - format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) - } - - pub fn program_next_run(&self, program_id: ProgramId) -> String { - // TODO: reconcile naming convention - format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id) + pub fn programs(&self) -> ProgramTopics { + ProgramTopics::new(self.0.as_ref()) } pub fn zone_runner(&self) -> String { // TODO: change nomenclature - format!("{}/section_runner", self.prefix.as_ref()) + format!("{}/section_runner", self.0.as_ref()) } pub fn requests(&self) -> String { - format!("{}/requests", self.prefix.as_ref()) + format!("{}/requests", self.0.as_ref()) } pub fn responses(&self) -> String { - format!("{}/responses", self.prefix.as_ref()) + format!("{}/responses", self.0.as_ref()) } } diff --git a/sprinklers_mqtt/src/update_listener.rs b/sprinklers_mqtt/src/update_listener.rs index c2b282e..8ceb49d 100644 --- a/sprinklers_mqtt/src/update_listener.rs +++ b/sprinklers_mqtt/src/update_listener.rs @@ -45,28 +45,21 @@ impl StreamHandler for UpdateListenerActor { let old_zones = self.old_zones.replace(zones.clone()); let fut = async move { - mqtt_interface.publish_zones(&zones).await?; - for zone_id in zones.keys() { - mqtt_interface.publish_zone_state(*zone_id, false).await?; - } - - match old_zones { - None => { - mqtt_interface.publish_zones(&zones).await?; - - // Some what of a hack - // Initialize zone running states to false the first time we - // receive zones - for zone_id in zones.keys() { - mqtt_interface.publish_zone_state(*zone_id, false).await?; - } - } - Some(old_zones) => { + if old_zones.is_none() { + // Some what of a hack + // Initialize zone running states to false the first time we + // receive zones + for zone_id in zones.keys() { mqtt_interface - .publish_zones_diff(&old_zones, &zones) + .zones() + .publish_state(*zone_id, false) .await?; } } + mqtt_interface + .zones() + .publish_diff(old_zones.as_ref(), &zones) + .await?; Ok(()) } .unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err)); @@ -93,7 +86,7 @@ impl StreamHandler> for UpdateListenerAc } { let mut mqtt_interface = self.mqtt_interface.clone(); let fut = async move { - if let Err(err) = mqtt_interface.publish_zone_state(zone_id, state).await { + if let Err(err) = mqtt_interface.zones().publish_state(zone_id, state).await { warn!("could not publish zone state: {}", err); } }; @@ -133,7 +126,8 @@ impl StreamHandler> for UpdateListene match publish { Publish::Running(running) => { if let Err(err) = mqtt_interface - .publish_program_running(program_id, running) + .programs() + .publish_running(program_id, running) .await { warn!("could not publish program running: {}", err); @@ -141,7 +135,8 @@ impl StreamHandler> for UpdateListene } Publish::NextRun(next_run) => { if let Err(err) = mqtt_interface - .publish_program_next_run(program_id, next_run) + .programs() + .publish_next_run(program_id, next_run) .await { warn!("could not publish program next run: {}", err); @@ -172,25 +167,18 @@ impl StreamHandler for UpdateListenerActor { let old_programs = self.old_programs.replace(programs.clone()); let fut = async move { - match old_programs { - None => { - mqtt_interface.publish_programs(&programs).await?; - - // Some what of a hack - // Initialize program running states to false the first time we - // receive programs - for program_id in programs.keys() { - mqtt_interface - .publish_program_running(*program_id, false) - .await?; - } - } - Some(old_programs) => { - mqtt_interface - .publish_programs_diff(&old_programs, &programs) - .await?; + let mut mqtt_progs = mqtt_interface.programs(); + if old_programs.is_none() { + // Some what of a hack + // Initialize program running states to false the first time we + // receive programs + for program_id in programs.keys() { + mqtt_progs.publish_running(*program_id, false).await?; } } + mqtt_progs + .publish_diff(old_programs.as_ref(), &programs) + .await?; Ok(()) } .unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err));