diff --git a/src/main.rs b/src/main.rs index 0b671df..903cadf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,6 +17,7 @@ use eyre::Result; use std::sync::Arc; use tracing::info; use tracing_subscriber::EnvFilter; +use update_listener::UpdateListener; #[tokio::main] async fn main() -> Result<()> { @@ -63,7 +64,7 @@ async fn main() -> Result<()> { }; let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; - update_listener::UpdateListener::start( + let update_listener = UpdateListener::start( section_runner.subscribe().await?, mqtt_interface.clone(), ); @@ -83,6 +84,7 @@ async fn main() -> Result<()> { tokio::signal::ctrl_c().await?; info!("Interrupt received, shutting down"); + update_listener.quit().await?; mqtt_interface.quit().await?; program_runner.quit().await?; section_runner.quit().await?; diff --git a/src/update_listener.rs b/src/update_listener.rs index 8ddb434..1f39bf8 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -2,26 +2,19 @@ use crate::{ mqtt_interface::MqttInterface, section_runner::{SectionEvent, SectionEventRecv}, }; -use tokio::{select, sync::broadcast, task::JoinHandle}; -use tracing::trace; +use tokio::{ + select, + sync::{broadcast, oneshot}, + task::JoinHandle, +}; +use tracing::{trace, trace_span}; -pub struct UpdateListener { +struct UpdateListenerTask { mqtt_interface: MqttInterface, running: bool, } -impl UpdateListener { - pub fn start( - section_events: SectionEventRecv, - mqtt_interface: MqttInterface, - ) -> JoinHandle<()> { - let update_listener = UpdateListener { - mqtt_interface, - running: true, - }; - tokio::spawn(update_listener.run(section_events)) - } - +impl UpdateListenerTask { async fn handle_section_event( &mut self, event: Result, @@ -50,14 +43,63 @@ impl UpdateListener { Ok(()) } - pub async fn run(mut self, mut section_events: SectionEventRecv) { + async fn run_impl( + &mut self, + mut section_events: SectionEventRecv, + ) -> eyre::Result<()> { while self.running { - let result = select! { + select! { section_event = section_events.recv() => { - self.handle_section_event(section_event).await + self.handle_section_event(section_event).await? } }; - result.expect("error in update_listener task"); } + Ok(()) + } + + async fn run( + mut self, + mut quit_rx: oneshot::Receiver<()>, + section_events: SectionEventRecv, + ) { + let span = trace_span!("UpdateListenerTask::run"); + let _enter = span.enter(); + + let result = select! { + _ = &mut quit_rx => { + self.running = false; + Ok(()) + } + res = self.run_impl(section_events) => { + res + } + }; + result.expect("error in UpdateListenerTask"); + } +} + +pub struct UpdateListener { + quit_tx: oneshot::Sender<()>, + join_handle: JoinHandle<()>, +} + +impl UpdateListener { + pub fn start(section_events: SectionEventRecv, mqtt_interface: MqttInterface) -> Self { + let task = UpdateListenerTask { + mqtt_interface, + running: true, + }; + let (quit_tx, quit_rx) = oneshot::channel(); + let join_handle = tokio::spawn(task.run(quit_rx, section_events)); + Self { + quit_tx, + join_handle, + } + } + + pub async fn quit(self) -> eyre::Result<()> { + let _ = self.quit_tx.send(()); + self.join_handle.await?; + Ok(()) } }