Split remaining mqtt stuff into sprinklers_mqtt
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
Alex Mikhalev 2020-09-30 16:16:31 -06:00
parent eff5e0c703
commit 6e596a2ef3
13 changed files with 40 additions and 26 deletions

View File

@ -4,5 +4,6 @@ members = [
"sprinklers_core", "sprinklers_core",
"sprinklers_database", "sprinklers_database",
"sprinklers_actors", "sprinklers_actors",
"sprinklers_mqtt",
"sprinklers_rs" "sprinklers_rs"
] ]

View File

@ -0,0 +1,27 @@
[package]
name = "sprinklers_mqtt"
version = "0.1.0"
authors = ["Alex Mikhalev <alexmikhalevalex@gmail.com>"]
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
sprinklers_core = { path = "../sprinklers_core" }
sprinklers_actors = { path = "../sprinklers_actors" }
actix = { version = "0.10.0", default-features = false }
eyre = "0.6.0"
rumqttc = "0.1.0"
tracing = "0.1.19"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
chrono = "0.4.15"
num-traits = "0.2.12"
num-derive = "0.3.2"
futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] }
[dependencies.tokio]
version = "0.2.22"
default-features = false
features = []

View File

@ -63,13 +63,13 @@ impl MqttInterface {
Ok(()) Ok(())
} }
pub(super) async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> { async fn publish_connected(&mut self, connected: bool) -> eyre::Result<()> {
self.publish_data(self.topics.connected(), &connected) self.publish_data(self.topics.connected(), &connected)
.await .await
.wrap_err("failed to publish connected topic") .wrap_err("failed to publish connected topic")
} }
pub(super) async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> { async fn cancel(&mut self) -> Result<(), rumqttc::ClientError> {
self.client.cancel().await self.client.cancel().await
} }
@ -135,7 +135,7 @@ impl MqttInterface {
.wrap_err("failed to publish section runner") .wrap_err("failed to publish section runner")
} }
pub async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> { async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> {
let payload_vec = let payload_vec =
serde_json::to_vec(&resp).wrap_err("failed to serialize request response")?; serde_json::to_vec(&resp).wrap_err("failed to serialize request response")?;
// TODO: if couldn't serialize, just in case can have a static response // TODO: if couldn't serialize, just in case can have a static response
@ -146,7 +146,7 @@ impl MqttInterface {
Ok(()) Ok(())
} }
pub async fn subscribe_requests(&mut self) -> eyre::Result<()> { async fn subscribe_requests(&mut self) -> eyre::Result<()> {
self.client self.client
.subscribe(self.topics.requests(), QoS::ExactlyOnce) .subscribe(self.topics.requests(), QoS::ExactlyOnce)
.await?; .await?;

View File

@ -1,8 +1,7 @@
use sprinklers_actors::{program_runner::ProgramRunner, section_runner::SectionRunner}; use sprinklers_actors::{program_runner::ProgramRunner, section_runner::SectionRunner};
use sprinklers_core::model::Sections; use sprinklers_core::model::Sections;
use futures_util::ready; use futures_util::{ready, FutureExt};
use futures_util::FutureExt;
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt, future::Future, pin::Pin, task::Poll}; use std::{fmt, future::Future, pin::Pin, task::Poll};

View File

@ -1,4 +1,4 @@
use crate::mqtt::MqttInterface; use super::MqttInterface;
use sprinklers_actors::{ use sprinklers_actors::{
program_runner::{ProgramEvent, ProgramEventRecv}, program_runner::{ProgramEvent, ProgramEventRecv},
section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv},

View File

@ -10,27 +10,14 @@ edition = "2018"
sprinklers_core = { path = "../sprinklers_core" } sprinklers_core = { path = "../sprinklers_core" }
sprinklers_database = { path = "../sprinklers_database" } sprinklers_database = { path = "../sprinklers_database" }
sprinklers_actors = { path = "../sprinklers_actors" } sprinklers_actors = { path = "../sprinklers_actors" }
sprinklers_mqtt = { path = "../sprinklers_mqtt" }
color-eyre = "0.5.1" color-eyre = "0.5.1"
eyre = "0.6.0" eyre = "0.6.0"
thiserror = "1.0.20" tokio = "0.2.22"
tokio = { version = "0.2.22", features = ["rt-core", "time", "stream", "sync", "signal", "macros", "test-util"] }
tracing = { version = "0.1.19", features = ["log"] } tracing = { version = "0.1.19", features = ["log"] }
tracing-futures = "0.2.4"
pin-project = "0.4.23"
im = "15.0.0"
chrono = { version = "0.4.15" }
assert_matches = "1.3.0"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
actix = { version = "0.10.0", default-features = false } actix = { version = "0.10.0", default-features = false }
actix-rt = "1.1.1" actix-rt = "1.1.1"
futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await", "sink"] }
num-traits = "0.2.12"
num-derive = "0.3.2"
[dependencies.rumqttc]
version = "0.1.0"
[dependencies.tracing-subscriber] [dependencies.tracing-subscriber]
version = "0.2.11" version = "0.2.11"

View File

@ -1,13 +1,12 @@
#![warn(clippy::all)] #![warn(clippy::all)]
#![warn(clippy::print_stdout)] #![warn(clippy::print_stdout)]
mod mqtt; // mod option_future;
mod option_future;
use sprinklers_core::section_interface::MockSectionInterface; use sprinklers_core::section_interface::MockSectionInterface;
use sprinklers_database as database; use sprinklers_database as database;
use sprinklers_actors as actors; use sprinklers_actors as actors;
use mqtt::UpdateListener; use sprinklers_mqtt as mqtt;
use eyre::Result; use eyre::Result;
use std::sync::Arc; use std::sync::Arc;
@ -61,7 +60,7 @@ async fn main() -> Result<()> {
let section_events = section_runner.subscribe().await?; let section_events = section_runner.subscribe().await?;
let program_events = program_runner.subscribe().await?; let program_events = program_runner.subscribe().await?;
let sec_runner_state = section_runner.state_receiver(); let sec_runner_state = section_runner.state_receiver();
UpdateListener::start( mqtt::UpdateListener::start(
section_events, section_events,
program_events, program_events,
sec_runner_state, sec_runner_state,
@ -70,6 +69,7 @@ async fn main() -> Result<()> {
}; };
program_runner.update_sections(sections.clone()).await?; program_runner.update_sections(sections.clone()).await?;
// TODO: update listener should probably do this
mqtt_interface.publish_sections(&sections).await?; mqtt_interface.publish_sections(&sections).await?;
for section_id in sections.keys() { for section_id in sections.keys() {
mqtt_interface mqtt_interface