diff --git a/src/program_runner.rs b/src/program_runner.rs index 03f0f15..dc1307b 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -9,7 +9,7 @@ use actix::{ use std::collections::VecDeque; use thiserror::Error; use tokio::{ - sync::{broadcast, mpsc, oneshot}, + sync::broadcast, time::{delay_queue, DelayQueue}, }; use tracing::{debug, error, trace, warn}; @@ -263,22 +263,23 @@ impl Handler for ProgramRunnerActor { } #[derive(Message)] -#[rtype(result = "()")] +#[rtype(result = "Result")] struct RunProgramId(ProgramId); impl Handler for ProgramRunnerActor { - type Result = (); + type Result = Result; fn handle(&mut self, msg: RunProgramId, ctx: &mut Self::Context) -> Self::Result { let RunProgramId(program_id) = msg; let program = match self.inner.programs.get(&program_id) { Some(program) => program.clone(), None => { - warn!(program_id, "trying to run non-existant program"); - return; + trace!(program_id, "trying to run non-existant program"); + return Err(ProgramRunnerError::InvalidProgramId(program_id)); } }; - self.run_queue.push_back(ProgRun::new(program)); + self.run_queue.push_back(ProgRun::new(program.clone())); ctx.notify(Process); + Ok(program) } } @@ -389,29 +390,18 @@ impl ProgramRunnerActor { } #[derive(Debug, Clone, Error)] -#[error("the ProgramRunner channel is closed")] -pub struct ChannelClosed; - -pub type Result = std::result::Result; - -impl From> for ChannelClosed { - fn from(_: mpsc::error::SendError) -> Self { - Self - } -} - -impl From for ChannelClosed { - fn from(_: oneshot::error::RecvError) -> Self { - Self - } +pub enum ProgramRunnerError { + #[error("mailbox error: {0}")] + Mailbox( + #[from] + #[source] + actix::MailboxError, + ), + #[error("no such program id: {0}")] + InvalidProgramId(ProgramId), } -impl From for ChannelClosed { - fn from(_: actix::MailboxError) -> Self { - // TODO: - Self - } -} +pub type Result = std::result::Result; #[derive(Clone)] pub struct ProgramRunner { @@ -434,35 +424,35 @@ impl ProgramRunner { self.addr .send(UpdateSections(new_sections)) .await - .map_err(From::from) + .map_err(ProgramRunnerError::from) } pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { self.addr .send(UpdatePrograms(new_programs)) .await - .map_err(From::from) + .map_err(ProgramRunnerError::from) } - pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> { + pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result { self.addr .send(RunProgramId(program_id)) .await - .map_err(From::from) + .map_err(ProgramRunnerError::from)? } pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { self.addr .send(RunProgram(program)) .await - .map_err(From::from) + .map_err(ProgramRunnerError::from) } pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { self.addr .send(CancelProgram(program_id)) .await - .map_err(From::from) + .map_err(ProgramRunnerError::from) } pub async fn subscribe(&mut self) -> Result {