Alex Mikhalev
4 years ago
7 changed files with 413 additions and 396 deletions
@ -0,0 +1,96 @@ |
|||||||
|
use super::{event_loop::EventLoopTask, MqttInterface}; |
||||||
|
use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; |
||||||
|
use tokio::sync::oneshot; |
||||||
|
use tracing::{debug, error, info, trace, warn}; |
||||||
|
|
||||||
|
pub(super) struct MqttActor { |
||||||
|
interface: MqttInterface, |
||||||
|
event_loop: Option<rumqttc::EventLoop>, |
||||||
|
quit_rx: Option<oneshot::Receiver<()>>, |
||||||
|
} |
||||||
|
|
||||||
|
impl MqttActor { |
||||||
|
pub(super) fn new(interface: MqttInterface, event_loop: rumqttc::EventLoop) -> Self { |
||||||
|
Self { |
||||||
|
interface, |
||||||
|
event_loop: Some(event_loop), |
||||||
|
quit_rx: None, |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl Actor for MqttActor { |
||||||
|
type Context = actix::Context<Self>; |
||||||
|
|
||||||
|
fn started(&mut self, ctx: &mut Self::Context) { |
||||||
|
trace!("MqttActor starting"); |
||||||
|
let event_loop = self.event_loop.take().expect("MqttActor already started"); |
||||||
|
let (quit_tx, quit_rx) = oneshot::channel(); |
||||||
|
ctx.spawn( |
||||||
|
EventLoopTask::new(event_loop, ctx.address(), quit_tx) |
||||||
|
.run() |
||||||
|
.into_actor(self), |
||||||
|
); |
||||||
|
self.quit_rx = Some(quit_rx); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(actix::Message)] |
||||||
|
#[rtype(result = "()")] |
||||||
|
pub(super) struct Quit; |
||||||
|
|
||||||
|
impl Handler<Quit> for MqttActor { |
||||||
|
type Result = actix::ResponseActFuture<Self, ()>; |
||||||
|
fn handle(&mut self, _msg: Quit, _ctx: &mut Self::Context) -> Self::Result { |
||||||
|
let mut interface = self.interface.clone(); |
||||||
|
let quit_rx = self.quit_rx.take().expect("MqttActor has already quit!"); |
||||||
|
let fut = async move { |
||||||
|
interface |
||||||
|
.cancel() |
||||||
|
.await |
||||||
|
.expect("could not cancel MQTT client"); |
||||||
|
let _ = quit_rx.await; |
||||||
|
} |
||||||
|
.into_actor(self) |
||||||
|
.map(|_, _, ctx| ctx.stop()); |
||||||
|
Box::pin(fut) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(actix::Message)] |
||||||
|
#[rtype(result = "()")] |
||||||
|
pub(super) struct Connected; |
||||||
|
|
||||||
|
impl Handler<Connected> for MqttActor { |
||||||
|
type Result = (); |
||||||
|
|
||||||
|
fn handle(&mut self, _msg: Connected, ctx: &mut Self::Context) -> Self::Result { |
||||||
|
info!("MQTT connected"); |
||||||
|
let mut interface = self.interface.clone(); |
||||||
|
let fut = async move { |
||||||
|
let res = interface.publish_connected(true).await; |
||||||
|
let res = res.and(interface.subscribe_requests().await); |
||||||
|
if let Err(err) = res { |
||||||
|
error!("error in connection setup: {}", err); |
||||||
|
} |
||||||
|
}; |
||||||
|
ctx.spawn(fut.into_actor(self)); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(actix::Message)] |
||||||
|
#[rtype(result = "()")] |
||||||
|
pub(super) struct PubRecieve(pub(super) rumqttc::Publish); |
||||||
|
|
||||||
|
impl Handler<PubRecieve> for MqttActor { |
||||||
|
type Result = (); |
||||||
|
|
||||||
|
fn handle(&mut self, msg: PubRecieve, _ctx: &mut Self::Context) -> Self::Result { |
||||||
|
let topic = &msg.0.topic; |
||||||
|
if topic == &self.interface.topics.requests() { |
||||||
|
debug!("received request: {:?}", msg.0); |
||||||
|
} else { |
||||||
|
warn!("received on unknown topic: {:?}", topic); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,82 @@ |
|||||||
|
use super::actor::{self, MqttActor}; |
||||||
|
use actix::Addr; |
||||||
|
use rumqttc::{Packet, QoS}; |
||||||
|
use std::{collections::HashSet, time::Duration}; |
||||||
|
use tokio::sync::oneshot; |
||||||
|
use tracing::{debug, trace, warn}; |
||||||
|
|
||||||
|
pub(super) struct EventLoopTask { |
||||||
|
event_loop: rumqttc::EventLoop, |
||||||
|
mqtt_addr: Addr<MqttActor>, |
||||||
|
quit_tx: oneshot::Sender<()>, |
||||||
|
unreleased_pubs: HashSet<u16>, |
||||||
|
} |
||||||
|
|
||||||
|
impl EventLoopTask { |
||||||
|
pub(super) fn new( |
||||||
|
event_loop: rumqttc::EventLoop, |
||||||
|
mqtt_addr: Addr<MqttActor>, |
||||||
|
quit_tx: oneshot::Sender<()>, |
||||||
|
) -> Self { |
||||||
|
Self { |
||||||
|
event_loop, |
||||||
|
mqtt_addr, |
||||||
|
quit_tx, |
||||||
|
unreleased_pubs: HashSet::default(), |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
fn handle_incoming(&mut self, incoming: Packet) { |
||||||
|
trace!(incoming = debug(&incoming), "MQTT incoming message"); |
||||||
|
#[allow(clippy::single_match)] |
||||||
|
match incoming { |
||||||
|
Packet::ConnAck(_) => { |
||||||
|
self.mqtt_addr.do_send(actor::Connected); |
||||||
|
} |
||||||
|
Packet::Publish(publish) => { |
||||||
|
// Only deliver QoS 2 packets once
|
||||||
|
let deliver = if publish.qos == QoS::ExactlyOnce { |
||||||
|
if self.unreleased_pubs.contains(&publish.pkid) { |
||||||
|
false |
||||||
|
} else { |
||||||
|
self.unreleased_pubs.insert(publish.pkid); |
||||||
|
true |
||||||
|
} |
||||||
|
} else { |
||||||
|
true |
||||||
|
}; |
||||||
|
if deliver { |
||||||
|
self.mqtt_addr.do_send(actor::PubRecieve(publish)); |
||||||
|
} |
||||||
|
} |
||||||
|
Packet::PubRel(pubrel) => { |
||||||
|
self.unreleased_pubs.remove(&pubrel.pkid); |
||||||
|
} |
||||||
|
_ => {} |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub(super) async fn run(mut self) { |
||||||
|
use rumqttc::{ConnectionError, Event}; |
||||||
|
let reconnect_timeout = Duration::from_secs(5); |
||||||
|
self.event_loop.set_reconnection_delay(reconnect_timeout); |
||||||
|
loop { |
||||||
|
match self.event_loop.poll().await { |
||||||
|
Ok(Event::Incoming(incoming)) => { |
||||||
|
self.handle_incoming(incoming); |
||||||
|
} |
||||||
|
Ok(Event::Outgoing(outgoing)) => { |
||||||
|
trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); |
||||||
|
} |
||||||
|
Err(ConnectionError::Cancel) => { |
||||||
|
debug!("MQTT disconnecting"); |
||||||
|
break; |
||||||
|
} |
||||||
|
Err(err) => { |
||||||
|
warn!("MQTT error, reconnecting: {}", err); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
let _ = self.quit_tx.send(()); |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,173 @@ |
|||||||
|
mod actor; |
||||||
|
mod event_loop; |
||||||
|
mod topics; |
||||||
|
|
||||||
|
use self::topics::Topics; |
||||||
|
use crate::{ |
||||||
|
model::{Program, ProgramId, Programs, Section, SectionId, Sections}, |
||||||
|
section_runner::SecRunnerState, |
||||||
|
section_runner_json::SecRunnerStateJson, |
||||||
|
}; |
||||||
|
use actix::{Actor, Addr}; |
||||||
|
use eyre::WrapErr; |
||||||
|
use rumqttc::{LastWill, MqttOptions, QoS}; |
||||||
|
use std::{ |
||||||
|
ops::{Deref, DerefMut}, |
||||||
|
sync::Arc, |
||||||
|
}; |
||||||
|
|
||||||
|
#[derive(Clone, Debug)] |
||||||
|
pub struct Options { |
||||||
|
pub broker_host: String, |
||||||
|
pub broker_port: u16, |
||||||
|
pub device_id: String, |
||||||
|
pub client_id: String, |
||||||
|
} |
||||||
|
|
||||||
|
#[derive(Clone)] |
||||||
|
pub struct MqttInterface { |
||||||
|
client: rumqttc::AsyncClient, |
||||||
|
topics: Topics<Arc<str>>, |
||||||
|
} |
||||||
|
|
||||||
|
impl MqttInterface { |
||||||
|
fn new(options: Options) -> (Self, rumqttc::EventLoop) { |
||||||
|
let mqtt_prefix = format!("devices/{}", options.device_id); |
||||||
|
let topics: Topics<Arc<str>> = Topics::new(mqtt_prefix.into()); |
||||||
|
let mut mqtt_opts = |
||||||
|
MqttOptions::new(options.client_id, options.broker_host, options.broker_port); |
||||||
|
|
||||||
|
let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); |
||||||
|
mqtt_opts.set_last_will(last_will); |
||||||
|
|
||||||
|
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16); |
||||||
|
|
||||||
|
(Self { client, topics }, event_loop) |
||||||
|
} |
||||||
|
|
||||||
|
async fn publish_data<P>(&mut self, topic: String, payload: &P) -> eyre::Result<()> |
||||||
|
where |
||||||
|
P: serde::Serialize, |
||||||
|
{ |
||||||
|
let payload_vec = |
||||||
|
serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?; |
||||||
|
self.client |
||||||
|
.publish(topic, QoS::AtLeastOnce, true, payload_vec) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish")?; |
||||||
|
Ok(()) |
||||||
|
} |
||||||
|
|
||||||
|
pub(super) async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { |
||||||
|
self.publish_data(self.topics.connected(), &connected) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish connected topic") |
||||||
|
} |
||||||
|
|
||||||
|
pub(super) async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { |
||||||
|
self.client.cancel().await |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { |
||||||
|
let section_ids: Vec<_> = sections.keys().cloned().collect(); |
||||||
|
self.publish_data(self.topics.sections(), §ion_ids) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish section ids")?; |
||||||
|
for section in sections.values() { |
||||||
|
self.publish_section(section).await?; |
||||||
|
} |
||||||
|
Ok(()) |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { |
||||||
|
self.publish_data(self.topics.section_data(section.id), section) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish section") |
||||||
|
} |
||||||
|
|
||||||
|
// Section state can be derived from section runner state...
|
||||||
|
pub async fn publish_section_state( |
||||||
|
&mut self, |
||||||
|
section_id: SectionId, |
||||||
|
state: bool, |
||||||
|
) -> eyre::Result<()> { |
||||||
|
self.publish_data(self.topics.section_state(section_id), &state) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish section state") |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { |
||||||
|
let program_ids: Vec<_> = programs.keys().cloned().collect(); |
||||||
|
self.publish_data(self.topics.programs(), &program_ids) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish program ids")?; |
||||||
|
for program in programs.values() { |
||||||
|
self.publish_program(program).await?; |
||||||
|
} |
||||||
|
Ok(()) |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { |
||||||
|
self.publish_data(self.topics.program_data(program.id), &program) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish program") |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_program_running( |
||||||
|
&mut self, |
||||||
|
program_id: ProgramId, |
||||||
|
running: bool, |
||||||
|
) -> eyre::Result<()> { |
||||||
|
self.publish_data(self.topics.program_running(program_id), &running) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish program running") |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { |
||||||
|
let json: SecRunnerStateJson = sr_state.into(); |
||||||
|
self.publish_data(self.topics.section_runner(), &json) |
||||||
|
.await |
||||||
|
.wrap_err("failed to publish section runner") |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { |
||||||
|
self.client |
||||||
|
.subscribe(self.topics.requests(), QoS::ExactlyOnce) |
||||||
|
.await?; |
||||||
|
Ok(()) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
pub struct MqttInterfaceTask { |
||||||
|
interface: MqttInterface, |
||||||
|
addr: Addr<actor::MqttActor>, |
||||||
|
} |
||||||
|
|
||||||
|
impl MqttInterfaceTask { |
||||||
|
pub fn start(options: Options) -> Self { |
||||||
|
let (interface, event_loop) = MqttInterface::new(options); |
||||||
|
|
||||||
|
let addr = actor::MqttActor::new(interface.clone(), event_loop).start(); |
||||||
|
|
||||||
|
Self { interface, addr } |
||||||
|
} |
||||||
|
|
||||||
|
pub async fn quit(self) -> eyre::Result<()> { |
||||||
|
self.addr.send(actor::Quit).await?; |
||||||
|
Ok(()) |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl Deref for MqttInterfaceTask { |
||||||
|
type Target = MqttInterface; |
||||||
|
|
||||||
|
fn deref(&self) -> &Self::Target { |
||||||
|
&self.interface |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
impl DerefMut for MqttInterfaceTask { |
||||||
|
fn deref_mut(&mut self) -> &mut Self::Target { |
||||||
|
&mut self.interface |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,58 @@ |
|||||||
|
use crate::model::{ProgramId, SectionId}; |
||||||
|
|
||||||
|
#[derive(Clone, Debug)] |
||||||
|
pub struct Topics<T> |
||||||
|
where |
||||||
|
T: AsRef<str>, |
||||||
|
{ |
||||||
|
prefix: T, |
||||||
|
} |
||||||
|
|
||||||
|
impl<T> Topics<T> |
||||||
|
where |
||||||
|
T: AsRef<str>, |
||||||
|
{ |
||||||
|
pub fn new(prefix: T) -> Self { |
||||||
|
Self { prefix } |
||||||
|
} |
||||||
|
|
||||||
|
pub fn connected(&self) -> String { |
||||||
|
format!("{}/connected", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn sections(&self) -> String { |
||||||
|
format!("{}/sections", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn section_data(&self, section_id: SectionId) -> String { |
||||||
|
format!("{}/sections/{}", self.prefix.as_ref(), section_id) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn section_state(&self, section_id: SectionId) -> String { |
||||||
|
format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn programs(&self) -> String { |
||||||
|
format!("{}/programs", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn program_data(&self, program_id: ProgramId) -> String { |
||||||
|
format!("{}/programs/{}", self.prefix.as_ref(), program_id) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn program_running(&self, program_id: ProgramId) -> String { |
||||||
|
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn section_runner(&self) -> String { |
||||||
|
format!("{}/section_runner", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn requests(&self) -> String { |
||||||
|
format!("{}/requests", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
|
||||||
|
pub fn responses(&self) -> String { |
||||||
|
format!("{}/responses", self.prefix.as_ref()) |
||||||
|
} |
||||||
|
} |
@ -1,392 +0,0 @@ |
|||||||
use crate::{ |
|
||||||
model::{Program, ProgramId, Programs, Section, SectionId, Sections}, |
|
||||||
section_runner::SecRunnerState, |
|
||||||
section_runner_json::SecRunnerStateJson, |
|
||||||
}; |
|
||||||
use actix::{Actor, ActorContext, ActorFuture, Addr, AsyncContext, Handler, WrapFuture}; |
|
||||||
use eyre::WrapErr; |
|
||||||
use rumqttc::{LastWill, MqttOptions, Packet, QoS}; |
|
||||||
use std::{ |
|
||||||
collections::HashSet, |
|
||||||
ops::{Deref, DerefMut}, |
|
||||||
sync::Arc, |
|
||||||
time::Duration, |
|
||||||
}; |
|
||||||
use tokio::sync::oneshot; |
|
||||||
use tracing::{debug, error, info, trace, warn}; |
|
||||||
|
|
||||||
#[derive(Clone, Debug)] |
|
||||||
struct Topics<T> |
|
||||||
where |
|
||||||
T: AsRef<str>, |
|
||||||
{ |
|
||||||
prefix: T, |
|
||||||
} |
|
||||||
|
|
||||||
impl<T> Topics<T> |
|
||||||
where |
|
||||||
T: AsRef<str>, |
|
||||||
{ |
|
||||||
fn new(prefix: T) -> Self { |
|
||||||
Self { prefix } |
|
||||||
} |
|
||||||
|
|
||||||
fn connected(&self) -> String { |
|
||||||
format!("{}/connected", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
|
|
||||||
fn sections(&self) -> String { |
|
||||||
format!("{}/sections", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
|
|
||||||
fn section_data(&self, section_id: SectionId) -> String { |
|
||||||
format!("{}/sections/{}", self.prefix.as_ref(), section_id) |
|
||||||
} |
|
||||||
|
|
||||||
fn section_state(&self, section_id: SectionId) -> String { |
|
||||||
format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) |
|
||||||
} |
|
||||||
|
|
||||||
fn programs(&self) -> String { |
|
||||||
format!("{}/programs", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
|
|
||||||
fn program_data(&self, program_id: ProgramId) -> String { |
|
||||||
format!("{}/programs/{}", self.prefix.as_ref(), program_id) |
|
||||||
} |
|
||||||
|
|
||||||
fn program_running(&self, program_id: ProgramId) -> String { |
|
||||||
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) |
|
||||||
} |
|
||||||
|
|
||||||
fn section_runner(&self) -> String { |
|
||||||
format!("{}/section_runner", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
|
|
||||||
fn requests(&self) -> String { |
|
||||||
format!("{}/requests", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
|
|
||||||
fn responses(&self) -> String { |
|
||||||
format!("{}/responses", self.prefix.as_ref()) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
struct EventLoopTask { |
|
||||||
event_loop: rumqttc::EventLoop, |
|
||||||
mqtt_addr: Addr<MqttActor>, |
|
||||||
quit_tx: oneshot::Sender<()>, |
|
||||||
unreleased_pubs: HashSet<u16>, |
|
||||||
} |
|
||||||
|
|
||||||
impl EventLoopTask { |
|
||||||
fn new( |
|
||||||
event_loop: rumqttc::EventLoop, |
|
||||||
mqtt_addr: Addr<MqttActor>, |
|
||||||
quit_tx: oneshot::Sender<()>, |
|
||||||
) -> Self { |
|
||||||
Self { |
|
||||||
event_loop, |
|
||||||
mqtt_addr, |
|
||||||
quit_tx, |
|
||||||
unreleased_pubs: HashSet::default(), |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
fn handle_incoming(&mut self, incoming: Packet) { |
|
||||||
trace!(incoming = debug(&incoming), "MQTT incoming message"); |
|
||||||
#[allow(clippy::single_match)] |
|
||||||
match incoming { |
|
||||||
Packet::ConnAck(_) => { |
|
||||||
self.mqtt_addr.do_send(Connected); |
|
||||||
} |
|
||||||
Packet::Publish(publish) => { |
|
||||||
// Only deliver QoS 2 packets once
|
|
||||||
let deliver = if publish.qos == QoS::ExactlyOnce { |
|
||||||
if self.unreleased_pubs.contains(&publish.pkid) { |
|
||||||
false |
|
||||||
} else { |
|
||||||
self.unreleased_pubs.insert(publish.pkid); |
|
||||||
true |
|
||||||
} |
|
||||||
} else { |
|
||||||
true |
|
||||||
}; |
|
||||||
if deliver { |
|
||||||
self.mqtt_addr.do_send(PubRecieve(publish)); |
|
||||||
} |
|
||||||
} |
|
||||||
Packet::PubRel(pubrel) => { |
|
||||||
self.unreleased_pubs.remove(&pubrel.pkid); |
|
||||||
} |
|
||||||
_ => {} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
async fn run(mut self) { |
|
||||||
use rumqttc::{ConnectionError, Event}; |
|
||||||
let reconnect_timeout = Duration::from_secs(5); |
|
||||||
self.event_loop.set_reconnection_delay(reconnect_timeout); |
|
||||||
loop { |
|
||||||
match self.event_loop.poll().await { |
|
||||||
Ok(Event::Incoming(incoming)) => { |
|
||||||
self.handle_incoming(incoming); |
|
||||||
} |
|
||||||
Ok(Event::Outgoing(outgoing)) => { |
|
||||||
trace!(outgoing = debug(&outgoing), "MQTT outgoing message"); |
|
||||||
} |
|
||||||
Err(ConnectionError::Cancel) => { |
|
||||||
debug!("MQTT disconnecting"); |
|
||||||
break; |
|
||||||
} |
|
||||||
Err(err) => { |
|
||||||
warn!("MQTT error, reconnecting: {}", err); |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
let _ = self.quit_tx.send(()); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Clone, Debug)] |
|
||||||
pub struct Options { |
|
||||||
pub broker_host: String, |
|
||||||
pub broker_port: u16, |
|
||||||
pub device_id: String, |
|
||||||
pub client_id: String, |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(Clone)] |
|
||||||
pub struct MqttInterface { |
|
||||||
client: rumqttc::AsyncClient, |
|
||||||
topics: Topics<Arc<str>>, |
|
||||||
} |
|
||||||
|
|
||||||
impl MqttInterface { |
|
||||||
fn new(options: Options) -> (Self, rumqttc::EventLoop) { |
|
||||||
let mqtt_prefix = format!("devices/{}", options.device_id); |
|
||||||
let topics: Topics<Arc<str>> = Topics::new(mqtt_prefix.into()); |
|
||||||
let mut mqtt_opts = |
|
||||||
MqttOptions::new(options.client_id, options.broker_host, options.broker_port); |
|
||||||
|
|
||||||
let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); |
|
||||||
mqtt_opts.set_last_will(last_will); |
|
||||||
|
|
||||||
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16); |
|
||||||
|
|
||||||
(Self { client, topics }, event_loop) |
|
||||||
} |
|
||||||
|
|
||||||
async fn publish_data<P>(&mut self, topic: String, payload: &P) -> eyre::Result<()> |
|
||||||
where |
|
||||||
P: serde::Serialize, |
|
||||||
{ |
|
||||||
let payload_vec = |
|
||||||
serde_json::to_vec(payload).wrap_err("failed to serialize publish payload")?; |
|
||||||
self.client |
|
||||||
.publish(topic, QoS::AtLeastOnce, true, payload_vec) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish")?; |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { |
|
||||||
self.publish_data(self.topics.connected(), &connected) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish connected topic") |
|
||||||
} |
|
||||||
|
|
||||||
async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { |
|
||||||
self.client.cancel().await |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { |
|
||||||
let section_ids: Vec<_> = sections.keys().cloned().collect(); |
|
||||||
self.publish_data(self.topics.sections(), §ion_ids) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish section ids")?; |
|
||||||
for section in sections.values() { |
|
||||||
self.publish_section(section).await?; |
|
||||||
} |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { |
|
||||||
self.publish_data(self.topics.section_data(section.id), section) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish section") |
|
||||||
} |
|
||||||
|
|
||||||
// Section state can be derived from section runner state...
|
|
||||||
pub async fn publish_section_state( |
|
||||||
&mut self, |
|
||||||
section_id: SectionId, |
|
||||||
state: bool, |
|
||||||
) -> eyre::Result<()> { |
|
||||||
self.publish_data(self.topics.section_state(section_id), &state) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish section state") |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { |
|
||||||
let program_ids: Vec<_> = programs.keys().cloned().collect(); |
|
||||||
self.publish_data(self.topics.programs(), &program_ids) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish program ids")?; |
|
||||||
for program in programs.values() { |
|
||||||
self.publish_program(program).await?; |
|
||||||
} |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_program(&mut self, program: &Program) -> eyre::Result<()> { |
|
||||||
self.publish_data(self.topics.program_data(program.id), &program) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish program") |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_program_running( |
|
||||||
&mut self, |
|
||||||
program_id: ProgramId, |
|
||||||
running: bool, |
|
||||||
) -> eyre::Result<()> { |
|
||||||
self.publish_data(self.topics.program_running(program_id), &running) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish program running") |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { |
|
||||||
let json: SecRunnerStateJson = sr_state.into(); |
|
||||||
self.publish_data(self.topics.section_runner(), &json) |
|
||||||
.await |
|
||||||
.wrap_err("failed to publish section runner") |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { |
|
||||||
self.client |
|
||||||
.subscribe(self.topics.requests(), QoS::ExactlyOnce) |
|
||||||
.await?; |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
struct MqttActor { |
|
||||||
interface: MqttInterface, |
|
||||||
event_loop: Option<rumqttc::EventLoop>, |
|
||||||
quit_rx: Option<oneshot::Receiver<()>>, |
|
||||||
} |
|
||||||
|
|
||||||
impl MqttActor { |
|
||||||
fn new(interface: MqttInterface, event_loop: rumqttc::EventLoop) -> Self { |
|
||||||
Self { |
|
||||||
interface, |
|
||||||
event_loop: Some(event_loop), |
|
||||||
quit_rx: None, |
|
||||||
} |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl Actor for MqttActor { |
|
||||||
type Context = actix::Context<Self>; |
|
||||||
|
|
||||||
fn started(&mut self, ctx: &mut Self::Context) { |
|
||||||
trace!("MqttActor starting"); |
|
||||||
let event_loop = self.event_loop.take().expect("MqttActor already started"); |
|
||||||
let (quit_tx, quit_rx) = oneshot::channel(); |
|
||||||
ctx.spawn( |
|
||||||
EventLoopTask::new(event_loop, ctx.address(), quit_tx) |
|
||||||
.run() |
|
||||||
.into_actor(self), |
|
||||||
); |
|
||||||
self.quit_rx = Some(quit_rx); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(actix::Message)] |
|
||||||
#[rtype(result = "()")] |
|
||||||
struct Quit; |
|
||||||
|
|
||||||
impl Handler<Quit> for MqttActor { |
|
||||||
type Result = actix::ResponseActFuture<Self, ()>; |
|
||||||
fn handle(&mut self, _msg: Quit, _ctx: &mut Self::Context) -> Self::Result { |
|
||||||
let mut interface = self.interface.clone(); |
|
||||||
let quit_rx = self.quit_rx.take().expect("MqttActor has already quit!"); |
|
||||||
let fut = async move { |
|
||||||
interface |
|
||||||
.cancel() |
|
||||||
.await |
|
||||||
.expect("could not cancel MQTT client"); |
|
||||||
let _ = quit_rx.await; |
|
||||||
} |
|
||||||
.into_actor(self) |
|
||||||
.map(|_, _, ctx| ctx.stop()); |
|
||||||
Box::pin(fut) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(actix::Message)] |
|
||||||
#[rtype(result = "()")] |
|
||||||
struct Connected; |
|
||||||
|
|
||||||
impl Handler<Connected> for MqttActor { |
|
||||||
type Result = (); |
|
||||||
|
|
||||||
fn handle(&mut self, _msg: Connected, ctx: &mut Self::Context) -> Self::Result { |
|
||||||
info!("MQTT connected"); |
|
||||||
let mut interface = self.interface.clone(); |
|
||||||
let fut = async move { |
|
||||||
let res = interface.publish_connected(true).await; |
|
||||||
let res = res.and(interface.subscribe_requests().await); |
|
||||||
if let Err(err) = res { |
|
||||||
error!("error in connection setup: {}", err); |
|
||||||
} |
|
||||||
}; |
|
||||||
ctx.spawn(fut.into_actor(self)); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
#[derive(actix::Message)] |
|
||||||
#[rtype(result = "()")] |
|
||||||
struct PubRecieve(rumqttc::Publish); |
|
||||||
|
|
||||||
impl Handler<PubRecieve> for MqttActor { |
|
||||||
type Result = (); |
|
||||||
|
|
||||||
fn handle(&mut self, msg: PubRecieve, _ctx: &mut Self::Context) -> Self::Result { |
|
||||||
debug!("received MQTT pub: {:?}", msg.0); |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
pub struct MqttInterfaceTask { |
|
||||||
interface: MqttInterface, |
|
||||||
addr: Addr<MqttActor>, |
|
||||||
} |
|
||||||
|
|
||||||
impl MqttInterfaceTask { |
|
||||||
pub fn start(options: Options) -> Self { |
|
||||||
let (interface, event_loop) = MqttInterface::new(options); |
|
||||||
|
|
||||||
let addr = MqttActor::new(interface.clone(), event_loop).start(); |
|
||||||
|
|
||||||
Self { interface, addr } |
|
||||||
} |
|
||||||
|
|
||||||
pub async fn quit(self) -> eyre::Result<()> { |
|
||||||
self.addr.send(Quit).await?; |
|
||||||
Ok(()) |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl Deref for MqttInterfaceTask { |
|
||||||
type Target = MqttInterface; |
|
||||||
|
|
||||||
fn deref(&self) -> &Self::Target { |
|
||||||
&self.interface |
|
||||||
} |
|
||||||
} |
|
||||||
|
|
||||||
impl DerefMut for MqttInterfaceTask { |
|
||||||
fn deref_mut(&mut self) -> &mut Self::Target { |
|
||||||
&mut self.interface |
|
||||||
} |
|
||||||
} |
|
Loading…
Reference in new issue