From faae15509b2c8dac1a76f3e04ff8a5d9c252638b Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Wed, 30 Sep 2020 13:48:32 -0600 Subject: [PATCH] Implement cancelling section runs over MQTT --- src/mqtt/actor.rs | 4 +-- src/mqtt/request/mod.rs | 48 +++++++++++++++++++++--------------- src/mqtt/request/sections.rs | 38 +++++++++++++++++++++++++++- 3 files changed, 67 insertions(+), 23 deletions(-) diff --git a/src/mqtt/actor.rs b/src/mqtt/actor.rs index bc39002..b5c7165 100644 --- a/src/mqtt/actor.rs +++ b/src/mqtt/actor.rs @@ -58,7 +58,7 @@ impl MqttActor { }; trace!("sending request response: {:?}", resp_with_id); if let Err(err) = interface.publish_response(resp_with_id).await { - error!("could not publish request response: {}", err); + error!("could not publish request response: {:?}", err); } }; ctx.spawn(fut.into_actor(self)); @@ -117,7 +117,7 @@ impl Handler for MqttActor { let res = interface.publish_connected(true).await; let res = res.and(interface.subscribe_requests().await); if let Err(err) = res { - error!("error in connection setup: {}", err); + error!("error in connection setup: {:?}", err); } }; ctx.spawn(fut.into_actor(self)); diff --git a/src/mqtt/request/mod.rs b/src/mqtt/request/mod.rs index 1be71b6..2dd460f 100644 --- a/src/mqtt/request/mod.rs +++ b/src/mqtt/request/mod.rs @@ -28,6 +28,8 @@ pub enum ErrorCode { NoPermission = 107, NotFound = 109, // NotUnique = 110, + NoSuchSection = 120, + NoSuchSectionRun = 121, Internal = 200, NotImplemented = 201, Timeout = 300, @@ -156,20 +158,24 @@ impl RequestError { } #[derive(Debug, Serialize, Deserialize)] -struct ResponseMessage(#[serde(rename = "message")] String); +struct ResponseMessage { + message: String, +} impl ResponseMessage { fn new(message: M) -> Self where M: ToString, { - ResponseMessage(message.to_string()) + ResponseMessage { + message: message.to_string(), + } } } impl From for ResponseMessage { fn from(message: String) -> Self { - ResponseMessage(message) + ResponseMessage { message } } } @@ -213,13 +219,6 @@ where } } -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase", tag = "type")] -pub enum Request { - RunSection(sections::RunSectionRequest), - CancelSection(sections::CancelSectionRequest), -} - #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase", tag = "result")] pub enum Response { @@ -242,6 +241,24 @@ impl From for Response { } } +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct WithRequestId { + pub rid: i32, + #[serde(flatten)] + pub rest: T, +} + +pub type ResponseWithId = WithRequestId; + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase", tag = "type")] +pub enum Request { + RunSection(sections::RunSectionRequest), + CancelSection(sections::CancelSectionRequest), + CancelSectionRunId(sections::CancelSectionRunIdRequest), +} + impl IRequest for Request { type Response = ResponseValue; @@ -249,6 +266,7 @@ impl IRequest for Request { match self { Request::RunSection(req) => req.exec_erased(ctx), Request::CancelSection(req) => req.exec_erased(ctx), + Request::CancelSectionRunId(req) => req.exec_erased(ctx), } } } @@ -258,13 +276,3 @@ impl Request { self.exec(ctx).map(Response::from) } } - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct WithRequestId { - pub rid: i32, - #[serde(flatten)] - pub rest: T, -} - -pub type ResponseWithId = WithRequestId; diff --git a/src/mqtt/request/sections.rs b/src/mqtt/request/sections.rs index 3ce50dc..ca222f4 100644 --- a/src/mqtt/request/sections.rs +++ b/src/mqtt/request/sections.rs @@ -11,7 +11,7 @@ pub struct SectionId(pub crate::model::SectionId); impl SectionId { fn get_section(self, sections: &Sections) -> Result { sections.get(&self.0).cloned().ok_or_else(|| { - RequestError::with_name(ErrorCode::NotFound, "section not found", "section") + RequestError::with_name(ErrorCode::NoSuchSection, "section not found", "section") }) } } @@ -85,3 +85,39 @@ impl IRequest for CancelSectionRequest { }) } } + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelSectionRunIdRequest { + pub run_id: SectionRunHandle, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelSectionRunIdResponse { + pub message: String, + pub cancelled: bool, +} + +impl IRequest for CancelSectionRunIdRequest { + type Response = ResponseMessage; + fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture { + let mut section_runner = ctx.section_runner.clone(); + let run_id = self.run_id.clone(); + Box::pin(async move { + let cancelled = section_runner + .cancel_run(run_id) + .await + .wrap_err("could not cancel section run")?; + if cancelled { + Ok(ResponseMessage::new("cancelled section run")) + } else { + Err(RequestError::with_name( + ErrorCode::NoSuchSectionRun, + "section run does not exist", + "section run", + )) + } + }) + } +}