Browse Source

Refactor BrokerTask

master
Alex Mikhalev 4 years ago
parent
commit
ba3104ec6f
  1. 45
      src/broker_task.rs

45
src/broker_task.rs

@ -1,6 +1,7 @@
use log::trace; use log::trace;
use std::{ use std::{
any::{Any, TypeId}, any::{Any, TypeId},
borrow::Cow,
collections::HashMap, collections::HashMap,
marker::PhantomData, marker::PhantomData,
}; };
@ -13,7 +14,9 @@ pub type ErasedSender = Box<dyn Any + Send + Sync>;
pub trait MessageType { pub trait MessageType {
fn message_type_id(&self) -> TypeId; fn message_type_id(&self) -> TypeId;
fn create_sender(&self) -> ErasedSender; fn message_type_name(&self) -> &'static str;
fn create_broadcast_sender(&self) -> ErasedSender;
} }
pub struct BasicMessageType<T> { pub struct BasicMessageType<T> {
@ -33,7 +36,11 @@ impl<T: Message> MessageType for BasicMessageType<T> {
TypeId::of::<T>() TypeId::of::<T>()
} }
fn create_sender(&self) -> ErasedSender { fn message_type_name(&self) -> &'static str {
std::any::type_name::<T>()
}
fn create_broadcast_sender(&self) -> ErasedSender {
trace!( trace!(
"Creating sender for {} ({:?})", "Creating sender for {} ({:?})",
std::any::type_name::<T>(), std::any::type_name::<T>(),
@ -131,14 +138,22 @@ impl<T: Message> BasicAdvertiseRequest<T> {
} }
} }
#[derive(Debug)]
struct TopicEntry {
message_type_id: TypeId,
message_type_name: String,
sender: ErasedSender,
}
#[derive(Debug)]
struct Registry { struct Registry {
senders: HashMap<TypeId, ErasedSender>, topics: HashMap<Cow<'static, str>, TopicEntry>,
} }
impl Default for Registry { impl Default for Registry {
fn default() -> Self { fn default() -> Self {
Registry { Registry {
senders: HashMap::new(), topics: HashMap::new(),
} }
} }
} }
@ -146,8 +161,14 @@ impl Default for Registry {
impl Registry { impl Registry {
fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender { fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender {
let type_id = message_type.message_type_id(); let type_id = message_type.message_type_id();
let sender_entry = self.senders.entry(type_id); let type_name = message_type.message_type_name();
sender_entry.or_insert_with(|| message_type.create_sender()) let topic_entry = self.topics.entry(type_name.into());
let topic_entry = topic_entry.or_insert_with(|| TopicEntry {
message_type_id: type_id,
message_type_name: type_name.to_string(),
sender: message_type.create_broadcast_sender(),
});
&topic_entry.sender
} }
fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) { fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) {
@ -180,15 +201,13 @@ impl BrokerTask {
Some(advertise_req) = self.advertise_rx.recv() => { Some(advertise_req) = self.advertise_rx.recv() => {
registry.handle_advertise(advertise_req) registry.handle_advertise(advertise_req)
} }
else => {
trace!("no more handles to BrokerTask");
break;
}
} }
} }
// trace!("BrokerTask exiting"); trace!("BrokerTask exiting");
}
}
impl Drop for BrokerTask {
fn drop(&mut self) {
trace!("BrokerTask dropped");
} }
} }

Loading…
Cancel
Save