From 6e024a07e636a4ba4f752fbe39ef7d3a2ce37142 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Mon, 4 Jan 2021 13:04:23 -0700 Subject: [PATCH] Refactor broker_task::request --- src/broker_task/mod.rs | 18 ++--- src/broker_task/mpsc.rs | 94 +++++++++++++++++++++++++ src/broker_task/pub_sub.rs | 53 ++++++++++++++ src/broker_task/request.rs | 138 +------------------------------------ 4 files changed, 158 insertions(+), 145 deletions(-) create mode 100644 src/broker_task/mpsc.rs create mode 100644 src/broker_task/pub_sub.rs diff --git a/src/broker_task/mod.rs b/src/broker_task/mod.rs index 219cf7b..fc2315e 100644 --- a/src/broker_task/mod.rs +++ b/src/broker_task/mod.rs @@ -1,25 +1,27 @@ use log::trace; -use tokio::{sync::mpsc, task::JoinHandle}; +use tokio::{sync::mpsc as tmpsc, task::JoinHandle}; mod error; +mod mpsc; +mod pub_sub; mod registry; mod request; pub(crate) use error::{BrokerError, BrokerResult}; -use registry::Registry; -pub(crate) use request::{ - AdvertiseRequest, BrokerRequestBox, BrokerRequestSender, ClaimReceiverError, - ClaimReceiverRequest, MakeBrokerRequest, SenderRequestNoWait, SenderRequestWait, - SubscribeRequest, +pub(crate) use mpsc::{ + ClaimReceiverError, ClaimReceiverRequest, SenderRequestNoWait, SenderRequestWait, }; +pub(crate) use pub_sub::{AdvertiseRequest, SubscribeRequest}; +use registry::Registry; +pub(crate) use request::{BrokerRequestBox, BrokerRequestSender, MakeBrokerRequest}; pub(crate) struct BrokerTask { - request_rx: mpsc::Receiver, + request_rx: tmpsc::Receiver, } impl BrokerTask { pub(crate) fn start() -> (BrokerRequestSender, JoinHandle<()>) { - let (request_tx, request_rx) = mpsc::channel(8); + let (request_tx, request_rx) = tmpsc::channel(8); let broker = BrokerTask { request_rx }; let join_handle = tokio::spawn(broker.run()); diff --git a/src/broker_task/mpsc.rs b/src/broker_task/mpsc.rs new file mode 100644 index 0000000..9b834ac --- /dev/null +++ b/src/broker_task/mpsc.rs @@ -0,0 +1,94 @@ +use super::{ + error::{BrokerError, BrokerResult}, + registry::{BasicMessageType, Registry}, + request::{BrokerRequest, BrokerRequestInternal, MakeBrokerRequest, RequestHandler}, +}; + +use crate::{mpsc, Message}; + +use std::marker::PhantomData; +use tokio::sync::oneshot; + +pub type ClaimReceiverRequest = BrokerRequest>; + +pub enum ClaimReceiverError { + AlreadyClaimed, + Broker(BrokerError), +} + +pub struct ClaimReceiver { + msg_type: BasicMessageType, +} + +impl Default for ClaimReceiver { + fn default() -> Self { + Self { + msg_type: BasicMessageType::::default(), + } + } +} + +impl RequestHandler for ClaimReceiver { + type Response = Result, ClaimReceiverError>; + + fn handle(self, registry: &mut Registry) -> Self::Response { + registry + .get_mpsc_sender::(&self.msg_type) + .map_err(ClaimReceiverError::Broker) + .and_then(|sender| { + sender + .claim_receiver() + .ok_or(ClaimReceiverError::AlreadyClaimed) + }) + .map(mpsc::Receiver::new) + } +} + +pub struct SenderRequest { + msg_type: BasicMessageType, + response_tx: oneshot::Sender>>, + _wait: PhantomData, +} + +pub trait GetSenderWait: Send { + const WAIT: bool; +} + +pub struct NoWait; +impl GetSenderWait for NoWait { + const WAIT: bool = false; +} +pub type SenderRequestNoWait = SenderRequest; + +pub struct Wait; +impl GetSenderWait for Wait { + const WAIT: bool = true; +} +pub type SenderRequestWait = SenderRequest; + +impl MakeBrokerRequest for SenderRequest { + type Response = BrokerResult>; + + fn create(response_tx: oneshot::Sender) -> Box { + Box::new(Self { + msg_type: BasicMessageType::default(), + response_tx, + _wait: PhantomData, + }) + } +} + +impl BrokerRequestInternal for SenderRequest { + fn run_request(self: Box, registry: &mut Registry) { + let response = match registry.get_mpsc_sender::(&self.msg_type) { + Ok(sender) => Ok(if W::WAIT { + sender.clone_sender_or_wait(self.response_tx); + return; + } else { + sender.clone_sender() + }), + Err(err) => Err(err), + }; + let _ = self.response_tx.send(response); + } +} diff --git a/src/broker_task/pub_sub.rs b/src/broker_task/pub_sub.rs new file mode 100644 index 0000000..823d216 --- /dev/null +++ b/src/broker_task/pub_sub.rs @@ -0,0 +1,53 @@ +use super::{ + error::BrokerResult, + registry::{BasicMessageType, Registry}, + request::{BrokerRequest, RequestHandler}, +}; +use crate::{pub_sub, Message}; + +pub type SubscribeRequest = BrokerRequest>; +pub struct Subscribe { + msg_type: BasicMessageType, +} + +impl Default for Subscribe { + fn default() -> Self { + Self { + msg_type: BasicMessageType::::default(), + } + } +} + +impl RequestHandler for Subscribe { + type Response = BrokerResult>; + + fn handle(self, registry: &mut Registry) -> Self::Response { + let sender = registry.get_broadcast_sender::(&self.msg_type); + sender.map(|sender| { + let receiver = sender.subscribe(); + pub_sub::Subscription::new(receiver) + }) + } +} + +pub type AdvertiseRequest = BrokerRequest>; +pub struct Advertise { + msg_type: BasicMessageType, +} + +impl Default for Advertise { + fn default() -> Self { + Self { + msg_type: BasicMessageType::::default(), + } + } +} + +impl RequestHandler for Advertise { + type Response = BrokerResult>; + + fn handle(self, registry: &mut Registry) -> Self::Response { + let sender = registry.get_broadcast_sender::(&self.msg_type); + sender.map(|sender| pub_sub::Publication::new(sender.clone())) + } +} diff --git a/src/broker_task/request.rs b/src/broker_task/request.rs index 7451098..f1822f9 100644 --- a/src/broker_task/request.rs +++ b/src/broker_task/request.rs @@ -1,11 +1,6 @@ -use std::marker::PhantomData; use tokio::sync::{mpsc, oneshot}; -use super::{ - registry::{BasicMessageType, Registry}, - BrokerError, BrokerResult, -}; -use crate::{Message, Publication, Receiver, Sender, Subscription}; +use super::registry::Registry; pub trait BrokerRequestInternal: Send { fn run_request(self: Box, registry: &mut Registry); @@ -53,134 +48,3 @@ impl MakeBrokerRequest for BrokerRequest { pub type BrokerRequestBox = Box; pub type BrokerRequestSender = mpsc::Sender; - -pub type SubscribeRequest = BrokerRequest>; -pub struct Subscribe { - msg_type: BasicMessageType, -} - -impl Default for Subscribe { - fn default() -> Self { - Self { - msg_type: BasicMessageType::::default(), - } - } -} - -impl RequestHandler for Subscribe { - type Response = BrokerResult>; - - fn handle(self, registry: &mut Registry) -> Self::Response { - let sender = registry.get_broadcast_sender::(&self.msg_type); - sender.map(|sender| { - let receiver = sender.subscribe(); - Subscription::new(receiver) - }) - } -} - -pub type AdvertiseRequest = BrokerRequest>; -pub struct Advertise { - msg_type: BasicMessageType, -} - -impl Default for Advertise { - fn default() -> Self { - Self { - msg_type: BasicMessageType::::default(), - } - } -} - -impl RequestHandler for Advertise { - type Response = BrokerResult>; - - fn handle(self, registry: &mut Registry) -> Self::Response { - let sender = registry.get_broadcast_sender::(&self.msg_type); - sender.map(|sender| Publication::new(sender.clone())) - } -} - -pub type ClaimReceiverRequest = BrokerRequest>; - -pub enum ClaimReceiverError { - AlreadyClaimed, - Broker(BrokerError), -} - -pub struct ClaimReceiver { - msg_type: BasicMessageType, -} - -impl Default for ClaimReceiver { - fn default() -> Self { - Self { - msg_type: BasicMessageType::::default(), - } - } -} - -impl RequestHandler for ClaimReceiver { - type Response = Result, ClaimReceiverError>; - - fn handle(self, registry: &mut Registry) -> Self::Response { - registry - .get_mpsc_sender::(&self.msg_type) - .map_err(ClaimReceiverError::Broker) - .and_then(|sender| { - sender - .claim_receiver() - .ok_or(ClaimReceiverError::AlreadyClaimed) - }) - .map(Receiver::new) - } -} - -pub struct SenderRequest { - msg_type: BasicMessageType, - response_tx: oneshot::Sender>>, - _wait: PhantomData, -} - -pub trait GetSenderWait: Send { - const WAIT: bool; -} - -pub struct NoWait; -impl GetSenderWait for NoWait { - const WAIT: bool = false; -} -pub type SenderRequestNoWait = SenderRequest; - -pub struct Wait; -impl GetSenderWait for Wait { - const WAIT: bool = true; -} -pub type SenderRequestWait = SenderRequest; - -impl MakeBrokerRequest for SenderRequest { - type Response = BrokerResult>; - - fn create(response_tx: oneshot::Sender) -> Box { - Box::new(Self { - msg_type: BasicMessageType::default(), - response_tx, - _wait: PhantomData, - }) - } -} - -impl BrokerRequestInternal for SenderRequest { - fn run_request(self: Box, registry: &mut Registry) { - let response = match registry.get_mpsc_sender::(&self.msg_type) { - Ok(sender) => Ok(if W::WAIT { - sender.clone_sender_or_wait(self.response_tx); - return; - } else { - sender.clone_sender() - }), - Err(err) => Err(err), - }; - let _ = self.response_tx.send(response); - } -}