diff --git a/src/mqtt_interface.rs b/src/mqtt_interface.rs index 3b24dec..036dca1 100644 --- a/src/mqtt_interface.rs +++ b/src/mqtt_interface.rs @@ -1,4 +1,4 @@ -use crate::model::{Section, SectionId, Sections, Programs, Program, ProgramId}; +use crate::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; use eyre::WrapErr; use rumqttc::{LastWill, MqttOptions, QoS}; use std::{ @@ -6,8 +6,8 @@ use std::{ sync::Arc, time::Duration, }; -use tokio::{task::JoinHandle, time::delay_for}; -use tracing::{debug, trace, warn, info}; +use tokio::task::JoinHandle; +use tracing::{debug, info, trace, warn}; #[derive(Clone, Debug)] struct Topics @@ -63,6 +63,8 @@ async fn event_loop_task( mut event_loop: rumqttc::EventLoop, ) -> eyre::Result<()> { use rumqttc::{ConnectionError, Event}; + let reconnect_timeout = Duration::from_secs(5); + event_loop.set_reconnection_delay(reconnect_timeout); loop { match event_loop.poll().await { Ok(Event::Incoming(incoming)) => { @@ -84,12 +86,7 @@ async fn event_loop_task( break; } Err(err) => { - let reconnect_timeout = Duration::from_secs(5); - warn!( - "MQTT error, will reconnect in {:?}: {}", - reconnect_timeout, err - ); - delay_for(reconnect_timeout).await; + warn!("MQTT error, reconnecting: {}", err); } } } @@ -112,7 +109,7 @@ impl MqttInterface { let last_will = LastWill::new(topics.connected(), "false", QoS::AtLeastOnce, true); mqtt_opts.set_last_will(last_will); - let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 32); + let (client, event_loop) = rumqttc::AsyncClient::new(mqtt_opts, 16); (Self { client, topics }, event_loop) } @@ -164,7 +161,11 @@ impl MqttInterface { Ok(()) } - pub async fn publish_section_state(&mut self, section_id: SectionId, state: bool) -> eyre::Result<()> { + pub async fn publish_section_state( + &mut self, + section_id: SectionId, + state: bool, + ) -> eyre::Result<()> { let payload: Vec = state.to_string().into(); self.client .publish(