Browse Source

More BrokerTask refactor

master
Alex Mikhalev 4 years ago
parent
commit
92e1739f90
  1. 45
      src/broker_task.rs
  2. 29
      src/lib.rs

45
src/broker_task.rs

@ -9,8 +9,20 @@ use tokio::sync::{broadcast, mpsc, oneshot}; @@ -9,8 +9,20 @@ use tokio::sync::{broadcast, mpsc, oneshot};
use crate::{message::Message, publication::Publication, subscription::Subscription};
pub enum BrokerError {
MismatchedType,
}
pub type BrokerResult<T> = Result<T, BrokerError>;
pub type ErasedSender = Box<dyn Any + Send + Sync>;
fn downcast_sender_ref<T: Message>(erased: &ErasedSender) -> BrokerResult<&broadcast::Sender<T>> {
(**erased)
.downcast_ref::<broadcast::Sender<T>>()
.ok_or(BrokerError::MismatchedType)
}
pub trait MessageType {
fn message_type_id(&self) -> TypeId;
@ -58,7 +70,7 @@ pub trait SubscribeRequest { @@ -58,7 +70,7 @@ pub trait SubscribeRequest {
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
unsafe fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender);
fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender);
}
pub type SubscribeRequestBox = Box<dyn SubscribeRequest + Send + Sync>;
@ -66,7 +78,7 @@ pub type SubscribeRequestSender = mpsc::Sender<SubscribeRequestBox>; @@ -66,7 +78,7 @@ pub type SubscribeRequestSender = mpsc::Sender<SubscribeRequestBox>;
pub struct BasicSubscribeRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<Subscription<T>>,
response_tx: oneshot::Sender<BrokerResult<Subscription<T>>>,
}
impl<T: Message> SubscribeRequest for BasicSubscribeRequest<T> {
@ -74,17 +86,17 @@ impl<T: Message> SubscribeRequest for BasicSubscribeRequest<T> { @@ -74,17 +86,17 @@ impl<T: Message> SubscribeRequest for BasicSubscribeRequest<T> {
&self.msg_type
}
unsafe fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender) {
let sender = &*(&**sender as *const dyn Any as *const broadcast::Sender<T>);
// let sender = (**sender).downcast_ref::<broadcast::Sender<T>>().unwrap();
let receiver = sender.subscribe();
let subscription = Subscription::new(receiver);
fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender) {
let subscription = downcast_sender_ref::<T>(sender).map(|sender| {
let receiver = sender.subscribe();
Subscription::new(receiver)
});
let _ = self.response_tx.send(subscription);
}
}
impl<T: Message> BasicSubscribeRequest<T> {
pub(crate) fn new() -> (Self, oneshot::Receiver<Subscription<T>>) {
pub(crate) fn new() -> (Self, oneshot::Receiver<BrokerResult<Subscription<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Self {
@ -101,7 +113,7 @@ pub trait AdvertiseRequest { @@ -101,7 +113,7 @@ pub trait AdvertiseRequest {
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
unsafe fn send_advertise_response(self: Box<Self>, sender: &ErasedSender);
fn send_advertise_response(self: Box<Self>, sender: &ErasedSender);
}
pub type AdvertiseRequestBox = Box<dyn AdvertiseRequest + Send + Sync>;
@ -109,7 +121,7 @@ pub type AdvertiseRequestSender = mpsc::Sender<AdvertiseRequestBox>; @@ -109,7 +121,7 @@ pub type AdvertiseRequestSender = mpsc::Sender<AdvertiseRequestBox>;
pub struct BasicAdvertiseRequest<T> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<Publication<T>>,
response_tx: oneshot::Sender<BrokerResult<Publication<T>>>,
}
impl<T: Message> AdvertiseRequest for BasicAdvertiseRequest<T> {
@ -117,16 +129,15 @@ impl<T: Message> AdvertiseRequest for BasicAdvertiseRequest<T> { @@ -117,16 +129,15 @@ impl<T: Message> AdvertiseRequest for BasicAdvertiseRequest<T> {
&self.msg_type
}
unsafe fn send_advertise_response(self: Box<Self>, sender: &ErasedSender) {
let sender = &*(&**sender as *const dyn Any as *const broadcast::Sender<T>);
// let sender = (**sender).downcast_ref::<broadcast::Sender<T>>().unwrap();
let publication = Publication::new(sender.clone());
fn send_advertise_response(self: Box<Self>, sender: &ErasedSender) {
let publication =
downcast_sender_ref::<T>(sender).map(|sender| Publication::new(sender.clone()));
let _ = self.response_tx.send(publication);
}
}
impl<T: Message> BasicAdvertiseRequest<T> {
pub(crate) fn new() -> (Self, oneshot::Receiver<Publication<T>>) {
pub(crate) fn new() -> (Self, oneshot::Receiver<BrokerResult<Publication<T>>>) {
let (response_tx, response_rx) = oneshot::channel();
(
Self {
@ -173,12 +184,12 @@ impl Registry { @@ -173,12 +184,12 @@ impl Registry {
fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) {
let sender = self.get_sender_for_type(subscribe_request.message_type());
unsafe { subscribe_request.send_subscribe_response(sender) }
subscribe_request.send_subscribe_response(sender)
}
fn handle_advertise(&mut self, advertise_request: AdvertiseRequestBox) {
let sender = self.get_sender_for_type(advertise_request.message_type());
unsafe { advertise_request.send_advertise_response(sender) }
advertise_request.send_advertise_response(sender)
}
}

29
src/lib.rs

@ -4,21 +4,40 @@ mod publication; @@ -4,21 +4,40 @@ mod publication;
mod subscription;
use broker_task::{
AdvertiseRequestSender, BasicAdvertiseRequest, BasicSubscribeRequest, BrokerTask,
SubscribeRequestSender,
AdvertiseRequestSender, BasicAdvertiseRequest, BasicSubscribeRequest, BrokerError,
BrokerResult, BrokerTask, SubscribeRequestSender,
};
pub use message::Message;
pub use publication::{Publication, PublishError};
pub use subscription::{Subscription, SubscriptionError};
use futures::executor::block_on;
use tokio::sync::mpsc;
use tokio::sync::{mpsc, oneshot};
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
pub enum OrsbError {
BrokerClosed,
NoResponse,
MismatchedType,
}
impl From<BrokerError> for OrsbError {
fn from(err: BrokerError) -> Self {
match err {
BrokerError::MismatchedType => OrsbError::MismatchedType,
}
}
}
fn map_broker_response<T>(
result: Result<BrokerResult<T>, oneshot::error::RecvError>,
) -> Result<T, OrsbError> {
match result {
Ok(Ok(value)) => Ok(value),
Ok(Err(err)) => Err(err.into()),
Err(_) => Err(OrsbError::NoResponse),
}
}
#[derive(Debug, Clone)]
@ -56,7 +75,7 @@ impl Orsb { @@ -56,7 +75,7 @@ impl Orsb {
.send(Box::new(subscribe_request))
.await
.or(Err(OrsbError::BrokerClosed))?;
response_rx.await.or(Err(OrsbError::NoResponse))
map_broker_response(response_rx.await)
}
pub async fn advertise<T: Message>(&mut self) -> Result<Publication<T>, OrsbError> {
@ -65,7 +84,7 @@ impl Orsb { @@ -65,7 +84,7 @@ impl Orsb {
.send(Box::new(advertise_request))
.await
.or(Err(OrsbError::BrokerClosed))?;
response_rx.await.or(Err(OrsbError::NoResponse))
map_broker_response(response_rx.await)
}
pub fn subscribe_blocking<T: Message>(&mut self) -> Result<Subscription<T>, OrsbError> {

Loading…
Cancel
Save