Browse Source

More broker_task refactor

master
Alex Mikhalev 4 years ago
parent
commit
08c63536c4
  1. 224
      src/broker_task.rs
  2. 5
      src/broker_task/error.rs
  3. 47
      src/broker_task/mod.rs
  4. 98
      src/broker_task/registry.rs
  5. 72
      src/broker_task/request.rs
  6. 34
      src/lib.rs

224
src/broker_task.rs

@ -1,224 +0,0 @@
use log::trace;
use std::{
any::{Any, TypeId},
borrow::Cow,
collections::HashMap,
marker::PhantomData,
};
use tokio::sync::{broadcast, mpsc, oneshot};
use crate::{message::Message, publication::Publication, subscription::Subscription};
pub enum BrokerError {
MismatchedType,
}
pub type BrokerResult<T> = Result<T, BrokerError>;
pub type ErasedSender = Box<dyn Any + Send + Sync>;
fn downcast_sender_ref<T: Message>(erased: &ErasedSender) -> BrokerResult<&broadcast::Sender<T>> {
(**erased)
.downcast_ref::<broadcast::Sender<T>>()
.ok_or(BrokerError::MismatchedType)
}
pub trait MessageType {
fn message_type_id(&self) -> TypeId;
fn message_type_name(&self) -> &'static str;
fn create_broadcast_sender(&self) -> ErasedSender;
}
pub struct BasicMessageType<T> {
_phantom: PhantomData<T>,
}
impl<T> Default for BasicMessageType<T> {
fn default() -> Self {
BasicMessageType {
_phantom: PhantomData,
}
}
}
impl<T: Message> MessageType for BasicMessageType<T> {
fn message_type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
fn create_broadcast_sender(&self) -> ErasedSender {
trace!(
"Creating sender for {} ({:?})",
std::any::type_name::<T>(),
MessageType::type_id(self)
);
// TODO: configurable queue size (per message?)
let (sender, _) = broadcast::channel::<T>(8);
let sender: ErasedSender = Box::new(sender);
sender
}
}
pub trait SubscribeRequest {
fn message_type(&self) -> &dyn MessageType;
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender);
}
pub type SubscribeRequestBox = Box<dyn SubscribeRequest + Send + Sync>;
pub type SubscribeRequestSender = mpsc::Sender<SubscribeRequestBox>;
pub struct BasicSubscribeRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Subscription<T>>>,
}
impl<T: Message> SubscribeRequest for BasicSubscribeRequest<T> {
fn message_type(&self) -> &dyn MessageType {
&self.msg_type
}
fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender) {
let subscription = downcast_sender_ref::<T>(sender).map(|sender| {
let receiver = sender.subscribe();
Subscription::new(receiver)
});
let _ = self.response_tx.send(subscription);
}
}
impl<T: Message> BasicSubscribeRequest<T> {
pub(crate) fn new() -> (Self, oneshot::Receiver<BrokerResult<Subscription<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Self {
msg_type: Default::default(),
response_tx,
},
response_rx,
)
}
}
pub trait AdvertiseRequest {
fn message_type(&self) -> &dyn MessageType;
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
fn send_advertise_response(self: Box<Self>, sender: &ErasedSender);
}
pub type AdvertiseRequestBox = Box<dyn AdvertiseRequest + Send + Sync>;
pub type AdvertiseRequestSender = mpsc::Sender<AdvertiseRequestBox>;
pub struct BasicAdvertiseRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Publication<T>>>,
}
impl<T: Message> AdvertiseRequest for BasicAdvertiseRequest<T> {
fn message_type(&self) -> &dyn MessageType {
&self.msg_type
}
fn send_advertise_response(self: Box<Self>, sender: &ErasedSender) {
let publication =
downcast_sender_ref::<T>(sender).map(|sender| Publication::new(sender.clone()));
let _ = self.response_tx.send(publication);
}
}
impl<T: Message> BasicAdvertiseRequest<T> {
pub(crate) fn new() -> (Self, oneshot::Receiver<BrokerResult<Publication<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Self {
msg_type: Default::default(),
response_tx,
},
response_rx,
)
}
}
#[derive(Debug)]
struct TopicEntry {
message_type_id: TypeId,
message_type_name: String,
sender: ErasedSender,
}
#[derive(Debug)]
struct Registry {
topics: HashMap<Cow<'static, str>, TopicEntry>,
}
impl Default for Registry {
fn default() -> Self {
Registry {
topics: HashMap::new(),
}
}
}
impl Registry {
fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender {
let type_id = message_type.message_type_id();
let type_name = message_type.message_type_name();
let topic_entry = self.topics.entry(type_name.into());
let topic_entry = topic_entry.or_insert_with(|| TopicEntry {
message_type_id: type_id,
message_type_name: type_name.to_string(),
sender: message_type.create_broadcast_sender(),
});
&topic_entry.sender
}
fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) {
let sender = self.get_sender_for_type(subscribe_request.message_type());
subscribe_request.send_subscribe_response(sender)
}
fn handle_advertise(&mut self, advertise_request: AdvertiseRequestBox) {
let sender = self.get_sender_for_type(advertise_request.message_type());
advertise_request.send_advertise_response(sender)
}
}
pub struct BrokerTask {
pub(crate) subscribe_rx: mpsc::Receiver<SubscribeRequestBox>,
pub(crate) advertise_rx: mpsc::Receiver<AdvertiseRequestBox>,
}
impl BrokerTask {
pub(crate) async fn run(mut self) {
trace!("BrokerTask starting");
let mut registry = Registry::default();
loop {
tokio::select! {
Some(subscribe_req) = self.subscribe_rx.recv() => {
registry.handle_subscribe(subscribe_req)
}
Some(advertise_req) = self.advertise_rx.recv() => {
registry.handle_advertise(advertise_req)
}
else => {
trace!("no more handles to BrokerTask");
break;
}
}
}
trace!("BrokerTask exiting");
}
}

5
src/broker_task/error.rs

@ -0,0 +1,5 @@
pub enum BrokerError {
MismatchedType,
}
pub type BrokerResult<T> = Result<T, BrokerError>;

47
src/broker_task/mod.rs

@ -0,0 +1,47 @@
use log::trace;
use tokio::{sync::mpsc, task::JoinHandle};
mod error;
mod registry;
mod request;
pub(crate) use error::{BrokerError, BrokerResult};
use registry::Registry;
pub(crate) use request::{
AdvertiseRequest, BrokerRequestBox, BrokerRequestSender, SubscribeRequest,
};
pub(crate) struct BrokerTask {
request_rx: mpsc::Receiver<BrokerRequestBox>,
}
impl BrokerTask {
pub(crate) fn start() -> (BrokerRequestSender, JoinHandle<()>) {
let (request_tx, request_rx) = mpsc::channel(8);
let broker = BrokerTask { request_rx };
let join_handle = tokio::spawn(broker.run());
(request_tx, join_handle)
}
async fn run(mut self) {
trace!("BrokerTask starting");
let mut registry = Registry::default();
loop {
tokio::select! {
Some(request) = self.request_rx.recv() => {
request.handle_request(&mut registry);
}
else => {
trace!("no more handles to BrokerTask");
break;
}
}
}
trace!("BrokerTask exiting");
}
}

98
src/broker_task/registry.rs

@ -0,0 +1,98 @@
use crate::Message;
use log::trace;
use std::{
any::{Any, TypeId},
borrow::Cow,
collections::HashMap,
marker::PhantomData,
};
use tokio::sync::broadcast;
use super::{BrokerError, BrokerResult};
pub trait MessageType {
fn message_type_id(&self) -> TypeId;
fn message_type_name(&self) -> &'static str;
fn create_broadcast_sender(&self) -> ErasedSender;
}
pub struct BasicMessageType<T> {
_phantom: PhantomData<T>,
}
impl<T> Default for BasicMessageType<T> {
fn default() -> Self {
BasicMessageType {
_phantom: PhantomData,
}
}
}
impl<T: Message> MessageType for BasicMessageType<T> {
fn message_type_id(&self) -> TypeId {
TypeId::of::<T>()
}
fn message_type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
fn create_broadcast_sender(&self) -> ErasedSender {
trace!(
"Creating sender for {} ({:?})",
std::any::type_name::<T>(),
MessageType::type_id(self)
);
// TODO: configurable queue size (per message?)
let (sender, _) = broadcast::channel::<T>(8);
let sender: ErasedSender = Box::new(sender);
sender
}
}
pub type ErasedSender = Box<dyn Any + Send + Sync>;
#[derive(Debug)]
pub struct TopicEntry {
message_type_id: TypeId,
message_type_name: String,
sender: ErasedSender,
}
#[derive(Debug)]
pub struct Registry {
topics: HashMap<Cow<'static, str>, TopicEntry>,
}
impl Default for Registry {
fn default() -> Self {
Registry {
topics: HashMap::new(),
}
}
}
impl Registry {
fn get_erased_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender {
let type_id = message_type.message_type_id();
let type_name = message_type.message_type_name();
let topic_entry = self.topics.entry(type_name.into());
let topic_entry = topic_entry.or_insert_with(|| TopicEntry {
message_type_id: type_id,
message_type_name: type_name.to_string(),
sender: message_type.create_broadcast_sender(),
});
&topic_entry.sender
}
pub fn get_sender_for_type<T: Message>(
&mut self,
message_type: &dyn MessageType,
) -> BrokerResult<&broadcast::Sender<T>> {
let erased = self.get_erased_sender_for_type(message_type);
(**erased)
.downcast_ref::<broadcast::Sender<T>>()
.ok_or(BrokerError::MismatchedType)
}
}

72
src/broker_task/request.rs

@ -0,0 +1,72 @@
use tokio::sync::{mpsc, oneshot};
use super::{
registry::{BasicMessageType, Registry},
BrokerResult,
};
use crate::{Message, Publication, Subscription};
pub(crate) trait BrokerRequestInternal {
fn handle_request(self: Box<Self>, registry: &mut Registry);
}
pub(crate) trait BrokerRequest: BrokerRequestInternal {}
impl<T: BrokerRequestInternal> BrokerRequest for T {}
pub(crate) type BrokerRequestBox = Box<dyn BrokerRequest + Send + Sync>;
pub(crate) type BrokerRequestSender = mpsc::Sender<BrokerRequestBox>;
pub(crate) struct SubscribeRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Subscription<T>>>,
}
impl<T: Message> BrokerRequestInternal for SubscribeRequest<T> {
fn handle_request(self: Box<Self>, registry: &mut Registry) {
let sender = registry.get_sender_for_type::<T>(&self.msg_type);
let subscription = sender.map(|sender| {
let receiver = sender.subscribe();
Subscription::new(receiver)
});
let _ = self.response_tx.send(subscription);
}
}
impl<T: Message> SubscribeRequest<T> {
pub(crate) fn new() -> (Box<Self>, oneshot::Receiver<BrokerResult<Subscription<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Box::new(Self {
msg_type: Default::default(),
response_tx,
}),
response_rx,
)
}
}
pub(crate) struct AdvertiseRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Publication<T>>>,
}
impl<T: Message> BrokerRequestInternal for AdvertiseRequest<T> {
fn handle_request(self: Box<Self>, registry: &mut Registry) {
let sender = registry.get_sender_for_type::<T>(&self.msg_type);
let publication = sender.map(|sender| Publication::new(sender.clone()));
let _ = self.response_tx.send(publication);
}
}
impl<T: Message> AdvertiseRequest<T> {
pub(crate) fn new() -> (Box<Self>, oneshot::Receiver<BrokerResult<Publication<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Box::new(Self {
msg_type: Default::default(),
response_tx,
}),
response_rx,
)
}
}

34
src/lib.rs

@ -4,15 +4,14 @@ mod publication;
mod subscription; mod subscription;
use broker_task::{ use broker_task::{
AdvertiseRequestSender, BasicAdvertiseRequest, BasicSubscribeRequest, BrokerError, AdvertiseRequest, BrokerError, BrokerRequestSender, BrokerResult, BrokerTask, SubscribeRequest,
BrokerResult, BrokerTask, SubscribeRequestSender,
}; };
pub use message::Message; pub use message::Message;
pub use publication::{Publication, PublishError}; pub use publication::{Publication, PublishError};
pub use subscription::{Subscription, SubscriptionError}; pub use subscription::{Subscription, SubscriptionError};
use futures::executor::block_on; use futures::executor::block_on;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::oneshot;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
#[non_exhaustive] #[non_exhaustive]
@ -42,8 +41,7 @@ fn map_broker_response<T>(
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Orsb { pub struct Orsb {
subscribe_tx: SubscribeRequestSender, request_tx: BrokerRequestSender,
advertise_tx: AdvertiseRequestSender,
} }
impl Orsb { impl Orsb {
@ -52,36 +50,26 @@ impl Orsb {
} }
pub(crate) fn start_new2() -> (Self, tokio::task::JoinHandle<()>) { pub(crate) fn start_new2() -> (Self, tokio::task::JoinHandle<()>) {
let (subscribe_tx, subscribe_rx) = mpsc::channel(8); let (request_tx, join_handle) = BrokerTask::start();
let (advertise_tx, advertise_rx) = mpsc::channel(8);
let broker = BrokerTask { let orsb = Orsb { request_tx };
subscribe_rx,
advertise_rx,
};
let join_handle = tokio::spawn(broker.run());
let orsb = Orsb {
subscribe_tx,
advertise_tx,
};
(orsb, join_handle) (orsb, join_handle)
} }
pub async fn subscribe<T: Message>(&mut self) -> Result<Subscription<T>, OrsbError> { pub async fn subscribe<T: Message>(&mut self) -> Result<Subscription<T>, OrsbError> {
let (subscribe_request, response_rx) = BasicSubscribeRequest::<T>::new(); let (subscribe_request, response_rx) = SubscribeRequest::<T>::new();
self.subscribe_tx self.request_tx
.send(Box::new(subscribe_request)) .send(subscribe_request)
.await .await
.or(Err(OrsbError::BrokerClosed))?; .or(Err(OrsbError::BrokerClosed))?;
map_broker_response(response_rx.await) map_broker_response(response_rx.await)
} }
pub async fn advertise<T: Message>(&mut self) -> Result<Publication<T>, OrsbError> { pub async fn advertise<T: Message>(&mut self) -> Result<Publication<T>, OrsbError> {
let (advertise_request, response_rx) = BasicAdvertiseRequest::<T>::new(); let (advertise_request, response_rx) = AdvertiseRequest::<T>::new();
self.advertise_tx self.request_tx
.send(Box::new(advertise_request)) .send(advertise_request)
.await .await
.or(Err(OrsbError::BrokerClosed))?; .or(Err(OrsbError::BrokerClosed))?;
map_broker_response(response_rx.await) map_broker_response(response_rx.await)

Loading…
Cancel
Save