Browse Source

Implement cancelling section runs over MQTT

master
Alex Mikhalev 4 years ago
parent
commit
faae15509b
  1. 4
      src/mqtt/actor.rs
  2. 48
      src/mqtt/request/mod.rs
  3. 38
      src/mqtt/request/sections.rs

4
src/mqtt/actor.rs

@ -58,7 +58,7 @@ impl MqttActor {
}; };
trace!("sending request response: {:?}", resp_with_id); trace!("sending request response: {:?}", resp_with_id);
if let Err(err) = interface.publish_response(resp_with_id).await { if let Err(err) = interface.publish_response(resp_with_id).await {
error!("could not publish request response: {}", err); error!("could not publish request response: {:?}", err);
} }
}; };
ctx.spawn(fut.into_actor(self)); ctx.spawn(fut.into_actor(self));
@ -117,7 +117,7 @@ impl Handler<Connected> for MqttActor {
let res = interface.publish_connected(true).await; let res = interface.publish_connected(true).await;
let res = res.and(interface.subscribe_requests().await); let res = res.and(interface.subscribe_requests().await);
if let Err(err) = res { if let Err(err) = res {
error!("error in connection setup: {}", err); error!("error in connection setup: {:?}", err);
} }
}; };
ctx.spawn(fut.into_actor(self)); ctx.spawn(fut.into_actor(self));

48
src/mqtt/request/mod.rs

@ -28,6 +28,8 @@ pub enum ErrorCode {
NoPermission = 107, NoPermission = 107,
NotFound = 109, NotFound = 109,
// NotUnique = 110, // NotUnique = 110,
NoSuchSection = 120,
NoSuchSectionRun = 121,
Internal = 200, Internal = 200,
NotImplemented = 201, NotImplemented = 201,
Timeout = 300, Timeout = 300,
@ -156,20 +158,24 @@ impl RequestError {
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
struct ResponseMessage(#[serde(rename = "message")] String); struct ResponseMessage {
message: String,
}
impl ResponseMessage { impl ResponseMessage {
fn new<M>(message: M) -> Self fn new<M>(message: M) -> Self
where where
M: ToString, M: ToString,
{ {
ResponseMessage(message.to_string()) ResponseMessage {
message: message.to_string(),
}
} }
} }
impl From<String> for ResponseMessage { impl From<String> for ResponseMessage {
fn from(message: String) -> Self { fn from(message: String) -> Self {
ResponseMessage(message) ResponseMessage { message }
} }
} }
@ -213,13 +219,6 @@ where
} }
} }
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Request {
RunSection(sections::RunSectionRequest),
CancelSection(sections::CancelSectionRequest),
}
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase", tag = "result")] #[serde(rename_all = "camelCase", tag = "result")]
pub enum Response { pub enum Response {
@ -242,6 +241,24 @@ impl From<RequestError> for Response {
} }
} }
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WithRequestId<T> {
pub rid: i32,
#[serde(flatten)]
pub rest: T,
}
pub type ResponseWithId = WithRequestId<Response>;
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase", tag = "type")]
pub enum Request {
RunSection(sections::RunSectionRequest),
CancelSection(sections::CancelSectionRequest),
CancelSectionRunId(sections::CancelSectionRunIdRequest),
}
impl IRequest for Request { impl IRequest for Request {
type Response = ResponseValue; type Response = ResponseValue;
@ -249,6 +266,7 @@ impl IRequest for Request {
match self { match self {
Request::RunSection(req) => req.exec_erased(ctx), Request::RunSection(req) => req.exec_erased(ctx),
Request::CancelSection(req) => req.exec_erased(ctx), Request::CancelSection(req) => req.exec_erased(ctx),
Request::CancelSectionRunId(req) => req.exec_erased(ctx),
} }
} }
} }
@ -258,13 +276,3 @@ impl Request {
self.exec(ctx).map(Response::from) self.exec(ctx).map(Response::from)
} }
} }
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct WithRequestId<T> {
pub rid: i32,
#[serde(flatten)]
pub rest: T,
}
pub type ResponseWithId = WithRequestId<Response>;

38
src/mqtt/request/sections.rs

@ -11,7 +11,7 @@ pub struct SectionId(pub crate::model::SectionId);
impl SectionId { impl SectionId {
fn get_section(self, sections: &Sections) -> Result<SectionRef, RequestError> { fn get_section(self, sections: &Sections) -> Result<SectionRef, RequestError> {
sections.get(&self.0).cloned().ok_or_else(|| { sections.get(&self.0).cloned().ok_or_else(|| {
RequestError::with_name(ErrorCode::NotFound, "section not found", "section") RequestError::with_name(ErrorCode::NoSuchSection, "section not found", "section")
}) })
} }
} }
@ -85,3 +85,39 @@ impl IRequest for CancelSectionRequest {
}) })
} }
} }
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionRunIdRequest {
pub run_id: SectionRunHandle,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionRunIdResponse {
pub message: String,
pub cancelled: bool,
}
impl IRequest for CancelSectionRunIdRequest {
type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone();
let run_id = self.run_id.clone();
Box::pin(async move {
let cancelled = section_runner
.cancel_run(run_id)
.await
.wrap_err("could not cancel section run")?;
if cancelled {
Ok(ResponseMessage::new("cancelled section run"))
} else {
Err(RequestError::with_name(
ErrorCode::NoSuchSectionRun,
"section run does not exist",
"section run",
))
}
})
}
}

Loading…
Cancel
Save