diff --git a/src/main.rs b/src/main.rs index 6c61ce5..3a4a7f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ mod database; mod model; -mod mqtt_interface; +mod mqtt; mod option_future; mod program_runner; mod schedule; @@ -57,13 +57,13 @@ async fn main() -> Result<()> { debug!(program = debug(&prog), "read program"); } - let mqtt_options = mqtt_interface::Options { + let mqtt_options = mqtt::Options { broker_host: "localhost".into(), broker_port: 1883, device_id: "sprinklers_rs-0001".into(), client_id: "sprinklers_rs-0001".into(), }; - let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options); + let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options); let update_listener = { let section_events = section_runner.subscribe().await?; diff --git a/src/mqtt/actor.rs b/src/mqtt/actor.rs new file mode 100644 index 0000000..e07c4e1 --- /dev/null +++ b/src/mqtt/actor.rs @@ -0,0 +1,96 @@ +use super::{event_loop::EventLoopTask, MqttInterface}; +use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; +use tokio::sync::oneshot; +use tracing::{debug, error, info, trace, warn}; + +pub(super) struct MqttActor { + interface: MqttInterface, + event_loop: Option, + quit_rx: Option>, +} + +impl MqttActor { + pub(super) fn new(interface: MqttInterface, event_loop: rumqttc::EventLoop) -> Self { + Self { + interface, + event_loop: Some(event_loop), + quit_rx: None, + } + } +} + +impl Actor for MqttActor { + type Context = actix::Context; + + fn started(&mut self, ctx: &mut Self::Context) { + trace!("MqttActor starting"); + let event_loop = self.event_loop.take().expect("MqttActor already started"); + let (quit_tx, quit_rx) = oneshot::channel(); + ctx.spawn( + EventLoopTask::new(event_loop, ctx.address(), quit_tx) + .run() + .into_actor(self), + ); + self.quit_rx = Some(quit_rx); + } +} + +#[derive(actix::Message)] +#[rtype(result = "()")] +pub(super) struct Quit; + +impl Handler for MqttActor { + type Result = actix::ResponseActFuture; + fn handle(&mut self, _msg: Quit, _ctx: &mut Self::Context) -> Self::Result { + let mut interface = self.interface.clone(); + let quit_rx = self.quit_rx.take().expect("MqttActor has already quit!"); + let fut = async move { + interface + .cancel() + .await + .expect("could not cancel MQTT client"); + let _ = quit_rx.await; + } + .into_actor(self) + .map(|_, _, ctx| ctx.stop()); + Box::pin(fut) + } +} + +#[derive(actix::Message)] +#[rtype(result = "()")] +pub(super) struct Connected; + +impl Handler for MqttActor { + type Result = (); + + fn handle(&mut self, _msg: Connected, ctx: &mut Self::Context) -> Self::Result { + info!("MQTT connected"); + let mut interface = self.interface.clone(); + let fut = async move { + let res = interface.publish_connected(true).await; + let res = res.and(interface.subscribe_requests().await); + if let Err(err) = res { + error!("error in connection setup: {}", err); + } + }; + ctx.spawn(fut.into_actor(self)); + } +} + +#[derive(actix::Message)] +#[rtype(result = "()")] +pub(super) struct PubRecieve(pub(super) rumqttc::Publish); + +impl Handler for MqttActor { + type Result = (); + + fn handle(&mut self, msg: PubRecieve, _ctx: &mut Self::Context) -> Self::Result { + let topic = &msg.0.topic; + if topic == &self.interface.topics.requests() { + debug!("received request: {:?}", msg.0); + } else { + warn!("received on unknown topic: {:?}", topic); + } + } +} diff --git a/src/mqtt/event_loop.rs b/src/mqtt/event_loop.rs new file mode 100644 index 0000000..acd96e7 --- /dev/null +++ b/src/mqtt/event_loop.rs @@ -0,0 +1,82 @@ +use super::actor::{self, MqttActor}; +use actix::Addr; +use rumqttc::{Packet, QoS}; +use std::{collections::HashSet, time::Duration}; +use tokio::sync::oneshot; +use tracing::{debug, trace, warn}; + +pub(super) struct EventLoopTask { + event_loop: rumqttc::EventLoop, + mqtt_addr: Addr, + quit_tx: oneshot::Sender<()>, + unreleased_pubs: HashSet, +} + +impl EventLoopTask { + pub(super) fn new( + event_loop: rumqttc::EventLoop, + mqtt_addr: Addr, + quit_tx: oneshot::Sender<()>, + ) -> Self { + Self { + event_loop, + mqtt_addr, + quit_tx, + unreleased_pubs: HashSet::default(), + } + } + + fn handle_incoming(&mut self, incoming: Packet) { + trace!(incoming = debug(&incoming), "MQTT incoming message"); + #[allow(clippy::single_match)] + match incoming { + Packet::ConnAck(_) => { + self.mqtt_addr.do_send(actor::Connected); + } + Packet::Publish(publish) => { + // Only deliver QoS 2 packets once + let deliver = if publish.qos == QoS::ExactlyOnce { + if self.unreleased_pubs.contains(&publish.pkid) { + false + } else { + self.unreleased_pubs.insert(publish.pkid); + true + } + } else { + true + }; + if deliver { + self.mqtt_addr.do_send(actor::PubRecieve(publish)); + } + } + Packet::PubRel(pubrel) => { + self.unreleased_pubs.remove(&pubrel.pkid); + } + _ => {} + } + } + + pub(super) async fn run(mut self) { + use rumqttc::{ConnectionError, Event}; + let reconnect_timeout = Duration::from_secs(5); + self.event_loop.set_reconnection_delay(reconnect_timeout); + loop { + match self.event_loop.poll().await { + Ok(Event::Incoming(incoming)) => { + self.handle_incoming(incoming); + } + Ok(Event::Outgoing(outgoing)) => { + trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); + } + Err(ConnectionError::Cancel) => { + debug!("MQTT disconnecting"); + break; + } + Err(err) => { + warn!("MQTT error, reconnecting: {}", err); + } + } + } + let _ = self.quit_tx.send(()); + } +} diff --git a/src/mqtt/mod.rs b/src/mqtt/mod.rs new file mode 100644 index 0000000..091f22c --- /dev/null +++ b/src/mqtt/mod.rs @@ -0,0 +1,173 @@ +mod actor; +mod event_loop; +mod topics; + +use self::topics::Topics; +use crate::{ + model::{Program, ProgramId, Programs, Section, SectionId, Sections}, + section_runner::SecRunnerState, + section_runner_json::SecRunnerStateJson, +}; +use actix::{Actor, Addr}; +use eyre::WrapErr; +use rumqttc::{LastWill, MqttOptions, QoS}; +use std::{ + ops::{Deref, DerefMut}, + sync::Arc, +}; + +#[derive(Clone, Debug)] +pub struct Options { + pub broker_host: String, + pub broker_port: u16, + pub device_id: String, + pub client_id: String, +} + +#[derive(Clone)] +pub struct MqttInterface { + client: rumqttc::AsyncClient, + topics: Topics>, +} + +impl MqttInterface { + fn new(options: Options) -> (Self, rumqttc::EventLoop) { + let mqtt_prefix = format!("devices/{}", options.device_id); + let topics: Topics> = Topics::new(mqtt_prefix.into()); + let mut mqtt_opts = + MqttOptions::new(options.client_id, options.broker_host, options.broker_port); + + let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); + mqtt_opts.set_last_will(last_will); + + let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16); + + (Self { client, topics }, event_loop) + } + + async fn publish_data

(&mut self, topic: String, payload: &P) -> eyre::Result<()> + where + P: serde::Serialize, + { + let payload_vec = + serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?; + self.client + .publish(topic, QoS::AtLeastOnce, true, payload_vec) + .await + .wrap_err("failed to publish")?; + Ok(()) + } + + pub(super) async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { + self.publish_data(self.topics.connected(), &connected) + .await + .wrap_err("failed to publish connected topic") + } + + pub(super) async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { + self.client.cancel().await + } + + pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { + let section_ids: Vec<_> = sections.keys().cloned().collect(); + self.publish_data(self.topics.sections(), §ion_ids) + .await + .wrap_err("failed to publish section ids")?; + for section in sections.values() { + self.publish_section(section).await?; + } + Ok(()) + } + + pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { + self.publish_data(self.topics.section_data(section.id), section) + .await + .wrap_err("failed to publish section") + } + + // Section state can be derived from section runner state... + pub async fn publish_section_state( + &mut self, + section_id: SectionId, + state: bool, + ) -> eyre::Result<()> { + self.publish_data(self.topics.section_state(section_id), &state) + .await + .wrap_err("failed to publish section state") + } + + pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { + let program_ids: Vec<_> = programs.keys().cloned().collect(); + self.publish_data(self.topics.programs(), &program_ids) + .await + .wrap_err("failed to publish program ids")?; + for program in programs.values() { + self.publish_program(program).await?; + } + Ok(()) + } + + pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { + self.publish_data(self.topics.program_data(program.id), &program) + .await + .wrap_err("failed to publish program") + } + + pub async fn publish_program_running( + &mut self, + program_id: ProgramId, + running: bool, + ) -> eyre::Result<()> { + self.publish_data(self.topics.program_running(program_id), &running) + .await + .wrap_err("failed to publish program running") + } + + pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { + let json: SecRunnerStateJson = sr_state.into(); + self.publish_data(self.topics.section_runner(), &json) + .await + .wrap_err("failed to publish section runner") + } + + pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { + self.client + .subscribe(self.topics.requests(), QoS::ExactlyOnce) + .await?; + Ok(()) + } +} + +pub struct MqttInterfaceTask { + interface: MqttInterface, + addr: Addr, +} + +impl MqttInterfaceTask { + pub fn start(options: Options) -> Self { + let (interface, event_loop) = MqttInterface::new(options); + + let addr = actor::MqttActor::new(interface.clone(), event_loop).start(); + + Self { interface, addr } + } + + pub async fn quit(self) -> eyre::Result<()> { + self.addr.send(actor::Quit).await?; + Ok(()) + } +} + +impl Deref for MqttInterfaceTask { + type Target = MqttInterface; + + fn deref(&self) -> &Self::Target { + &self.interface + } +} + +impl DerefMut for MqttInterfaceTask { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.interface + } +} diff --git a/src/mqtt/topics.rs b/src/mqtt/topics.rs new file mode 100644 index 0000000..6df8e49 --- /dev/null +++ b/src/mqtt/topics.rs @@ -0,0 +1,58 @@ +use crate::model::{ProgramId, SectionId}; + +#[derive(Clone, Debug)] +pub struct Topics +where + T: AsRef, +{ + prefix: T, +} + +impl Topics +where + T: AsRef, +{ + pub fn new(prefix: T) -> Self { + Self { prefix } + } + + pub fn connected(&self) -> String { + format!("{}/connected", self.prefix.as_ref()) + } + + pub fn sections(&self) -> String { + format!("{}/sections", self.prefix.as_ref()) + } + + pub fn section_data(&self, section_id: SectionId) -> String { + format!("{}/sections/{}", self.prefix.as_ref(), section_id) + } + + pub fn section_state(&self, section_id: SectionId) -> String { + format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) + } + + pub fn programs(&self) -> String { + format!("{}/programs", self.prefix.as_ref()) + } + + pub fn program_data(&self, program_id: ProgramId) -> String { + format!("{}/programs/{}", self.prefix.as_ref(), program_id) + } + + pub fn program_running(&self, program_id: ProgramId) -> String { + format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) + } + + pub fn section_runner(&self) -> String { + format!("{}/section_runner", self.prefix.as_ref()) + } + + pub fn requests(&self) -> String { + format!("{}/requests", self.prefix.as_ref()) + } + + pub fn responses(&self) -> String { + format!("{}/responses", self.prefix.as_ref()) + } +} diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs deleted file mode 100644 index d775b9c..0000000 --- a/src/mqtt_interface.rs +++ /dev/null @@ -1,392 +0,0 @@ -use crate::{ - model::{Program, ProgramId, Programs, Section, SectionId, Sections}, - section_runner::SecRunnerState, - section_runner_json::SecRunnerStateJson, -}; -use actix::{Actor, ActorContext, ActorFuture, Addr, AsyncContext, Handler, WrapFuture}; -use eyre::WrapErr; -use rumqttc::{LastWill, MqttOptions, Packet, QoS}; -use std::{ - collections::HashSet, - ops::{Deref, DerefMut}, - sync::Arc, - time::Duration, -}; -use tokio::sync::oneshot; -use tracing::{debug, error, info, trace, warn}; - -#[derive(Clone, Debug)] -struct Topics -where - T: AsRef, -{ - prefix: T, -} - -impl Topics -where - T: AsRef, -{ - fn new(prefix: T) -> Self { - Self { prefix } - } - - fn connected(&self) -> String { - format!("{}/connected", self.prefix.as_ref()) - } - - fn sections(&self) -> String { - format!("{}/sections", self.prefix.as_ref()) - } - - fn section_data(&self, section_id: SectionId) -> String { - format!("{}/sections/{}", self.prefix.as_ref(), section_id) - } - - fn section_state(&self, section_id: SectionId) -> String { - format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) - } - - fn programs(&self) -> String { - format!("{}/programs", self.prefix.as_ref()) - } - - fn program_data(&self, program_id: ProgramId) -> String { - format!("{}/programs/{}", self.prefix.as_ref(), program_id) - } - - fn program_running(&self, program_id: ProgramId) -> String { - format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) - } - - fn section_runner(&self) -> String { - format!("{}/section_runner", self.prefix.as_ref()) - } - - fn requests(&self) -> String { - format!("{}/requests", self.prefix.as_ref()) - } - - fn responses(&self) -> String { - format!("{}/responses", self.prefix.as_ref()) - } -} - -struct EventLoopTask { - event_loop: rumqttc::EventLoop, - mqtt_addr: Addr, - quit_tx: oneshot::Sender<()>, - unreleased_pubs: HashSet, -} - -impl EventLoopTask { - fn new( - event_loop: rumqttc::EventLoop, - mqtt_addr: Addr, - quit_tx: oneshot::Sender<()>, - ) -> Self { - Self { - event_loop, - mqtt_addr, - quit_tx, - unreleased_pubs: HashSet::default(), - } - } - - fn handle_incoming(&mut self, incoming: Packet) { - trace!(incoming = debug(&incoming), "MQTT incoming message"); - #[allow(clippy::single_match)] - match incoming { - Packet::ConnAck(_) => { - self.mqtt_addr.do_send(Connected); - } - Packet::Publish(publish) => { - // Only deliver QoS 2 packets once - let deliver = if publish.qos == QoS::ExactlyOnce { - if self.unreleased_pubs.contains(&publish.pkid) { - false - } else { - self.unreleased_pubs.insert(publish.pkid); - true - } - } else { - true - }; - if deliver { - self.mqtt_addr.do_send(PubRecieve(publish)); - } - } - Packet::PubRel(pubrel) => { - self.unreleased_pubs.remove(&pubrel.pkid); - } - _ => {} - } - } - - async fn run(mut self) { - use rumqttc::{ConnectionError, Event}; - let reconnect_timeout = Duration::from_secs(5); - self.event_loop.set_reconnection_delay(reconnect_timeout); - loop { - match self.event_loop.poll().await { - Ok(Event::Incoming(incoming)) => { - self.handle_incoming(incoming); - } - Ok(Event::Outgoing(outgoing)) => { - trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); - } - Err(ConnectionError::Cancel) => { - debug!("MQTT disconnecting"); - break; - } - Err(err) => { - warn!("MQTT error, reconnecting: {}", err); - } - } - } - let _ = self.quit_tx.send(()); - } -} - -#[derive(Clone, Debug)] -pub struct Options { - pub broker_host: String, - pub broker_port: u16, - pub device_id: String, - pub client_id: String, -} - -#[derive(Clone)] -pub struct MqttInterface { - client: rumqttc::AsyncClient, - topics: Topics>, -} - -impl MqttInterface { - fn new(options: Options) -> (Self, rumqttc::EventLoop) { - let mqtt_prefix = format!("devices/{}", options.device_id); - let topics: Topics> = Topics::new(mqtt_prefix.into()); - let mut mqtt_opts = - MqttOptions::new(options.client_id, options.broker_host, options.broker_port); - - let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); - mqtt_opts.set_last_will(last_will); - - let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16); - - (Self { client, topics }, event_loop) - } - - async fn publish_data

(&mut self, topic: String, payload: &P) -> eyre::Result<()> - where - P: serde::Serialize, - { - let payload_vec = - serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?; - self.client - .publish(topic, QoS::AtLeastOnce, true, payload_vec) - .await - .wrap_err("failed to publish")?; - Ok(()) - } - - async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { - self.publish_data(self.topics.connected(), &connected) - .await - .wrap_err("failed to publish connected topic") - } - - async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { - self.client.cancel().await - } - - pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { - let section_ids: Vec<_> = sections.keys().cloned().collect(); - self.publish_data(self.topics.sections(), §ion_ids) - .await - .wrap_err("failed to publish section ids")?; - for section in sections.values() { - self.publish_section(section).await?; - } - Ok(()) - } - - pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { - self.publish_data(self.topics.section_data(section.id), section) - .await - .wrap_err("failed to publish section") - } - - // Section state can be derived from section runner state... - pub async fn publish_section_state( - &mut self, - section_id: SectionId, - state: bool, - ) -> eyre::Result<()> { - self.publish_data(self.topics.section_state(section_id), &state) - .await - .wrap_err("failed to publish section state") - } - - pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { - let program_ids: Vec<_> = programs.keys().cloned().collect(); - self.publish_data(self.topics.programs(), &program_ids) - .await - .wrap_err("failed to publish program ids")?; - for program in programs.values() { - self.publish_program(program).await?; - } - Ok(()) - } - - pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { - self.publish_data(self.topics.program_data(program.id), &program) - .await - .wrap_err("failed to publish program") - } - - pub async fn publish_program_running( - &mut self, - program_id: ProgramId, - running: bool, - ) -> eyre::Result<()> { - self.publish_data(self.topics.program_running(program_id), &running) - .await - .wrap_err("failed to publish program running") - } - - pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { - let json: SecRunnerStateJson = sr_state.into(); - self.publish_data(self.topics.section_runner(), &json) - .await - .wrap_err("failed to publish section runner") - } - - pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { - self.client - .subscribe(self.topics.requests(), QoS::ExactlyOnce) - .await?; - Ok(()) - } -} - -struct MqttActor { - interface: MqttInterface, - event_loop: Option, - quit_rx: Option>, -} - -impl MqttActor { - fn new(interface: MqttInterface, event_loop: rumqttc::EventLoop) -> Self { - Self { - interface, - event_loop: Some(event_loop), - quit_rx: None, - } - } -} - -impl Actor for MqttActor { - type Context = actix::Context; - - fn started(&mut self, ctx: &mut Self::Context) { - trace!("MqttActor starting"); - let event_loop = self.event_loop.take().expect("MqttActor already started"); - let (quit_tx, quit_rx) = oneshot::channel(); - ctx.spawn( - EventLoopTask::new(event_loop, ctx.address(), quit_tx) - .run() - .into_actor(self), - ); - self.quit_rx = Some(quit_rx); - } -} - -#[derive(actix::Message)] -#[rtype(result = "()")] -struct Quit; - -impl Handler for MqttActor { - type Result = actix::ResponseActFuture; - fn handle(&mut self, _msg: Quit, _ctx: &mut Self::Context) -> Self::Result { - let mut interface = self.interface.clone(); - let quit_rx = self.quit_rx.take().expect("MqttActor has already quit!"); - let fut = async move { - interface - .cancel() - .await - .expect("could not cancel MQTT client"); - let _ = quit_rx.await; - } - .into_actor(self) - .map(|_, _, ctx| ctx.stop()); - Box::pin(fut) - } -} - -#[derive(actix::Message)] -#[rtype(result = "()")] -struct Connected; - -impl Handler for MqttActor { - type Result = (); - - fn handle(&mut self, _msg: Connected, ctx: &mut Self::Context) -> Self::Result { - info!("MQTT connected"); - let mut interface = self.interface.clone(); - let fut = async move { - let res = interface.publish_connected(true).await; - let res = res.and(interface.subscribe_requests().await); - if let Err(err) = res { - error!("error in connection setup: {}", err); - } - }; - ctx.spawn(fut.into_actor(self)); - } -} - -#[derive(actix::Message)] -#[rtype(result = "()")] -struct PubRecieve(rumqttc::Publish); - -impl Handler for MqttActor { - type Result = (); - - fn handle(&mut self, msg: PubRecieve, _ctx: &mut Self::Context) -> Self::Result { - debug!("received MQTT pub: {:?}", msg.0); - } -} - -pub struct MqttInterfaceTask { - interface: MqttInterface, - addr: Addr, -} - -impl MqttInterfaceTask { - pub fn start(options: Options) -> Self { - let (interface, event_loop) = MqttInterface::new(options); - - let addr = MqttActor::new(interface.clone(), event_loop).start(); - - Self { interface, addr } - } - - pub async fn quit(self) -> eyre::Result<()> { - self.addr.send(Quit).await?; - Ok(()) - } -} - -impl Deref for MqttInterfaceTask { - type Target = MqttInterface; - - fn deref(&self) -> &Self::Target { - &self.interface - } -} - -impl DerefMut for MqttInterfaceTask { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.interface - } -} diff --git a/src/update_listener.rs b/src/update_listener.rs index bf48773..e28e746 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -1,5 +1,5 @@ use crate::{ - mqtt_interface::MqttInterface, + mqtt::MqttInterface, program_runner::{ProgramEvent, ProgramEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, };