Alex Mikhalev
4 years ago
6 changed files with 339 additions and 38 deletions
@ -0,0 +1,194 @@
@@ -0,0 +1,194 @@
|
||||
use log::trace; |
||||
use std::{ |
||||
any::{Any, TypeId}, |
||||
collections::HashMap, |
||||
marker::PhantomData, |
||||
}; |
||||
use tokio::sync::{broadcast, mpsc, oneshot}; |
||||
|
||||
use crate::{message::Message, publication::Publication, subscription::Subscription}; |
||||
|
||||
pub type ErasedSender = Box<dyn Any + Send + Sync>; |
||||
|
||||
pub trait MessageType { |
||||
fn message_type_id(&self) -> TypeId; |
||||
|
||||
fn create_sender(&self) -> ErasedSender; |
||||
} |
||||
|
||||
pub struct BasicMessageType<T> { |
||||
_phantom: PhantomData<T>, |
||||
} |
||||
|
||||
impl<T> Default for BasicMessageType<T> { |
||||
fn default() -> Self { |
||||
BasicMessageType { |
||||
_phantom: PhantomData, |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl<T: Message> MessageType for BasicMessageType<T> { |
||||
fn message_type_id(&self) -> TypeId { |
||||
TypeId::of::<T>() |
||||
} |
||||
|
||||
fn create_sender(&self) -> ErasedSender { |
||||
trace!( |
||||
"Creating sender for {} ({:?})", |
||||
std::any::type_name::<T>(), |
||||
MessageType::type_id(self) |
||||
); |
||||
// TODO: configurable queue size (per message?)
|
||||
let (sender, _) = broadcast::channel::<T>(8); |
||||
let sender: ErasedSender = Box::new(sender); |
||||
sender |
||||
} |
||||
} |
||||
|
||||
pub trait SubscribeRequest { |
||||
fn message_type(&self) -> &dyn MessageType; |
||||
|
||||
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
|
||||
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
|
||||
unsafe fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender); |
||||
} |
||||
|
||||
pub type SubscribeRequestBox = Box<dyn SubscribeRequest + Send + Sync>; |
||||
pub type SubscribeRequestSender = mpsc::Sender<SubscribeRequestBox>; |
||||
|
||||
pub struct BasicSubscribeRequest<T> { |
||||
msg_type: BasicMessageType<T>, |
||||
response_tx: oneshot::Sender<Subscription<T>>, |
||||
} |
||||
|
||||
impl<T: Message> SubscribeRequest for BasicSubscribeRequest<T> { |
||||
fn message_type(&self) -> &dyn MessageType { |
||||
&self.msg_type |
||||
} |
||||
|
||||
unsafe fn send_subscribe_response(self: Box<Self>, sender: &ErasedSender) { |
||||
let sender = &*(&**sender as *const dyn Any as *const broadcast::Sender<T>); |
||||
// let sender = (**sender).downcast_ref::<broadcast::Sender<T>>().unwrap();
|
||||
let receiver = sender.subscribe(); |
||||
let subscription = Subscription::new(receiver); |
||||
let _ = self.response_tx.send(subscription); |
||||
} |
||||
} |
||||
|
||||
impl<T: Message> BasicSubscribeRequest<T> { |
||||
pub(crate) fn new() -> (Self, oneshot::Receiver<Subscription<T>>) { |
||||
let (response_tx, response_rx) = oneshot::channel(); |
||||
( |
||||
Self { |
||||
msg_type: Default::default(), |
||||
response_tx, |
||||
}, |
||||
response_rx, |
||||
) |
||||
} |
||||
} |
||||
|
||||
pub trait AdvertiseRequest { |
||||
fn message_type(&self) -> &dyn MessageType; |
||||
|
||||
/// `sender` must be `tokio::sync::broadcast::Sender<T>` where
|
||||
/// `MessageType::get_message_type` returns the `TypeId` of `T`.
|
||||
unsafe fn send_advertise_response(self: Box<Self>, sender: &ErasedSender); |
||||
} |
||||
|
||||
pub type AdvertiseRequestBox = Box<dyn AdvertiseRequest + Send + Sync>; |
||||
pub type AdvertiseRequestSender = mpsc::Sender<AdvertiseRequestBox>; |
||||
|
||||
pub struct BasicAdvertiseRequest<T> { |
||||
msg_type: BasicMessageType<T>, |
||||
response_tx: oneshot::Sender<Publication<T>>, |
||||
} |
||||
|
||||
impl<T: Message> AdvertiseRequest for BasicAdvertiseRequest<T> { |
||||
fn message_type(&self) -> &dyn MessageType { |
||||
&self.msg_type |
||||
} |
||||
|
||||
unsafe fn send_advertise_response(self: Box<Self>, sender: &ErasedSender) { |
||||
let sender = &*(&**sender as *const dyn Any as *const broadcast::Sender<T>); |
||||
// let sender = (**sender).downcast_ref::<broadcast::Sender<T>>().unwrap();
|
||||
let publication = Publication::new(sender.clone()); |
||||
let _ = self.response_tx.send(publication); |
||||
} |
||||
} |
||||
|
||||
impl<T: Message> BasicAdvertiseRequest<T> { |
||||
pub(crate) fn new() -> (Self, oneshot::Receiver<Publication<T>>) { |
||||
let (response_tx, response_rx) = oneshot::channel(); |
||||
( |
||||
Self { |
||||
msg_type: Default::default(), |
||||
response_tx, |
||||
}, |
||||
response_rx, |
||||
) |
||||
} |
||||
} |
||||
|
||||
struct Registry { |
||||
senders: HashMap<TypeId, ErasedSender>, |
||||
} |
||||
|
||||
impl Default for Registry { |
||||
fn default() -> Self { |
||||
Registry { |
||||
senders: HashMap::new(), |
||||
} |
||||
} |
||||
} |
||||
|
||||
impl Registry { |
||||
fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender { |
||||
let type_id = message_type.message_type_id(); |
||||
let sender_entry = self.senders.entry(type_id); |
||||
sender_entry.or_insert_with(|| message_type.create_sender()) |
||||
} |
||||
|
||||
fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) { |
||||
let sender = self.get_sender_for_type(subscribe_request.message_type()); |
||||
unsafe { subscribe_request.send_subscribe_response(sender) } |
||||
} |
||||
|
||||
fn handle_advertise(&mut self, advertise_request: AdvertiseRequestBox) { |
||||
let sender = self.get_sender_for_type(advertise_request.message_type()); |
||||
unsafe { advertise_request.send_advertise_response(sender) } |
||||
} |
||||
} |
||||
|
||||
pub struct BrokerTask { |
||||
pub(crate) subscribe_rx: mpsc::Receiver<SubscribeRequestBox>, |
||||
pub(crate) advertise_rx: mpsc::Receiver<AdvertiseRequestBox>, |
||||
} |
||||
|
||||
impl BrokerTask { |
||||
pub(crate) async fn run(mut self) { |
||||
trace!("BrokerTask starting"); |
||||
|
||||
let mut registry = Registry::default(); |
||||
|
||||
loop { |
||||
tokio::select! { |
||||
Some(subscribe_req) = self.subscribe_rx.recv() => { |
||||
registry.handle_subscribe(subscribe_req) |
||||
} |
||||
Some(advertise_req) = self.advertise_rx.recv() => { |
||||
registry.handle_advertise(advertise_req) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// trace!("BrokerTask exiting");
|
||||
} |
||||
} |
||||
|
||||
impl Drop for BrokerTask { |
||||
fn drop(&mut self) { |
||||
trace!("BrokerTask dropped"); |
||||
} |
||||
} |
@ -1,3 +1,5 @@
@@ -1,3 +1,5 @@
|
||||
pub trait Message: 'static + Clone + Send + Sync {} |
||||
use std::any::Any; |
||||
|
||||
pub trait Message: 'static + Any + Clone + Send + Sync {} |
||||
|
||||
impl<T> Message for T where T: 'static + Clone + Send + Sync {} |
||||
|
@ -1,20 +1,25 @@
@@ -1,20 +1,25 @@
|
||||
use std::marker::PhantomData; |
||||
use tokio::sync::broadcast; |
||||
|
||||
use crate::message::Message; |
||||
|
||||
#[derive(Clone, Debug, PartialEq)] |
||||
#[non_exhaustive] |
||||
pub enum SendError { |
||||
NoListeners, |
||||
} |
||||
pub enum SendError {} |
||||
|
||||
#[derive(Debug)] |
||||
pub struct Publication<T> { |
||||
_phantom: PhantomData<T>, |
||||
sender: broadcast::Sender<T>, |
||||
} |
||||
|
||||
impl<T: Message> Publication<T> { |
||||
pub fn send(&mut self, message: T) -> Result<(), SendError> { |
||||
let _ = message; |
||||
todo!() |
||||
pub(crate) fn new(sender: broadcast::Sender<T>) -> Self { |
||||
Publication { sender } |
||||
} |
||||
|
||||
pub fn send(&mut self, message: T) -> Result<usize, SendError> { |
||||
match self.sender.send(message) { |
||||
Ok(subscribers) => Ok(subscribers), |
||||
Err(_) => Ok(0), |
||||
} |
||||
} |
||||
} |
||||
|
Loading…
Reference in new issue