|
|
@ -1,6 +1,6 @@ |
|
|
|
use super::{event_loop::EventLoopTask, request, MqttInterface}; |
|
|
|
use super::{event_loop::EventLoopTask, request, MqttInterface}; |
|
|
|
use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; |
|
|
|
use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; |
|
|
|
use request::{ErrorCode, RequestContext, RequestError, WithRequestId}; |
|
|
|
use request::{ErrorCode, RequestContext, RequestError, WithRequestId, Response}; |
|
|
|
use tokio::sync::oneshot; |
|
|
|
use tokio::sync::oneshot; |
|
|
|
use tracing::{debug, error, info, trace, warn}; |
|
|
|
use tracing::{debug, error, info, trace, warn}; |
|
|
|
|
|
|
|
|
|
|
@ -37,7 +37,7 @@ impl MqttActor { |
|
|
|
let rid = request_value.rid; |
|
|
|
let rid = request_value.rid; |
|
|
|
let request_fut = |
|
|
|
let request_fut = |
|
|
|
serde_json::from_value::<request::Request>(request_value.rest).map(|mut request| { |
|
|
|
serde_json::from_value::<request::Request>(request_value.rest).map(|mut request| { |
|
|
|
trace!("deserialized request: {:?}", request); |
|
|
|
debug!(rid, "about to execute request: {:?}", request); |
|
|
|
request.execute(&mut self.request_context) |
|
|
|
request.execute(&mut self.request_context) |
|
|
|
}); |
|
|
|
}); |
|
|
|
let mut interface = self.interface.clone(); |
|
|
|
let mut interface = self.interface.clone(); |
|
|
@ -52,11 +52,18 @@ impl MqttActor { |
|
|
|
) |
|
|
|
) |
|
|
|
.into(), |
|
|
|
.into(), |
|
|
|
}; |
|
|
|
}; |
|
|
|
|
|
|
|
match &response { |
|
|
|
|
|
|
|
Response::Success(res) => { |
|
|
|
|
|
|
|
debug!(rid, response = display(res), "success response:"); |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
Response::Error(err) => { |
|
|
|
|
|
|
|
debug!(rid, "request error: {}", err); |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
}; |
|
|
|
let resp_with_id = WithRequestId::<request::Response> { |
|
|
|
let resp_with_id = WithRequestId::<request::Response> { |
|
|
|
rid, |
|
|
|
rid, |
|
|
|
rest: response, |
|
|
|
rest: response, |
|
|
|
}; |
|
|
|
}; |
|
|
|
trace!("sending request response: {:?}", resp_with_id); |
|
|
|
|
|
|
|
if let Err(err) = interface.publish_response(resp_with_id).await { |
|
|
|
if let Err(err) = interface.publish_response(resp_with_id).await { |
|
|
|
error!("could not publish request response: {:?}", err); |
|
|
|
error!("could not publish request response: {:?}", err); |
|
|
|
} |
|
|
|
} |
|
|
@ -134,7 +141,6 @@ impl Handler<PubRecieve> for MqttActor { |
|
|
|
fn handle(&mut self, msg: PubRecieve, ctx: &mut Self::Context) -> Self::Result { |
|
|
|
fn handle(&mut self, msg: PubRecieve, ctx: &mut Self::Context) -> Self::Result { |
|
|
|
let topic = &msg.0.topic; |
|
|
|
let topic = &msg.0.topic; |
|
|
|
if topic == &self.interface.topics.requests() { |
|
|
|
if topic == &self.interface.topics.requests() { |
|
|
|
debug!("received request: {:?}", msg.0); |
|
|
|
|
|
|
|
self.handle_request(msg.0.payload.as_ref(), ctx); |
|
|
|
self.handle_request(msg.0.payload.as_ref(), ctx); |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
warn!("received on unknown topic: {:?}", topic); |
|
|
|
warn!("received on unknown topic: {:?}", topic); |
|
|
|