From 239a289e4e82de937001d24282793eec31de4bf7 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Sep 2020 17:05:29 -0600 Subject: [PATCH] Publish programs --- src/main.rs | 1 + src/mqtt_interface.rs | 54 ++++++++++++++++++++++++++++++++++++------- 2 files changed, 47 insertions(+), 8 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9e45647..34df3f9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> Result<()> { program_runner.update_sections(sections.clone()).await?; mqtt_interface.publish_sections(§ions).await?; program_runner.update_programs(programs.clone()).await?; + mqtt_interface.publish_programs(&programs).await?; info!("sprinklers_rs initialized"); diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs index 60d1787..0a1ee9f 100644 --- a/src/mqtt_interface.rs +++ b/src/mqtt_interface.rs @@ -1,4 +1,4 @@ -use crate::model::{Section, SectionId, Sections}; +use crate::model::{Section, SectionId, Sections, Programs, Program, ProgramId}; use eyre::WrapErr; use rumqttc::{LastWill, MqttOptions, QoS}; use std::{ @@ -36,6 +36,14 @@ where fn section_data(&self, section_id: SectionId) -> String { format!("{}/sections/{}", self.prefix.as_ref(), section_id) } + + fn programs(&self) -> String { + format!("{}/programs", self.prefix.as_ref()) + } + + fn program_data(&self, program_id: ProgramId) -> String { + format!("{}/programs/{}", self.prefix.as_ref(), program_id) + } } #[derive(Clone, Debug)] @@ -105,6 +113,23 @@ impl MqttInterface { (Self { client, topics }, event_loop) } + async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { + self.client + .publish( + self.topics.connected(), + QoS::AtLeastOnce, + true, + connected.to_string(), + ) + .await + .wrap_err("failed to publish connected topic")?; + Ok(()) + } + + async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { + self.client.cancel().await + } + pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { let section_ids: Vec<_> = sections.keys().cloned().collect(); let section_ids_payload = serde_json::to_vec(§ion_ids)?; @@ -135,21 +160,34 @@ impl MqttInterface { Ok(()) } - async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { + pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { + let program_ids: Vec<_> = programs.keys().cloned().collect(); + let program_ids_payload = serde_json::to_vec(&program_ids)?; self.client .publish( - self.topics.connected(), + self.topics.programs(), QoS::AtLeastOnce, true, - connected.to_string(), + program_ids_payload, ) - .await - .wrap_err("failed to publish connected topic")?; + .await?; + for program in programs.values() { + self.publish_program(program).await?; + } Ok(()) } - async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { - self.client.cancel().await + pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { + let payload = serde_json::to_vec(program).wrap_err("failed to serialize program")?; + self.client + .publish( + self.topics.program_data(program.id), + QoS::AtLeastOnce, + true, + payload, + ) + .await?; + Ok(()) } }