Browse Source

Refactor broker_task::request

master
Alex Mikhalev 4 years ago
parent
commit
6e024a07e6
  1. 18
      src/broker_task/mod.rs
  2. 94
      src/broker_task/mpsc.rs
  3. 53
      src/broker_task/pub_sub.rs
  4. 138
      src/broker_task/request.rs

18
src/broker_task/mod.rs

@ -1,25 +1,27 @@ @@ -1,25 +1,27 @@
use log::trace;
use tokio::{sync::mpsc, task::JoinHandle};
use tokio::{sync::mpsc as tmpsc, task::JoinHandle};
mod error;
mod mpsc;
mod pub_sub;
mod registry;
mod request;
pub(crate) use error::{BrokerError, BrokerResult};
use registry::Registry;
pub(crate) use request::{
AdvertiseRequest, BrokerRequestBox, BrokerRequestSender, ClaimReceiverError,
ClaimReceiverRequest, MakeBrokerRequest, SenderRequestNoWait, SenderRequestWait,
SubscribeRequest,
pub(crate) use mpsc::{
ClaimReceiverError, ClaimReceiverRequest, SenderRequestNoWait, SenderRequestWait,
};
pub(crate) use pub_sub::{AdvertiseRequest, SubscribeRequest};
use registry::Registry;
pub(crate) use request::{BrokerRequestBox, BrokerRequestSender, MakeBrokerRequest};
pub(crate) struct BrokerTask {
request_rx: mpsc::Receiver<BrokerRequestBox>,
request_rx: tmpsc::Receiver<BrokerRequestBox>,
}
impl BrokerTask {
pub(crate) fn start() -> (BrokerRequestSender, JoinHandle<()>) {
let (request_tx, request_rx) = mpsc::channel(8);
let (request_tx, request_rx) = tmpsc::channel(8);
let broker = BrokerTask { request_rx };
let join_handle = tokio::spawn(broker.run());

94
src/broker_task/mpsc.rs

@ -0,0 +1,94 @@ @@ -0,0 +1,94 @@
use super::{
error::{BrokerError, BrokerResult},
registry::{BasicMessageType, Registry},
request::{BrokerRequest, BrokerRequestInternal, MakeBrokerRequest, RequestHandler},
};
use crate::{mpsc, Message};
use std::marker::PhantomData;
use tokio::sync::oneshot;
pub type ClaimReceiverRequest<T> = BrokerRequest<ClaimReceiver<T>>;
pub enum ClaimReceiverError {
AlreadyClaimed,
Broker(BrokerError),
}
pub struct ClaimReceiver<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for ClaimReceiver<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for ClaimReceiver<T> {
type Response = Result<mpsc::Receiver<T>, ClaimReceiverError>;
fn handle(self, registry: &mut Registry) -> Self::Response {
registry
.get_mpsc_sender::<T>(&self.msg_type)
.map_err(ClaimReceiverError::Broker)
.and_then(|sender| {
sender
.claim_receiver()
.ok_or(ClaimReceiverError::AlreadyClaimed)
})
.map(mpsc::Receiver::new)
}
}
pub struct SenderRequest<T, W> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<mpsc::Sender<T>>>,
_wait: PhantomData<W>,
}
pub trait GetSenderWait: Send {
const WAIT: bool;
}
pub struct NoWait;
impl GetSenderWait for NoWait {
const WAIT: bool = false;
}
pub type SenderRequestNoWait<T> = SenderRequest<T, NoWait>;
pub struct Wait;
impl GetSenderWait for Wait {
const WAIT: bool = true;
}
pub type SenderRequestWait<T> = SenderRequest<T, Wait>;
impl<T: Message, W: GetSenderWait> MakeBrokerRequest for SenderRequest<T, W> {
type Response = BrokerResult<mpsc::Sender<T>>;
fn create(response_tx: oneshot::Sender<Self::Response>) -> Box<Self> {
Box::new(Self {
msg_type: BasicMessageType::default(),
response_tx,
_wait: PhantomData,
})
}
}
impl<T: Message, W: GetSenderWait> BrokerRequestInternal for SenderRequest<T, W> {
fn run_request(self: Box<Self>, registry: &mut Registry) {
let response = match registry.get_mpsc_sender::<T>(&self.msg_type) {
Ok(sender) => Ok(if W::WAIT {
sender.clone_sender_or_wait(self.response_tx);
return;
} else {
sender.clone_sender()
}),
Err(err) => Err(err),
};
let _ = self.response_tx.send(response);
}
}

53
src/broker_task/pub_sub.rs

@ -0,0 +1,53 @@ @@ -0,0 +1,53 @@
use super::{
error::BrokerResult,
registry::{BasicMessageType, Registry},
request::{BrokerRequest, RequestHandler},
};
use crate::{pub_sub, Message};
pub type SubscribeRequest<T> = BrokerRequest<Subscribe<T>>;
pub struct Subscribe<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for Subscribe<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for Subscribe<T> {
type Response = BrokerResult<pub_sub::Subscription<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_broadcast_sender::<T>(&self.msg_type);
sender.map(|sender| {
let receiver = sender.subscribe();
pub_sub::Subscription::new(receiver)
})
}
}
pub type AdvertiseRequest<T> = BrokerRequest<Advertise<T>>;
pub struct Advertise<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for Advertise<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for Advertise<T> {
type Response = BrokerResult<pub_sub::Publication<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_broadcast_sender::<T>(&self.msg_type);
sender.map(|sender| pub_sub::Publication::new(sender.clone()))
}
}

138
src/broker_task/request.rs

@ -1,11 +1,6 @@ @@ -1,11 +1,6 @@
use std::marker::PhantomData;
use tokio::sync::{mpsc, oneshot};
use super::{
registry::{BasicMessageType, Registry},
BrokerError, BrokerResult,
};
use crate::{Message, Publication, Receiver, Sender, Subscription};
use super::registry::Registry;
pub trait BrokerRequestInternal: Send {
fn run_request(self: Box<Self>, registry: &mut Registry);
@ -53,134 +48,3 @@ impl<H: RequestHandler + Default> MakeBrokerRequest for BrokerRequest<H> { @@ -53,134 +48,3 @@ impl<H: RequestHandler + Default> MakeBrokerRequest for BrokerRequest<H> {
pub type BrokerRequestBox = Box<dyn BrokerRequestInternal>;
pub type BrokerRequestSender = mpsc::Sender<BrokerRequestBox>;
pub type SubscribeRequest<T> = BrokerRequest<Subscribe<T>>;
pub struct Subscribe<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for Subscribe<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for Subscribe<T> {
type Response = BrokerResult<Subscription<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_broadcast_sender::<T>(&self.msg_type);
sender.map(|sender| {
let receiver = sender.subscribe();
Subscription::new(receiver)
})
}
}
pub type AdvertiseRequest<T> = BrokerRequest<Advertise<T>>;
pub struct Advertise<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for Advertise<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for Advertise<T> {
type Response = BrokerResult<Publication<T>>;
fn handle(self, registry: &mut Registry) -> Self::Response {
let sender = registry.get_broadcast_sender::<T>(&self.msg_type);
sender.map(|sender| Publication::new(sender.clone()))
}
}
pub type ClaimReceiverRequest<T> = BrokerRequest<ClaimReceiver<T>>;
pub enum ClaimReceiverError {
AlreadyClaimed,
Broker(BrokerError),
}
pub struct ClaimReceiver<T: Message> {
msg_type: BasicMessageType<T>,
}
impl<T: Message> Default for ClaimReceiver<T> {
fn default() -> Self {
Self {
msg_type: BasicMessageType::<T>::default(),
}
}
}
impl<T: Message> RequestHandler for ClaimReceiver<T> {
type Response = Result<Receiver<T>, ClaimReceiverError>;
fn handle(self, registry: &mut Registry) -> Self::Response {
registry
.get_mpsc_sender::<T>(&self.msg_type)
.map_err(ClaimReceiverError::Broker)
.and_then(|sender| {
sender
.claim_receiver()
.ok_or(ClaimReceiverError::AlreadyClaimed)
})
.map(Receiver::new)
}
}
pub struct SenderRequest<T, W> {
msg_type: BasicMessageType<T>,
response_tx: oneshot::Sender<BrokerResult<Sender<T>>>,
_wait: PhantomData<W>,
}
pub trait GetSenderWait: Send {
const WAIT: bool;
}
pub struct NoWait;
impl GetSenderWait for NoWait {
const WAIT: bool = false;
}
pub type SenderRequestNoWait<T> = SenderRequest<T, NoWait>;
pub struct Wait;
impl GetSenderWait for Wait {
const WAIT: bool = true;
}
pub type SenderRequestWait<T> = SenderRequest<T, Wait>;
impl<T: Message, W: GetSenderWait> MakeBrokerRequest for SenderRequest<T, W> {
type Response = BrokerResult<Sender<T>>;
fn create(response_tx: oneshot::Sender<Self::Response>) -> Box<Self> {
Box::new(Self {
msg_type: BasicMessageType::default(),
response_tx,
_wait: PhantomData,
})
}
}
impl<T: Message, W: GetSenderWait> BrokerRequestInternal for SenderRequest<T, W> {
fn run_request(self: Box<Self>, registry: &mut Registry) {
let response = match registry.get_mpsc_sender::<T>(&self.msg_type) {
Ok(sender) => Ok(if W::WAIT {
sender.clone_sender_or_wait(self.response_tx);
return;
} else {
sender.clone_sender()
}),
Err(err) => Err(err),
};
let _ = self.response_tx.send(response);
}
}

Loading…
Cancel
Save