Browse Source

Use actix for MQTT interface

master
Alex Mikhalev 4 years ago
parent
commit
1eb1d2b058
  1. 4
      src/main.rs
  2. 245
      src/mqtt_interface.rs

4
src/main.rs

@ -63,7 +63,7 @@ async fn main() -> Result<()> {
device_id: "sprinklers_rs-0001".into(), device_id: "sprinklers_rs-0001".into(),
client_id: "sprinklers_rs-0001".into(), client_id: "sprinklers_rs-0001".into(),
}; };
let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options);
let update_listener = { let update_listener = {
let section_events = section_runner.subscribe().await?; let section_events = section_runner.subscribe().await?;
@ -101,7 +101,7 @@ async fn main() -> Result<()> {
mqtt_interface.quit().await?; mqtt_interface.quit().await?;
program_runner.quit().await?; program_runner.quit().await?;
section_runner.quit().await?; section_runner.quit().await?;
tokio::task::yield_now().await; actix::System::current().stop();
Ok(()) Ok(())
} }

245
src/mqtt_interface.rs

@ -3,15 +3,17 @@ use crate::{
section_runner::SecRunnerState, section_runner::SecRunnerState,
section_runner_json::SecRunnerStateJson, section_runner_json::SecRunnerStateJson,
}; };
use actix::{Actor, ActorContext, ActorFuture, Addr, AsyncContext, Handler, WrapFuture};
use eyre::WrapErr; use eyre::WrapErr;
use rumqttc::{LastWill, MqttOptions, QoS}; use rumqttc::{LastWill, MqttOptions, Packet, QoS};
use std::{ use std::{
collections::HashSet,
ops::{Deref, DerefMut}, ops::{Deref, DerefMut},
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use tokio::task::JoinHandle; use tokio::sync::oneshot;
use tracing::{debug, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Topics<T> struct Topics<T>
@ -60,56 +62,98 @@ where
fn section_runner(&self) -> String { fn section_runner(&self) -> String {
format!("{}/section_runner", self.prefix.as_ref()) format!("{}/section_runner", self.prefix.as_ref())
} }
fn requests(&self) -> String {
format!("{}/requests", self.prefix.as_ref())
}
fn responses(&self) -> String {
format!("{}/responses", self.prefix.as_ref())
}
} }
#[derive(Clone, Debug)] struct EventLoopTask {
pub struct Options { event_loop: rumqttc::EventLoop,
pub broker_host: String, mqtt_addr: Addr<MqttActor>,
pub broker_port: u16, quit_tx: oneshot::Sender<()>,
pub device_id: String, unreleased_pubs: HashSet<u16>,
pub client_id: String,
} }
async fn event_loop_task( impl EventLoopTask {
interface: MqttInterface, fn new(
mut event_loop: rumqttc::EventLoop, event_loop: rumqttc::EventLoop,
) -> eyre::Result<()> { mqtt_addr: Addr<MqttActor>,
use rumqttc::{ConnectionError, Event}; quit_tx: oneshot::Sender<()>,
let reconnect_timeout = Duration::from_secs(5); ) -> Self {
event_loop.set_reconnection_delay(reconnect_timeout); Self {
loop { event_loop,
match event_loop.poll().await { mqtt_addr,
Ok(Event::Incoming(incoming)) => { quit_tx,
debug!(incoming = debug(&incoming), "MQTT incoming message"); unreleased_pubs: HashSet::default(),
#[allow(clippy::single_match)] }
match incoming { }
rumqttc::Packet::ConnAck(_) => {
info!("MQTT connected"); fn handle_incoming(&mut self, incoming: Packet) {
{ trace!(incoming = debug(&incoming), "MQTT incoming message");
// HACK: this really should just be await #[allow(clippy::single_match)]
// but that can sometimes deadlock if the publish channel is full match incoming {
let mut interface = interface.clone(); Packet::ConnAck(_) => {
let fut = async move { interface.publish_connected(true).await }; self.mqtt_addr.do_send(Connected);
tokio::spawn(fut); }
} Packet::Publish(publish) => {
//.await?; // Only deliver QoS 2 packets once
let deliver = if publish.qos == QoS::ExactlyOnce {
if self.unreleased_pubs.contains(&publish.pkid) {
false
} else {
self.unreleased_pubs.insert(publish.pkid);
true
} }
_ => {} } else {
true
};
if deliver {
self.mqtt_addr.do_send(PubRecieve(publish));
} }
} }
Ok(Event::Outgoing(outgoing)) => { Packet::PubRel(pubrel) => {
trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); self.unreleased_pubs.remove(&pubrel.pkid);
}
Err(ConnectionError::Cancel) => {
debug!("MQTT disconnecting");
break;
} }
Err(err) => { _ => {}
warn!("MQTT error, reconnecting: {}", err); }
}
async fn run(mut self) {
use rumqttc::{ConnectionError, Event};
let reconnect_timeout = Duration::from_secs(5);
self.event_loop.set_reconnection_delay(reconnect_timeout);
loop {
match self.event_loop.poll().await {
Ok(Event::Incoming(incoming)) => {
self.handle_incoming(incoming);
}
Ok(Event::Outgoing(outgoing)) => {
trace!(outgoing = debug(&outgoing), "MQTT outgoing message");
}
Err(ConnectionError::Cancel) => {
debug!("MQTT disconnecting");
break;
}
Err(err) => {
warn!("MQTT error, reconnecting: {}", err);
}
} }
} }
let _ = self.quit_tx.send(());
} }
Ok(()) }
#[derive(Clone, Debug)]
pub struct Options {
pub broker_host: String,
pub broker_port: u16,
pub device_id: String,
pub client_id: String,
} }
#[derive(Clone)] #[derive(Clone)]
@ -217,35 +261,118 @@ impl MqttInterface {
.await .await
.wrap_err("failed to publish section runner") .wrap_err("failed to publish section runner")
} }
pub async fn subscribe_requests(&mut self) -> eyre::Result<()> {
self.client
.subscribe(self.topics.requests(), QoS::ExactlyOnce)
.await?;
Ok(())
}
}
struct MqttActor {
interface: MqttInterface,
event_loop: Option<rumqttc::EventLoop>,
quit_rx: Option<oneshot::Receiver<()>>,
}
impl MqttActor {
fn new(interface: MqttInterface, event_loop: rumqttc::EventLoop) -> Self {
Self {
interface,
event_loop: Some(event_loop),
quit_rx: None,
}
}
}
impl Actor for MqttActor {
type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) {
trace!("MqttActor starting");
let event_loop = self.event_loop.take().expect("MqttActor already started");
let (quit_tx, quit_rx) = oneshot::channel();
ctx.spawn(
EventLoopTask::new(event_loop, ctx.address(), quit_tx)
.run()
.into_actor(self),
);
self.quit_rx = Some(quit_rx);
}
}
#[derive(actix::Message)]
#[rtype(result = "()")]
struct Quit;
impl Handler<Quit> for MqttActor {
type Result = actix::ResponseActFuture<Self, ()>;
fn handle(&mut self, _msg: Quit, _ctx: &mut Self::Context) -> Self::Result {
let mut interface = self.interface.clone();
let quit_rx = self.quit_rx.take().expect("MqttActor has already quit!");
let fut = async move {
interface
.cancel()
.await
.expect("could not cancel MQTT client");
let _ = quit_rx.await;
}
.into_actor(self)
.map(|_, _, ctx| ctx.stop());
Box::pin(fut)
}
}
#[derive(actix::Message)]
#[rtype(result = "()")]
struct Connected;
impl Handler<Connected> for MqttActor {
type Result = ();
fn handle(&mut self, _msg: Connected, ctx: &mut Self::Context) -> Self::Result {
info!("MQTT connected");
let mut interface = self.interface.clone();
let fut = async move {
let res = interface.publish_connected(true).await;
let res = res.and(interface.subscribe_requests().await);
if let Err(err) = res {
error!("error in connection setup: {}", err);
}
};
ctx.spawn(fut.into_actor(self));
}
}
#[derive(actix::Message)]
#[rtype(result = "()")]
struct PubRecieve(rumqttc::Publish);
impl Handler<PubRecieve> for MqttActor {
type Result = ();
fn handle(&mut self, msg: PubRecieve, _ctx: &mut Self::Context) -> Self::Result {
debug!("received MQTT pub: {:?}", msg.0);
}
} }
pub struct MqttInterfaceTask { pub struct MqttInterfaceTask {
interface: MqttInterface, interface: MqttInterface,
join_handle: JoinHandle<()>, addr: Addr<MqttActor>,
} }
impl MqttInterfaceTask { impl MqttInterfaceTask {
pub async fn start(options: Options) -> eyre::Result<Self> { pub fn start(options: Options) -> Self {
let (interface, event_loop) = MqttInterface::new(options); let (interface, event_loop) = MqttInterface::new(options);
let join_handle = tokio::spawn({ let addr = MqttActor::new(interface.clone(), event_loop).start();
let interface = interface.clone();
async move {
event_loop_task(interface, event_loop)
.await
.expect("error in event loop task")
}
});
Ok(Self { Self { interface, addr }
interface,
join_handle,
})
} }
pub async fn quit(mut self) -> eyre::Result<()> { pub async fn quit(self) -> eyre::Result<()> {
self.interface.cancel().await?; self.addr.send(Quit).await?;
self.join_handle.await?;
Ok(()) Ok(())
} }
} }

Loading…
Cancel
Save