diff --git a/sprinklers_actors/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs index c5e1b53..448c7fa 100644 --- a/sprinklers_actors/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -10,7 +10,7 @@ use actix::{ use std::collections::VecDeque; use thiserror::Error; use tokio::{ - sync::broadcast, + sync::{broadcast, watch}, time::{delay_queue, DelayQueue}, }; use tracing::{debug, error, trace, warn}; @@ -275,7 +275,7 @@ impl Handler for ProgramRunnerActor { Some(program) => program.clone(), None => { trace!(program_id, "trying to run non-existant program"); - return Err(ProgramRunnerError::InvalidProgramId(program_id)); + return Err(Error::InvalidProgramId(program_id)); } }; self.run_queue.push_back(ProgRun::new(program.clone())); @@ -317,6 +317,24 @@ impl Handler for ProgramRunnerActor { } } +impl StreamHandler for ProgramRunnerActor { + fn handle(&mut self, item: Programs, ctx: &mut Self::Context) { + ctx.notify(UpdatePrograms(item)) + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct ListenPrograms(watch::Receiver); + +impl Handler for ProgramRunnerActor { + type Result = (); + + fn handle(&mut self, msg: ListenPrograms, ctx: &mut Self::Context) -> Self::Result { + ctx.add_stream(msg.0); + } +} + #[derive(Message)] #[rtype(result = "()")] struct Process; @@ -394,7 +412,7 @@ impl ProgramRunnerActor { } #[derive(Debug, Clone, Error)] -pub enum ProgramRunnerError { +pub enum Error { #[error("mailbox error: {0}")] Mailbox( #[from] @@ -405,7 +423,7 @@ pub enum ProgramRunnerError { InvalidProgramId(ProgramId), } -pub type Result = std::result::Result; +pub type Result = std::result::Result; #[derive(Clone)] pub struct ProgramRunner { @@ -428,41 +446,49 @@ impl ProgramRunner { self.addr .send(UpdateSections(new_sections)) .await - .map_err(ProgramRunnerError::from) + .map_err(Error::from) } pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { self.addr .send(UpdatePrograms(new_programs)) .await - .map_err(ProgramRunnerError::from) + .map_err(Error::from) } pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result { self.addr .send(RunProgramId(program_id)) .await - .map_err(ProgramRunnerError::from)? + .map_err(Error::from)? } pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { self.addr .send(RunProgram(program)) .await - .map_err(ProgramRunnerError::from) + .map_err(Error::from) } pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result> { self.addr .send(CancelProgram(program_id)) .await - .map_err(ProgramRunnerError::from) + .map_err(Error::from) } pub async fn subscribe(&mut self) -> Result { let event_recv = self.addr.send(Subscribe).await?; Ok(event_recv) } + + pub fn listen_programs( + &mut self, + programs_watch: watch::Receiver, + ) { + self.addr + .do_send(ListenPrograms(programs_watch)) + } } #[cfg(test)] @@ -693,7 +719,7 @@ mod test { // First try a non-existant program id assert_matches!( runner.run_program_id(3).await, - Err(ProgramRunnerError::InvalidProgramId(3)) + Err(Error::InvalidProgramId(3)) ); runner.run_program_id(1).await.unwrap();