From 4eb03f2e227ceb1e0a54e59c76a38e8cefe84e2a Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Tue, 29 Sep 2020 18:09:24 -0600 Subject: [PATCH] Use actix for update_listener --- src/update_listener.rs | 211 ++++++++++++++++++++++------------------- 1 file changed, 116 insertions(+), 95 deletions(-) diff --git a/src/update_listener.rs b/src/update_listener.rs index a030689..bf48773 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -3,37 +3,38 @@ use crate::{ program_runner::{ProgramEvent, ProgramEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, }; -use tokio::{ - select, - sync::{broadcast, oneshot}, - task::JoinHandle, -}; -use tracing::{trace_span, warn}; -use tracing_futures::Instrument; +use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; +use tokio::sync::broadcast; +use tracing::{trace, warn}; -struct UpdateListenerTask { - section_events: SectionEventRecv, - program_events: ProgramEventRecv, - sec_runner_state: SecRunnerStateRecv, +struct UpdateListenerActor { mqtt_interface: MqttInterface, - running: bool, } -impl UpdateListenerTask { - async fn handle_section_event( +impl Actor for UpdateListenerActor { + type Context = actix::Context; + + fn started(&mut self, _ctx: &mut Self::Context) { + trace!("starting UpdateListener"); + } + + fn stopped(&mut self, _ctx: &mut Self::Context) { + trace!("stopped UpdateListener") + } +} + +impl StreamHandler> for UpdateListenerActor { + fn handle( &mut self, event: Result, - ) -> eyre::Result<()> { + ctx: &mut Self::Context, + ) { let event = match event { Ok(ev) => ev, - Err(broadcast::RecvError::Closed) => { - warn!("section events channel closed"); - self.running = false; - return Ok(()); - } + Err(broadcast::RecvError::Closed) => unreachable!(), Err(broadcast::RecvError::Lagged(n)) => { warn!("section events lagged by {}", n); - return Ok(()); + return; } }; if let Some((sec_id, state)) = match event { @@ -45,94 +46,116 @@ impl UpdateListenerTask { | SectionEvent::RunCancel(_, sec) => Some((sec.id, false)), SectionEvent::RunnerPause | SectionEvent::RunnerUnpause => None, } { - self.mqtt_interface - .publish_section_state(sec_id, state) - .await?; + let mut mqtt_interface = self.mqtt_interface.clone(); + let fut = async move { + if let Err(err) = mqtt_interface.publish_section_state(sec_id, state).await { + warn!("could not publish section state: {}", err); + } + }; + ctx.spawn(wrap_future(fut)); } - Ok(()) } +} - async fn handle_program_event( +impl StreamHandler> for UpdateListenerActor { + fn handle( &mut self, event: Result, - ) -> eyre::Result<()> { + ctx: &mut Self::Context, + ) { let event = match event { Ok(ev) => ev, - Err(broadcast::RecvError::Closed) => { - warn!("program events channel closed"); - self.running = false; - return Ok(()); - } + Err(broadcast::RecvError::Closed) => unreachable!(), Err(broadcast::RecvError::Lagged(n)) => { warn!("program events lagged by {}", n); - return Ok(()); + return; } }; let (program_id, running) = match event { ProgramEvent::RunStart(prog) => (prog.id, true), ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false), }; - self.mqtt_interface - .publish_program_running(program_id, running) - .await?; - Ok(()) + let mut mqtt_interface = self.mqtt_interface.clone(); + let fut = async move { + if let Err(err) = mqtt_interface + .publish_program_running(program_id, running) + .await + { + warn!("could not publish program running: {}", err); + } + }; + ctx.spawn(wrap_future(fut)); } +} - async fn handle_sec_runner_state(&mut self, state: Option) -> eyre::Result<()> { - let state = match state { - Some(state) => state, - None => { - warn!("section runner events channel closed"); - self.running = false; - return Ok(()); +impl StreamHandler for UpdateListenerActor { + fn handle(&mut self, state: SecRunnerState, ctx: &mut Self::Context) { + let mut mqtt_interface = self.mqtt_interface.clone(); + let fut = async move { + if let Err(err) = mqtt_interface.publish_section_runner(&state).await { + warn!("could not publish section runner: {}", err); } }; - self.mqtt_interface.publish_section_runner(&state).await?; - Ok(()) + ctx.spawn(wrap_future(fut)); } +} - async fn run_impl(&mut self) -> eyre::Result<()> { - while self.running { - select! { - section_event = self.section_events.recv() => { - self.handle_section_event(section_event).await? - } - program_event = self.program_events.recv() => { - self.handle_program_event(program_event).await? - } - sec_runner_state = self.sec_runner_state.recv() => { - self.handle_sec_runner_state(sec_runner_state).await? - } - }; - } - Ok(()) +#[derive(actix::Message)] +#[rtype(result = "()")] +struct Quit; + +impl Handler for UpdateListenerActor { + type Result = (); + + fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result { + ctx.stop(); } +} - async fn run_or_quit(mut self, mut quit_rx: oneshot::Receiver<()>) -> eyre::Result<()> { - select! { - _ = &mut quit_rx => { - self.running = false; - Ok(()) - } - res = self.run_impl() => { - res - } - } +trait Listenable: Send +where + A: Actor, +{ + fn listen(self, ctx: &mut A::Context); +} + +#[derive(actix::Message)] +#[rtype(result = "()")] +struct Listen(L) +where + L: Listenable; + +impl Handler> for UpdateListenerActor +where + L: Listenable, +{ + type Result = (); + + fn handle(&mut self, msg: Listen, ctx: &mut Self::Context) -> Self::Result { + msg.0.listen(ctx); } +} - async fn run(self, quit_rx: oneshot::Receiver<()>) { - let span = trace_span!("update_listener task"); +impl Listenable for SectionEventRecv { + fn listen(self, ctx: &mut ::Context) { + ctx.add_stream(self); + } +} + +impl Listenable for ProgramEventRecv { + fn listen(self, ctx: &mut ::Context) { + ctx.add_stream(self); + } +} - self.run_or_quit(quit_rx) - .instrument(span) - .await - .expect("error in UpdateListenerTask"); +impl Listenable for SecRunnerStateRecv { + fn listen(self, ctx: &mut ::Context) { + ctx.add_stream(self); } } pub struct UpdateListener { - quit_tx: oneshot::Sender<()>, - join_handle: JoinHandle<()>, + addr: Addr, } impl UpdateListener { @@ -142,24 +165,22 @@ impl UpdateListener { sec_runner_state: SecRunnerStateRecv, mqtt_interface: MqttInterface, ) -> Self { - let task = UpdateListenerTask { - section_events, - program_events, - sec_runner_state, - mqtt_interface, - running: true, - }; - let (quit_tx, quit_rx) = oneshot::channel(); - let join_handle = tokio::spawn(task.run(quit_rx)); - Self { - quit_tx, - join_handle, - } + let addr = UpdateListenerActor { mqtt_interface }.start(); + let mut l = Self { addr }; + l.listen(section_events); + l.listen(program_events); + l.listen(sec_runner_state); + l + } + + fn listen(&mut self, listener: L) + where + L: Listenable, + { + self.addr.do_send(Listen(listener)); } pub async fn quit(self) -> eyre::Result<()> { - let _ = self.quit_tx.send(()); - self.join_handle.await?; - Ok(()) + Ok(self.addr.send(Quit).await?) } }