Browse Source

Move zone handling to state manager

And zone publishing to update listener
master
Alex Mikhalev 4 years ago
parent
commit
4eb2043ad7
  1. 23
      sprinklers_actors/src/program_runner.rs
  2. 9
      sprinklers_actors/src/state_manager.rs
  3. 33
      sprinklers_mqtt/src/lib.rs
  4. 3
      sprinklers_mqtt/src/request/mod.rs
  5. 4
      sprinklers_mqtt/src/request/zones.rs
  6. 50
      sprinklers_mqtt/src/update_listener.rs
  7. 20
      sprinklers_rs/src/main.rs
  8. 25
      sprinklers_rs/src/state_manager.rs

23
sprinklers_actors/src/program_runner.rs

@ -316,6 +316,24 @@ impl Handler<CancelProgram> for ProgramRunnerActor {
} }
} }
impl StreamHandler<Zones> for ProgramRunnerActor {
fn handle(&mut self, item: Zones, ctx: &mut Self::Context) {
ctx.notify(UpdateZones(item))
}
}
#[derive(Message)]
#[rtype(result = "()")]
struct ListenZones(watch::Receiver<Zones>);
impl Handler<ListenZones> for ProgramRunnerActor {
type Result = ();
fn handle(&mut self, msg: ListenZones, ctx: &mut Self::Context) -> Self::Result {
ctx.add_stream(msg.0);
}
}
impl StreamHandler<Programs> for ProgramRunnerActor { impl StreamHandler<Programs> for ProgramRunnerActor {
fn handle(&mut self, item: Programs, ctx: &mut Self::Context) { fn handle(&mut self, item: Programs, ctx: &mut Self::Context) {
ctx.notify(UpdatePrograms(item)) ctx.notify(UpdatePrograms(item))
@ -498,6 +516,11 @@ impl ProgramRunner {
Ok(event_recv) Ok(event_recv)
} }
pub fn listen_zones(&mut self, zones_watch: watch::Receiver<Zones>) {
// TODO: should this adopt a similar pattern to update_listener?
self.addr.do_send(ListenZones(zones_watch))
}
pub fn listen_programs(&mut self, programs_watch: watch::Receiver<Programs>) { pub fn listen_programs(&mut self, programs_watch: watch::Receiver<Programs>) {
self.addr.do_send(ListenPrograms(programs_watch)) self.addr.do_send(ListenPrograms(programs_watch))
} }

9
sprinklers_actors/src/state_manager.rs

@ -1,4 +1,4 @@
use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData, Programs}; use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData, Programs, Zones};
use thiserror::Error; use thiserror::Error;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
@ -14,6 +14,7 @@ pub enum Request {
#[derive(Clone)] #[derive(Clone)]
pub struct StateManager { pub struct StateManager {
request_tx: mpsc::Sender<Request>, request_tx: mpsc::Sender<Request>,
zones_watch: watch::Receiver<Zones>,
programs_watch: watch::Receiver<Programs>, programs_watch: watch::Receiver<Programs>,
} }
@ -34,10 +35,12 @@ pub type Result<T, E = StateError> = std::result::Result<T, E>;
impl StateManager { impl StateManager {
pub fn new( pub fn new(
request_tx: mpsc::Sender<Request>, request_tx: mpsc::Sender<Request>,
zones_watch: watch::Receiver<Zones>,
programs_watch: watch::Receiver<Programs>, programs_watch: watch::Receiver<Programs>,
) -> Self { ) -> Self {
Self { Self {
request_tx, request_tx,
zones_watch,
programs_watch, programs_watch,
} }
} }
@ -59,6 +62,10 @@ impl StateManager {
resp_rx.await.map_err(eyre::Report::from)? resp_rx.await.map_err(eyre::Report::from)?
} }
pub fn get_zones(&self) -> watch::Receiver<Zones> {
self.zones_watch.clone()
}
pub fn get_programs(&self) -> watch::Receiver<Programs> { pub fn get_programs(&self) -> watch::Receiver<Programs> {
self.programs_watch.clone() self.programs_watch.clone()
} }

33
sprinklers_mqtt/src/lib.rs

@ -75,15 +75,42 @@ impl MqttInterface {
pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> { pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> {
let zone_ids: Vec<_> = zones.keys().cloned().collect(); let zone_ids: Vec<_> = zones.keys().cloned().collect();
self.publish_data(self.topics.zones(), &zone_ids) self.publish_zone_ids(&zone_ids).await?;
.await
.wrap_err("failed to publish zone ids")?;
for zone in zones.values() { for zone in zones.values() {
self.publish_zone(zone).await?; self.publish_zone(zone).await?;
} }
Ok(()) Ok(())
} }
// TODO: figure out how to share logic with publish_programs_diff and publish_zones
pub async fn publish_zones_diff(
&mut self,
old_zones: &Zones,
zones: &Zones,
) -> eyre::Result<()> {
for (id, zone) in zones {
let publish = match old_zones.get(id) {
Some(old_zone) => !Arc::ptr_eq(old_zone, zone),
None => {
let zone_ids: Vec<_> = zones.keys().cloned().collect();
self.publish_zone_ids(&zone_ids).await?;
true
}
};
if publish {
self.publish_zone(zone).await?;
}
}
Ok(())
}
pub async fn publish_zone_ids(&mut self, zone_ids: &[ZoneId]) -> eyre::Result<()> {
self.publish_data(self.topics.zones(), &zone_ids)
.await
.wrap_err("failed to publish zone ids")?;
Ok(())
}
pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> { pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> {
self.publish_data(self.topics.zone_data(zone.id), zone) self.publish_data(self.topics.zone_data(zone.id), zone)
.await .await

3
sprinklers_mqtt/src/request/mod.rs

@ -5,12 +5,13 @@ use futures_util::{ready, FutureExt};
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt, future::Future, pin::Pin, task::Poll}; use std::{fmt, future::Future, pin::Pin, task::Poll};
use tokio::sync::watch;
mod programs; mod programs;
mod zones; mod zones;
pub struct RequestContext { pub struct RequestContext {
pub zones: Zones, pub zones: watch::Receiver<Zones>,
pub zone_runner: ZoneRunner, pub zone_runner: ZoneRunner,
pub program_runner: ProgramRunner, pub program_runner: ProgramRunner,
pub state_manager: StateManager, pub state_manager: StateManager,

4
sprinklers_mqtt/src/request/zones.rs

@ -41,7 +41,7 @@ impl IRequest for RunZoneRequest {
type Response = RunZoneResponse; type Response = RunZoneResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone(); let mut zone_runner = ctx.zone_runner.clone();
let zone = self.zone_id.get_zone(&ctx.zones); let zone = self.zone_id.get_zone(&*ctx.zones.borrow());
let duration = self.duration; let duration = self.duration;
Box::pin(async move { Box::pin(async move {
let zone = zone?; let zone = zone?;
@ -76,7 +76,7 @@ impl IRequest for CancelZoneRequest {
type Response = CancelZoneResponse; type Response = CancelZoneResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone(); let mut zone_runner = ctx.zone_runner.clone();
let zone = self.zone_id.get_zone(&ctx.zones); let zone = self.zone_id.get_zone(&*ctx.zones.borrow());
Box::pin(async move { Box::pin(async move {
let zone = zone?; let zone = zone?;
let cancelled = zone_runner let cancelled = zone_runner

50
sprinklers_mqtt/src/update_listener.rs

@ -6,12 +6,13 @@ use sprinklers_actors::{
use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use futures_util::TryFutureExt; use futures_util::TryFutureExt;
use sprinklers_core::model::Programs; use sprinklers_core::model::{Programs, Zones};
use tokio::sync::{broadcast, watch}; use tokio::sync::{broadcast, watch};
use tracing::{trace, warn}; use tracing::{trace, warn};
struct UpdateListenerActor { struct UpdateListenerActor {
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
old_zones: Option<Zones>,
old_programs: Option<Programs>, old_programs: Option<Programs>,
} }
@ -19,6 +20,7 @@ impl UpdateListenerActor {
fn new(mqtt_interface: MqttInterface) -> Self { fn new(mqtt_interface: MqttInterface) -> Self {
Self { Self {
mqtt_interface, mqtt_interface,
old_zones: None,
old_programs: None, old_programs: None,
} }
} }
@ -36,6 +38,42 @@ impl Actor for UpdateListenerActor {
} }
} }
impl StreamHandler<Zones> for UpdateListenerActor {
fn handle(&mut self, zones: Zones, ctx: &mut Self::Context) {
let mut mqtt_interface = self.mqtt_interface.clone();
let old_zones = self.old_zones.replace(zones.clone());
let fut = async move {
mqtt_interface.publish_zones(&zones).await?;
for zone_id in zones.keys() {
mqtt_interface.publish_zone_state(*zone_id, false).await?;
}
match old_zones {
None => {
mqtt_interface.publish_zones(&zones).await?;
// Some what of a hack
// Initialize zone running states to false the first time we
// receive zones
for zone_id in zones.keys() {
mqtt_interface.publish_zone_state(*zone_id, false).await?;
}
}
Some(old_zones) => {
mqtt_interface
.publish_zones_diff(&old_zones, &zones)
.await?;
}
}
Ok(())
}
.unwrap_or_else(|err: eyre::Report| warn!("could not publish programs: {:?}", err));
ctx.spawn(wrap_future(fut));
}
}
impl StreamHandler<Result<ZoneEvent, broadcast::RecvError>> for UpdateListenerActor { impl StreamHandler<Result<ZoneEvent, broadcast::RecvError>> for UpdateListenerActor {
fn handle(&mut self, event: Result<ZoneEvent, broadcast::RecvError>, ctx: &mut Self::Context) { fn handle(&mut self, event: Result<ZoneEvent, broadcast::RecvError>, ctx: &mut Self::Context) {
let event = match event { let event = match event {
@ -196,6 +234,12 @@ where
} }
} }
impl Listenable<UpdateListenerActor> for watch::Receiver<Zones> {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self);
}
}
impl Listenable<UpdateListenerActor> for ZoneEventRecv { impl Listenable<UpdateListenerActor> for ZoneEventRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) { fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self); ctx.add_stream(self);
@ -237,6 +281,10 @@ impl UpdateListener {
self.addr.do_send(Listen(listener)); self.addr.do_send(Listen(listener));
} }
pub fn listen_zones(&mut self, zones: watch::Receiver<Zones>) {
self.listen(zones);
}
pub fn listen_zone_events(&mut self, zone_events: ZoneEventRecv) { pub fn listen_zone_events(&mut self, zone_events: ZoneEventRecv) {
self.listen(zone_events); self.listen(zone_events);
} }

20
sprinklers_rs/src/main.rs

@ -12,7 +12,7 @@ use sprinklers_mqtt as mqtt;
use eyre::{Result, WrapErr}; use eyre::{Result, WrapErr};
use settings::Settings; use settings::Settings;
use tracing::{debug, info}; use tracing::info;
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
#[actix_rt::main] #[actix_rt::main]
@ -31,11 +31,6 @@ async fn main() -> Result<()> {
let db_conn = database::setup_db()?; let db_conn = database::setup_db()?;
let zones = database::query_zones(&db_conn)?;
for zone in zones.values() {
debug!(zone = debug(&zone), "read zone");
}
let zone_interface = settings.zone_interface.build()?; let zone_interface = settings.zone_interface.build()?;
let mut zone_runner = actors::ZoneRunner::new(zone_interface); let mut zone_runner = actors::ZoneRunner::new(zone_interface);
let mut program_runner = actors::ProgramRunner::new(zone_runner.clone()); let mut program_runner = actors::ProgramRunner::new(zone_runner.clone());
@ -45,29 +40,24 @@ async fn main() -> Result<()> {
let mqtt_options = settings.mqtt; let mqtt_options = settings.mqtt;
// TODO: have ability to update zones / other data // TODO: have ability to update zones / other data
let request_context = mqtt::RequestContext { let request_context = mqtt::RequestContext {
zones: zones.clone(), zones: state_manager.get_zones(),
zone_runner: zone_runner.clone(), zone_runner: zone_runner.clone(),
program_runner: program_runner.clone(), program_runner: program_runner.clone(),
state_manager: state_manager.clone(), state_manager: state_manager.clone(),
}; };
let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context); let mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context);
let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone()); let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone());
update_listener.listen_zones(state_manager.get_zones());
update_listener.listen_zone_events(zone_runner.subscribe().await?); update_listener.listen_zone_events(zone_runner.subscribe().await?);
update_listener.listen_zone_runner(zone_runner.get_state_recv()); update_listener.listen_zone_runner(zone_runner.get_state_recv());
update_listener.listen_programs(state_manager.get_programs()); update_listener.listen_programs(state_manager.get_programs());
update_listener.listen_program_events(program_runner.subscribe().await?); update_listener.listen_program_events(program_runner.subscribe().await?);
// Only listen to programs now so above subscriptions get events // Only listen to programs now so above subscriptions get events
program_runner.listen_zones(state_manager.get_zones());
program_runner.listen_programs(state_manager.get_programs()); program_runner.listen_programs(state_manager.get_programs());
program_runner.update_zones(zones.clone()).await?;
// TODO: update listener should probably do this
mqtt_interface.publish_zones(&zones).await?;
for zone_id in zones.keys() {
mqtt_interface.publish_zone_state(*zone_id, false).await?;
}
info!("sprinklers_rs initialized"); info!("sprinklers_rs initialized");
tokio::signal::ctrl_c().await?; tokio::signal::ctrl_c().await?;

25
sprinklers_rs/src/state_manager.rs

@ -2,30 +2,34 @@ use sprinklers_actors::{state_manager, StateManager};
use sprinklers_database::{self as database, DbConn}; use sprinklers_database::{self as database, DbConn};
use eyre::{eyre, WrapErr}; use eyre::{eyre, WrapErr};
use sprinklers_core::model::{ProgramRef, Programs}; use sprinklers_core::model::{ProgramRef, Programs, Zones};
use tokio::{ use tokio::{
runtime, runtime,
sync::{mpsc, watch}, sync::{mpsc, watch},
}; };
use tracing::warn; use tracing::{trace, warn};
pub struct StateManagerThread { pub struct StateManagerThread {
db_conn: DbConn, db_conn: DbConn,
request_rx: mpsc::Receiver<state_manager::Request>, request_rx: mpsc::Receiver<state_manager::Request>,
zones_tx: watch::Sender<Zones>,
programs_tx: watch::Sender<Programs>, programs_tx: watch::Sender<Programs>,
} }
struct State { struct State {
zones: Zones,
programs: Programs, programs: Programs,
} }
impl StateManagerThread { impl StateManagerThread {
pub fn start(db_conn: DbConn) -> StateManager { pub fn start(db_conn: DbConn) -> StateManager {
let (request_tx, request_rx) = mpsc::channel(8); let (request_tx, request_rx) = mpsc::channel(8);
let (zones_tx, zones_rx) = watch::channel(Zones::default());
let (programs_tx, programs_rx) = watch::channel(Programs::default()); let (programs_tx, programs_rx) = watch::channel(Programs::default());
let task = StateManagerThread { let task = StateManagerThread {
db_conn, db_conn,
request_rx, request_rx,
zones_tx,
programs_tx, programs_tx,
}; };
let runtime_handle = runtime::Handle::current(); let runtime_handle = runtime::Handle::current();
@ -33,7 +37,13 @@ impl StateManagerThread {
.name("sprinklers_rs::state_manager".into()) .name("sprinklers_rs::state_manager".into())
.spawn(move || task.run(runtime_handle)) .spawn(move || task.run(runtime_handle))
.expect("could not start state_manager thread"); .expect("could not start state_manager thread");
StateManager::new(request_tx, programs_rx) StateManager::new(request_tx, zones_rx, programs_rx)
}
fn broadcast_zones(&mut self, zones: Zones) {
if let Err(err) = self.zones_tx.broadcast(zones) {
warn!("could not broadcast zones: {}", err);
}
} }
fn broadcast_programs(&mut self, programs: Programs) { fn broadcast_programs(&mut self, programs: Programs) {
@ -83,15 +93,22 @@ impl StateManagerThread {
} }
fn load_state(&mut self) -> eyre::Result<State> { fn load_state(&mut self) -> eyre::Result<State> {
let zones = database::query_zones(&self.db_conn)?;
for zone in zones.values() {
trace!(zone = debug(&zone), "read zone");
}
let programs = let programs =
database::query_programs(&self.db_conn).wrap_err("could not query programs")?; database::query_programs(&self.db_conn).wrap_err("could not query programs")?;
Ok(State { programs }) Ok(State { zones, programs })
} }
fn run(mut self, runtime_handle: runtime::Handle) { fn run(mut self, runtime_handle: runtime::Handle) {
let mut state = self.load_state().expect("could not load initial state"); let mut state = self.load_state().expect("could not load initial state");
self.broadcast_zones(state.zones.clone());
self.broadcast_programs(state.programs.clone()); self.broadcast_programs(state.programs.clone());
while let Some(request) = runtime_handle.block_on(self.request_rx.recv()) { while let Some(request) = runtime_handle.block_on(self.request_rx.recv()) {

Loading…
Cancel
Save