From ba3104ec6f3d7cca4938b84ca121506e00131779 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Dec 2020 13:58:29 -0700 Subject: [PATCH] Refactor BrokerTask --- src/broker_task.rs | 45 ++++++++++++++++++++++++++++++++------------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/src/broker_task.rs b/src/broker_task.rs index f0e1fcb..ec1c59b 100644 --- a/src/broker_task.rs +++ b/src/broker_task.rs @@ -1,6 +1,7 @@ use log::trace; use std::{ any::{Any, TypeId}, + borrow::Cow, collections::HashMap, marker::PhantomData, }; @@ -13,7 +14,9 @@ pub type ErasedSender = Box; pub trait MessageType { fn message_type_id(&self) -> TypeId; - fn create_sender(&self) -> ErasedSender; + fn message_type_name(&self) -> &'static str; + + fn create_broadcast_sender(&self) -> ErasedSender; } pub struct BasicMessageType { @@ -33,7 +36,11 @@ impl MessageType for BasicMessageType { TypeId::of::() } - fn create_sender(&self) -> ErasedSender { + fn message_type_name(&self) -> &'static str { + std::any::type_name::() + } + + fn create_broadcast_sender(&self) -> ErasedSender { trace!( "Creating sender for {} ({:?})", std::any::type_name::(), @@ -131,14 +138,22 @@ impl BasicAdvertiseRequest { } } +#[derive(Debug)] +struct TopicEntry { + message_type_id: TypeId, + message_type_name: String, + sender: ErasedSender, +} + +#[derive(Debug)] struct Registry { - senders: HashMap, + topics: HashMap, TopicEntry>, } impl Default for Registry { fn default() -> Self { Registry { - senders: HashMap::new(), + topics: HashMap::new(), } } } @@ -146,8 +161,14 @@ impl Default for Registry { impl Registry { fn get_sender_for_type(&mut self, message_type: &dyn MessageType) -> &ErasedSender { let type_id = message_type.message_type_id(); - let sender_entry = self.senders.entry(type_id); - sender_entry.or_insert_with(|| message_type.create_sender()) + let type_name = message_type.message_type_name(); + let topic_entry = self.topics.entry(type_name.into()); + let topic_entry = topic_entry.or_insert_with(|| TopicEntry { + message_type_id: type_id, + message_type_name: type_name.to_string(), + sender: message_type.create_broadcast_sender(), + }); + &topic_entry.sender } fn handle_subscribe(&mut self, subscribe_request: SubscribeRequestBox) { @@ -180,15 +201,13 @@ impl BrokerTask { Some(advertise_req) = self.advertise_rx.recv() => { registry.handle_advertise(advertise_req) } + else => { + trace!("no more handles to BrokerTask"); + break; + } } } - // trace!("BrokerTask exiting"); - } -} - -impl Drop for BrokerTask { - fn drop(&mut self) { - trace!("BrokerTask dropped"); + trace!("BrokerTask exiting"); } }