Browse Source

Implement proper quit behavior for update_listener

master
Alex Mikhalev 4 years ago
parent
commit
f9cdb0e57b
  1. 4
      src/main.rs
  2. 80
      src/update_listener.rs

4
src/main.rs

@ -17,6 +17,7 @@ use eyre::Result;
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tracing::info;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use update_listener::UpdateListener;
#[tokio::main] #[tokio::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
@ -63,7 +64,7 @@ async fn main() -> Result<()> {
}; };
let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?;
update_listener::UpdateListener::start( let update_listener = UpdateListener::start(
section_runner.subscribe().await?, section_runner.subscribe().await?,
mqtt_interface.clone(), mqtt_interface.clone(),
); );
@ -83,6 +84,7 @@ async fn main() -> Result<()> {
tokio::signal::ctrl_c().await?; tokio::signal::ctrl_c().await?;
info!("Interrupt received, shutting down"); info!("Interrupt received, shutting down");
update_listener.quit().await?;
mqtt_interface.quit().await?; mqtt_interface.quit().await?;
program_runner.quit().await?; program_runner.quit().await?;
section_runner.quit().await?; section_runner.quit().await?;

80
src/update_listener.rs

@ -2,26 +2,19 @@ use crate::{
mqtt_interface::MqttInterface, mqtt_interface::MqttInterface,
section_runner::{SectionEvent, SectionEventRecv}, section_runner::{SectionEvent, SectionEventRecv},
}; };
use tokio::{select, sync::broadcast, task::JoinHandle}; use tokio::{
use tracing::trace; select,
sync::{broadcast, oneshot},
task::JoinHandle,
};
use tracing::{trace, trace_span};
pub struct UpdateListener { struct UpdateListenerTask {
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
running: bool, running: bool,
} }
impl UpdateListener { impl UpdateListenerTask {
pub fn start(
section_events: SectionEventRecv,
mqtt_interface: MqttInterface,
) -> JoinHandle<()> {
let update_listener = UpdateListener {
mqtt_interface,
running: true,
};
tokio::spawn(update_listener.run(section_events))
}
async fn handle_section_event( async fn handle_section_event(
&mut self, &mut self,
event: Result<SectionEvent, broadcast::RecvError>, event: Result<SectionEvent, broadcast::RecvError>,
@ -50,14 +43,63 @@ impl UpdateListener {
Ok(()) Ok(())
} }
pub async fn run(mut self, mut section_events: SectionEventRecv) { async fn run_impl(
&mut self,
mut section_events: SectionEventRecv,
) -> eyre::Result<()> {
while self.running { while self.running {
let result = select! { select! {
section_event = section_events.recv() => { section_event = section_events.recv() => {
self.handle_section_event(section_event).await self.handle_section_event(section_event).await?
} }
}; };
result.expect("error in update_listener task");
} }
Ok(())
}
async fn run(
mut self,
mut quit_rx: oneshot::Receiver<()>,
section_events: SectionEventRecv,
) {
let span = trace_span!("UpdateListenerTask::run");
let _enter = span.enter();
let result = select! {
_ = &mut quit_rx => {
self.running = false;
Ok(())
}
res = self.run_impl(section_events) => {
res
}
};
result.expect("error in UpdateListenerTask");
}
}
pub struct UpdateListener {
quit_tx: oneshot::Sender<()>,
join_handle: JoinHandle<()>,
}
impl UpdateListener {
pub fn start(section_events: SectionEventRecv, mqtt_interface: MqttInterface) -> Self {
let task = UpdateListenerTask {
mqtt_interface,
running: true,
};
let (quit_tx, quit_rx) = oneshot::channel();
let join_handle = tokio::spawn(task.run(quit_rx, section_events));
Self {
quit_tx,
join_handle,
}
}
pub async fn quit(self) -> eyre::Result<()> {
let _ = self.quit_tx.send(());
self.join_handle.await?;
Ok(())
} }
} }

Loading…
Cancel
Save