Better abstraction over MQTT collectiosn

This commit is contained in:
Alex Mikhalev 2020-12-24 16:03:53 -07:00
parent 4eb2043ad7
commit 691f14b79a
4 changed files with 221 additions and 196 deletions

View File

@ -20,6 +20,7 @@ chrono = "0.4.15"
num-traits = "0.2.12" num-traits = "0.2.12"
num-derive = "0.3.2" num-derive = "0.3.2"
futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] } futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] }
im = "15.0.0"
[dependencies.tokio] [dependencies.tokio]
version = "0.2.22" version = "0.2.22"

View File

@ -8,15 +8,16 @@ mod zone_runner_json;
pub use request::RequestContext; pub use request::RequestContext;
pub use update_listener::UpdateListener; pub use update_listener::UpdateListener;
use self::topics::Topics; use self::topics::{CollectionTopics, Topics};
use sprinklers_actors::zone_runner::ZoneRunnerState; use sprinklers_actors::zone_runner::ZoneRunnerState;
use sprinklers_core::model::{Program, ProgramId, Programs, Zone, ZoneId, Zones}; use sprinklers_core::model::{ProgramId, Programs, ZoneId, Zones};
use zone_runner_json::ZoneRunnerStateJson; use zone_runner_json::ZoneRunnerStateJson;
use actix::{Actor, Addr}; use actix::{Actor, Addr};
use eyre::WrapErr; use eyre::WrapErr;
use rumqttc::{LastWill, MqttOptions, QoS}; use rumqttc::{LastWill, MqttOptions, QoS};
use std::{ use std::{
marker::PhantomData,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::Arc,
}; };
@ -73,119 +74,12 @@ impl MqttInterface {
self.client.cancel().await self.client.cancel().await
} }
pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> { pub fn zones(&mut self) -> MqttCollection<'_, topics::ZoneTopics, Zones> {
let zone_ids: Vec<_> = zones.keys().cloned().collect(); MqttCollection::new(self)
self.publish_zone_ids(&zone_ids).await?;
for zone in zones.values() {
self.publish_zone(zone).await?;
}
Ok(())
} }
// TODO: figure out how to share logic with publish_programs_diff and publish_zones pub fn programs(&mut self) -> MqttCollection<'_, topics::ProgramTopics, Programs> {
pub async fn publish_zones_diff( MqttCollection::new(self)
&mut self,
old_zones: &Zones,
zones: &Zones,
) -> eyre::Result<()> {
for (id, zone) in zones {
let publish = match old_zones.get(id) {
Some(old_zone) => !Arc::ptr_eq(old_zone, zone),
None => {
let zone_ids: Vec<_> = zones.keys().cloned().collect();
self.publish_zone_ids(&zone_ids).await?;
true
}
};
if publish {
self.publish_zone(zone).await?;
}
}
Ok(())
}
pub async fn publish_zone_ids(&mut self, zone_ids: &[ZoneId]) -> eyre::Result<()> {
self.publish_data(self.topics.zones(), &zone_ids)
.await
.wrap_err("failed to publish zone ids")?;
Ok(())
}
pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> {
self.publish_data(self.topics.zone_data(zone.id), zone)
.await
.wrap_err("failed to publish zone")
}
// Zone state can be derived from zone runner state...
pub async fn publish_zone_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> {
self.publish_data(self.topics.zone_state(zone_id), &state)
.await
.wrap_err("failed to publish zone state")
}
pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> {
let program_ids: Vec<_> = programs.keys().cloned().collect();
self.publish_program_ids(&program_ids).await?;
for program in programs.values() {
self.publish_program(program).await?;
}
Ok(())
}
pub async fn publish_program_ids(&mut self, program_ids: &[ProgramId]) -> eyre::Result<()> {
self.publish_data(self.topics.programs(), &program_ids)
.await
.wrap_err("failed to publish program ids")?;
Ok(())
}
pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> {
self.publish_data(self.topics.program_data(program.id), &program)
.await
.wrap_err("failed to publish program")
}
pub async fn publish_programs_diff(
&mut self,
old_programs: &Programs,
programs: &Programs,
) -> eyre::Result<()> {
for (id, program) in programs {
let publish = match old_programs.get(id) {
Some(old_program) => !Arc::ptr_eq(old_program, program),
None => {
let program_ids: Vec<_> = programs.keys().cloned().collect();
self.publish_program_ids(&program_ids).await?;
true
}
};
if publish {
self.publish_program(program).await?;
}
}
Ok(())
}
pub async fn publish_program_running(
&mut self,
program_id: ProgramId,
running: bool,
) -> eyre::Result<()> {
self.publish_data(self.topics.program_running(program_id), &running)
.await
.wrap_err("failed to publish program running")
}
pub async fn publish_program_next_run(
&mut self,
program_id: ProgramId,
next_run: chrono::DateTime<chrono::Local>,
) -> eyre::Result<()> {
let payload = next_run.to_rfc3339();
self.publish_data(self.topics.program_next_run(program_id), &payload)
.await
.wrap_err("failed to publish program next run")
} }
pub async fn publish_zone_runner(&mut self, sr_state: &ZoneRunnerState) -> eyre::Result<()> { pub async fn publish_zone_runner(&mut self, sr_state: &ZoneRunnerState) -> eyre::Result<()> {
@ -214,6 +108,118 @@ impl MqttInterface {
} }
} }
pub struct MqttCollection<'a, T, U> {
client: &'a mut rumqttc::AsyncClient,
topics: T,
collection: PhantomData<U>,
}
impl<'a, T: CollectionTopics<'a>, U> MqttCollection<'a, T, U> {
fn new(interface: &'a mut MqttInterface) -> Self {
Self {
client: &mut interface.client,
topics: T::new(interface.topics.prefix()),
collection: PhantomData,
}
}
async fn publish<P: serde::Serialize>(
&mut self,
topic: String,
payload: &P,
) -> eyre::Result<()> {
let payload_vec =
serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?;
self.client
.publish(topic, QoS::AtLeastOnce, true, payload_vec)
.await
.wrap_err("failed to publish")?;
Ok(())
}
async fn publish_ids_impl(&mut self, ids: &[u32]) -> eyre::Result<()> {
self.publish(self.topics.ids(), &ids).await?;
Ok(())
}
async fn publish_data<V: serde::Serialize>(&mut self, id: u32, item: &V) -> eyre::Result<()> {
self.publish(self.topics.data(id), item).await?;
Ok(())
}
}
impl<'a, T: CollectionTopics<'a>, V: serde::Serialize>
MqttCollection<'a, T, im::OrdMap<u32, Arc<V>>>
{
async fn publish_ids(&mut self, items: &im::OrdMap<u32, Arc<V>>) -> eyre::Result<()> {
let ids: Vec<u32> = items.keys().cloned().collect();
self.publish_ids_impl(&ids).await
}
pub async fn publish_diff(
&mut self,
old_values: Option<&im::OrdMap<u32, Arc<V>>>,
new_values: &im::OrdMap<u32, Arc<V>>,
) -> eyre::Result<()> {
let mut published_ids = false;
for (id, value) in new_values {
let new_value_different = old_values
.and_then(|old_values| old_values.get(id))
.map(|old_value| !Arc::ptr_eq(old_value, value));
let publish_value = if let Some(different) = new_value_different {
different
} else {
// old value does not exist
if !published_ids {
self.publish_ids(new_values).await?;
published_ids = true;
}
true
};
if publish_value {
self.publish_data(*id, &**value).await?;
}
}
Ok(())
}
pub async fn publish_all(&mut self, values: &im::OrdMap<u32, Arc<V>>) -> eyre::Result<()> {
self.publish_diff(None, values).await
}
}
impl<'a> MqttCollection<'a, topics::ZoneTopics<'a>, Zones> {
// Zone state can be derived from zone runner state...
pub async fn publish_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> {
self.publish(self.topics.state(zone_id), &state)
.await
.wrap_err("failed to publish zone state")
}
}
impl<'a> MqttCollection<'a, topics::ProgramTopics<'a>, Programs> {
pub async fn publish_running(
&mut self,
program_id: ProgramId,
running: bool,
) -> eyre::Result<()> {
self.publish(self.topics.running(program_id), &running)
.await
.wrap_err("failed to publish program running")
}
pub async fn publish_next_run(
&mut self,
program_id: ProgramId,
next_run: chrono::DateTime<chrono::Local>,
) -> eyre::Result<()> {
let payload = next_run.to_rfc3339();
self.publish(self.topics.next_run(program_id), &payload)
.await
.wrap_err("failed to publish program next run")
}
}
pub struct MqttInterfaceTask { pub struct MqttInterfaceTask {
interface: MqttInterface, interface: MqttInterface,
addr: Addr<actor::MqttActor>, addr: Addr<actor::MqttActor>,

View File

@ -1,67 +1,97 @@
use sprinklers_core::model::{ProgramId, ZoneId}; pub trait CollectionTopics<'t> {
fn new(prefix: &'t str) -> Self;
#[derive(Clone, Debug)] fn ids(&self) -> String;
pub struct Topics<T> fn data(&self, id: u32) -> String;
where
T: AsRef<str>,
{
prefix: T,
} }
impl<T> Topics<T> #[derive(Clone, Debug)]
where pub struct ZoneTopics<'a>(pub &'a str);
T: AsRef<str>,
{ impl<'a> CollectionTopics<'a> for ZoneTopics<'a> {
fn new(prefix: &'a str) -> Self {
ZoneTopics(prefix)
}
fn ids(&self) -> String {
// TODO: change nomenclature
format!("{}/sections", self.0)
}
fn data(&self, zone_id: u32) -> String {
// TODO: change nomenclature
format!("{}/sections/{}", self.0, zone_id)
}
}
impl<'a> ZoneTopics<'a> {
pub fn state(&self, zone_id: u32) -> String {
// TODO: change nomenclature
format!("{}/sections/{}/state", self.0, zone_id)
}
}
#[derive(Clone, Debug)]
pub struct ProgramTopics<'a>(pub &'a str);
impl<'a> CollectionTopics<'a> for ProgramTopics<'a> {
fn new(prefix: &'a str) -> Self {
ProgramTopics(prefix)
}
fn ids(&self) -> String {
format!("{}/programs", self.0)
}
fn data(&self, zone_id: u32) -> String {
format!("{}/programs/{}", self.0, zone_id)
}
}
impl<'a> ProgramTopics<'a> {
pub fn running(&self, zone_id: u32) -> String {
format!("{}/programs/{}/running", self.0, zone_id)
}
pub fn next_run(&self, zone_id: u32) -> String {
// TODO: reconcile naming convention
format!("{}/programs/{}/nextRun", self.0, zone_id)
}
}
#[derive(Clone, Debug)]
pub struct Topics<T: AsRef<str>>(pub T);
impl<T: AsRef<str>> Topics<T> {
pub fn new(prefix: T) -> Self { pub fn new(prefix: T) -> Self {
Self { prefix } Self(prefix)
}
pub fn prefix(&self) -> &str {
self.0.as_ref()
} }
pub fn connected(&self) -> String { pub fn connected(&self) -> String {
format!("{}/connected", self.prefix.as_ref()) format!("{}/connected", self.0.as_ref())
} }
pub fn zones(&self) -> String { pub fn zones(&self) -> ZoneTopics {
// TODO: change nomenclature ZoneTopics::new(self.0.as_ref())
format!("{}/sections", self.prefix.as_ref())
} }
pub fn zone_data(&self, zone_id: ZoneId) -> String { pub fn programs(&self) -> ProgramTopics {
// TODO: change nomenclature ProgramTopics::new(self.0.as_ref())
format!("{}/sections/{}", self.prefix.as_ref(), zone_id)
}
pub fn zone_state(&self, zone_id: ZoneId) -> String {
// TODO: change nomenclature
format!("{}/sections/{}/state", self.prefix.as_ref(), zone_id)
}
pub fn programs(&self) -> String {
format!("{}/programs", self.prefix.as_ref())
}
pub fn program_data(&self, program_id: ProgramId) -> String {
format!("{}/programs/{}", self.prefix.as_ref(), program_id)
}
pub fn program_running(&self, program_id: ProgramId) -> String {
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id)
}
pub fn program_next_run(&self, program_id: ProgramId) -> String {
// TODO: reconcile naming convention
format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id)
} }
pub fn zone_runner(&self) -> String { pub fn zone_runner(&self) -> String {
// TODO: change nomenclature // TODO: change nomenclature
format!("{}/section_runner", self.prefix.as_ref()) format!("{}/section_runner", self.0.as_ref())
} }
pub fn requests(&self) -> String { pub fn requests(&self) -> String {
format!("{}/requests", self.prefix.as_ref()) format!("{}/requests", self.0.as_ref())
} }
pub fn responses(&self) -> String { pub fn responses(&self) -> String {
format!("{}/responses", self.prefix.as_ref()) format!("{}/responses", self.0.as_ref())
} }
} }

View File

@ -45,28 +45,21 @@ impl StreamHandler<Zones> for UpdateListenerActor {
let old_zones = self.old_zones.replace(zones.clone()); let old_zones = self.old_zones.replace(zones.clone());
let fut = async move { let fut = async move {
mqtt_interface.publish_zones(&zones).await?; if old_zones.is_none() {
for zone_id in zones.keys() { // Some what of a hack
mqtt_interface.publish_zone_state(*zone_id, false).await?; // Initialize zone running states to false the first time we
} // receive zones
for zone_id in zones.keys() {
match old_zones {
None => {
mqtt_interface.publish_zones(&zones).await?;
// Some what of a hack
// Initialize zone running states to false the first time we
// receive zones
for zone_id in zones.keys() {
mqtt_interface.publish_zone_state(*zone_id, false).await?;
}
}
Some(old_zones) => {
mqtt_interface mqtt_interface
.publish_zones_diff(&old_zones, &zones) .zones()
.publish_state(*zone_id, false)
.await?; .await?;
} }
} }
mqtt_interface
.zones()
.publish_diff(old_zones.as_ref(), &zones)
.await?;
Ok(()) Ok(())
} }
.unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err)); .unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err));
@ -93,7 +86,7 @@ impl StreamHandler<Result<ZoneEvent, broadcast::RecvError>> for UpdateListenerAc
} { } {
let mut mqtt_interface = self.mqtt_interface.clone(); let mut mqtt_interface = self.mqtt_interface.clone();
let fut = async move { let fut = async move {
if let Err(err) = mqtt_interface.publish_zone_state(zone_id, state).await { if let Err(err) = mqtt_interface.zones().publish_state(zone_id, state).await {
warn!("could not publish zone state: {}", err); warn!("could not publish zone state: {}", err);
} }
}; };
@ -133,7 +126,8 @@ impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListene
match publish { match publish {
Publish::Running(running) => { Publish::Running(running) => {
if let Err(err) = mqtt_interface if let Err(err) = mqtt_interface
.publish_program_running(program_id, running) .programs()
.publish_running(program_id, running)
.await .await
{ {
warn!("could not publish program running: {}", err); warn!("could not publish program running: {}", err);
@ -141,7 +135,8 @@ impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListene
} }
Publish::NextRun(next_run) => { Publish::NextRun(next_run) => {
if let Err(err) = mqtt_interface if let Err(err) = mqtt_interface
.publish_program_next_run(program_id, next_run) .programs()
.publish_next_run(program_id, next_run)
.await .await
{ {
warn!("could not publish program next run: {}", err); warn!("could not publish program next run: {}", err);
@ -172,25 +167,18 @@ impl StreamHandler<Programs> for UpdateListenerActor {
let old_programs = self.old_programs.replace(programs.clone()); let old_programs = self.old_programs.replace(programs.clone());
let fut = async move { let fut = async move {
match old_programs { let mut mqtt_progs = mqtt_interface.programs();
None => { if old_programs.is_none() {
mqtt_interface.publish_programs(&programs).await?; // Some what of a hack
// Initialize program running states to false the first time we
// Some what of a hack // receive programs
// Initialize program running states to false the first time we for program_id in programs.keys() {
// receive programs mqtt_progs.publish_running(*program_id, false).await?;
for program_id in programs.keys() {
mqtt_interface
.publish_program_running(*program_id, false)
.await?;
}
}
Some(old_programs) => {
mqtt_interface
.publish_programs_diff(&old_programs, &programs)
.await?;
} }
} }
mqtt_progs
.publish_diff(old_programs.as_ref(), &programs)
.await?;
Ok(()) Ok(())
} }
.unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err)); .unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err));