From 08c63536c460a64152ba23edf8e5a2c8d88e599b Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Dec 2020 14:54:28 -0700 Subject: [PATCH] More broker_task refactor --- src/broker_task.rs | 224 ------------------------------------ src/broker_task/error.rs | 5 + src/broker_task/mod.rs | 47 ++++++++ src/broker_task/registry.rs | 98 ++++++++++++++++ src/broker_task/request.rs | 72 ++++++++++++ src/lib.rs | 34 ++---- 6 files changed, 233 insertions(+), 247 deletions(-) delete mode 100644 src/broker_task.rs create mode 100644 src/broker_task/error.rs create mode 100644 src/broker_task/mod.rs create mode 100644 src/broker_task/registry.rs create mode 100644 src/broker_task/request.rs diff --git a/src/broker_task.rs b/src/broker_task.rs deleted file mode 100644 index 5993275..0000000 --- a/src/broker_task.rs +++ /dev/null @@ -1,224 +0,0 @@ -use log::trace; -use std::{ - any::{Any, TypeId}, - borrow::Cow, - collections::HashMap, - marker::PhantomData, -}; -use tokio::sync::{broadcast, mpsc, oneshot}; - -use crate::{message::Message, publication::Publication, subscription::Subscription}; - -pub enum BrokerError { - MismatchedType, -} - -pub type BrokerResult = Result; - -pub type ErasedSender = Box; - -fn downcast_sender_ref(erased: &ErasedSender) -> BrokerResult<&broadcast::Sender> { - (**erased) - .downcast_ref::>() - .ok_or(BrokerError::MismatchedType) -} - -pub trait MessageType { - fn message_type_id(&self) -> TypeId; - - fn message_type_name(&self) -> &'static str; - - fn create_broadcast_sender(&self) -> ErasedSender; -} - -pub struct BasicMessageType { - _phantom: PhantomData, -} - -impl Default for BasicMessageType { - fn default() -> Self { - BasicMessageType { - _phantom: PhantomData, - } - } -} - -impl MessageType for BasicMessageType { - fn message_type_id(&self) -> TypeId { - TypeId::of::() - } - - fn message_type_name(&self) -> &'static str { - std::any::type_name::() - } - - fn create_broadcast_sender(&self) -> ErasedSender { - trace!( - "Creating sender for {} ({:?})", - std::any::type_name::(), - MessageType::type_id(self) - ); - // TODO: configurable queue size (per message?) - let (sender, _) = broadcast::channel::(8); - let sender: ErasedSender = Box::new(sender); - sender - } -} - -pub trait SubscribeRequest { - fn message_type(&self) -> &dyn MessageType; - - /// `sender` must be `tokio::sync::broadcast::Sender` where - /// `MessageType::get_message_type` returns the `TypeId` of `T`. - fn send_subscribe_response(self: Box, sender: &ErasedSender); -} - -pub type SubscribeRequestBox = Box; -pub type SubscribeRequestSender = mpsc::Sender; - -pub struct BasicSubscribeRequest { - msg_type: BasicMessageType, - response_tx: oneshot::Sender>>, -} - -impl SubscribeRequest for BasicSubscribeRequest { - fn message_type(&self) -> &dyn MessageType { - &self.msg_type - } - - fn send_subscribe_response(self: Box, sender: &ErasedSender) { - let subscription = downcast_sender_ref::(sender).map(|sender| { - let receiver = sender.subscribe(); - Subscription::new(receiver) - }); - let _ = self.response_tx.send(subscription); - } -} - -impl BasicSubscribeRequest { - pub(crate) fn new() -> (Self, oneshot::Receiver>>) { - let (response_tx, response_rx) = oneshot::channel(); - ( - Self { - msg_type: Default::default(), - response_tx, - }, - response_rx, - ) - } -} - -pub trait AdvertiseRequest { - fn message_type(&self) -> &dyn MessageType; - - /// `sender` must be `tokio::sync::broadcast::Sender` where - /// `MessageType::get_message_type` returns the `TypeId` of `T`. - fn send_advertise_response(self: Box, sender: &ErasedSender); -} - -pub type AdvertiseRequestBox = Box; -pub type AdvertiseRequestSender = mpsc::Sender; - -pub struct BasicAdvertiseRequest { - msg_type: BasicMessageType, - response_tx: oneshot::Sender>>, -} - -impl AdvertiseRequest for BasicAdvertiseRequest { - fn message_type(&self) -> &dyn MessageType { - &self.msg_type - } - - fn send_advertise_response(self: Box, sender: &ErasedSender) { - let publication = - downcast_sender_ref::(sender).map(|sender| Publication::new(sender.clone())); - let _ = self.response_tx.send(publication); - } -} - -impl BasicAdvertiseRequest { - pub(crate) fn new() -> (Self, oneshot::Receiver>>) { - let (response_tx, response_rx) = oneshot::channel(); - ( - Self { - msg_type: Default::default(), - response_tx, - }, - response_rx, - ) - } -} - -#[derive(Debug)] -struct TopicEntry { - message_type_id: TypeId, - message_type_name: String, - sender: ErasedSender, -} - -#[derive(Debug)] -struct Registry { - topics: HashMap, TopicEntry>, -} - -impl Default for Registry { - fn default() -> Self { - Registry { - topics: HashMap::new(), - } - } -} - -impl Registry { - fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender { - let type_id = message_type.message_type_id(); - let type_name = message_type.message_type_name(); - let topic_entry = self.topics.entry(type_name.into()); - let topic_entry = topic_entry.or_insert_with(|| TopicEntry { - message_type_id: type_id, - message_type_name: type_name.to_string(), - sender: message_type.create_broadcast_sender(), - }); - &topic_entry.sender - } - - fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) { - let sender = self.get_sender_for_type(subscribe_request.message_type()); - subscribe_request.send_subscribe_response(sender) - } - - fn handle_advertise(&mut self, advertise_request: AdvertiseRequestBox) { - let sender = self.get_sender_for_type(advertise_request.message_type()); - advertise_request.send_advertise_response(sender) - } -} - -pub struct BrokerTask { - pub(crate) subscribe_rx: mpsc::Receiver, - pub(crate) advertise_rx: mpsc::Receiver, -} - -impl BrokerTask { - pub(crate) async fn run(mut self) { - trace!("BrokerTask starting"); - - let mut registry = Registry::default(); - - loop { - tokio::select! { - Some(subscribe_req) = self.subscribe_rx.recv() => { - registry.handle_subscribe(subscribe_req) - } - Some(advertise_req) = self.advertise_rx.recv() => { - registry.handle_advertise(advertise_req) - } - else => { - trace!("no more handles to BrokerTask"); - break; - } - } - } - - trace!("BrokerTask exiting"); - } -} diff --git a/src/broker_task/error.rs b/src/broker_task/error.rs new file mode 100644 index 0000000..78261af --- /dev/null +++ b/src/broker_task/error.rs @@ -0,0 +1,5 @@ +pub enum BrokerError { + MismatchedType, +} + +pub type BrokerResult = Result; diff --git a/src/broker_task/mod.rs b/src/broker_task/mod.rs new file mode 100644 index 0000000..03fc9be --- /dev/null +++ b/src/broker_task/mod.rs @@ -0,0 +1,47 @@ +use log::trace; +use tokio::{sync::mpsc, task::JoinHandle}; + +mod error; +mod registry; +mod request; + +pub(crate) use error::{BrokerError, BrokerResult}; +use registry::Registry; +pub(crate) use request::{ + AdvertiseRequest, BrokerRequestBox, BrokerRequestSender, SubscribeRequest, +}; + +pub(crate) struct BrokerTask { + request_rx: mpsc::Receiver, +} + +impl BrokerTask { + pub(crate) fn start() -> (BrokerRequestSender, JoinHandle<()>) { + let (request_tx, request_rx) = mpsc::channel(8); + + let broker = BrokerTask { request_rx }; + let join_handle = tokio::spawn(broker.run()); + + (request_tx, join_handle) + } + + async fn run(mut self) { + trace!("BrokerTask starting"); + + let mut registry = Registry::default(); + + loop { + tokio::select! { + Some(request) = self.request_rx.recv() => { + request.handle_request(&mut registry); + } + else => { + trace!("no more handles to BrokerTask"); + break; + } + } + } + + trace!("BrokerTask exiting"); + } +} diff --git a/src/broker_task/registry.rs b/src/broker_task/registry.rs new file mode 100644 index 0000000..9efff2a --- /dev/null +++ b/src/broker_task/registry.rs @@ -0,0 +1,98 @@ +use crate::Message; +use log::trace; +use std::{ + any::{Any, TypeId}, + borrow::Cow, + collections::HashMap, + marker::PhantomData, +}; +use tokio::sync::broadcast; +use super::{BrokerError, BrokerResult}; + +pub trait MessageType { + fn message_type_id(&self) -> TypeId; + + fn message_type_name(&self) -> &'static str; + + fn create_broadcast_sender(&self) -> ErasedSender; +} + +pub struct BasicMessageType { + _phantom: PhantomData, +} + +impl Default for BasicMessageType { + fn default() -> Self { + BasicMessageType { + _phantom: PhantomData, + } + } +} + +impl MessageType for BasicMessageType { + fn message_type_id(&self) -> TypeId { + TypeId::of::() + } + + fn message_type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn create_broadcast_sender(&self) -> ErasedSender { + trace!( + "Creating sender for {} ({:?})", + std::any::type_name::(), + MessageType::type_id(self) + ); + // TODO: configurable queue size (per message?) + let (sender, _) = broadcast::channel::(8); + let sender: ErasedSender = Box::new(sender); + sender + } +} + +pub type ErasedSender = Box; + +#[derive(Debug)] +pub struct TopicEntry { + message_type_id: TypeId, + message_type_name: String, + sender: ErasedSender, +} + +#[derive(Debug)] +pub struct Registry { + topics: HashMap, TopicEntry>, +} + +impl Default for Registry { + fn default() -> Self { + Registry { + topics: HashMap::new(), + } + } +} + +impl Registry { + fn get_erased_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender { + let type_id = message_type.message_type_id(); + let type_name = message_type.message_type_name(); + let topic_entry = self.topics.entry(type_name.into()); + let topic_entry = topic_entry.or_insert_with(|| TopicEntry { + message_type_id: type_id, + message_type_name: type_name.to_string(), + sender: message_type.create_broadcast_sender(), + }); + &topic_entry.sender + } + + pub fn get_sender_for_type( + &mut self, + message_type: &dyn MessageType, + ) -> BrokerResult<&broadcast::Sender> { + let erased = self.get_erased_sender_for_type(message_type); + (**erased) + .downcast_ref::>() + .ok_or(BrokerError::MismatchedType) + } +} diff --git a/src/broker_task/request.rs b/src/broker_task/request.rs new file mode 100644 index 0000000..a81878f --- /dev/null +++ b/src/broker_task/request.rs @@ -0,0 +1,72 @@ +use tokio::sync::{mpsc, oneshot}; + +use super::{ + registry::{BasicMessageType, Registry}, + BrokerResult, +}; +use crate::{Message, Publication, Subscription}; + +pub(crate) trait BrokerRequestInternal { + fn handle_request(self: Box, registry: &mut Registry); +} + +pub(crate) trait BrokerRequest: BrokerRequestInternal {} +impl BrokerRequest for T {} + +pub(crate) type BrokerRequestBox = Box; +pub(crate) type BrokerRequestSender = mpsc::Sender; + +pub(crate) struct SubscribeRequest { + msg_type: BasicMessageType, + response_tx: oneshot::Sender>>, +} + +impl BrokerRequestInternal for SubscribeRequest { + fn handle_request(self: Box, registry: &mut Registry) { + let sender = registry.get_sender_for_type::(&self.msg_type); + let subscription = sender.map(|sender| { + let receiver = sender.subscribe(); + Subscription::new(receiver) + }); + let _ = self.response_tx.send(subscription); + } +} + +impl SubscribeRequest { + pub(crate) fn new() -> (Box, oneshot::Receiver>>) { + let (response_tx, response_rx) = oneshot::channel(); + ( + Box::new(Self { + msg_type: Default::default(), + response_tx, + }), + response_rx, + ) + } +} + +pub(crate) struct AdvertiseRequest { + msg_type: BasicMessageType, + response_tx: oneshot::Sender>>, +} + +impl BrokerRequestInternal for AdvertiseRequest { + fn handle_request(self: Box, registry: &mut Registry) { + let sender = registry.get_sender_for_type::(&self.msg_type); + let publication = sender.map(|sender| Publication::new(sender.clone())); + let _ = self.response_tx.send(publication); + } +} + +impl AdvertiseRequest { + pub(crate) fn new() -> (Box, oneshot::Receiver>>) { + let (response_tx, response_rx) = oneshot::channel(); + ( + Box::new(Self { + msg_type: Default::default(), + response_tx, + }), + response_rx, + ) + } +} diff --git a/src/lib.rs b/src/lib.rs index 4e5655c..10cfd1f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,15 +4,14 @@ mod publication; mod subscription; use broker_task::{ - AdvertiseRequestSender, BasicAdvertiseRequest, BasicSubscribeRequest, BrokerError, - BrokerResult, BrokerTask, SubscribeRequestSender, + AdvertiseRequest, BrokerError, BrokerRequestSender, BrokerResult, BrokerTask, SubscribeRequest, }; pub use message::Message; pub use publication::{Publication, PublishError}; pub use subscription::{Subscription, SubscriptionError}; use futures::executor::block_on; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::oneshot; #[derive(Clone, Debug, PartialEq)] #[non_exhaustive] @@ -42,8 +41,7 @@ fn map_broker_response( #[derive(Debug, Clone)] pub struct Orsb { - subscribe_tx: SubscribeRequestSender, - advertise_tx: AdvertiseRequestSender, + request_tx: BrokerRequestSender, } impl Orsb { @@ -52,36 +50,26 @@ impl Orsb { } pub(crate) fn start_new2() -> (Self, tokio::task::JoinHandle<()>) { - let (subscribe_tx, subscribe_rx) = mpsc::channel(8); - let (advertise_tx, advertise_rx) = mpsc::channel(8); + let (request_tx, join_handle) = BrokerTask::start(); - let broker = BrokerTask { - subscribe_rx, - advertise_rx, - }; - let join_handle = tokio::spawn(broker.run()); - - let orsb = Orsb { - subscribe_tx, - advertise_tx, - }; + let orsb = Orsb { request_tx }; (orsb, join_handle) } pub async fn subscribe(&mut self) -> Result, OrsbError> { - let (subscribe_request, response_rx) = BasicSubscribeRequest::::new(); - self.subscribe_tx - .send(Box::new(subscribe_request)) + let (subscribe_request, response_rx) = SubscribeRequest::::new(); + self.request_tx + .send(subscribe_request) .await .or(Err(OrsbError::BrokerClosed))?; map_broker_response(response_rx.await) } pub async fn advertise(&mut self) -> Result, OrsbError> { - let (advertise_request, response_rx) = BasicAdvertiseRequest::::new(); - self.advertise_tx - .send(Box::new(advertise_request)) + let (advertise_request, response_rx) = AdvertiseRequest::::new(); + self.request_tx + .send(advertise_request) .await .or(Err(OrsbError::BrokerClosed))?; map_broker_response(response_rx.await)