From 2eb64bbdc03b3d299f0c5f0c8f2b82d30f71e8b8 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Wed, 30 Sep 2020 14:58:26 -0600 Subject: [PATCH] Implement cancelling program --- src/mqtt/request/mod.rs | 2 ++ src/mqtt/request/programs.rs | 32 ++++++++++++++++++++++++++++++++ src/program_runner.rs | 9 ++++++--- 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/src/mqtt/request/mod.rs b/src/mqtt/request/mod.rs index e90b08b..97a056f 100644 --- a/src/mqtt/request/mod.rs +++ b/src/mqtt/request/mod.rs @@ -262,6 +262,7 @@ pub enum Request { CancelSectionRunId(sections::CancelSectionRunIdRequest), PauseSectionRunner(sections::PauseSectionRunnerRequest), RunProgram(programs::RunProgramRequest), + CancelProgram(programs::CancelProgramRequest), } impl IRequest for Request { @@ -274,6 +275,7 @@ impl IRequest for Request { Request::CancelSectionRunId(req) => req.exec_erased(ctx), Request::PauseSectionRunner(req) => req.exec_erased(ctx), Request::RunProgram(req) => req.exec_erased(ctx), + Request::CancelProgram(req) => req.exec_erased(ctx), } } } diff --git a/src/mqtt/request/programs.rs b/src/mqtt/request/programs.rs index b105990..237ba60 100644 --- a/src/mqtt/request/programs.rs +++ b/src/mqtt/request/programs.rs @@ -30,3 +30,35 @@ impl IRequest for RunProgramRequest { }) } } + +#[derive(Debug, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelProgramRequest { + program_id: ProgramId, +} + +impl IRequest for CancelProgramRequest { + type Response = ResponseMessage; + + fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture { + let mut program_runner = ctx.program_runner.clone(); + let program_id = self.program_id; + Box::pin(async move { + let cancelled = program_runner + .cancel_program(program_id) + .await + .wrap_err("could not run cancel program")?; + match cancelled { + Some(program) => Ok(ResponseMessage::new(format!( + "program '{}' cancelled", + program.name + ))), + None => Err(RequestError::with_name( + ErrorCode::NoSuchProgram, + "program was not running", + "program", + )), + } + }) + } +} diff --git a/src/program_runner.rs b/src/program_runner.rs index dc1307b..c69a008 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -297,19 +297,22 @@ impl Handler for ProgramRunnerActor { } #[derive(Message)] -#[rtype(result = "()")] +#[rtype(result = "Option")] struct CancelProgram(ProgramId); impl Handler for ProgramRunnerActor { - type Result = (); + type Result = Option; fn handle(&mut self, msg: CancelProgram, ctx: &mut Self::Context) -> Self::Result { let CancelProgram(program_id) = msg; + let mut cancelled = None; for run in self.run_queue.iter_mut() { if run.program.id == program_id { run.state = RunState::Cancelled; + cancelled = Some(run.program.clone()); } } ctx.notify(Process); + cancelled } } @@ -448,7 +451,7 @@ impl ProgramRunner { .map_err(ProgramRunnerError::from) } - pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { + pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result> { self.addr .send(CancelProgram(program_id)) .await