Browse Source

Implement cancelling program

master
Alex Mikhalev 4 years ago
parent
commit
2eb64bbdc0
  1. 2
      src/mqtt/request/mod.rs
  2. 32
      src/mqtt/request/programs.rs
  3. 9
      src/program_runner.rs

2
src/mqtt/request/mod.rs

@ -262,6 +262,7 @@ pub enum Request {
CancelSectionRunId(sections::CancelSectionRunIdRequest), CancelSectionRunId(sections::CancelSectionRunIdRequest),
PauseSectionRunner(sections::PauseSectionRunnerRequest), PauseSectionRunner(sections::PauseSectionRunnerRequest),
RunProgram(programs::RunProgramRequest), RunProgram(programs::RunProgramRequest),
CancelProgram(programs::CancelProgramRequest),
} }
impl IRequest for Request { impl IRequest for Request {
@ -274,6 +275,7 @@ impl IRequest for Request {
Request::CancelSectionRunId(req) => req.exec_erased(ctx), Request::CancelSectionRunId(req) => req.exec_erased(ctx),
Request::PauseSectionRunner(req) => req.exec_erased(ctx), Request::PauseSectionRunner(req) => req.exec_erased(ctx),
Request::RunProgram(req) => req.exec_erased(ctx), Request::RunProgram(req) => req.exec_erased(ctx),
Request::CancelProgram(req) => req.exec_erased(ctx),
} }
} }
} }

32
src/mqtt/request/programs.rs

@ -30,3 +30,35 @@ impl IRequest for RunProgramRequest {
}) })
} }
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelProgramRequest {
program_id: ProgramId,
}
impl IRequest for CancelProgramRequest {
type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut program_runner = ctx.program_runner.clone();
let program_id = self.program_id;
Box::pin(async move {
let cancelled = program_runner
.cancel_program(program_id)
.await
.wrap_err("could not run cancel program")?;
match cancelled {
Some(program) => Ok(ResponseMessage::new(format!(
"program '{}' cancelled",
program.name
))),
None => Err(RequestError::with_name(
ErrorCode::NoSuchProgram,
"program was not running",
"program",
)),
}
})
}
}

9
src/program_runner.rs

@ -297,19 +297,22 @@ impl Handler<RunProgram> for ProgramRunnerActor {
} }
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "Option<ProgramRef>")]
struct CancelProgram(ProgramId); struct CancelProgram(ProgramId);
impl Handler<CancelProgram> for ProgramRunnerActor { impl Handler<CancelProgram> for ProgramRunnerActor {
type Result = (); type Result = Option<ProgramRef>;
fn handle(&mut self, msg: CancelProgram, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: CancelProgram, ctx: &mut Self::Context) -> Self::Result {
let CancelProgram(program_id) = msg; let CancelProgram(program_id) = msg;
let mut cancelled = None;
for run in self.run_queue.iter_mut() { for run in self.run_queue.iter_mut() {
if run.program.id == program_id { if run.program.id == program_id {
run.state = RunState::Cancelled; run.state = RunState::Cancelled;
cancelled = Some(run.program.clone());
} }
} }
ctx.notify(Process); ctx.notify(Process);
cancelled
} }
} }
@ -448,7 +451,7 @@ impl ProgramRunner {
.map_err(ProgramRunnerError::from) .map_err(ProgramRunnerError::from)
} }
pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<Option<ProgramRef>> {
self.addr self.addr
.send(CancelProgram(program_id)) .send(CancelProgram(program_id))
.await .await

Loading…
Cancel
Save