Browse Source

Fix some naming in program_runner and add listen

master
Alex Mikhalev 4 years ago
parent
commit
25cf94abad
  1. 46
      sprinklers_actors/src/program_runner.rs

46
sprinklers_actors/src/program_runner.rs

@ -10,7 +10,7 @@ use actix::{
use std::collections::VecDeque; use std::collections::VecDeque;
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
sync::broadcast, sync::{broadcast, watch},
time::{delay_queue, DelayQueue}, time::{delay_queue, DelayQueue},
}; };
use tracing::{debug, error, trace, warn}; use tracing::{debug, error, trace, warn};
@ -275,7 +275,7 @@ impl Handler<RunProgramId> for ProgramRunnerActor {
Some(program) => program.clone(), Some(program) => program.clone(),
None => { None => {
trace!(program_id, "trying to run non-existant program"); trace!(program_id, "trying to run non-existant program");
return Err(ProgramRunnerError::InvalidProgramId(program_id)); return Err(Error::InvalidProgramId(program_id));
} }
}; };
self.run_queue.push_back(ProgRun::new(program.clone())); self.run_queue.push_back(ProgRun::new(program.clone()));
@ -317,6 +317,24 @@ impl Handler<CancelProgram> for ProgramRunnerActor {
} }
} }
impl StreamHandler<Programs> for ProgramRunnerActor {
fn handle(&mut self, item: Programs, ctx: &mut Self::Context) {
ctx.notify(UpdatePrograms(item))
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct ListenPrograms(watch::Receiver<Programs>);
impl Handler<ListenPrograms> for ProgramRunnerActor {
type Result = ();
fn handle(&mut self, msg: ListenPrograms, ctx: &mut Self::Context) -> Self::Result {
ctx.add_stream(msg.0);
}
}
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
struct Process; struct Process;
@ -394,7 +412,7 @@ impl ProgramRunnerActor {
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
pub enum ProgramRunnerError { pub enum Error {
#[error("mailbox error: {0}")] #[error("mailbox error: {0}")]
Mailbox( Mailbox(
#[from] #[from]
@ -405,7 +423,7 @@ pub enum ProgramRunnerError {
InvalidProgramId(ProgramId), InvalidProgramId(ProgramId),
} }
pub type Result<T, E = ProgramRunnerError> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Clone)] #[derive(Clone)]
pub struct ProgramRunner { pub struct ProgramRunner {
@ -428,41 +446,49 @@ impl ProgramRunner {
self.addr self.addr
.send(UpdateSections(new_sections)) .send(UpdateSections(new_sections))
.await .await
.map_err(ProgramRunnerError::from) .map_err(Error::from)
} }
pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> {
self.addr self.addr
.send(UpdatePrograms(new_programs)) .send(UpdatePrograms(new_programs))
.await .await
.map_err(ProgramRunnerError::from) .map_err(Error::from)
} }
pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<ProgramRef> { pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<ProgramRef> {
self.addr self.addr
.send(RunProgramId(program_id)) .send(RunProgramId(program_id))
.await .await
.map_err(ProgramRunnerError::from)? .map_err(Error::from)?
} }
pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> {
self.addr self.addr
.send(RunProgram(program)) .send(RunProgram(program))
.await .await
.map_err(ProgramRunnerError::from) .map_err(Error::from)
} }
pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<Option<ProgramRef>> { pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<Option<ProgramRef>> {
self.addr self.addr
.send(CancelProgram(program_id)) .send(CancelProgram(program_id))
.await .await
.map_err(ProgramRunnerError::from) .map_err(Error::from)
} }
pub async fn subscribe(&mut self) -> Result<ProgramEventRecv> { pub async fn subscribe(&mut self) -> Result<ProgramEventRecv> {
let event_recv = self.addr.send(Subscribe).await?; let event_recv = self.addr.send(Subscribe).await?;
Ok(event_recv) Ok(event_recv)
} }
pub fn listen_programs(
&mut self,
programs_watch: watch::Receiver<Programs>,
) {
self.addr
.do_send(ListenPrograms(programs_watch))
}
} }
#[cfg(test)] #[cfg(test)]
@ -693,7 +719,7 @@ mod test {
// First try a non-existant program id // First try a non-existant program id
assert_matches!( assert_matches!(
runner.run_program_id(3).await, runner.run_program_id(3).await,
Err(ProgramRunnerError::InvalidProgramId(3)) Err(Error::InvalidProgramId(3))
); );
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();

Loading…
Cancel
Save