From eff5e0c703de3e23bbd3a12971be1608bd5ac7f1 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Wed, 30 Sep 2020 15:55:59 -0600 Subject: [PATCH] Split out actors into sprinklers_actors --- Cargo.toml | 1 + sprinklers_actors/Cargo.toml | 36 +++++++++++++++++++ sprinklers_actors/src/lib.rs | 8 +++++ .../src/program_runner.rs | 2 +- .../src/section_runner.rs | 16 ++++----- .../src/trace_listeners.rs | 0 sprinklers_rs/Cargo.toml | 2 ++ sprinklers_rs/src/main.rs | 13 +++---- sprinklers_rs/src/mqtt/mod.rs | 6 +++- sprinklers_rs/src/mqtt/request/mod.rs | 2 +- sprinklers_rs/src/mqtt/request/programs.rs | 2 +- sprinklers_rs/src/mqtt/request/sections.rs | 2 +- .../src/{ => mqtt}/section_runner_json.rs | 2 +- .../src/{ => mqtt}/update_listener.rs | 5 +-- 14 files changed, 72 insertions(+), 25 deletions(-) create mode 100644 sprinklers_actors/Cargo.toml create mode 100644 sprinklers_actors/src/lib.rs rename {sprinklers_rs => sprinklers_actors}/src/program_runner.rs (99%) rename {sprinklers_rs => sprinklers_actors}/src/section_runner.rs (98%) rename {sprinklers_rs => sprinklers_actors}/src/trace_listeners.rs (100%) rename sprinklers_rs/src/{ => mqtt}/section_runner_json.rs (96%) rename sprinklers_rs/src/{ => mqtt}/update_listener.rs (98%) diff --git a/Cargo.toml b/Cargo.toml index ebb7fb5..bf725f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,5 +3,6 @@ members = [ "sprinklers_core", "sprinklers_database", + "sprinklers_actors", "sprinklers_rs" ] \ No newline at end of file diff --git a/sprinklers_actors/Cargo.toml b/sprinklers_actors/Cargo.toml new file mode 100644 index 0000000..b8b850d --- /dev/null +++ b/sprinklers_actors/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "sprinklers_actors" +version = "0.1.0" +authors = ["Alex Mikhalev "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +sprinklers_core = { path = "../sprinklers_core" } +actix = { version = "0.10.0", default-features = false } +thiserror = "1.0.20" +tracing = "0.1.19" +chrono = { version = "0.4.15" } +serde = { version = "1.0.116", features = ["derive"] } +im = "15.0.0" + +[dependencies.tokio] +version = "0.2.22" +default-features = false +features = [] + +[dependencies.futures-util] +version = "0.3.5" +default-features = false +features = ["std", "async-await", "sink"] + +[dev-dependencies] +actix-rt = "1.1.1" +tokio = { version = "0.2.22", features = ["test-util"] } +assert_matches = "1.3.0" + +[dev-dependencies.tracing-subscriber] +version = "0.2.11" +default-features = false +features = ["registry"] diff --git a/sprinklers_actors/src/lib.rs b/sprinklers_actors/src/lib.rs new file mode 100644 index 0000000..59edff5 --- /dev/null +++ b/sprinklers_actors/src/lib.rs @@ -0,0 +1,8 @@ +pub mod program_runner; +pub mod section_runner; + +#[cfg(test)] +mod trace_listeners; + +pub use program_runner::ProgramRunner; +pub use section_runner::SectionRunner; diff --git a/sprinklers_rs/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs similarity index 99% rename from sprinklers_rs/src/program_runner.rs rename to sprinklers_actors/src/program_runner.rs index bd13fdd..c5e1b53 100644 --- a/sprinklers_rs/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -485,7 +485,7 @@ mod test { async fn test_quit() { let quit_msg = EventListener::new( Filters::new() - .target("sprinklers_rs::program_runner") + .target("sprinklers_actors::program_runner") .message("program_runner stopped"), ); let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); diff --git a/sprinklers_rs/src/section_runner.rs b/sprinklers_actors/src/section_runner.rs similarity index 98% rename from sprinklers_rs/src/section_runner.rs rename to sprinklers_actors/src/section_runner.rs index ff972ee..00fb6a4 100644 --- a/sprinklers_rs/src/section_runner.rs +++ b/sprinklers_actors/src/section_runner.rs @@ -62,11 +62,11 @@ pub enum SecRunState { #[derive(Clone, Debug)] pub struct SecRun { - pub(crate) handle: SectionRunHandle, - pub(crate) section: SectionRef, - pub(crate) duration: Duration, - pub(crate) total_duration: Duration, - pub(crate) state: SecRunState, + pub handle: SectionRunHandle, + pub section: SectionRef, + pub duration: Duration, + pub total_duration: Duration, + pub state: SecRunState, } impl SecRun { @@ -94,8 +94,8 @@ pub type SecRunQueue = im::Vector>; #[derive(Clone, Debug)] pub struct SecRunnerState { - pub(crate) run_queue: SecRunQueue, - pub(crate) paused: bool, + pub run_queue: SecRunQueue, + pub paused: bool, } impl Default for SecRunnerState { @@ -633,7 +633,7 @@ mod test { async fn test_quit() { let quit_msg = EventListener::new( Filters::new() - .target("sprinklers_rs::section_runner") + .target("sprinklers_actors::section_runner") .message("section_runner stopped"), ); let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); diff --git a/sprinklers_rs/src/trace_listeners.rs b/sprinklers_actors/src/trace_listeners.rs similarity index 100% rename from sprinklers_rs/src/trace_listeners.rs rename to sprinklers_actors/src/trace_listeners.rs diff --git a/sprinklers_rs/Cargo.toml b/sprinklers_rs/Cargo.toml index 67b2180..6645750 100644 --- a/sprinklers_rs/Cargo.toml +++ b/sprinklers_rs/Cargo.toml @@ -9,6 +9,8 @@ edition = "2018" [dependencies] sprinklers_core = { path = "../sprinklers_core" } sprinklers_database = { path = "../sprinklers_database" } +sprinklers_actors = { path = "../sprinklers_actors" } + color-eyre = "0.5.1" eyre = "0.6.0" thiserror = "1.0.20" diff --git a/sprinklers_rs/src/main.rs b/sprinklers_rs/src/main.rs index 3981fe5..b1b52ec 100644 --- a/sprinklers_rs/src/main.rs +++ b/sprinklers_rs/src/main.rs @@ -3,16 +3,11 @@ mod mqtt; mod option_future; -mod program_runner; -mod section_runner; -mod section_runner_json; -#[cfg(test)] -mod trace_listeners; -mod update_listener; use sprinklers_core::section_interface::MockSectionInterface; use sprinklers_database as database; -use update_listener::UpdateListener; +use sprinklers_actors as actors; +use mqtt::UpdateListener; use eyre::Result; use std::sync::Arc; @@ -39,8 +34,8 @@ async fn main() -> Result<()> { // TODO: Section interface which actual does something. Preferrably selectable somehow let section_interface: Arc<_> = MockSectionInterface::new(6).into(); - let mut section_runner = section_runner::SectionRunner::new(section_interface); - let mut program_runner = program_runner::ProgramRunner::new(section_runner.clone()); + let mut section_runner = actors::SectionRunner::new(section_interface); + let mut program_runner = actors::ProgramRunner::new(section_runner.clone()); let programs = database::query_programs(&conn)?; diff --git a/sprinklers_rs/src/mqtt/mod.rs b/sprinklers_rs/src/mqtt/mod.rs index 623bb77..307a15b 100644 --- a/sprinklers_rs/src/mqtt/mod.rs +++ b/sprinklers_rs/src/mqtt/mod.rs @@ -1,12 +1,16 @@ mod actor; mod event_loop; mod request; +mod section_runner_json; mod topics; +mod update_listener; pub use request::RequestContext; +pub use update_listener::UpdateListener; use self::topics::Topics; -use crate::{section_runner::SecRunnerState, section_runner_json::SecRunnerStateJson}; +use section_runner_json::SecRunnerStateJson; +use sprinklers_actors::section_runner::SecRunnerState; use sprinklers_core::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; use actix::{Actor, Addr}; diff --git a/sprinklers_rs/src/mqtt/request/mod.rs b/sprinklers_rs/src/mqtt/request/mod.rs index 2b074db..95711ea 100644 --- a/sprinklers_rs/src/mqtt/request/mod.rs +++ b/sprinklers_rs/src/mqtt/request/mod.rs @@ -1,4 +1,4 @@ -use crate::{program_runner::ProgramRunner, section_runner::SectionRunner}; +use sprinklers_actors::{program_runner::ProgramRunner, section_runner::SectionRunner}; use sprinklers_core::model::Sections; use futures_util::ready; diff --git a/sprinklers_rs/src/mqtt/request/programs.rs b/sprinklers_rs/src/mqtt/request/programs.rs index 311489b..49db41e 100644 --- a/sprinklers_rs/src/mqtt/request/programs.rs +++ b/sprinklers_rs/src/mqtt/request/programs.rs @@ -1,5 +1,5 @@ use super::*; -use crate::program_runner::ProgramRunnerError; +use sprinklers_actors::program_runner::ProgramRunnerError; use sprinklers_core::model::ProgramId; use eyre::WrapErr; diff --git a/sprinklers_rs/src/mqtt/request/sections.rs b/sprinklers_rs/src/mqtt/request/sections.rs index 6a0aa97..23dcf41 100644 --- a/sprinklers_rs/src/mqtt/request/sections.rs +++ b/sprinklers_rs/src/mqtt/request/sections.rs @@ -1,5 +1,5 @@ use super::*; -use crate::section_runner::SectionRunHandle; +use sprinklers_actors::section_runner::SectionRunHandle; use sprinklers_core::model::{self, SectionRef}; use sprinklers_core::serde::duration_secs; diff --git a/sprinklers_rs/src/section_runner_json.rs b/sprinklers_rs/src/mqtt/section_runner_json.rs similarity index 96% rename from sprinklers_rs/src/section_runner_json.rs rename to sprinklers_rs/src/mqtt/section_runner_json.rs index 657f4e2..ada486c 100644 --- a/sprinklers_rs/src/section_runner_json.rs +++ b/sprinklers_rs/src/mqtt/section_runner_json.rs @@ -1,4 +1,4 @@ -use crate::section_runner::{SecRun, SecRunState, SecRunnerState}; +use sprinklers_actors::section_runner::{SecRun, SecRunState, SecRunnerState}; use sprinklers_core::model::SectionId; use chrono::{DateTime, Utc}; diff --git a/sprinklers_rs/src/update_listener.rs b/sprinklers_rs/src/mqtt/update_listener.rs similarity index 98% rename from sprinklers_rs/src/update_listener.rs rename to sprinklers_rs/src/mqtt/update_listener.rs index e28e746..b71c733 100644 --- a/sprinklers_rs/src/update_listener.rs +++ b/sprinklers_rs/src/mqtt/update_listener.rs @@ -1,8 +1,9 @@ -use crate::{ - mqtt::MqttInterface, +use crate::mqtt::MqttInterface; +use sprinklers_actors::{ program_runner::{ProgramEvent, ProgramEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, }; + use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use tokio::sync::broadcast; use tracing::{trace, warn};