Browse Source

Use actix for update_listener

master
Alex Mikhalev 4 years ago
parent
commit
4eb03f2e22
  1. 211
      src/update_listener.rs

211
src/update_listener.rs

@ -3,37 +3,38 @@ use crate::{
program_runner::{ProgramEvent, ProgramEventRecv}, program_runner::{ProgramEvent, ProgramEventRecv},
section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv},
}; };
use tokio::{ use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
select, use tokio::sync::broadcast;
sync::{broadcast, oneshot}, use tracing::{trace, warn};
task::JoinHandle,
};
use tracing::{trace_span, warn};
use tracing_futures::Instrument;
struct UpdateListenerTask { struct UpdateListenerActor {
section_events: SectionEventRecv,
program_events: ProgramEventRecv,
sec_runner_state: SecRunnerStateRecv,
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
running: bool,
} }
impl UpdateListenerTask { impl Actor for UpdateListenerActor {
async fn handle_section_event( type Context = actix::Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
trace!("starting UpdateListener");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
trace!("stopped UpdateListener")
}
}
impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for UpdateListenerActor {
fn handle(
&mut self, &mut self,
event: Result<SectionEvent, broadcast::RecvError>, event: Result<SectionEvent, broadcast::RecvError>,
) -> eyre::Result<()> { ctx: &mut Self::Context,
) {
let event = match event { let event = match event {
Ok(ev) => ev, Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => { Err(broadcast::RecvError::Closed) => unreachable!(),
warn!("section events channel closed");
self.running = false;
return Ok(());
}
Err(broadcast::RecvError::Lagged(n)) => { Err(broadcast::RecvError::Lagged(n)) => {
warn!("section events lagged by {}", n); warn!("section events lagged by {}", n);
return Ok(()); return;
} }
}; };
if let Some((sec_id, state)) = match event { if let Some((sec_id, state)) = match event {
@ -45,94 +46,116 @@ impl UpdateListenerTask {
| SectionEvent::RunCancel(_, sec) => Some((sec.id, false)), | SectionEvent::RunCancel(_, sec) => Some((sec.id, false)),
SectionEvent::RunnerPause | SectionEvent::RunnerUnpause => None, SectionEvent::RunnerPause | SectionEvent::RunnerUnpause => None,
} { } {
self.mqtt_interface let mut mqtt_interface = self.mqtt_interface.clone();
.publish_section_state(sec_id, state) let fut = async move {
.await?; if let Err(err) = mqtt_interface.publish_section_state(sec_id, state).await {
warn!("could not publish section state: {}", err);
}
};
ctx.spawn(wrap_future(fut));
} }
Ok(())
} }
}
async fn handle_program_event( impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListenerActor {
fn handle(
&mut self, &mut self,
event: Result<ProgramEvent, broadcast::RecvError>, event: Result<ProgramEvent, broadcast::RecvError>,
) -> eyre::Result<()> { ctx: &mut Self::Context,
) {
let event = match event { let event = match event {
Ok(ev) => ev, Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => { Err(broadcast::RecvError::Closed) => unreachable!(),
warn!("program events channel closed");
self.running = false;
return Ok(());
}
Err(broadcast::RecvError::Lagged(n)) => { Err(broadcast::RecvError::Lagged(n)) => {
warn!("program events lagged by {}", n); warn!("program events lagged by {}", n);
return Ok(()); return;
} }
}; };
let (program_id, running) = match event { let (program_id, running) = match event {
ProgramEvent::RunStart(prog) => (prog.id, true), ProgramEvent::RunStart(prog) => (prog.id, true),
ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false), ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false),
}; };
self.mqtt_interface let mut mqtt_interface = self.mqtt_interface.clone();
.publish_program_running(program_id, running) let fut = async move {
.await?; if let Err(err) = mqtt_interface
Ok(()) .publish_program_running(program_id, running)
.await
{
warn!("could not publish program running: {}", err);
}
};
ctx.spawn(wrap_future(fut));
} }
}
async fn handle_sec_runner_state(&mut self, state: Option<SecRunnerState>) -> eyre::Result<()> { impl StreamHandler<SecRunnerState> for UpdateListenerActor {
let state = match state { fn handle(&mut self, state: SecRunnerState, ctx: &mut Self::Context) {
Some(state) => state, let mut mqtt_interface = self.mqtt_interface.clone();
None => { let fut = async move {
warn!("section runner events channel closed"); if let Err(err) = mqtt_interface.publish_section_runner(&state).await {
self.running = false; warn!("could not publish section runner: {}", err);
return Ok(());
} }
}; };
self.mqtt_interface.publish_section_runner(&state).await?; ctx.spawn(wrap_future(fut));
Ok(())
} }
}
async fn run_impl(&mut self) -> eyre::Result<()> { #[derive(actix::Message)]
while self.running { #[rtype(result = "()")]
select! { struct Quit;
section_event = self.section_events.recv() => {
self.handle_section_event(section_event).await? impl Handler<Quit> for UpdateListenerActor {
} type Result = ();
program_event = self.program_events.recv() => {
self.handle_program_event(program_event).await? fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result {
} ctx.stop();
sec_runner_state = self.sec_runner_state.recv() => {
self.handle_sec_runner_state(sec_runner_state).await?
}
};
}
Ok(())
} }
}
async fn run_or_quit(mut self, mut quit_rx: oneshot::Receiver<()>) -> eyre::Result<()> { trait Listenable<A>: Send
select! { where
_ = &mut quit_rx => { A: Actor,
self.running = false; {
Ok(()) fn listen(self, ctx: &mut A::Context);
} }
res = self.run_impl() => {
res #[derive(actix::Message)]
} #[rtype(result = "()")]
} struct Listen<L>(L)
where
L: Listenable<UpdateListenerActor>;
impl<L> Handler<Listen<L>> for UpdateListenerActor
where
L: Listenable<Self>,
{
type Result = ();
fn handle(&mut self, msg: Listen<L>, ctx: &mut Self::Context) -> Self::Result {
msg.0.listen(ctx);
} }
}
async fn run(self, quit_rx: oneshot::Receiver<()>) { impl Listenable<UpdateListenerActor> for SectionEventRecv {
let span = trace_span!("update_listener task"); fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self);
}
}
impl Listenable<UpdateListenerActor> for ProgramEventRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self);
}
}
self.run_or_quit(quit_rx) impl Listenable<UpdateListenerActor> for SecRunnerStateRecv {
.instrument(span) fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
.await ctx.add_stream(self);
.expect("error in UpdateListenerTask");
} }
} }
pub struct UpdateListener { pub struct UpdateListener {
quit_tx: oneshot::Sender<()>, addr: Addr<UpdateListenerActor>,
join_handle: JoinHandle<()>,
} }
impl UpdateListener { impl UpdateListener {
@ -142,24 +165,22 @@ impl UpdateListener {
sec_runner_state: SecRunnerStateRecv, sec_runner_state: SecRunnerStateRecv,
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
) -> Self { ) -> Self {
let task = UpdateListenerTask { let addr = UpdateListenerActor { mqtt_interface }.start();
section_events, let mut l = Self { addr };
program_events, l.listen(section_events);
sec_runner_state, l.listen(program_events);
mqtt_interface, l.listen(sec_runner_state);
running: true, l
}; }
let (quit_tx, quit_rx) = oneshot::channel();
let join_handle = tokio::spawn(task.run(quit_rx)); fn listen<L: 'static>(&mut self, listener: L)
Self { where
quit_tx, L: Listenable<UpdateListenerActor>,
join_handle, {
} self.addr.do_send(Listen(listener));
} }
pub async fn quit(self) -> eyre::Result<()> { pub async fn quit(self) -> eyre::Result<()> {
let _ = self.quit_tx.send(()); Ok(self.addr.send(Quit).await?)
self.join_handle.await?;
Ok(())
} }
} }

Loading…
Cancel
Save