From 2270c69f2b4bcab079da71634fb06573f44be452 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Sep 2020 16:54:40 -0600 Subject: [PATCH] Add support for publishing sections --- Cargo.toml | 4 + src/main.rs | 11 +++ src/model/program.rs | 3 +- src/model/section.rs | 6 +- src/mqtt_interface.rs | 199 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 220 insertions(+), 3 deletions(-) create mode 100644 src/mqtt_interface.rs diff --git a/Cargo.toml b/Cargo.toml index 64d79a9..8ca179a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,10 @@ assert_matches = "1.3.0" serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.57" +[dependencies.rumqttc] +git = "https://github.com/bytebeamio/rumqtt.git" +rev = "814dc891" + [dependencies.tracing-subscriber] version = "0.2.11" default-features = false diff --git a/src/main.rs b/src/main.rs index 47fc14f..9e45647 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ mod database; mod model; +mod mqtt_interface; mod option_future; mod program_runner; mod schedule; @@ -53,7 +54,16 @@ async fn main() -> Result<()> { info!(program = debug(&prog), "read program"); } + let mqtt_options = mqtt_interface::Options { + broker_host: "localhost".into(), + broker_port: 1883, + device_id: "sprinklers_rs-0001".into(), + client_id: "sprinklers_rs-0001".into(), + }; + let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; + program_runner.update_sections(sections.clone()).await?; + mqtt_interface.publish_sections(§ions).await?; program_runner.update_programs(programs.clone()).await?; info!("sprinklers_rs initialized"); @@ -61,6 +71,7 @@ async fn main() -> Result<()> { tokio::signal::ctrl_c().await?; info!("Interrupt received, shutting down"); + mqtt_interface.quit().await?; program_runner.quit().await?; section_runner.quit().await?; tokio::task::yield_now().await; diff --git a/src/model/program.rs b/src/model/program.rs index 0e47658..5b45cee 100644 --- a/src/model/program.rs +++ b/src/model/program.rs @@ -37,7 +37,8 @@ pub type ProgramSequence = Vec; pub type ProgramId = u32; -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Program { pub id: ProgramId, pub name: String, diff --git a/src/model/section.rs b/src/model/section.rs index f847718..60c2f03 100644 --- a/src/model/section.rs +++ b/src/model/section.rs @@ -1,19 +1,21 @@ //! Data models for sprinklers sections //! //! A section represents a group of sprinkler heads actuated by a single -//! valve. Physically controllable (or virtual) valves are handled by implementations of +//! valve. Physically controllable (or virtual) valves are handled by implementations of //! [SectionInterface](../../section_interface/trait.SectionInterface.html), but the model //! describes a logical section and how it maps to a physical one. use crate::section_interface::SecId; use rusqlite::{Error as SqlError, Row as SqlRow, ToSql}; +use serde::{Deserialize, Serialize}; use std::sync::Arc; /// Identifying integer type for a Section pub type SectionId = u32; /// A single logical section -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] pub struct Section { pub id: SectionId, pub name: String, diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs new file mode 100644 index 0000000..60d1787 --- /dev/null +++ b/src/mqtt_interface.rs @@ -0,0 +1,199 @@ +use crate::model::{Section, SectionId, Sections}; +use eyre::WrapErr; +use rumqttc::{LastWill, MqttOptions, QoS}; +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, + time::Duration, +}; +use tokio::{task::JoinHandle, time::delay_for}; +use tracing::{debug, trace, warn, info}; + +#[derive(Clone, Debug)] +struct Topics +where + T: AsRef, +{ + prefix: T, +} + +impl Topics +where + T: AsRef, +{ + fn new(prefix: T) -> Self { + Self { prefix } + } + + fn connected(&self) -> String { + format!("{}/connected", self.prefix.as_ref()) + } + + fn sections(&self) -> String { + format!("{}/sections", self.prefix.as_ref()) + } + + fn section_data(&self, section_id: SectionId) -> String { + format!("{}/sections/{}", self.prefix.as_ref(), section_id) + } +} + +#[derive(Clone, Debug)] +pub struct Options { + pub broker_host: String, + pub broker_port: u16, + pub device_id: String, + pub client_id: String, +} + +async fn event_loop_task( + mut interface: MqttInterface, + mut event_loop: rumqttc::EventLoop, +) -> eyre::Result<()> { + use rumqttc::{ConnectionError, Event}; + loop { + match event_loop.poll().await { + Ok(Event::Incoming(incoming)) => { + debug!(incoming = debug(&incoming), "MQTT incoming message"); + #[allow(clippy::single_match)] + match incoming { + rumqttc::Packet::ConnAck(_) => { + info!("MQTT connected"); + interface.publish_connected(true).await?; + } + _ => {} + } + } + Ok(Event::Outgoing(outgoing)) => { + trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); + } + Err(ConnectionError::Cancel) => { + debug!("MQTT disconnecting"); + break; + } + Err(err) => { + let reconnect_timeout = Duration::from_secs(5); + warn!( + "MQTT error, will reconnect in {:?}: {}", + reconnect_timeout, err + ); + delay_for(reconnect_timeout).await; + } + } + } + Ok(()) +} + +#[derive(Clone)] +pub struct MqttInterface { + client: rumqttc::AsyncClient, + topics: Topics>, +} + +impl MqttInterface { + fn new(options: Options) -> (Self, rumqttc::EventLoop) { + let mqtt_prefix = format!("devices/{}", options.device_id); + let topics: Topics> = Topics::new(mqtt_prefix.into()); + let mut mqtt_opts = + MqttOptions::new(options.client_id, options.broker_host, options.broker_port); + + let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); + mqtt_opts.set_last_will(last_will); + + let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 32); + + (Self { client, topics }, event_loop) + } + + pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { + let section_ids: Vec<_> = sections.keys().cloned().collect(); + let section_ids_payload = serde_json::to_vec(§ion_ids)?; + self.client + .publish( + self.topics.sections(), + QoS::AtLeastOnce, + true, + section_ids_payload, + ) + .await?; + for section in sections.values() { + self.publish_section(section).await?; + } + Ok(()) + } + + pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { + let payload = serde_json::to_vec(section).wrap_err("failed to serialize section")?; + self.client + .publish( + self.topics.section_data(section.id), + QoS::AtLeastOnce, + true, + payload, + ) + .await?; + Ok(()) + } + + async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { + self.client + .publish( + self.topics.connected(), + QoS::AtLeastOnce, + true, + connected.to_string(), + ) + .await + .wrap_err("failed to publish connected topic")?; + Ok(()) + } + + async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { + self.client.cancel().await + } +} + +pub struct MqttInterfaceTask { + interface: MqttInterface, + join_handle: JoinHandle<()>, +} + +impl MqttInterfaceTask { + pub async fn start(options: Options) -> eyre::Result { + let (interface, event_loop) = MqttInterface::new(options); + + let join_handle = tokio::spawn({ + let interface = interface.clone(); + async move { + event_loop_task(interface, event_loop) + .await + .expect("error in event loop task") + } + }); + + Ok(Self { + interface, + join_handle, + }) + } + + pub async fn quit(mut self) -> eyre::Result<()> { + self.interface.cancel().await?; + self.join_handle.await?; + Ok(()) + } +} + +impl Deref for MqttInterfaceTask { + type Target = MqttInterface; + + fn deref(&self) -> &Self::Target { + &self.interface + } +} + +impl DerefMut for MqttInterfaceTask { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.interface + } +}