From 6e596a2ef3c1b5d53f2c8a23be780eb515a4e87b Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Wed, 30 Sep 2020 16:16:31 -0600 Subject: [PATCH] Split remaining mqtt stuff into sprinklers_mqtt --- Cargo.toml | 1 + sprinklers_mqtt/Cargo.toml | 27 +++++++++++++++++++ .../src/mqtt => sprinklers_mqtt/src}/actor.rs | 0 .../src}/event_loop.rs | 0 .../mqtt/mod.rs => sprinklers_mqtt/src/lib.rs | 8 +++--- .../src}/request/mod.rs | 3 +-- .../src}/request/programs.rs | 0 .../src}/request/sections.rs | 0 .../src}/section_runner_json.rs | 0 .../mqtt => sprinklers_mqtt/src}/topics.rs | 0 .../src}/update_listener.rs | 2 +- sprinklers_rs/Cargo.toml | 17 ++---------- sprinklers_rs/src/main.rs | 8 +++--- 13 files changed, 40 insertions(+), 26 deletions(-) create mode 100644 sprinklers_mqtt/Cargo.toml rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/actor.rs (100%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/event_loop.rs (100%) rename sprinklers_rs/src/mqtt/mod.rs => sprinklers_mqtt/src/lib.rs (94%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/request/mod.rs (99%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/request/programs.rs (100%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/request/sections.rs (100%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/section_runner_json.rs (100%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/topics.rs (100%) rename {sprinklers_rs/src/mqtt => sprinklers_mqtt/src}/update_listener.rs (99%) diff --git a/Cargo.toml b/Cargo.toml index bf725f1..b0b3dca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,5 +4,6 @@ members = [ "sprinklers_core", "sprinklers_database", "sprinklers_actors", + "sprinklers_mqtt", "sprinklers_rs" ] \ No newline at end of file diff --git a/sprinklers_mqtt/Cargo.toml b/sprinklers_mqtt/Cargo.toml new file mode 100644 index 0000000..bbe6027 --- /dev/null +++ b/sprinklers_mqtt/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "sprinklers_mqtt" +version = "0.1.0" +authors = ["Alex Mikhalev "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sprinklers_core = { path = "../sprinklers_core" } +sprinklers_actors = { path = "../sprinklers_actors" } + +actix = { version = "0.10.0", default-features = false } +eyre = "0.6.0" +rumqttc = "0.1.0" +tracing = "0.1.19" +serde = { version = "1.0.116", features = ["derive"] } +serde_json = "1.0.57" +chrono = "0.4.15" +num-traits = "0.2.12" +num-derive = "0.3.2" +futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] } + +[dependencies.tokio] +version = "0.2.22" +default-features = false +features = [] diff --git a/sprinklers_rs/src/mqtt/actor.rs b/sprinklers_mqtt/src/actor.rs similarity index 100% rename from sprinklers_rs/src/mqtt/actor.rs rename to sprinklers_mqtt/src/actor.rs diff --git a/sprinklers_rs/src/mqtt/event_loop.rs b/sprinklers_mqtt/src/event_loop.rs similarity index 100% rename from sprinklers_rs/src/mqtt/event_loop.rs rename to sprinklers_mqtt/src/event_loop.rs diff --git a/sprinklers_rs/src/mqtt/mod.rs b/sprinklers_mqtt/src/lib.rs similarity index 94% rename from sprinklers_rs/src/mqtt/mod.rs rename to sprinklers_mqtt/src/lib.rs index 307a15b..58649c3 100644 --- a/sprinklers_rs/src/mqtt/mod.rs +++ b/sprinklers_mqtt/src/lib.rs @@ -63,13 +63,13 @@ impl MqttInterface { Ok(()) } - pub(super) async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { + async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { self.publish_data(self.topics.connected(), &connected) .await .wrap_err("failed to publish connected topic") } - pub(super) async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { + async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { self.client.cancel().await } @@ -135,7 +135,7 @@ impl MqttInterface { .wrap_err("failed to publish section runner") } - pub async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> { + async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> { let payload_vec = serde_json::to_vec(&resp).wrap_err("failed to serialize request response")?; // TODO: if couldn't serialize, just in case can have a static response @@ -146,7 +146,7 @@ impl MqttInterface { Ok(()) } - pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { + async fn subscribe_requests(&mut self) -> eyre::Result<()> { self.client .subscribe(self.topics.requests(), QoS::ExactlyOnce) .await?; diff --git a/sprinklers_rs/src/mqtt/request/mod.rs b/sprinklers_mqtt/src/request/mod.rs similarity index 99% rename from sprinklers_rs/src/mqtt/request/mod.rs rename to sprinklers_mqtt/src/request/mod.rs index 95711ea..23829f5 100644 --- a/sprinklers_rs/src/mqtt/request/mod.rs +++ b/sprinklers_mqtt/src/request/mod.rs @@ -1,8 +1,7 @@ use sprinklers_actors::{program_runner::ProgramRunner, section_runner::SectionRunner}; use sprinklers_core::model::Sections; -use futures_util::ready; -use futures_util::FutureExt; +use futures_util::{ready, FutureExt}; use num_derive::FromPrimitive; use serde::{Deserialize, Serialize}; use std::{fmt, future::Future, pin::Pin, task::Poll}; diff --git a/sprinklers_rs/src/mqtt/request/programs.rs b/sprinklers_mqtt/src/request/programs.rs similarity index 100% rename from sprinklers_rs/src/mqtt/request/programs.rs rename to sprinklers_mqtt/src/request/programs.rs diff --git a/sprinklers_rs/src/mqtt/request/sections.rs b/sprinklers_mqtt/src/request/sections.rs similarity index 100% rename from sprinklers_rs/src/mqtt/request/sections.rs rename to sprinklers_mqtt/src/request/sections.rs diff --git a/sprinklers_rs/src/mqtt/section_runner_json.rs b/sprinklers_mqtt/src/section_runner_json.rs similarity index 100% rename from sprinklers_rs/src/mqtt/section_runner_json.rs rename to sprinklers_mqtt/src/section_runner_json.rs diff --git a/sprinklers_rs/src/mqtt/topics.rs b/sprinklers_mqtt/src/topics.rs similarity index 100% rename from sprinklers_rs/src/mqtt/topics.rs rename to sprinklers_mqtt/src/topics.rs diff --git a/sprinklers_rs/src/mqtt/update_listener.rs b/sprinklers_mqtt/src/update_listener.rs similarity index 99% rename from sprinklers_rs/src/mqtt/update_listener.rs rename to sprinklers_mqtt/src/update_listener.rs index b71c733..68fe6ab 100644 --- a/sprinklers_rs/src/mqtt/update_listener.rs +++ b/sprinklers_mqtt/src/update_listener.rs @@ -1,4 +1,4 @@ -use crate::mqtt::MqttInterface; +use super::MqttInterface; use sprinklers_actors::{ program_runner::{ProgramEvent, ProgramEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, diff --git a/sprinklers_rs/Cargo.toml b/sprinklers_rs/Cargo.toml index 6645750..0cf05b7 100644 --- a/sprinklers_rs/Cargo.toml +++ b/sprinklers_rs/Cargo.toml @@ -10,27 +10,14 @@ edition = "2018" sprinklers_core = { path = "../sprinklers_core" } sprinklers_database = { path = "../sprinklers_database" } sprinklers_actors = { path = "../sprinklers_actors" } +sprinklers_mqtt = { path = "../sprinklers_mqtt" } color-eyre = "0.5.1" eyre = "0.6.0" -thiserror = "1.0.20" -tokio = { version = "0.2.22", features = ["rt-core", "time", "stream", "sync", "signal", "macros", "test-util"] } +tokio = "0.2.22" tracing = { version = "0.1.19", features = ["log"] } -tracing-futures = "0.2.4" -pin-project = "0.4.23" -im = "15.0.0" -chrono = { version = "0.4.15" } -assert_matches = "1.3.0" -serde = { version = "1.0.116", features = ["derive"] } -serde_json = "1.0.57" actix = { version = "0.10.0", default-features = false } actix-rt = "1.1.1" -futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] } -num-traits = "0.2.12" -num-derive = "0.3.2" - -[dependencies.rumqttc] -version = "0.1.0" [dependencies.tracing-subscriber] version = "0.2.11" diff --git a/sprinklers_rs/src/main.rs b/sprinklers_rs/src/main.rs index b1b52ec..00300cc 100644 --- a/sprinklers_rs/src/main.rs +++ b/sprinklers_rs/src/main.rs @@ -1,13 +1,12 @@ #![warn(clippy::all)] #![warn(clippy::print_stdout)] -mod mqtt; -mod option_future; +// mod option_future; use sprinklers_core::section_interface::MockSectionInterface; use sprinklers_database as database; use sprinklers_actors as actors; -use mqtt::UpdateListener; +use sprinklers_mqtt as mqtt; use eyre::Result; use std::sync::Arc; @@ -61,7 +60,7 @@ async fn main() -> Result<()> { let section_events = section_runner.subscribe().await?; let program_events = program_runner.subscribe().await?; let sec_runner_state = section_runner.state_receiver(); - UpdateListener::start( + mqtt::UpdateListener::start( section_events, program_events, sec_runner_state, @@ -70,6 +69,7 @@ async fn main() -> Result<()> { }; program_runner.update_sections(sections.clone()).await?; + // TODO: update listener should probably do this mqtt_interface.publish_sections(§ions).await?; for section_id in sections.keys() { mqtt_interface