Browse Source

Use built in MQTT reconnection behavior

master
Alex Mikhalev 4 years ago
parent
commit
4e21e7b96a
  1. 23
      src/mqtt_interface.rs

23
src/mqtt_interface.rs

@ -1,4 +1,4 @@
use crate::model::{Section, SectionId, Sections, Programs, Program, ProgramId}; use crate::model::{Program, ProgramId, Programs, Section, SectionId, Sections};
use eyre::WrapErr; use eyre::WrapErr;
use rumqttc::{LastWill, MqttOptions, QoS}; use rumqttc::{LastWill, MqttOptions, QoS};
use std::{ use std::{
@ -6,8 +6,8 @@ use std::{
sync::Arc, sync::Arc,
time::Duration, time::Duration,
}; };
use tokio::{task::JoinHandle, time::delay_for}; use tokio::task::JoinHandle;
use tracing::{debug, trace, warn, info}; use tracing::{debug, info, trace, warn};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Topics<T> struct Topics<T>
@ -63,6 +63,8 @@ async fn event_loop_task(
mut event_loop: rumqttc::EventLoop, mut event_loop: rumqttc::EventLoop,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
use rumqttc::{ConnectionError, Event}; use rumqttc::{ConnectionError, Event};
let reconnect_timeout = Duration::from_secs(5);
event_loop.set_reconnection_delay(reconnect_timeout);
loop { loop {
match event_loop.poll().await { match event_loop.poll().await {
Ok(Event::Incoming(incoming)) => { Ok(Event::Incoming(incoming)) => {
@ -84,12 +86,7 @@ async fn event_loop_task(
break; break;
} }
Err(err) => { Err(err) => {
let reconnect_timeout = Duration::from_secs(5); warn!("MQTT error, reconnecting: {}", err);
warn!(
"MQTT error, will reconnect in {:?}: {}",
reconnect_timeout, err
);
delay_for(reconnect_timeout).await;
} }
} }
} }
@ -112,7 +109,7 @@ impl MqttInterface {
let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true);
mqtt_opts.set_last_will(last_will); mqtt_opts.set_last_will(last_will);
let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 32); let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16);
(Self { client, topics }, event_loop) (Self { client, topics }, event_loop)
} }
@ -164,7 +161,11 @@ impl MqttInterface {
Ok(()) Ok(())
} }
pub async fn publish_section_state(&mut self, section_id: SectionId, state: bool) -> eyre::Result<()> { pub async fn publish_section_state(
&mut self,
section_id: SectionId,
state: bool,
) -> eyre::Result<()> {
let payload: Vec<u8> = state.to_string().into(); let payload: Vec<u8> = state.to_string().into();
self.client self.client
.publish( .publish(

Loading…
Cancel
Save