Alex Mikhalev
4 years ago
5 changed files with 220 additions and 3 deletions
@ -0,0 +1,199 @@
@@ -0,0 +1,199 @@
|
||||
use crate::model::{Section, SectionId, Sections}; |
||||
use eyre::WrapErr; |
||||
use rumqttc::{LastWill, MqttOptions, QoS}; |
||||
use std::{ |
||||
ops::{Deref, DerefMut}, |
||||
sync::Arc, |
||||
time::Duration, |
||||
}; |
||||
use tokio::{task::JoinHandle, time::delay_for}; |
||||
use tracing::{debug, trace, warn, info}; |
||||
|
||||
#[derive(Clone, Debug)] |
||||
struct Topics<T> |
||||
where |
||||
T: AsRef<str>, |
||||
{ |
||||
prefix: T, |
||||
} |
||||
|
||||
impl<T> Topics<T> |
||||
where |
||||
T: AsRef<str>, |
||||
{ |
||||
fn new(prefix: T) -> Self { |
||||
Self { prefix } |
||||
} |
||||
|
||||
fn connected(&self) -> String { |
||||
format!("{}/connected", self.prefix.as_ref()) |
||||
} |
||||
|
||||
fn sections(&self) -> String { |
||||
format!("{}/sections", self.prefix.as_ref()) |
||||
} |
||||
|
||||
fn section_data(&self, section_id: SectionId) -> String { |
||||
format!("{}/sections/{}", self.prefix.as_ref(), section_id) |
||||
} |
||||
} |
||||
|
||||
#[derive(Clone, Debug)] |
||||
pub struct Options { |
||||
pub broker_host: String, |
||||
pub broker_port: u16, |
||||
pub device_id: String, |
||||
pub client_id: String, |
||||
} |
||||
|
||||
async fn event_loop_task( |
||||
mut interface: MqttInterface, |
||||
mut event_loop: rumqttc::EventLoop, |
||||
) -> eyre::Result<()> { |
||||
use rumqttc::{ConnectionError, Event}; |
||||
loop { |
||||
match event_loop.poll().await { |
||||
Ok(Event::Incoming(incoming)) => { |
||||
debug!(incoming = debug(&incoming), "MQTT incoming message"); |
||||
#[allow(clippy::single_match)] |
||||
match incoming { |
||||
rumqttc::Packet::ConnAck(_) => { |
||||
info!("MQTT connected"); |
||||
interface.publish_connected(true).await?; |
||||
} |
||||
_ => {} |
||||
} |
||||
} |
||||
Ok(Event::Outgoing(outgoing)) => { |
||||
trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); |
||||
} |
||||
Err(ConnectionError::Cancel) => { |
||||
debug!("MQTT disconnecting"); |
||||
break; |
||||
} |
||||
Err(err) => { |
||||
let reconnect_timeout = Duration::from_secs(5); |
||||
warn!( |
||||
"MQTT error, will reconnect in {:?}: {}", |
||||
reconnect_timeout, err |
||||
); |
||||
delay_for(reconnect_timeout).await; |
||||
} |
||||
} |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
#[derive(Clone)] |
||||
pub struct MqttInterface { |
||||
client: rumqttc::AsyncClient, |
||||
topics: Topics<Arc<str>>, |
||||
} |
||||
|
||||
impl MqttInterface { |
||||
fn new(options: Options) -> (Self, rumqttc::EventLoop) { |
||||
let mqtt_prefix = format!("devices/{}", options.device_id); |
||||
let topics: Topics<Arc<str>> = Topics::new(mqtt_prefix.into()); |
||||
let mut mqtt_opts = |
||||
MqttOptions::new(options.client_id, options.broker_host, options.broker_port); |
||||
|
||||
let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); |
||||
mqtt_opts.set_last_will(last_will); |
||||
|
||||
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 32); |
||||
|
||||
(Self { client, topics }, event_loop) |
||||
} |
||||
|
||||
pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { |
||||
let section_ids: Vec<_> = sections.keys().cloned().collect(); |
||||
let section_ids_payload = serde_json::to_vec(§ion_ids)?; |
||||
self.client |
||||
.publish( |
||||
self.topics.sections(), |
||||
QoS::AtLeastOnce, |
||||
true, |
||||
section_ids_payload, |
||||
) |
||||
.await?; |
||||
for section in sections.values() { |
||||
self.publish_section(section).await?; |
||||
} |
||||
Ok(()) |
||||
} |
||||
|
||||
pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { |
||||
let payload = serde_json::to_vec(section).wrap_err("failed to serialize section")?; |
||||
self.client |
||||
.publish( |
||||
self.topics.section_data(section.id), |
||||
QoS::AtLeastOnce, |
||||
true, |
||||
payload, |
||||
) |
||||
.await?; |
||||
Ok(()) |
||||
} |
||||
|
||||
async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { |
||||
self.client |
||||
.publish( |
||||
self.topics.connected(), |
||||
QoS::AtLeastOnce, |
||||
true, |
||||
connected.to_string(), |
||||
) |
||||
.await |
||||
.wrap_err("failed to publish connected topic")?; |
||||
Ok(()) |
||||
} |
||||
|
||||
async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { |
||||
self.client.cancel().await |
||||
} |
||||
} |
||||
|
||||
pub struct MqttInterfaceTask { |
||||
interface: MqttInterface, |
||||
join_handle: JoinHandle<()>, |
||||
} |
||||
|
||||
impl MqttInterfaceTask { |
||||
pub async fn start(options: Options) -> eyre::Result<Self> { |
||||
let (interface, event_loop) = MqttInterface::new(options); |
||||
|
||||
let join_handle = tokio::spawn({ |
||||
let interface = interface.clone(); |
||||
async move { |
||||
event_loop_task(interface, event_loop) |
||||
.await |
||||
.expect("error in event loop task") |
||||
} |
||||
}); |
||||
|
||||
Ok(Self { |
||||
interface, |
||||
join_handle, |
||||
}) |
||||
} |
||||
|
||||
pub async fn quit(mut self) -> eyre::Result<()> { |
||||
self.interface.cancel().await?; |
||||
self.join_handle.await?; |
||||
Ok(()) |
||||
} |
||||
} |
||||
|
||||
impl Deref for MqttInterfaceTask { |
||||
type Target = MqttInterface; |
||||
|
||||
fn deref(&self) -> &Self::Target { |
||||
&self.interface |
||||
} |
||||
} |
||||
|
||||
impl DerefMut for MqttInterfaceTask { |
||||
fn deref_mut(&mut self) -> &mut Self::Target { |
||||
&mut self.interface |
||||
} |
||||
} |
Loading…
Reference in new issue