From cf6431fbe1dec1b820c923d9e5751b4bd1cafe39 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Dec 2020 23:56:12 -0700 Subject: [PATCH] Refactor broker requests --- src/broker_task/mod.rs | 2 +- src/broker_task/request.rs | 105 ++++++++++++++++++++++--------------- 2 files changed, 64 insertions(+), 43 deletions(-) diff --git a/src/broker_task/mod.rs b/src/broker_task/mod.rs index 03fc9be..54fdf6f 100644 --- a/src/broker_task/mod.rs +++ b/src/broker_task/mod.rs @@ -33,7 +33,7 @@ impl BrokerTask { loop { tokio::select! { Some(request) = self.request_rx.recv() => { - request.handle_request(&mut registry); + request.run_request(&mut registry); } else => { trace!("no more handles to BrokerTask"); diff --git a/src/broker_task/request.rs b/src/broker_task/request.rs index a81878f..a7d9c0a 100644 --- a/src/broker_task/request.rs +++ b/src/broker_task/request.rs @@ -6,67 +6,88 @@ use super::{ }; use crate::{Message, Publication, Subscription}; -pub(crate) trait BrokerRequestInternal { - fn handle_request(self: Box, registry: &mut Registry); +pub trait BrokerRequestInternal { + fn run_request(self: Box, registry: &mut Registry); } -pub(crate) trait BrokerRequest: BrokerRequestInternal {} -impl BrokerRequest for T {} +pub trait RequestHandler { + type Response; -pub(crate) type BrokerRequestBox = Box; -pub(crate) type BrokerRequestSender = mpsc::Sender; + fn handle(self, registry: &mut Registry) -> Self::Response; +} -pub(crate) struct SubscribeRequest { - msg_type: BasicMessageType, - response_tx: oneshot::Sender>>, +pub struct BrokerRequest { + handler: H, + response_tx: oneshot::Sender, } -impl BrokerRequestInternal for SubscribeRequest { - fn handle_request(self: Box, registry: &mut Registry) { - let sender = registry.get_sender_for_type::(&self.msg_type); - let subscription = sender.map(|sender| { - let receiver = sender.subscribe(); - Subscription::new(receiver) - }); - let _ = self.response_tx.send(subscription); +impl BrokerRequestInternal for BrokerRequest { + fn run_request(self: Box, registry: &mut Registry) { + let response = self.handler.handle(registry); + let _ = self.response_tx.send(response); } } -impl SubscribeRequest { - pub(crate) fn new() -> (Box, oneshot::Receiver>>) { +impl BrokerRequest { + fn create(response_tx: oneshot::Sender) -> Box { + Box::new(Self { + handler: H::default(), + response_tx, + }) + } + + pub(crate) fn new() -> (Box, oneshot::Receiver) { let (response_tx, response_rx) = oneshot::channel(); - ( - Box::new(Self { - msg_type: Default::default(), - response_tx, - }), - response_rx, - ) + (Self::create(response_tx), response_rx) } } -pub(crate) struct AdvertiseRequest { +pub type BrokerRequestBox = Box; +pub type BrokerRequestSender = mpsc::Sender; + +pub type SubscribeRequest = BrokerRequest>; +pub struct Subscribe { msg_type: BasicMessageType, - response_tx: oneshot::Sender>>, } -impl BrokerRequestInternal for AdvertiseRequest { - fn handle_request(self: Box, registry: &mut Registry) { +impl Default for Subscribe { + fn default() -> Self { + Self { + msg_type: BasicMessageType::::default(), + } + } +} + +impl RequestHandler for Subscribe { + type Response = BrokerResult>; + + fn handle(self, registry: &mut Registry) -> Self::Response { let sender = registry.get_sender_for_type::(&self.msg_type); - let publication = sender.map(|sender| Publication::new(sender.clone())); - let _ = self.response_tx.send(publication); + sender.map(|sender| { + let receiver = sender.subscribe(); + Subscription::new(receiver) + }) } } -impl AdvertiseRequest { - pub(crate) fn new() -> (Box, oneshot::Receiver>>) { - let (response_tx, response_rx) = oneshot::channel(); - ( - Box::new(Self { - msg_type: Default::default(), - response_tx, - }), - response_rx, - ) +pub type AdvertiseRequest = BrokerRequest>; +pub struct Advertise { + msg_type: BasicMessageType, +} + +impl Default for Advertise { + fn default() -> Self { + Self { + msg_type: BasicMessageType::::default(), + } + } +} + +impl RequestHandler for Advertise { + type Response = BrokerResult>; + + fn handle(self, registry: &mut Registry) -> Self::Response { + let sender = registry.get_sender_for_type::(&self.msg_type); + sender.map(|sender| Publication::new(sender.clone())) } }