Browse Source

Refactor broker requests

master
Alex Mikhalev 4 years ago
parent
commit
cf6431fbe1
  1. 2
      src/broker_task/mod.rs
  2. 103
      src/broker_task/request.rs

2
src/broker_task/mod.rs

@ -33,7 +33,7 @@ impl BrokerTask {
loop { loop {
tokio::select! { tokio::select! {
Some(request) = self.request_rx.recv() => { Some(request) = self.request_rx.recv() => {
request.handle_request(&mut registry); request.run_request(&mut registry);
} }
else => { else => {
trace!("no more handles to BrokerTask"); trace!("no more handles to BrokerTask");

103
src/broker_task/request.rs

@ -6,67 +6,88 @@ use super::{
}; };
use crate::{Message, Publication, Subscription}; use crate::{Message, Publication, Subscription};
pub(crate) trait BrokerRequestInternal { pub trait BrokerRequestInternal {
fn handle_request(self: Box<Self>, registry: &mut Registry); fn run_request(self: Box<Self>, registry: &mut Registry);
} }
pub(crate) trait BrokerRequest: BrokerRequestInternal {} pub trait RequestHandler {
impl<T: BrokerRequestInternal> BrokerRequest for T {} type Response;
pub(crate) type BrokerRequestBox = Box<dyn BrokerRequest + Send + Sync>; fn handle(self, registry: &mut Registry) -> Self::Response;
pub(crate) type BrokerRequestSender = mpsc::Sender<BrokerRequestBox>; }
pub(crate) struct SubscribeRequest<T> { pub struct BrokerRequest<H: RequestHandler> {
msg_type: BasicMessageType<T>, handler: H,
response_tx: oneshot::Sender<BrokerResult<Subscription<T>>>, response_tx: oneshot::Sender<H::Response>,
} }
impl<T: Message> BrokerRequestInternal for SubscribeRequest<T> { impl<H: RequestHandler> BrokerRequestInternal for BrokerRequest<H> {
fn handle_request(self: Box<Self>, registry: &mut Registry) { fn run_request(self: Box<Self>, registry: &mut Registry) {
let sender = registry.get_sender_for_type::<T>(&self.msg_type); let response = self.handler.handle(registry);
let subscription = sender.map(|sender| { let _ = self.response_tx.send(response);
let receiver = sender.subscribe();
Subscription::new(receiver)
});
let _ = self.response_tx.send(subscription);
} }
} }
impl<T: Message> SubscribeRequest<T> { impl<H: RequestHandler + Default> BrokerRequest<H> {
pub(crate) fn new() -> (Box<Self>, oneshot::Receiver<BrokerResult<Subscription<T>>>) { fn create(response_tx: oneshot::Sender<H::Response>) -> Box<Self> {
let (response_tx, response_rx) = oneshot::channel();
(
Box::new(Self { Box::new(Self {
msg_type: Default::default(), handler: H::default(),
response_tx, response_tx,
}), })
response_rx,
)
} }
pub(crate) fn new() -> (Box<Self>, oneshot::Receiver<H::Response>) {
let (response_tx, response_rx) = oneshot::channel();
(Self::create(response_tx), response_rx)
} }
}
pub type BrokerRequestBox = Box<dyn BrokerRequestInternal + Send + Sync>;
pub type BrokerRequestSender = mpsc::Sender<BrokerRequestBox>;
pub(crate) struct AdvertiseRequest<T> { pub type SubscribeRequest<T> = BrokerRequest<Subscribe<T>>;
pub struct Subscribe<T: Message> {
msg_type: BasicMessageType<T>, msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Publication<T>>>,
} }
impl<T: Message> BrokerRequestInternal for AdvertiseRequest<T> { impl<T: Message> Default for Subscribe<T> {
fn handle_request(self: Box<Self>, registry: &mut Registry) { fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for Subscribe<T> {
type Response = BrokerResult<Subscription<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_sender_for_type::<T>(&self.msg_type); let sender = registry.get_sender_for_type::<T>(&self.msg_type);
let publication = sender.map(|sender| Publication::new(sender.clone())); sender.map(|sender| {
let _ = self.response_tx.send(publication); let receiver = sender.subscribe();
Subscription::new(receiver)
})
} }
} }
impl<T: Message> AdvertiseRequest<T> { pub type AdvertiseRequest<T> = BrokerRequest<Advertise<T>>;
pub(crate) fn new() -> (Box<Self>, oneshot::Receiver<BrokerResult<Publication<T>>>) { pub struct Advertise<T: Message> {
let (response_tx, response_rx) = oneshot::channel(); msg_type: BasicMessageType<T>,
( }
Box::new(Self {
msg_type: Default::default(), impl<T: Message> Default for Advertise<T> {
response_tx, fn default() -> Self {
}), Self {
response_rx, msg_type: BasicMessageType::<T>::default(),
) }
}
}
impl<T: Message> RequestHandler for Advertise<T> {
type Response = BrokerResult<Publication<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_sender_for_type::<T>(&self.msg_type);
sender.map(|sender| Publication::new(sender.clone()))
} }
} }

Loading…
Cancel
Save