Browse Source

Rename section to zone

master
Alex Mikhalev 4 years ago
parent
commit
47f563c0a9
  1. 4
      sprinklers_actors/src/lib.rs
  2. 240
      sprinklers_actors/src/program_runner.rs
  3. 3
      sprinklers_actors/src/state_manager.rs
  4. 448
      sprinklers_actors/src/zone_runner.rs
  5. 2
      sprinklers_core/src/lib.rs
  6. 4
      sprinklers_core/src/model/mod.rs
  7. 6
      sprinklers_core/src/model/program.rs
  8. 27
      sprinklers_core/src/model/section.rs
  9. 27
      sprinklers_core/src/model/zone.rs
  10. 5
      sprinklers_core/src/schedule.rs
  11. 65
      sprinklers_core/src/section_interface.rs
  12. 65
      sprinklers_core/src/zone_interface.rs
  13. 16
      sprinklers_database/src/lib.rs
  14. 8
      sprinklers_database/src/migration.rs
  15. 2
      sprinklers_database/src/migrations/mod.rs
  16. 8
      sprinklers_database/src/program.rs
  17. 16
      sprinklers_database/src/section.rs
  18. 16
      sprinklers_database/src/zone.rs
  19. 16
      sprinklers_linux/src/lib.rs
  20. 4
      sprinklers_mqtt/src/actor.rs
  21. 46
      sprinklers_mqtt/src/lib.rs
  22. 35
      sprinklers_mqtt/src/request/mod.rs
  23. 161
      sprinklers_mqtt/src/request/sections.rs
  24. 160
      sprinklers_mqtt/src/request/zones.rs
  25. 18
      sprinklers_mqtt/src/topics.rs
  26. 50
      sprinklers_mqtt/src/update_listener.rs
  27. 38
      sprinklers_mqtt/src/zone_runner_json.rs
  28. 4
      sprinklers_rs/sprinklers_rs.default.json
  29. 36
      sprinklers_rs/src/main.rs
  30. 37
      sprinklers_rs/src/section_interface.rs
  31. 4
      sprinklers_rs/src/settings.rs
  32. 33
      sprinklers_rs/src/zone_interface.rs

4
sprinklers_actors/src/lib.rs

@ -1,10 +1,10 @@
pub mod program_runner; pub mod program_runner;
pub mod section_runner;
pub mod state_manager; pub mod state_manager;
pub mod zone_runner;
#[cfg(test)] #[cfg(test)]
mod trace_listeners; mod trace_listeners;
pub use program_runner::ProgramRunner; pub use program_runner::ProgramRunner;
pub use section_runner::SectionRunner;
pub use state_manager::StateManager; pub use state_manager::StateManager;
pub use zone_runner::ZoneRunner;

240
sprinklers_actors/src/program_runner.rs

@ -1,7 +1,7 @@
use crate::section_runner::{ use crate::zone_runner::{
Error as SectionRunnerError, SectionEvent, SectionEventRecv, SectionRunHandle, SectionRunner, Error as ZoneRunnerError, ZoneEvent, ZoneEventRecv, ZoneRunHandle, ZoneRunner,
}; };
use sprinklers_core::model::{ProgramId, ProgramRef, Programs, Sections}; use sprinklers_core::model::{ProgramId, ProgramRef, Programs, Zones};
use actix::{ use actix::{
Actor, ActorContext, ActorFuture, ActorStream, Addr, AsyncContext, Handler, Message, Actor, ActorContext, ActorFuture, ActorStream, Addr, AsyncContext, Handler, Message,
@ -40,7 +40,7 @@ enum RunState {
struct ProgRun { struct ProgRun {
program: ProgramRef, program: ProgramRef,
state: RunState, state: RunState,
sec_run_handles: Vec<SectionRunHandle>, zone_run_handles: Vec<ZoneRunHandle>,
} }
impl ProgRun { impl ProgRun {
@ -48,7 +48,7 @@ impl ProgRun {
Self { Self {
program, program,
state: RunState::Waiting, state: RunState::Waiting,
sec_run_handles: Vec::new(), zone_run_handles: Vec::new(),
} }
} }
} }
@ -56,8 +56,8 @@ impl ProgRun {
type RunQueue = VecDeque<ProgRun>; type RunQueue = VecDeque<ProgRun>;
struct ProgramRunnerInner { struct ProgramRunnerInner {
section_runner: SectionRunner, zone_runner: ZoneRunner,
sections: Sections, zones: Zones,
programs: Programs, programs: Programs,
event_send: Option<ProgramEventSend>, event_send: Option<ProgramEventSend>,
schedule_run_fut: Option<SpawnHandle>, schedule_run_fut: Option<SpawnHandle>,
@ -98,29 +98,29 @@ impl ProgramRunnerInner {
.program .program
.sequence .sequence
.iter() .iter()
.filter_map(|item| match self.sections.get(&item.section_id) { .filter_map(|item| match self.zones.get(&item.zone_id) {
Some(sec) => Some((sec.clone(), item.duration)), Some(zone) => Some((zone.clone(), item.duration)),
None => { None => {
warn!( warn!(
program_id = run.program.id, program_id = run.program.id,
section_id = item.section_id, zone_id = item.zone_id,
"trying to run program with nonexistant section" "trying to run program with nonexistant zone"
); );
None None
} }
}) })
.collect(); .collect();
if sequence.is_empty() { if sequence.is_empty() {
warn!(program_id = run.program.id, "program has no valid sections"); warn!(program_id = run.program.id, "program has no valid zones");
run.state = RunState::Finished; run.state = RunState::Finished;
self.send_event(ProgramEvent::RunStart(run.program.clone())); self.send_event(ProgramEvent::RunStart(run.program.clone()));
self.send_event(ProgramEvent::RunFinish(run.program.clone())); self.send_event(ProgramEvent::RunFinish(run.program.clone()));
return; return;
} }
run.sec_run_handles.reserve(sequence.len()); run.zone_run_handles.reserve(sequence.len());
for (section, duration) in sequence { for (zone, duration) in sequence {
let handle = self.section_runner.do_queue_run(section, duration); let handle = self.zone_runner.do_queue_run(zone, duration);
run.sec_run_handles.push(handle); run.zone_run_handles.push(handle);
} }
run.state = RunState::Running; run.state = RunState::Running;
self.send_event(ProgramEvent::RunStart(run.program.clone())); self.send_event(ProgramEvent::RunStart(run.program.clone()));
@ -128,8 +128,8 @@ impl ProgramRunnerInner {
} }
fn cancel_program_run(&mut self, run: &mut ProgRun) { fn cancel_program_run(&mut self, run: &mut ProgRun) {
for handle in run.sec_run_handles.drain(..) { for handle in run.zone_run_handles.drain(..) {
self.section_runner.do_cancel_run(handle); self.zone_runner.do_cancel_run(handle);
} }
debug!(program_id = run.program.id, "program run is cancelled"); debug!(program_id = run.program.id, "program run is cancelled");
self.send_event(ProgramEvent::RunCancel(run.program.clone())); self.send_event(ProgramEvent::RunCancel(run.program.clone()));
@ -171,16 +171,16 @@ impl Actor for ProgramRunnerActor {
type Context = actix::Context<Self>; type Context = actix::Context<Self>;
fn started(&mut self, ctx: &mut Self::Context) { fn started(&mut self, ctx: &mut Self::Context) {
trace!("subscribing to SectionRunner events"); trace!("subscribing to ZoneRunner events");
let subscribe_fut = self.inner.section_runner.subscribe().into_actor(self).map( let subscribe_fut = self.inner.zone_runner.subscribe().into_actor(self).map(
|section_events: Result<SectionEventRecv, SectionRunnerError>, |zone_events: Result<ZoneEventRecv, ZoneRunnerError>,
_act: &mut ProgramRunnerActor, _act: &mut ProgramRunnerActor,
ctx: &mut Self::Context| { ctx: &mut Self::Context| {
match section_events { match zone_events {
Ok(section_events) => { Ok(zone_events) => {
ctx.add_stream(section_events.into_stream()); ctx.add_stream(zone_events.into_stream());
} }
Err(err) => warn!("failed to subscribe to SectionRunner events: {}", err), Err(err) => warn!("failed to subscribe to ZoneRunner events: {}", err),
} }
}, },
); );
@ -193,22 +193,18 @@ impl Actor for ProgramRunnerActor {
} }
} }
impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for ProgramRunnerActor { impl StreamHandler<Result<ZoneEvent, broadcast::RecvError>> for ProgramRunnerActor {
fn handle( fn handle(&mut self, item: Result<ZoneEvent, broadcast::RecvError>, ctx: &mut Self::Context) {
&mut self, let zone_event = match item {
item: Result<SectionEvent, broadcast::RecvError>,
ctx: &mut Self::Context,
) {
let sec_event = match item {
Ok(e) => e, Ok(e) => e,
Err(err) => { Err(err) => {
warn!("failed to receive section event: {}", err); warn!("failed to receive zone event: {}", err);
return; return;
} }
}; };
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
match sec_event { match zone_event {
SectionEvent::RunFinish(finished_run, _) => { ZoneEvent::RunFinish(finished_run, _) => {
self.handle_finished_run(finished_run, ctx); self.handle_finished_run(finished_run, ctx);
} }
_ => {} _ => {}
@ -241,14 +237,14 @@ impl Handler<Subscribe> for ProgramRunnerActor {
#[derive(Message)] #[derive(Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
struct UpdateSections(Sections); struct UpdateZones(Zones);
impl Handler<UpdateSections> for ProgramRunnerActor { impl Handler<UpdateZones> for ProgramRunnerActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: UpdateSections, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: UpdateZones, _ctx: &mut Self::Context) -> Self::Result {
trace!("updating sections"); trace!("updating zones");
let UpdateSections(new_sections) = msg; let UpdateZones(new_zones) = msg;
self.inner.sections = new_sections; self.inner.zones = new_zones;
} }
} }
@ -379,11 +375,11 @@ impl Handler<UpdateSchedules> for ProgramRunnerActor {
} }
impl ProgramRunnerActor { impl ProgramRunnerActor {
fn new(section_runner: SectionRunner) -> Self { fn new(zone_runner: ZoneRunner) -> Self {
Self { Self {
inner: ProgramRunnerInner { inner: ProgramRunnerInner {
section_runner, zone_runner,
sections: Sections::new(), zones: Zones::new(),
programs: Programs::new(), programs: Programs::new(),
event_send: None, event_send: None,
schedule_run_fut: None, schedule_run_fut: None,
@ -394,11 +390,11 @@ impl ProgramRunnerActor {
fn handle_finished_run( fn handle_finished_run(
&mut self, &mut self,
finished_run: SectionRunHandle, finished_run: ZoneRunHandle,
ctx: &mut <ProgramRunnerActor as Actor>::Context, ctx: &mut <ProgramRunnerActor as Actor>::Context,
) -> Option<()> { ) -> Option<()> {
let current_run = self.run_queue.front_mut()?; let current_run = self.run_queue.front_mut()?;
let last_run_handle = current_run.sec_run_handles.last()?; let last_run_handle = current_run.zone_run_handles.last()?;
if finished_run == *last_run_handle { if finished_run == *last_run_handle {
current_run.state = RunState::Finished; current_run.state = RunState::Finished;
debug!( debug!(
@ -452,8 +448,8 @@ pub struct ProgramRunner {
#[allow(dead_code)] #[allow(dead_code)]
impl ProgramRunner { impl ProgramRunner {
pub fn new(section_runner: SectionRunner) -> Self { pub fn new(zone_runner: ZoneRunner) -> Self {
let addr = ProgramRunnerActor::new(section_runner).start(); let addr = ProgramRunnerActor::new(zone_runner).start();
Self { addr } Self { addr }
} }
@ -462,9 +458,9 @@ impl ProgramRunner {
Ok(()) Ok(())
} }
pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { pub async fn update_zones(&mut self, new_zones: Zones) -> Result<()> {
self.addr self.addr
.send(UpdateSections(new_sections)) .send(UpdateZones(new_zones))
.await .await
.map_err(Error::from) .map_err(Error::from)
} }
@ -512,9 +508,9 @@ mod test {
use super::*; use super::*;
use crate::trace_listeners::{EventListener, Filters}; use crate::trace_listeners::{EventListener, Filters};
use sprinklers_core::{ use sprinklers_core::{
model::{Program, ProgramItem, Section}, model::{Program, ProgramItem, Zone},
schedule::{every_day, DateTimeBound, Schedule}, schedule::{every_day, DateTimeBound, Schedule},
section_interface::{MockSectionInterface, SectionInterface}, zone_interface::{MockZoneInterface, ZoneInterface},
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
@ -533,33 +529,33 @@ mod test {
let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let subscriber = tracing_subscriber::registry().with(quit_msg.clone());
let _sub = tracing::subscriber::set_default(subscriber); let _sub = tracing::subscriber::set_default(subscriber);
let interface = MockSectionInterface::new(6); let interface = MockZoneInterface::new(6);
let mut sec_runner = SectionRunner::new(Arc::new(interface)); let mut zone_runner = ZoneRunner::new(Arc::new(interface));
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
yield_now().await; yield_now().await;
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
yield_now().await; yield_now().await;
assert_eq!(quit_msg.get_count(), 1); assert_eq!(quit_msg.get_count(), 1);
} }
fn make_sections_and_runner() -> (Sections, SectionRunner, Arc<MockSectionInterface>) { fn make_zones_and_runner() -> (Zones, ZoneRunner, Arc<MockZoneInterface>) {
let interface = Arc::new(MockSectionInterface::new(2)); let interface = Arc::new(MockZoneInterface::new(2));
let sections: Sections = ordmap![ let zones: Zones = ordmap![
1 => Section { 1 => Zone {
id: 1, id: 1,
name: "Section 1".into(), name: "Zone 1".into(),
interface_id: 0, interface_id: 0,
}.into(), }.into(),
2 => Section { 2 => Zone {
id: 2, id: 2,
name: "Section 2".into(), name: "Zone 2".into(),
interface_id: 1, interface_id: 1,
}.into() }.into()
]; ];
let sec_runner = SectionRunner::new(interface.clone()); let zone_runner = ZoneRunner::new(interface.clone());
(sections, sec_runner, interface) (zones, zone_runner, interface)
} }
fn make_program(num: ProgramId, sequence: Vec<ProgramItem>) -> ProgramRef { fn make_program(num: ProgramId, sequence: Vec<ProgramItem>) -> ProgramRef {
@ -584,26 +580,26 @@ mod test {
#[actix_rt::test] #[actix_rt::test]
async fn test_run_program() { async fn test_run_program() {
let (sections, mut sec_runner, interface) = make_sections_and_runner(); let (zones, mut zone_runner, interface) = make_zones_and_runner();
let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut zone_events = zone_runner.subscribe().await.unwrap();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program = make_program( let program = make_program(
1, 1,
vec![ vec![
ProgramItem { ProgramItem {
section_id: 1, zone_id: 1,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}, },
ProgramItem { ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}, },
], ],
); );
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.run_program(program).await.unwrap(); runner.run_program(program).await.unwrap();
yield_now().await; yield_now().await;
@ -612,48 +608,48 @@ mod test {
Ok(ProgramEvent::RunStart(prog)) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.try_recv(), Ok(SectionEvent::RunStart(_, _))); assert_matches!(zone_events.try_recv(), Ok(ZoneEvent::RunStart(_, _)));
assert_eq!(interface.get_section_state(0), true); assert_eq!(interface.get_zone_state(0), true);
tokio::time::pause(); tokio::time::pause();
assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunFinish(_, _)));
assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunStart(_, _))); assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunStart(_, _)));
assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_zone_state(0), false);
assert_eq!(interface.get_section_state(1), true); assert_eq!(interface.get_zone_state(1), true);
assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunFinish(_, _)));
assert_matches!(prog_events.recv().await, Ok(ProgramEvent::RunFinish(_))); assert_matches!(prog_events.recv().await, Ok(ProgramEvent::RunFinish(_)));
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
yield_now().await; yield_now().await;
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_run_nonexistant_section() { async fn test_run_nonexistant_zone() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program1 = make_program( let program1 = make_program(
1, 1,
vec![ProgramItem { vec![ProgramItem {
section_id: 3, zone_id: 3,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
let program2 = make_program( let program2 = make_program(
2, 2,
vec![ProgramItem { vec![ProgramItem {
section_id: 1, zone_id: 1,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.run_program(program1).await.unwrap(); runner.run_program(program1).await.unwrap();
yield_now().await; yield_now().await;
// Should immediately start and finish running program // Should immediately start and finish running program
// due to nonexistant section // due to nonexistant zone
assert_matches!( assert_matches!(
prog_events.recv().await, prog_events.recv().await,
Ok(ProgramEvent::RunStart(prog)) Ok(ProgramEvent::RunStart(prog))
@ -681,18 +677,18 @@ mod test {
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_close_event_chan() { async fn test_close_event_chan() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program = make_program(1, vec![]); let program = make_program(1, vec![]);
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.run_program(program.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap();
prog_events.recv().await.unwrap(); prog_events.recv().await.unwrap();
@ -704,32 +700,32 @@ mod test {
yield_now().await; yield_now().await;
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_run_program_id() { async fn test_run_program_id() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program1 = make_program( let program1 = make_program(
1, 1,
vec![ProgramItem { vec![ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
let program2 = make_program( let program2 = make_program(
2, 2,
vec![ProgramItem { vec![ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
let programs = ordmap![ 1 => program1, 2 => program2 ]; let programs = ordmap![ 1 => program1, 2 => program2 ];
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.update_programs(programs).await.unwrap(); runner.update_programs(programs).await.unwrap();
// First try a non-existant program id // First try a non-existant program id
@ -767,32 +763,32 @@ mod test {
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_queue_program() { async fn test_queue_program() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program1 = make_program( let program1 = make_program(
1, 1,
vec![ProgramItem { vec![ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
let program2 = make_program( let program2 = make_program(
2, 2,
vec![ProgramItem { vec![ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}], }],
); );
let programs = ordmap![ 1 => program1, 2 => program2 ]; let programs = ordmap![ 1 => program1, 2 => program2 ];
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.update_programs(programs).await.unwrap(); runner.update_programs(programs).await.unwrap();
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();
@ -822,31 +818,31 @@ mod test {
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_cancel_program() { async fn test_cancel_program() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut zone_events = zone_runner.subscribe().await.unwrap();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let program = make_program( let program = make_program(
1, 1,
vec![ vec![
ProgramItem { ProgramItem {
section_id: 1, zone_id: 1,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}, },
ProgramItem { ProgramItem {
section_id: 2, zone_id: 2,
duration: Duration::from_secs(10), duration: Duration::from_secs(10),
}, },
], ],
); );
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.run_program(program.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap();
yield_now().await; yield_now().await;
@ -855,7 +851,7 @@ mod test {
Ok(ProgramEvent::RunStart(prog)) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_, _)); assert_matches!(zone_events.try_recv().unwrap(), ZoneEvent::RunStart(_, _));
runner.cancel_program(program.id).await.unwrap(); runner.cancel_program(program.id).await.unwrap();
yield_now().await; yield_now().await;
@ -864,16 +860,16 @@ mod test {
Ok(ProgramEvent::RunCancel(prog)) Ok(ProgramEvent::RunCancel(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunCancel(_, _))); assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunCancel(_, _)));
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_scheduled_run() { async fn test_scheduled_run() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let make_programs = |num: ProgramId, enabled: bool| { let make_programs = |num: ProgramId, enabled: bool| {
@ -888,7 +884,7 @@ mod test {
let program1 = make_program_with_schedule( let program1 = make_program_with_schedule(
num, num,
vec![ProgramItem { vec![ProgramItem {
section_id: 1, zone_id: 1,
duration: Duration::from_micros(100), duration: Duration::from_micros(100),
}], }],
enabled, enabled,
@ -898,7 +894,7 @@ mod test {
programs programs
}; };
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner runner
.update_programs(make_programs(1, false)) .update_programs(make_programs(1, false))
.await .await
@ -926,13 +922,13 @@ mod test {
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_scheduled_run_twice() { async fn test_scheduled_run_twice() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (zones, mut zone_runner, _) = make_zones_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(zone_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let now = chrono::Local::now(); let now = chrono::Local::now();
@ -947,7 +943,7 @@ mod test {
let program1 = make_program_with_schedule( let program1 = make_program_with_schedule(
1, 1,
vec![ProgramItem { vec![ProgramItem {
section_id: 1, zone_id: 1,
duration: Duration::from_micros(10), duration: Duration::from_micros(10),
}], }],
true, true,
@ -955,7 +951,7 @@ mod test {
); );
let programs = ordmap![ 1 => program1 ]; let programs = ordmap![ 1 => program1 ];
runner.update_sections(sections.clone()).await.unwrap(); runner.update_zones(zones.clone()).await.unwrap();
runner.update_programs(programs).await.unwrap(); runner.update_programs(programs).await.unwrap();
let fut = async move { let fut = async move {
@ -997,6 +993,6 @@ mod test {
.unwrap(); .unwrap();
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); zone_runner.quit().await.unwrap();
} }
} }

3
sprinklers_actors/src/state_manager.rs

@ -54,7 +54,8 @@ impl StateManager {
update, update,
resp_tx, resp_tx,
}) })
.await.map_err(eyre::Report::from)?; .await
.map_err(eyre::Report::from)?;
resp_rx.await.map_err(eyre::Report::from)? resp_rx.await.map_err(eyre::Report::from)?
} }

448
sprinklers_actors/src/section_runner.rs → sprinklers_actors/src/zone_runner.rs

@ -1,5 +1,5 @@
use sprinklers_core::model::{SectionId, SectionRef}; use sprinklers_core::model::{ZoneId, ZoneRef};
use sprinklers_core::section_interface::SectionInterface; use sprinklers_core::zone_interface::ZoneInterface;
use actix::{ use actix::{
Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle, Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle,
@ -22,32 +22,32 @@ use tokio::{
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
pub struct SectionRunHandle(i32); pub struct ZoneRunHandle(i32);
impl SectionRunHandle { impl ZoneRunHandle {
pub fn into_inner(self) -> i32 { pub fn into_inner(self) -> i32 {
self.0 self.0
} }
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum SectionEvent { pub enum ZoneEvent {
RunStart(SectionRunHandle, SectionRef), RunStart(ZoneRunHandle, ZoneRef),
RunFinish(SectionRunHandle, SectionRef), RunFinish(ZoneRunHandle, ZoneRef),
RunPause(SectionRunHandle, SectionRef), RunPause(ZoneRunHandle, ZoneRef),
RunUnpause(SectionRunHandle, SectionRef), RunUnpause(ZoneRunHandle, ZoneRef),
RunCancel(SectionRunHandle, SectionRef), RunCancel(ZoneRunHandle, ZoneRef),
RunnerPause, RunnerPause,
RunnerUnpause, RunnerUnpause,
} }
pub type SectionEventRecv = broadcast::Receiver<SectionEvent>; pub type ZoneEventRecv = broadcast::Receiver<ZoneEvent>;
type SectionEventSend = broadcast::Sender<SectionEvent>; type ZoneEventSend = broadcast::Sender<ZoneEvent>;
const EVENT_CAPACITY: usize = 8; const EVENT_CAPACITY: usize = 8;
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]
pub enum SecRunState { pub enum ZoneRunState {
Waiting, Waiting,
Running { Running {
start_time: Instant, start_time: Instant,
@ -61,64 +61,64 @@ pub enum SecRunState {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SecRun { pub struct ZoneRun {
pub handle: SectionRunHandle, pub handle: ZoneRunHandle,
pub section: SectionRef, pub zone: ZoneRef,
pub duration: Duration, pub duration: Duration,
pub total_duration: Duration, pub total_duration: Duration,
pub state: SecRunState, pub state: ZoneRunState,
} }
impl SecRun { impl ZoneRun {
fn new(handle: SectionRunHandle, section: SectionRef, duration: Duration) -> Self { fn new(handle: ZoneRunHandle, zone: ZoneRef, duration: Duration) -> Self {
Self { Self {
handle, handle,
section, zone,
duration, duration,
total_duration: duration, total_duration: duration,
state: SecRunState::Waiting, state: ZoneRunState::Waiting,
} }
} }
pub fn is_running(&self) -> bool { pub fn is_running(&self) -> bool {
matches!(self.state, SecRunState::Running{..}) matches!(self.state, ZoneRunState::Running{..})
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn is_paused(&self) -> bool { pub fn is_paused(&self) -> bool {
matches!(self.state, SecRunState::Paused{..}) matches!(self.state, ZoneRunState::Paused{..})
} }
} }
pub type SecRunQueue = im::Vector<Arc<SecRun>>; pub type ZoneRunQueue = im::Vector<Arc<ZoneRun>>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SecRunnerState { pub struct ZoneRunnerState {
pub run_queue: SecRunQueue, pub run_queue: ZoneRunQueue,
pub paused: bool, pub paused: bool,
} }
impl Default for SecRunnerState { impl Default for ZoneRunnerState {
fn default() -> Self { fn default() -> Self {
Self { Self {
run_queue: SecRunQueue::default(), run_queue: ZoneRunQueue::default(),
paused: false, paused: false,
} }
} }
} }
pub type SecRunnerStateRecv = watch::Receiver<SecRunnerState>; pub type ZoneRunnerStateRecv = watch::Receiver<ZoneRunnerState>;
struct SectionRunnerInner { struct ZoneRunnerInner {
interface: Arc<dyn SectionInterface>, interface: Arc<dyn ZoneInterface>,
event_send: Option<SectionEventSend>, event_send: Option<ZoneEventSend>,
state_send: watch::Sender<SecRunnerState>, state_send: watch::Sender<ZoneRunnerState>,
delay_future: Option<SpawnHandle>, delay_future: Option<SpawnHandle>,
did_change: bool, did_change: bool,
} }
impl SectionRunnerInner { impl ZoneRunnerInner {
fn send_event(&mut self, event: SectionEvent) { fn send_event(&mut self, event: ZoneEvent) {
if let Some(event_send) = &mut self.event_send { if let Some(event_send) = &mut self.event_send {
match event_send.send(event) { match event_send.send(event) {
Ok(_) => {} Ok(_) => {}
@ -129,7 +129,7 @@ impl SectionRunnerInner {
} }
} }
fn subscribe_event(&mut self) -> SectionEventRecv { fn subscribe_event(&mut self) -> ZoneEventRecv {
match &mut self.event_send { match &mut self.event_send {
Some(event_send) => event_send.subscribe(), Some(event_send) => event_send.subscribe(),
None => { None => {
@ -140,56 +140,44 @@ impl SectionRunnerInner {
} }
} }
fn start_run(&mut self, run: &mut Arc<SecRun>) { fn start_run(&mut self, run: &mut Arc<ZoneRun>) {
use SecRunState::*; use ZoneRunState::*;
let run = Arc::make_mut(run); let run = Arc::make_mut(run);
debug!(section_id = run.section.id, "starting running section"); debug!(zone_id = run.zone.id, "starting running zone");
self.interface self.interface.set_zone_state(run.zone.interface_id, true);
.set_section_state(run.section.interface_id, true);
run.state = Running { run.state = Running {
start_time: Instant::now(), start_time: Instant::now(),
}; };
self.send_event(SectionEvent::RunStart( self.send_event(ZoneEvent::RunStart(run.handle.clone(), run.zone.clone()));
run.handle.clone(),
run.section.clone(),
));
self.did_change = true; self.did_change = true;
} }
fn finish_run(&mut self, run: &mut Arc<SecRun>) { fn finish_run(&mut self, run: &mut Arc<ZoneRun>) {
let run = Arc::make_mut(run); let run = Arc::make_mut(run);
if run.is_running() { if run.is_running() {
debug!(section_id = run.section.id, "finished running section"); debug!(zone_id = run.zone.id, "finished running zone");
self.interface self.interface.set_zone_state(run.zone.interface_id, false);
.set_section_state(run.section.interface_id, false); run.state = ZoneRunState::Finished;
run.state = SecRunState::Finished; self.send_event(ZoneEvent::RunFinish(run.handle.clone(), run.zone.clone()));
self.send_event(SectionEvent::RunFinish(
run.handle.clone(),
run.section.clone(),
));
self.did_change = true; self.did_change = true;
} else { } else {
warn!( warn!(
section_id = run.section.id, zone_id = run.zone.id,
state = debug(&run.state), state = debug(&run.state),
"cannot finish run which is not running" "cannot finish run which is not running"
); );
} }
} }
fn cancel_run(&mut self, run: &mut Arc<SecRun>) -> bool { fn cancel_run(&mut self, run: &mut Arc<ZoneRun>) -> bool {
let run = Arc::make_mut(run); let run = Arc::make_mut(run);
if run.is_running() { if run.is_running() {
debug!(section_id = run.section.id, "cancelling running section"); debug!(zone_id = run.zone.id, "cancelling running zone");
self.interface self.interface.set_zone_state(run.zone.interface_id, false);
.set_section_state(run.section.interface_id, false);
} }
if run.state != SecRunState::Cancelled { if run.state != ZoneRunState::Cancelled {
run.state = SecRunState::Cancelled; run.state = ZoneRunState::Cancelled;
self.send_event(SectionEvent::RunCancel( self.send_event(ZoneEvent::RunCancel(run.handle.clone(), run.zone.clone()));
run.handle.clone(),
run.section.clone(),
));
self.did_change = true; self.did_change = true;
true true
} else { } else {
@ -197,21 +185,20 @@ impl SectionRunnerInner {
} }
} }
fn pause_run(&mut self, run: &mut Arc<SecRun>) { fn pause_run(&mut self, run: &mut Arc<ZoneRun>) {
use SecRunState::*; use ZoneRunState::*;
let run = Arc::make_mut(run); let run = Arc::make_mut(run);
let new_state = match run.state { let new_state = match run.state {
Running { start_time } => { Running { start_time } => {
debug!(section_id = run.section.id, "pausing running section"); debug!(zone_id = run.zone.id, "pausing running zone");
self.interface self.interface.set_zone_state(run.zone.interface_id, false);
.set_section_state(run.section.interface_id, false);
Paused { Paused {
start_time, start_time,
pause_time: Instant::now(), pause_time: Instant::now(),
} }
} }
Waiting => { Waiting => {
debug!(section_id = run.section.id, "pausing waiting section"); debug!(zone_id = run.zone.id, "pausing waiting zone");
Paused { Paused {
start_time: Instant::now(), start_time: Instant::now(),
pause_time: Instant::now(), pause_time: Instant::now(),
@ -222,39 +209,32 @@ impl SectionRunnerInner {
} }
}; };
run.state = new_state; run.state = new_state;
self.send_event(SectionEvent::RunPause( self.send_event(ZoneEvent::RunPause(run.handle.clone(), run.zone.clone()));
run.handle.clone(),
run.section.clone(),
));
self.did_change = true; self.did_change = true;
} }
fn unpause_run(&mut self, run: &mut Arc<SecRun>) { fn unpause_run(&mut self, run: &mut Arc<ZoneRun>) {
use SecRunState::*; use ZoneRunState::*;
let run = Arc::make_mut(run); let run = Arc::make_mut(run);
match run.state { match run.state {
Paused { Paused {
start_time, start_time,
pause_time, pause_time,
} => { } => {
debug!(section_id = run.section.id, "unpausing section"); debug!(zone_id = run.zone.id, "unpausing zone");
self.interface self.interface.set_zone_state(run.zone.interface_id, true);
.set_section_state(run.section.interface_id, true);
run.state = Running { run.state = Running {
start_time: Instant::now(), start_time: Instant::now(),
}; };
let ran_for = pause_time - start_time; let ran_for = pause_time - start_time;
run.duration -= ran_for; run.duration -= ran_for;
self.send_event(SectionEvent::RunUnpause( self.send_event(ZoneEvent::RunUnpause(run.handle.clone(), run.zone.clone()));
run.handle.clone(),
run.section.clone(),
));
} }
Waiting | Finished | Cancelled | Running { .. } => { Waiting | Finished | Cancelled | Running { .. } => {
warn!( warn!(
section_id = run.section.id, zone_id = run.zone.id,
state = debug(&run.state), state = debug(&run.state),
"can only unpause paused section" "can only unpause paused zone"
); );
} }
} }
@ -264,7 +244,7 @@ impl SectionRunnerInner {
fn process_after_delay( fn process_after_delay(
&mut self, &mut self,
after: Duration, after: Duration,
ctx: &mut <SectionRunnerActor as Actor>::Context, ctx: &mut <ZoneRunnerActor as Actor>::Context,
) { ) {
let delay_future = ctx.notify_later(Process, after); let delay_future = ctx.notify_later(Process, after);
if let Some(old_future) = self.delay_future.replace(delay_future) { if let Some(old_future) = self.delay_future.replace(delay_future) {
@ -272,32 +252,32 @@ impl SectionRunnerInner {
} }
} }
fn cancel_process(&mut self, ctx: &mut <SectionRunnerActor as Actor>::Context) { fn cancel_process(&mut self, ctx: &mut <ZoneRunnerActor as Actor>::Context) {
if let Some(old_future) = self.delay_future.take() { if let Some(old_future) = self.delay_future.take() {
ctx.cancel_future(old_future); ctx.cancel_future(old_future);
} }
} }
} }
struct SectionRunnerActor { struct ZoneRunnerActor {
state: SecRunnerState, state: ZoneRunnerState,
inner: SectionRunnerInner, inner: ZoneRunnerInner,
} }
impl Actor for SectionRunnerActor { impl Actor for ZoneRunnerActor {
type Context = actix::Context<Self>; type Context = actix::Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) { fn started(&mut self, _ctx: &mut Self::Context) {
trace!("section_runner starting"); trace!("zone_runner starting");
for i in 0..self.inner.interface.num_sections() { for i in 0..self.inner.interface.num_zones() {
self.inner.interface.set_section_state(i, false); self.inner.interface.set_zone_state(i, false);
} }
} }
fn stopped(&mut self, _ctx: &mut Self::Context) { fn stopped(&mut self, _ctx: &mut Self::Context) {
trace!("section_runner stopped"); trace!("zone_runner stopped");
for i in 0..self.inner.interface.num_sections() { for i in 0..self.inner.interface.num_zones() {
self.inner.interface.set_section_state(i, false); self.inner.interface.set_zone_state(i, false);
} }
} }
} }
@ -306,7 +286,7 @@ impl Actor for SectionRunnerActor {
#[rtype(result = "()")] #[rtype(result = "()")]
struct Quit; struct Quit;
impl Handler<Quit> for SectionRunnerActor { impl Handler<Quit> for ZoneRunnerActor {
type Result = (); type Result = ();
fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result {
@ -316,15 +296,15 @@ impl Handler<Quit> for SectionRunnerActor {
#[derive(Message, Debug, Clone)] #[derive(Message, Debug, Clone)]
#[rtype(result = "()")] #[rtype(result = "()")]
struct QueueRun(SectionRunHandle, SectionRef, Duration); struct QueueRun(ZoneRunHandle, ZoneRef, Duration);
impl Handler<QueueRun> for SectionRunnerActor { impl Handler<QueueRun> for ZoneRunnerActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result {
let QueueRun(handle, section, duration) = msg; let QueueRun(handle, zone, duration) = msg;
let run: Arc<SecRun> = SecRun::new(handle, section, duration).into(); let run: Arc<ZoneRun> = ZoneRun::new(handle, zone, duration).into();
self.state.run_queue.push_back(run); self.state.run_queue.push_back(run);
self.inner.did_change = true; self.inner.did_change = true;
@ -334,9 +314,9 @@ impl Handler<QueueRun> for SectionRunnerActor {
#[derive(Message, Debug, Clone)] #[derive(Message, Debug, Clone)]
#[rtype(result = "bool")] #[rtype(result = "bool")]
struct CancelRun(SectionRunHandle); struct CancelRun(ZoneRunHandle);
impl Handler<CancelRun> for SectionRunnerActor { impl Handler<CancelRun> for ZoneRunnerActor {
type Result = bool; type Result = bool;
fn handle(&mut self, msg: CancelRun, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: CancelRun, ctx: &mut Self::Context) -> Self::Result {
@ -358,25 +338,21 @@ impl Handler<CancelRun> for SectionRunnerActor {
#[derive(Message, Debug, Clone)] #[derive(Message, Debug, Clone)]
#[rtype(result = "usize")] #[rtype(result = "usize")]
struct CancelBySection(SectionId); struct CancelByZone(ZoneId);
impl Handler<CancelBySection> for SectionRunnerActor { impl Handler<CancelByZone> for ZoneRunnerActor {
type Result = usize; type Result = usize;
fn handle(&mut self, msg: CancelBySection, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: CancelByZone, ctx: &mut Self::Context) -> Self::Result {
let CancelBySection(section_id) = msg; let CancelByZone(zone_id) = msg;
let mut count = 0_usize; let mut count = 0_usize;
for run in self for run in self
.state .state
.run_queue .run_queue
.iter_mut() .iter_mut()
.filter(|run| run.section.id == section_id) .filter(|run| run.zone.id == zone_id)
{ {
trace!( trace!(handle = run.handle.0, zone_id, "cancelling run by zone");
handle = run.handle.0,
section_id,
"cancelling run by section"
);
if self.inner.cancel_run(run) { if self.inner.cancel_run(run) {
count += 1; count += 1;
} }
@ -390,11 +366,11 @@ impl Handler<CancelBySection> for SectionRunnerActor {
#[rtype(result = "usize")] #[rtype(result = "usize")]
struct CancelAll; struct CancelAll;
impl Handler<CancelAll> for SectionRunnerActor { impl Handler<CancelAll> for ZoneRunnerActor {
type Result = usize; type Result = usize;
fn handle(&mut self, _msg: CancelAll, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: CancelAll, ctx: &mut Self::Context) -> Self::Result {
let mut old_runs = SecRunQueue::new(); let mut old_runs = ZoneRunQueue::new();
swap(&mut old_runs, &mut self.state.run_queue); swap(&mut old_runs, &mut self.state.run_queue);
trace!(count = old_runs.len(), "cancelling all runs"); trace!(count = old_runs.len(), "cancelling all runs");
let mut count = 0usize; let mut count = 0usize;
@ -412,7 +388,7 @@ impl Handler<CancelAll> for SectionRunnerActor {
#[rtype(result = "()")] #[rtype(result = "()")]
struct SetPaused(bool); struct SetPaused(bool);
impl Handler<SetPaused> for SectionRunnerActor { impl Handler<SetPaused> for ZoneRunnerActor {
type Result = (); type Result = ();
fn handle(&mut self, msg: SetPaused, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, msg: SetPaused, ctx: &mut Self::Context) -> Self::Result {
@ -420,10 +396,10 @@ impl Handler<SetPaused> for SectionRunnerActor {
if pause != self.state.paused { if pause != self.state.paused {
if pause { if pause {
self.state.paused = true; self.state.paused = true;
self.inner.send_event(SectionEvent::RunnerPause); self.inner.send_event(ZoneEvent::RunnerPause);
} else { } else {
self.state.paused = false; self.state.paused = false;
self.inner.send_event(SectionEvent::RunnerUnpause); self.inner.send_event(ZoneEvent::RunnerUnpause);
} }
self.inner.did_change = true; self.inner.did_change = true;
ctx.notify(Process); ctx.notify(Process);
@ -432,10 +408,10 @@ impl Handler<SetPaused> for SectionRunnerActor {
} }
#[derive(Message, Debug, Clone)] #[derive(Message, Debug, Clone)]
#[rtype(result = "SectionEventRecv")] #[rtype(result = "ZoneEventRecv")]
struct Subscribe; struct Subscribe;
impl Handler<Subscribe> for SectionRunnerActor { impl Handler<Subscribe> for ZoneRunnerActor {
type Result = MessageResult<Subscribe>; type Result = MessageResult<Subscribe>;
fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
@ -448,7 +424,7 @@ impl Handler<Subscribe> for SectionRunnerActor {
#[rtype(result = "()")] #[rtype(result = "()")]
struct Process; struct Process;
impl Handler<Process> for SectionRunnerActor { impl Handler<Process> for ZoneRunnerActor {
type Result = (); type Result = ();
fn handle(&mut self, _msg: Process, ctx: &mut Self::Context) -> Self::Result { fn handle(&mut self, _msg: Process, ctx: &mut Self::Context) -> Self::Result {
@ -456,14 +432,11 @@ impl Handler<Process> for SectionRunnerActor {
} }
} }
impl SectionRunnerActor { impl ZoneRunnerActor {
fn new( fn new(interface: Arc<dyn ZoneInterface>, state_send: watch::Sender<ZoneRunnerState>) -> Self {
interface: Arc<dyn SectionInterface>,
state_send: watch::Sender<SecRunnerState>,
) -> Self {
Self { Self {
state: SecRunnerState::default(), state: ZoneRunnerState::default(),
inner: SectionRunnerInner { inner: ZoneRunnerInner {
interface, interface,
event_send: None, event_send: None,
state_send, state_send,
@ -474,7 +447,7 @@ impl SectionRunnerActor {
} }
fn process_queue(&mut self, ctx: &mut actix::Context<Self>) { fn process_queue(&mut self, ctx: &mut actix::Context<Self>) {
use SecRunState::*; use ZoneRunState::*;
let state = &mut self.state; let state = &mut self.state;
while let Some(current_run) = state.run_queue.front_mut() { while let Some(current_run) = state.run_queue.front_mut() {
let run_finished = match (&current_run.state, state.paused) { let run_finished = match (&current_run.state, state.paused) {
@ -528,23 +501,23 @@ impl SectionRunnerActor {
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
#[error("error communicating with SectionRunner: {0}")] #[error("error communicating with ZoneRunner: {0}")]
pub struct Error(#[from] actix::MailboxError); pub struct Error(#[from] actix::MailboxError);
pub type Result<T, E = Error> = std::result::Result<T, E>; pub type Result<T, E = Error> = std::result::Result<T, E>;
#[derive(Clone)] #[derive(Clone)]
pub struct SectionRunner { pub struct ZoneRunner {
state_recv: SecRunnerStateRecv, state_recv: ZoneRunnerStateRecv,
addr: Addr<SectionRunnerActor>, addr: Addr<ZoneRunnerActor>,
next_run_id: Arc<AtomicI32>, next_run_id: Arc<AtomicI32>,
} }
#[allow(dead_code)] #[allow(dead_code)]
impl SectionRunner { impl ZoneRunner {
pub fn new(interface: Arc<dyn SectionInterface>) -> Self { pub fn new(interface: Arc<dyn ZoneInterface>) -> Self {
let (state_send, state_recv) = watch::channel(SecRunnerState::default()); let (state_send, state_recv) = watch::channel(ZoneRunnerState::default());
let addr = SectionRunnerActor::new(interface, state_send).start(); let addr = ZoneRunnerActor::new(interface, state_send).start();
Self { Self {
state_recv, state_recv,
addr, addr,
@ -556,49 +529,40 @@ impl SectionRunner {
self.addr.send(Quit).map_err(From::from) self.addr.send(Quit).map_err(From::from)
} }
fn queue_run_inner( fn queue_run_inner(&mut self, zone: ZoneRef, duration: Duration) -> (QueueRun, ZoneRunHandle) {
&mut self,
section: SectionRef,
duration: Duration,
) -> (QueueRun, SectionRunHandle) {
let run_id = self.next_run_id.fetch_add(1, Ordering::SeqCst); let run_id = self.next_run_id.fetch_add(1, Ordering::SeqCst);
let handle = SectionRunHandle(run_id); let handle = ZoneRunHandle(run_id);
(QueueRun(handle.clone(), section, duration), handle) (QueueRun(handle.clone(), zone, duration), handle)
} }
pub fn do_queue_run(&mut self, section: SectionRef, duration: Duration) -> SectionRunHandle { pub fn do_queue_run(&mut self, zone: ZoneRef, duration: Duration) -> ZoneRunHandle {
let (queue_run, handle) = self.queue_run_inner(section, duration); let (queue_run, handle) = self.queue_run_inner(zone, duration);
self.addr.do_send(queue_run); self.addr.do_send(queue_run);
handle handle
} }
pub fn queue_run( pub fn queue_run(
&mut self, &mut self,
section: SectionRef, zone: ZoneRef,
duration: Duration, duration: Duration,
) -> impl Future<Output = Result<SectionRunHandle>> { ) -> impl Future<Output = Result<ZoneRunHandle>> {
let (queue_run, handle) = self.queue_run_inner(section, duration); let (queue_run, handle) = self.queue_run_inner(zone, duration);
self.addr self.addr
.send(queue_run) .send(queue_run)
.map_err(From::from) .map_err(From::from)
.map_ok(move |_| handle) .map_ok(move |_| handle)
} }
pub fn do_cancel_run(&mut self, handle: SectionRunHandle) { pub fn do_cancel_run(&mut self, handle: ZoneRunHandle) {
self.addr.do_send(CancelRun(handle)) self.addr.do_send(CancelRun(handle))
} }
pub fn cancel_run(&mut self, handle: SectionRunHandle) -> impl Future<Output = Result<bool>> { pub fn cancel_run(&mut self, handle: ZoneRunHandle) -> impl Future<Output = Result<bool>> {
self.addr.send(CancelRun(handle)).map_err(From::from) self.addr.send(CancelRun(handle)).map_err(From::from)
} }
pub fn cancel_by_section( pub fn cancel_by_zone(&mut self, zone_id: ZoneId) -> impl Future<Output = Result<usize>> {
&mut self, self.addr.send(CancelByZone(zone_id)).map_err(From::from)
section_id: SectionId,
) -> impl Future<Output = Result<usize>> {
self.addr
.send(CancelBySection(section_id))
.map_err(From::from)
} }
pub fn cancel_all(&mut self) -> impl Future<Output = Result<usize>> { pub fn cancel_all(&mut self) -> impl Future<Output = Result<usize>> {
@ -613,11 +577,11 @@ impl SectionRunner {
self.addr.send(SetPaused(false)).map_err(From::from) self.addr.send(SetPaused(false)).map_err(From::from)
} }
pub fn subscribe(&mut self) -> impl Future<Output = Result<SectionEventRecv>> { pub fn subscribe(&mut self) -> impl Future<Output = Result<ZoneEventRecv>> {
self.addr.send(Subscribe).map_err(From::from) self.addr.send(Subscribe).map_err(From::from)
} }
pub fn get_state_recv(&self) -> SecRunnerStateRecv { pub fn get_state_recv(&self) -> ZoneRunnerStateRecv {
self.state_recv.clone() self.state_recv.clone()
} }
} }
@ -627,8 +591,8 @@ mod test {
use super::*; use super::*;
use crate::trace_listeners::{EventListener, Filters}; use crate::trace_listeners::{EventListener, Filters};
use sprinklers_core::{ use sprinklers_core::{
model::{Section, Sections}, model::{Zone, Zones},
section_interface::MockSectionInterface, zone_interface::MockZoneInterface,
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
@ -639,43 +603,43 @@ mod test {
async fn test_quit() { async fn test_quit() {
let quit_msg = EventListener::new( let quit_msg = EventListener::new(
Filters::new() Filters::new()
.target("sprinklers_actors::section_runner") .target("sprinklers_actors::zone_runner")
.message("section_runner stopped"), .message("zone_runner stopped"),
); );
let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let subscriber = tracing_subscriber::registry().with(quit_msg.clone());
let _sub = tracing::subscriber::set_default(subscriber); let _sub = tracing::subscriber::set_default(subscriber);
let interface = MockSectionInterface::new(6); let interface = MockZoneInterface::new(6);
let mut runner = SectionRunner::new(Arc::new(interface)); let mut runner = ZoneRunner::new(Arc::new(interface));
tokio::task::yield_now().await; tokio::task::yield_now().await;
runner.quit().await.unwrap(); runner.quit().await.unwrap();
assert_eq!(quit_msg.get_count(), 1); assert_eq!(quit_msg.get_count(), 1);
} }
fn make_sections_and_interface() -> (Sections, Arc<MockSectionInterface>) { fn make_zones_and_interface() -> (Zones, Arc<MockZoneInterface>) {
let interface = Arc::new(MockSectionInterface::new(2)); let interface = Arc::new(MockZoneInterface::new(2));
let sections: Sections = ordmap![ let zones: Zones = ordmap![
1 => Section { 1 => Zone {
id: 1, id: 1,
name: "Section 1".into(), name: "Zone 1".into(),
interface_id: 0, interface_id: 0,
}.into(), }.into(),
2 => Section { 2 => Zone {
id: 2, id: 2,
name: "Section 2".into(), name: "Zone 2".into(),
interface_id: 1, interface_id: 1,
}.into() }.into()
]; ];
(sections, interface) (zones, interface)
} }
fn assert_section_states(interface: &MockSectionInterface, states: &[bool]) { fn assert_zone_states(interface: &MockZoneInterface, states: &[bool]) {
for (id, state) in states.iter().enumerate() { for (id, state) in states.iter().enumerate() {
assert_eq!( assert_eq!(
interface.get_section_state(id as u32), interface.get_zone_state(id as u32),
*state, *state,
"section interface id {} did not match", "zone interface id {} did not match",
id id
); );
} }
@ -691,236 +655,236 @@ mod test {
#[actix_rt::test] #[actix_rt::test]
async fn test_queue() { async fn test_queue() {
let (sections, interface) = make_sections_and_interface(); let (zones, interface) = make_zones_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = ZoneRunner::new(interface.clone());
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
// Queue single section, make sure it runs // Queue single zone, make sure it runs
runner runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[true, false]); assert_zone_states(&interface, &[true, false]);
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
// Queue two sections, make sure they run one at a time // Queue two zones, make sure they run one at a time
runner runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[true, false]); assert_zone_states(&interface, &[true, false]);
advance(Duration::from_secs(10)).await; advance(Duration::from_secs(10)).await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_cancel_run() { async fn test_cancel_run() {
let (sections, interface) = make_sections_and_interface(); let (zones, interface) = make_zones_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = ZoneRunner::new(interface.clone());
let run1 = runner let run1 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let _run2 = runner let _run2 = runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run3 = runner let run3 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
runner.cancel_run(run1).await.unwrap(); runner.cancel_run(run1).await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[true, false]); assert_zone_states(&interface, &[true, false]);
runner.cancel_run(run3).await.unwrap(); runner.cancel_run(run3).await.unwrap();
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_cancel_all() { async fn test_cancel_all() {
let (sections, interface) = make_sections_and_interface(); let (zones, interface) = make_zones_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = ZoneRunner::new(interface.clone());
runner runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
runner runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
runner.cancel_all().await.unwrap(); runner.cancel_all().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.cancel_all().await.unwrap(); runner.cancel_all().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_pause() { async fn test_pause() {
let (sections, interface) = make_sections_and_interface(); let (zones, interface) = make_zones_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = ZoneRunner::new(interface.clone());
let _run1 = runner let _run1 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run2 = runner let run2 = runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let _run3 = runner let _run3 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
runner.pause().await.unwrap(); runner.pause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
advance(Duration::from_secs(10)).await; advance(Duration::from_secs(10)).await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
advance(Duration::from_secs(8)).await; advance(Duration::from_secs(8)).await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
advance(Duration::from_secs(2)).await; advance(Duration::from_secs(2)).await;
assert_section_states(&interface, &[true, false]); assert_zone_states(&interface, &[true, false]);
runner.pause().await.unwrap(); runner.pause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
// cancel paused run // cancel paused run
runner.cancel_run(run2).await.unwrap(); runner.cancel_run(run2).await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
tokio::task::yield_now().await; tokio::task::yield_now().await;
assert_section_states(&interface, &[false, true]); assert_zone_states(&interface, &[false, true]);
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_section_states(&interface, &[false, false]); assert_zone_states(&interface, &[false, false]);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[actix_rt::test] #[actix_rt::test]
async fn test_event() { async fn test_event() {
let (sections, interface) = make_sections_and_interface(); let (zones, interface) = make_zones_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = ZoneRunner::new(interface.clone());
let mut event_recv = runner.subscribe().await.unwrap(); let mut event_recv = runner.subscribe().await.unwrap();
let run1 = runner let run1 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run2 = runner let run2 = runner
.queue_run(sections[&1].clone(), Duration::from_secs(10)) .queue_run(zones[&1].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
let run3 = runner let run3 = runner
.queue_run(sections[&2].clone(), Duration::from_secs(10)) .queue_run(zones[&2].clone(), Duration::from_secs(10))
.await .await
.unwrap(); .unwrap();
assert_matches!( assert_matches!(
event_recv.recv().await, event_recv.recv().await,
Ok(SectionEvent::RunStart(handle, _)) Ok(ZoneEvent::RunStart(handle, _))
if handle == run1 if handle == run1
); );
runner.pause().await.unwrap(); runner.pause().await.unwrap();
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerPause));
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run1); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run1);
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerUnpause));
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunUnpause(handle, _)) if handle == run1); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunUnpause(handle, _)) if handle == run1);
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunFinish(handle, _)) if handle == run1); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunFinish(handle, _)) if handle == run1);
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunStart(handle, _)) if handle == run2); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunStart(handle, _)) if handle == run2);
runner.pause().await.unwrap(); runner.pause().await.unwrap();
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerPause));
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run2); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run2);
// cancel paused run // cancel paused run
runner.cancel_run(run2.clone()).await.unwrap(); runner.cancel_run(run2.clone()).await.unwrap();
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunCancel(handle, _)) if handle == run2); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunCancel(handle, _)) if handle == run2);
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run3); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run3);
runner.unpause().await.unwrap(); runner.unpause().await.unwrap();
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerUnpause));
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunUnpause(handle, _)) if handle == run3); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunUnpause(handle, _)) if handle == run3);
advance(Duration::from_secs(11)).await; advance(Duration::from_secs(11)).await;
assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunFinish(handle, _)) if handle == run3); assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunFinish(handle, _)) if handle == run3);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }

2
sprinklers_core/src/lib.rs

@ -1,4 +1,4 @@
pub mod model; pub mod model;
pub mod schedule; pub mod schedule;
pub mod serde; pub mod serde;
pub mod section_interface; pub mod zone_interface;

4
sprinklers_core/src/model/mod.rs

@ -1,7 +1,7 @@
//! Domain specific data models //! Domain specific data models
mod program; mod program;
mod section; mod zone;
pub use program::*; pub use program::*;
pub use section::*; pub use zone::*;

6
sprinklers_core/src/model/program.rs

@ -1,4 +1,4 @@
use super::section::SectionId; use super::zone::ZoneId;
use crate::schedule::Schedule; use crate::schedule::Schedule;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
@ -6,7 +6,9 @@ use std::{sync::Arc, time::Duration};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ProgramItem { pub struct ProgramItem {
pub section_id: SectionId, // TODO: update nomenclature
#[serde(rename = "sectionId")]
pub zone_id: ZoneId,
#[serde(with = "crate::serde::duration_secs")] #[serde(with = "crate::serde::duration_secs")]
pub duration: Duration, pub duration: Duration,
} }

27
sprinklers_core/src/model/section.rs

@ -1,27 +0,0 @@
//! Data models for sprinklers sections
//!
//! A section represents a group of sprinkler heads actuated by a single
//! valve. Physically controllable (or virtual) valves are handled by implementations of
//! [SectionInterface](../../section_interface/trait.SectionInterface.html), but the model
//! describes a logical section and how it maps to a physical one.
use crate::section_interface::SecId;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// Identifying integer type for a Section
pub type SectionId = u32;
/// A single logical section
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Section {
pub id: SectionId,
pub name: String,
/// ID number of the corresponding physical section
pub interface_id: SecId,
}
pub type SectionRef = Arc<Section>;
pub type Sections = im::OrdMap<SectionId, SectionRef>;

27
sprinklers_core/src/model/zone.rs

@ -0,0 +1,27 @@
//! Data models for sprinklers zones
//!
//! A zone represents a group of sprinkler heads actuated by a single
//! valve. Physically controllable (or virtual) valves are handled by implementations of
//! [ZoneInterface](../../zone_interface/trait.ZoneInterface.html), but the model
//! describes a logical zone and how it maps to a physical one.
use crate::zone_interface::ZoneNum;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// Identifying integer type for a Zone
pub type ZoneId = u32;
/// A single logical zone
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Zone {
pub id: ZoneId,
pub name: String,
/// ID number of the corresponding physical zone
pub interface_id: ZoneNum,
}
pub type ZoneRef = Arc<Zone>;
pub type Zones = im::OrdMap<ZoneId, ZoneRef>;

5
sprinklers_core/src/schedule.rs

@ -1,9 +1,6 @@
//! Scheduling for events to run at certain intervals in the future //! Scheduling for events to run at certain intervals in the future
use chrono::{ use chrono::{Date, DateTime, Datelike, Local, NaiveDateTime, NaiveTime, TimeZone, Weekday};
Date, DateTime, Datelike, Local, NaiveDateTime, NaiveTime, TimeZone,
Weekday,
};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp; use std::cmp;
use std::iter::FromIterator; use std::iter::FromIterator;

65
sprinklers_core/src/section_interface.rs

@ -1,65 +0,0 @@
use std::iter::repeat_with;
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::debug;
pub type SecId = u32;
pub trait SectionInterface: Send + Sync {
fn num_sections(&self) -> SecId;
fn set_section_state(&self, id: SecId, running: bool);
fn get_section_state(&self, id: SecId) -> bool;
}
pub struct MockSectionInterface {
states: Vec<AtomicBool>,
}
impl MockSectionInterface {
#[allow(dead_code)]
pub fn new(num_sections: SecId) -> Self {
Self {
states: repeat_with(|| AtomicBool::new(false))
.take(num_sections as usize)
.collect(),
}
}
}
impl SectionInterface for MockSectionInterface {
fn num_sections(&self) -> SecId {
self.states.len() as SecId
}
fn set_section_state(&self, id: SecId, running: bool) {
debug!(id, running, "setting section");
self.states[id as usize].store(running, Ordering::SeqCst);
}
fn get_section_state(&self, id: SecId) -> bool {
self.states[id as usize].load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_mock_section_interface() {
let iface = MockSectionInterface::new(6);
assert_eq!(iface.num_sections(), 6);
for i in 0..6u32 {
assert_eq!(iface.get_section_state(i), false);
}
for i in 0..6u32 {
iface.set_section_state(i, true);
}
for i in 0..6u32 {
assert_eq!(iface.get_section_state(i), true);
}
for i in 0..6u32 {
iface.set_section_state(i, false);
}
for i in 0..6u32 {
assert_eq!(iface.get_section_state(i), false);
}
}
}

65
sprinklers_core/src/zone_interface.rs

@ -0,0 +1,65 @@
use std::iter::repeat_with;
use std::sync::atomic::{AtomicBool, Ordering};
use tracing::debug;
pub type ZoneNum = u32;
pub trait ZoneInterface: Send + Sync {
fn num_zones(&self) -> ZoneNum;
fn set_zone_state(&self, id: ZoneNum, running: bool);
fn get_zone_state(&self, id: ZoneNum) -> bool;
}
pub struct MockZoneInterface {
states: Vec<AtomicBool>,
}
impl MockZoneInterface {
#[allow(dead_code)]
pub fn new(num_zones: ZoneNum) -> Self {
Self {
states: repeat_with(|| AtomicBool::new(false))
.take(num_zones as usize)
.collect(),
}
}
}
impl ZoneInterface for MockZoneInterface {
fn num_zones(&self) -> ZoneNum {
self.states.len() as ZoneNum
}
fn set_zone_state(&self, id: ZoneNum, running: bool) {
debug!(id, running, "setting zone");
self.states[id as usize].store(running, Ordering::SeqCst);
}
fn get_zone_state(&self, id: ZoneNum) -> bool {
self.states[id as usize].load(Ordering::SeqCst)
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_mock_zone_interface() {
let iface = MockZoneInterface::new(6);
assert_eq!(iface.num_zones(), 6);
for i in 0..6u32 {
assert_eq!(iface.get_zone_state(i), false);
}
for i in 0..6u32 {
iface.set_zone_state(i, true);
}
for i in 0..6u32 {
assert_eq!(iface.get_zone_state(i), true);
}
for i in 0..6u32 {
iface.set_zone_state(i, false);
}
for i in 0..6u32 {
assert_eq!(iface.get_zone_state(i), false);
}
}
}

16
sprinklers_database/src/lib.rs

@ -1,8 +1,8 @@
mod migration; mod migration;
mod migrations; mod migrations;
mod program; mod program;
mod section;
mod sql_json; mod sql_json;
mod zone;
pub use migration::*; pub use migration::*;
pub use migrations::create_migrations; pub use migrations::create_migrations;
@ -10,7 +10,7 @@ pub use program::*;
pub use rusqlite::Connection as DbConn; pub use rusqlite::Connection as DbConn;
use sprinklers_core::model::Sections; use sprinklers_core::model::Zones;
use eyre::Result; use eyre::Result;
use rusqlite::NO_PARAMS; use rusqlite::NO_PARAMS;
@ -28,16 +28,16 @@ pub fn setup_db() -> Result<DbConn> {
Ok(conn) Ok(conn)
} }
pub fn query_sections(conn: &DbConn) -> Result<Sections> { pub fn query_zones(conn: &DbConn) -> Result<Zones> {
let mut statement = conn.prepare_cached( let mut statement = conn.prepare_cached(
"SELECT s.id, s.name, s.interface_id \ "SELECT s.id, s.name, s.interface_id \
FROM sections AS s;", FROM sections AS s;",
)?; )?;
let rows = statement.query_map(NO_PARAMS, section::from_sql)?; let rows = statement.query_map(NO_PARAMS, zone::from_sql)?;
let mut sections = Sections::new(); let mut zones = Zones::new();
for row in rows { for row in rows {
let section = row?; let zone = row?;
sections.insert(section.id, section.into()); zones.insert(zone.id, zone.into());
} }
Ok(sections) Ok(zones)
} }

8
sprinklers_database/src/migration.rs

@ -3,7 +3,7 @@ use rusqlite::{params, Connection};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::ops::Bound::{Excluded, Unbounded}; use std::ops::Bound::{Excluded, Unbounded};
use thiserror::Error; use thiserror::Error;
use tracing::{debug, trace, info}; use tracing::{debug, info, trace};
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum MigrationError { pub enum MigrationError {
@ -152,7 +152,11 @@ impl Migrations {
last_ver = *ver; last_ver = *ver;
} }
if last_ver != NO_MIGRATIONS { if last_ver != NO_MIGRATIONS {
info!(old_version = db_version, new_version = last_ver, "applied database migrations"); info!(
old_version = db_version,
new_version = last_ver,
"applied database migrations"
);
set_db_version(&trans, last_ver)?; set_db_version(&trans, last_ver)?;
} }
trans.commit()?; trans.commit()?;

2
sprinklers_database/src/migrations/mod.rs

@ -20,4 +20,4 @@ pub fn create_migrations() -> Migrations {
migs.add(include_file_migration!(6, "0006-fix_view_program_seq")); migs.add(include_file_migration!(6, "0006-fix_view_program_seq"));
// INSERT MIGRATION ABOVE -- DO NOT EDIT THIS COMMENT // INSERT MIGRATION ABOVE -- DO NOT EDIT THIS COMMENT
migs migs
} }

8
sprinklers_database/src/program.rs

@ -2,7 +2,7 @@ use super::sql_json::SqlJson;
use super::DbConn; use super::DbConn;
use sprinklers_core::{ use sprinklers_core::{
model::{ model::{
Program, ProgramId, ProgramItem, ProgramSequence, ProgramUpdateData, Programs, SectionId, Program, ProgramId, ProgramItem, ProgramSequence, ProgramUpdateData, Programs, ZoneId,
}, },
schedule::Schedule, schedule::Schedule,
}; };
@ -58,7 +58,7 @@ fn update_as_sql(id: ProgramId, program: &ProgramUpdateData) -> SqlProgramUpdate
struct SqlProgramItem { struct SqlProgramItem {
program_id: ProgramId, program_id: ProgramId,
seq_num: isize, seq_num: isize,
section_id: SectionId, zone_id: ZoneId,
duration: f64, duration: f64,
} }
@ -70,7 +70,7 @@ impl<'a> IntoIterator for &'a SqlProgramItem {
vec![ vec![
&self.program_id as &dyn ToSql, &self.program_id as &dyn ToSql,
&self.seq_num, &self.seq_num,
&self.section_id, &self.zone_id,
&self.duration, &self.duration,
] ]
.into_iter() .into_iter()
@ -85,7 +85,7 @@ fn item_as_sql(
SqlProgramItem { SqlProgramItem {
program_id, program_id,
seq_num: (seq_num + 1) as isize, seq_num: (seq_num + 1) as isize,
section_id: program_item.section_id, zone_id: program_item.zone_id,
duration: program_item.duration.as_secs_f64(), duration: program_item.duration.as_secs_f64(),
} }
} }

16
sprinklers_database/src/section.rs

@ -1,16 +0,0 @@
use sprinklers_core::model::Section;
use rusqlite::{Error as SqlError, Row as SqlRow, ToSql};
pub fn from_sql<'a>(row: &SqlRow<'a>) -> Result<Section, SqlError> {
Ok(Section {
id: row.get(0)?,
name: row.get(1)?,
interface_id: row.get(2)?,
})
}
#[allow(dead_code)]
pub fn as_sql(section: &Section) -> Vec<&dyn ToSql> {
vec![&section.id, &section.name, &section.interface_id]
}

16
sprinklers_database/src/zone.rs

@ -0,0 +1,16 @@
use sprinklers_core::model::Zone;
use rusqlite::{Error as SqlError, Row as SqlRow, ToSql};
pub fn from_sql<'a>(row: &SqlRow<'a>) -> Result<Zone, SqlError> {
Ok(Zone {
id: row.get(0)?,
name: row.get(1)?,
interface_id: row.get(2)?,
})
}
#[allow(dead_code)]
pub fn as_sql(zone: &Zone) -> Vec<&dyn ToSql> {
vec![&zone.id, &zone.name, &zone.interface_id]
}

16
sprinklers_linux/src/lib.rs

@ -1,4 +1,4 @@
use sprinklers_core::section_interface::{SecId, SectionInterface}; use sprinklers_core::zone_interface::{ZoneInterface, ZoneNum};
use eyre::WrapErr; use eyre::WrapErr;
use gpio_cdev::{LineHandle, LineRequestFlags}; use gpio_cdev::{LineHandle, LineRequestFlags};
@ -9,12 +9,12 @@ pub struct LinuxGpio {
lines: Vec<LineHandle>, lines: Vec<LineHandle>,
} }
impl SectionInterface for LinuxGpio { impl ZoneInterface for LinuxGpio {
fn num_sections(&self) -> SecId { fn num_zones(&self) -> ZoneNum {
self.lines.len() as SecId self.lines.len() as ZoneNum
} }
fn set_section_state(&self, id: SecId, running: bool) { fn set_zone_state(&self, id: ZoneNum, running: bool) {
if let Some(line) = &self.lines.get(id as usize) { if let Some(line) = &self.lines.get(id as usize) {
trace!( trace!(
line = line.line().offset(), line = line.line().offset(),
@ -26,11 +26,11 @@ impl SectionInterface for LinuxGpio {
error!("error setting GPIO line value: {}", err); error!("error setting GPIO line value: {}", err);
} }
} else { } else {
warn!("set_section_state: invalid section id: {}", id); warn!("set_zone_state: invalid zone id: {}", id);
} }
} }
fn get_section_state(&self, id: SecId) -> bool { fn get_zone_state(&self, id: ZoneNum) -> bool {
if let Some(line) = &self.lines.get(id as usize) { if let Some(line) = &self.lines.get(id as usize) {
match line.get_value() { match line.get_value() {
Ok(active) => active != 0, Ok(active) => active != 0,
@ -40,7 +40,7 @@ impl SectionInterface for LinuxGpio {
} }
} }
} else { } else {
warn!("get_section_state: invalid section id: {}", id); warn!("get_zone_state: invalid zone id: {}", id);
false false
} }
} }

4
sprinklers_mqtt/src/actor.rs

@ -1,6 +1,6 @@
use super::{event_loop::EventLoopTask, request, MqttInterface}; use super::{event_loop::EventLoopTask, request, MqttInterface};
use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture};
use request::{ErrorCode, RequestContext, RequestError, WithRequestId, Response}; use request::{ErrorCode, RequestContext, RequestError, Response, WithRequestId};
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tracing::{debug, error, info, trace, warn}; use tracing::{debug, error, info, trace, warn};
@ -55,7 +55,7 @@ impl MqttActor {
match &response { match &response {
Response::Success(res) => { Response::Success(res) => {
debug!(rid, response = display(res), "success response:"); debug!(rid, response = display(res), "success response:");
}, }
Response::Error(err) => { Response::Error(err) => {
debug!(rid, "request error: {}", err); debug!(rid, "request error: {}", err);
} }

46
sprinklers_mqtt/src/lib.rs

@ -1,17 +1,17 @@
mod actor; mod actor;
mod event_loop; mod event_loop;
mod request; mod request;
mod section_runner_json;
mod topics; mod topics;
mod update_listener; mod update_listener;
mod zone_runner_json;
pub use request::RequestContext; pub use request::RequestContext;
pub use update_listener::UpdateListener; pub use update_listener::UpdateListener;
use self::topics::Topics; use self::topics::Topics;
use section_runner_json::SecRunnerStateJson; use sprinklers_actors::zone_runner::ZoneRunnerState;
use sprinklers_actors::section_runner::SecRunnerState; use sprinklers_core::model::{Program, ProgramId, Programs, Zone, ZoneId, Zones};
use sprinklers_core::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; use zone_runner_json::ZoneRunnerStateJson;
use actix::{Actor, Addr}; use actix::{Actor, Addr};
use eyre::WrapErr; use eyre::WrapErr;
@ -73,32 +73,28 @@ impl MqttInterface {
self.client.cancel().await self.client.cancel().await
} }
pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> {
let section_ids: Vec<_> = sections.keys().cloned().collect(); let zone_ids: Vec<_> = zones.keys().cloned().collect();
self.publish_data(self.topics.sections(), &section_ids) self.publish_data(self.topics.zones(), &zone_ids)
.await .await
.wrap_err("failed to publish section ids")?; .wrap_err("failed to publish zone ids")?;
for section in sections.values() { for zone in zones.values() {
self.publish_section(section).await?; self.publish_zone(zone).await?;
} }
Ok(()) Ok(())
} }
pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> {
self.publish_data(self.topics.section_data(section.id), section) self.publish_data(self.topics.zone_data(zone.id), zone)
.await .await
.wrap_err("failed to publish section") .wrap_err("failed to publish zone")
} }
// Section state can be derived from section runner state... // Zone state can be derived from zone runner state...
pub async fn publish_section_state( pub async fn publish_zone_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> {
&mut self, self.publish_data(self.topics.zone_state(zone_id), &state)
section_id: SectionId,
state: bool,
) -> eyre::Result<()> {
self.publish_data(self.topics.section_state(section_id), &state)
.await .await
.wrap_err("failed to publish section state") .wrap_err("failed to publish zone state")
} }
pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> {
@ -165,11 +161,11 @@ impl MqttInterface {
.wrap_err("failed to publish program next run") .wrap_err("failed to publish program next run")
} }
pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { pub async fn publish_zone_runner(&mut self, sr_state: &ZoneRunnerState) -> eyre::Result<()> {
let json: SecRunnerStateJson = sr_state.into(); let json: ZoneRunnerStateJson = sr_state.into();
self.publish_data(self.topics.section_runner(), &json) self.publish_data(self.topics.zone_runner(), &json)
.await .await
.wrap_err("failed to publish section runner") .wrap_err("failed to publish zone runner")
} }
async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> { async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> {

35
sprinklers_mqtt/src/request/mod.rs

@ -1,5 +1,5 @@
use sprinklers_actors::{ProgramRunner, SectionRunner, StateManager}; use sprinklers_actors::{ProgramRunner, StateManager, ZoneRunner};
use sprinklers_core::model::Sections; use sprinklers_core::model::Zones;
use futures_util::{ready, FutureExt}; use futures_util::{ready, FutureExt};
use num_derive::FromPrimitive; use num_derive::FromPrimitive;
@ -7,11 +7,11 @@ use serde::{Deserialize, Serialize};
use std::{fmt, future::Future, pin::Pin, task::Poll}; use std::{fmt, future::Future, pin::Pin, task::Poll};
mod programs; mod programs;
mod sections; mod zones;
pub struct RequestContext { pub struct RequestContext {
pub sections: Sections, pub zones: Zones,
pub section_runner: SectionRunner, pub zone_runner: ZoneRunner,
pub program_runner: ProgramRunner, pub program_runner: ProgramRunner,
pub state_manager: StateManager, pub state_manager: StateManager,
} }
@ -31,8 +31,8 @@ pub enum ErrorCode {
NoPermission = 107, NoPermission = 107,
NotFound = 109, NotFound = 109,
// NotUnique = 110, // NotUnique = 110,
NoSuchSection = 120, NoSuchZone = 120,
NoSuchSectionRun = 121, NoSuchZoneRun = 121,
NoSuchProgram = 122, NoSuchProgram = 122,
Internal = 200, Internal = 200,
NotImplemented = 201, NotImplemented = 201,
@ -259,10 +259,15 @@ pub type ResponseWithId = WithRequestId<Response>;
#[derive(Debug, Deserialize, Serialize)] #[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase", tag = "type")] #[serde(rename_all = "camelCase", tag = "type")]
pub enum Request { pub enum Request {
RunSection(sections::RunSectionRequest), // TODO: update nomenclature
CancelSection(sections::CancelSectionRequest), #[serde(rename = "runSection")]
CancelSectionRunId(sections::CancelSectionRunIdRequest), RunZone(zones::RunZoneRequest),
PauseSectionRunner(sections::PauseSectionRunnerRequest), #[serde(rename = "cancelSection")]
CancelZone(zones::CancelZoneRequest),
#[serde(rename = "cancelSectionRunId")]
CancelZoneRunId(zones::CancelZoneRunIdRequest),
#[serde(rename = "pauseSectionRunner")]
PauseZoneRunner(zones::PauseZoneRunnerRequest),
RunProgram(programs::RunProgramRequest), RunProgram(programs::RunProgramRequest),
CancelProgram(programs::CancelProgramRequest), CancelProgram(programs::CancelProgramRequest),
UpdateProgram(programs::UpdateProgramRequest), UpdateProgram(programs::UpdateProgramRequest),
@ -273,10 +278,10 @@ impl IRequest for Request {
fn exec(self, ctx: &mut RequestContext) -> RequestFuture { fn exec(self, ctx: &mut RequestContext) -> RequestFuture {
match self { match self {
Request::RunSection(req) => req.exec_erased(ctx), Request::RunZone(req) => req.exec_erased(ctx),
Request::CancelSection(req) => req.exec_erased(ctx), Request::CancelZone(req) => req.exec_erased(ctx),
Request::CancelSectionRunId(req) => req.exec_erased(ctx), Request::CancelZoneRunId(req) => req.exec_erased(ctx),
Request::PauseSectionRunner(req) => req.exec_erased(ctx), Request::PauseZoneRunner(req) => req.exec_erased(ctx),
Request::RunProgram(req) => req.exec_erased(ctx), Request::RunProgram(req) => req.exec_erased(ctx),
Request::CancelProgram(req) => req.exec_erased(ctx), Request::CancelProgram(req) => req.exec_erased(ctx),
Request::UpdateProgram(req) => req.exec_erased(ctx), Request::UpdateProgram(req) => req.exec_erased(ctx),

161
sprinklers_mqtt/src/request/sections.rs

@ -1,161 +0,0 @@
use super::*;
use sprinklers_actors::section_runner::SectionRunHandle;
use sprinklers_core::model::{self, SectionRef};
use sprinklers_core::serde::duration_secs;
use eyre::WrapErr;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct SectionId(pub model::SectionId);
impl SectionId {
fn get_section(self, sections: &Sections) -> Result<SectionRef, RequestError> {
sections.get(&self.0).cloned().ok_or_else(|| {
RequestError::with_name(ErrorCode::NoSuchSection, "no such section", "section")
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RunSectionRequest {
pub section_id: SectionId,
#[serde(with = "duration_secs")]
pub duration: Duration,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RunSectionResponse {
pub message: String,
pub run_id: SectionRunHandle,
}
impl IRequest for RunSectionRequest {
type Response = RunSectionResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone();
let section = self.section_id.get_section(&ctx.sections);
let duration = self.duration;
Box::pin(async move {
let section = section?;
let handle = section_runner
.queue_run(section.clone(), duration)
.await
.wrap_err("could not queue run")?;
Ok(RunSectionResponse {
message: format!("running section '{}' for {:?}", &section.name, duration),
run_id: handle,
})
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionRequest {
pub section_id: SectionId,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionResponse {
pub message: String,
pub cancelled: usize,
}
impl IRequest for CancelSectionRequest {
type Response = CancelSectionResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone();
let section = self.section_id.get_section(&ctx.sections);
Box::pin(async move {
let section = section?;
let cancelled = section_runner
.cancel_by_section(section.id)
.await
.wrap_err("could not cancel section")?;
Ok(CancelSectionResponse {
message: format!(
"cancelled {} runs for section '{}'",
cancelled, section.name
),
cancelled,
})
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionRunIdRequest {
pub run_id: SectionRunHandle,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelSectionRunIdResponse {
pub message: String,
pub cancelled: bool,
}
impl IRequest for CancelSectionRunIdRequest {
type Response = ResponseMessage;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone();
Box::pin(async move {
let cancelled = section_runner
.cancel_run(self.run_id)
.await
.wrap_err("could not cancel section run")?;
if cancelled {
Ok(ResponseMessage::new("cancelled section run"))
} else {
Err(RequestError::with_name(
ErrorCode::NoSuchSectionRun,
"no such section run",
"section run",
))
}
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PauseSectionRunnerRequest {
pub paused: bool,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PauseSectionRunnerResponse {
pub message: String,
pub paused: bool,
}
impl IRequest for PauseSectionRunnerRequest {
type Response = PauseSectionRunnerResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone();
let paused = self.paused;
Box::pin(async move {
if paused {
section_runner.pause().await
} else {
section_runner.unpause().await
}
.wrap_err("could not pause/unpause section runner")?;
Ok(PauseSectionRunnerResponse {
message: format!(
"{} section runner",
if paused { "paused" } else { "unpaused" }
),
paused,
})
})
}
}

160
sprinklers_mqtt/src/request/zones.rs

@ -0,0 +1,160 @@
use super::*;
use sprinklers_actors::zone_runner::ZoneRunHandle;
use sprinklers_core::model::{self, ZoneRef};
use sprinklers_core::serde::duration_secs;
use eyre::WrapErr;
use serde::{Deserialize, Serialize};
use std::time::Duration;
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ZoneId(pub model::ZoneId);
impl ZoneId {
fn get_zone(self, zones: &Zones) -> Result<ZoneRef, RequestError> {
zones
.get(&self.0)
.cloned()
.ok_or_else(|| RequestError::with_name(ErrorCode::NoSuchZone, "no such zone", "zone"))
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RunZoneRequest {
// TODO: update nomenclature
#[serde(rename = "sectionId")]
pub zone_id: ZoneId,
#[serde(with = "duration_secs")]
pub duration: Duration,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct RunZoneResponse {
pub message: String,
pub run_id: ZoneRunHandle,
}
impl IRequest for RunZoneRequest {
type Response = RunZoneResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone();
let zone = self.zone_id.get_zone(&ctx.zones);
let duration = self.duration;
Box::pin(async move {
let zone = zone?;
let handle = zone_runner
.queue_run(zone.clone(), duration)
.await
.wrap_err("could not queue run")?;
Ok(RunZoneResponse {
message: format!("running zone '{}' for {:?}", &zone.name, duration),
run_id: handle,
})
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelZoneRequest {
// TODO: update nomenclature
#[serde(rename = "sectionId")]
pub zone_id: ZoneId,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelZoneResponse {
pub message: String,
pub cancelled: usize,
}
impl IRequest for CancelZoneRequest {
type Response = CancelZoneResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone();
let zone = self.zone_id.get_zone(&ctx.zones);
Box::pin(async move {
let zone = zone?;
let cancelled = zone_runner
.cancel_by_zone(zone.id)
.await
.wrap_err("could not cancel zone")?;
Ok(CancelZoneResponse {
message: format!("cancelled {} runs for zone '{}'", cancelled, zone.name),
cancelled,
})
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelZoneRunIdRequest {
pub run_id: ZoneRunHandle,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CancelZoneRunIdResponse {
pub message: String,
pub cancelled: bool,
}
impl IRequest for CancelZoneRunIdRequest {
type Response = ResponseMessage;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone();
Box::pin(async move {
let cancelled = zone_runner
.cancel_run(self.run_id)
.await
.wrap_err("could not cancel zone run")?;
if cancelled {
Ok(ResponseMessage::new("cancelled zone run"))
} else {
Err(RequestError::with_name(
ErrorCode::NoSuchZoneRun,
"no such zone run",
"zone run",
))
}
})
}
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PauseZoneRunnerRequest {
pub paused: bool,
}
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PauseZoneRunnerResponse {
pub message: String,
pub paused: bool,
}
impl IRequest for PauseZoneRunnerRequest {
type Response = PauseZoneRunnerResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut zone_runner = ctx.zone_runner.clone();
let paused = self.paused;
Box::pin(async move {
if paused {
zone_runner.pause().await
} else {
zone_runner.unpause().await
}
.wrap_err("could not pause/unpause zone runner")?;
Ok(PauseZoneRunnerResponse {
message: format!("{} zone runner", if paused { "paused" } else { "unpaused" }),
paused,
})
})
}
}

18
sprinklers_mqtt/src/topics.rs

@ -1,4 +1,4 @@
use sprinklers_core::model::{ProgramId, SectionId}; use sprinklers_core::model::{ProgramId, ZoneId};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct Topics<T> pub struct Topics<T>
@ -20,16 +20,19 @@ where
format!("{}/connected", self.prefix.as_ref()) format!("{}/connected", self.prefix.as_ref())
} }
pub fn sections(&self) -> String { pub fn zones(&self) -> String {
// TODO: change nomenclature
format!("{}/sections", self.prefix.as_ref()) format!("{}/sections", self.prefix.as_ref())
} }
pub fn section_data(&self, section_id: SectionId) -> String { pub fn zone_data(&self, zone_id: ZoneId) -> String {
format!("{}/sections/{}", self.prefix.as_ref(), section_id) // TODO: change nomenclature
format!("{}/sections/{}", self.prefix.as_ref(), zone_id)
} }
pub fn section_state(&self, section_id: SectionId) -> String { pub fn zone_state(&self, zone_id: ZoneId) -> String {
format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) // TODO: change nomenclature
format!("{}/sections/{}/state", self.prefix.as_ref(), zone_id)
} }
pub fn programs(&self) -> String { pub fn programs(&self) -> String {
@ -49,7 +52,8 @@ where
format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id) format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id)
} }
pub fn section_runner(&self) -> String { pub fn zone_runner(&self) -> String {
// TODO: change nomenclature
format!("{}/section_runner", self.prefix.as_ref()) format!("{}/section_runner", self.prefix.as_ref())
} }

50
sprinklers_mqtt/src/update_listener.rs

@ -1,7 +1,7 @@
use super::MqttInterface; use super::MqttInterface;
use sprinklers_actors::{ use sprinklers_actors::{
program_runner::{ProgramEvent, ProgramEventRecv}, program_runner::{ProgramEvent, ProgramEventRecv},
section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, zone_runner::{ZoneEvent, ZoneEventRecv, ZoneRunnerState, ZoneRunnerStateRecv},
}; };
use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
@ -36,33 +36,27 @@ impl Actor for UpdateListenerActor {
} }
} }
impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for UpdateListenerActor { impl StreamHandler<Result<ZoneEvent, broadcast::RecvError>> for UpdateListenerActor {
fn handle( fn handle(&mut self, event: Result<ZoneEvent, broadcast::RecvError>, ctx: &mut Self::Context) {
&mut self,
event: Result<SectionEvent, broadcast::RecvError>,
ctx: &mut Self::Context,
) {
let event = match event { let event = match event {
Ok(ev) => ev, Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => unreachable!(), Err(broadcast::RecvError::Closed) => unreachable!(),
Err(broadcast::RecvError::Lagged(n)) => { Err(broadcast::RecvError::Lagged(n)) => {
warn!("section events lagged by {}", n); warn!("zone events lagged by {}", n);
return; return;
} }
}; };
if let Some((sec_id, state)) = match event { if let Some((zone_id, state)) = match event {
SectionEvent::RunStart(_, sec) | SectionEvent::RunUnpause(_, sec) => { ZoneEvent::RunStart(_, zone) | ZoneEvent::RunUnpause(_, zone) => Some((zone.id, true)),
Some((sec.id, true)) ZoneEvent::RunFinish(_, zone)
} | ZoneEvent::RunPause(_, zone)
SectionEvent::RunFinish(_, sec) | ZoneEvent::RunCancel(_, zone) => Some((zone.id, false)),
| SectionEvent::RunPause(_, sec) ZoneEvent::RunnerPause | ZoneEvent::RunnerUnpause => None,
| SectionEvent::RunCancel(_, sec) => Some((sec.id, false)),
SectionEvent::RunnerPause | SectionEvent::RunnerUnpause => None,
} { } {
let mut mqtt_interface = self.mqtt_interface.clone(); let mut mqtt_interface = self.mqtt_interface.clone();
let fut = async move { let fut = async move {
if let Err(err) = mqtt_interface.publish_section_state(sec_id, state).await { if let Err(err) = mqtt_interface.publish_zone_state(zone_id, state).await {
warn!("could not publish section state: {}", err); warn!("could not publish zone state: {}", err);
} }
}; };
ctx.spawn(wrap_future(fut)); ctx.spawn(wrap_future(fut));
@ -121,12 +115,12 @@ impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListene
} }
} }
impl StreamHandler<SecRunnerState> for UpdateListenerActor { impl StreamHandler<ZoneRunnerState> for UpdateListenerActor {
fn handle(&mut self, state: SecRunnerState, ctx: &mut Self::Context) { fn handle(&mut self, state: ZoneRunnerState, ctx: &mut Self::Context) {
let mut mqtt_interface = self.mqtt_interface.clone(); let mut mqtt_interface = self.mqtt_interface.clone();
let fut = async move { let fut = async move {
if let Err(err) = mqtt_interface.publish_section_runner(&state).await { if let Err(err) = mqtt_interface.publish_zone_runner(&state).await {
warn!("could not publish section runner: {}", err); warn!("could not publish zone runner: {}", err);
} }
}; };
ctx.spawn(wrap_future(fut)); ctx.spawn(wrap_future(fut));
@ -202,13 +196,13 @@ where
} }
} }
impl Listenable<UpdateListenerActor> for SectionEventRecv { impl Listenable<UpdateListenerActor> for ZoneEventRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) { fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self); ctx.add_stream(self);
} }
} }
impl Listenable<UpdateListenerActor> for SecRunnerStateRecv { impl Listenable<UpdateListenerActor> for ZoneRunnerStateRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) { fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self); ctx.add_stream(self);
} }
@ -243,12 +237,12 @@ impl UpdateListener {
self.addr.do_send(Listen(listener)); self.addr.do_send(Listen(listener));
} }
pub fn listen_section_events(&mut self, section_events: SectionEventRecv) { pub fn listen_zone_events(&mut self, zone_events: ZoneEventRecv) {
self.listen(section_events); self.listen(zone_events);
} }
pub fn listen_section_runner(&mut self, sec_runner_state_recv: SecRunnerStateRecv) { pub fn listen_zone_runner(&mut self, zone_runner_state_recv: ZoneRunnerStateRecv) {
self.listen(sec_runner_state_recv); self.listen(zone_runner_state_recv);
} }
pub fn listen_programs(&mut self, programs: watch::Receiver<Programs>) { pub fn listen_programs(&mut self, programs: watch::Receiver<Programs>) {

38
sprinklers_mqtt/src/section_runner_json.rs → sprinklers_mqtt/src/zone_runner_json.rs

@ -1,5 +1,5 @@
use sprinklers_actors::section_runner::{SecRun, SecRunState, SecRunnerState}; use sprinklers_actors::zone_runner::{ZoneRun, ZoneRunState, ZoneRunnerState};
use sprinklers_core::model::SectionId; use sprinklers_core::model::ZoneId;
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use serde::Serialize; use serde::Serialize;
@ -8,9 +8,11 @@ use tokio::time::Instant;
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SecRunJson { pub struct ZoneRunJson {
id: i32, id: i32,
section: SectionId, // TODO: change nomenclature
#[serde(rename = "section")]
zone: ZoneId,
total_duration: f64, total_duration: f64,
duration: f64, duration: f64,
start_time: Option<String>, start_time: Option<String>,
@ -18,19 +20,19 @@ pub struct SecRunJson {
unpause_time: Option<String>, unpause_time: Option<String>,
} }
impl SecRunJson { impl ZoneRunJson {
fn from_run(run: &SecRun) -> Option<Self> { fn from_run(run: &ZoneRun) -> Option<Self> {
let (now, system_now) = (Instant::now(), SystemTime::now()); let (now, system_now) = (Instant::now(), SystemTime::now());
let instant_to_string = |instant: Instant| -> String { let instant_to_string = |instant: Instant| -> String {
DateTime::<Utc>::from(system_now - now.duration_since(instant)).to_rfc3339() DateTime::<Utc>::from(system_now - now.duration_since(instant)).to_rfc3339()
}; };
let (start_time, pause_time) = match run.state { let (start_time, pause_time) = match run.state {
SecRunState::Finished | SecRunState::Cancelled => { ZoneRunState::Finished | ZoneRunState::Cancelled => {
return None; return None;
} }
SecRunState::Waiting => (None, None), ZoneRunState::Waiting => (None, None),
SecRunState::Running { start_time } => (Some(instant_to_string(start_time)), None), ZoneRunState::Running { start_time } => (Some(instant_to_string(start_time)), None),
SecRunState::Paused { ZoneRunState::Paused {
start_time, start_time,
pause_time, pause_time,
} => ( } => (
@ -40,7 +42,7 @@ impl SecRunJson {
}; };
Some(Self { Some(Self {
id: run.handle.clone().into_inner(), id: run.handle.clone().into_inner(),
section: run.section.id, zone: run.zone.id,
total_duration: run.total_duration.as_secs_f64(), total_duration: run.total_duration.as_secs_f64(),
duration: run.duration.as_secs_f64(), duration: run.duration.as_secs_f64(),
start_time, start_time,
@ -52,18 +54,18 @@ impl SecRunJson {
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct SecRunnerStateJson { pub struct ZoneRunnerStateJson {
queue: Vec<SecRunJson>, queue: Vec<ZoneRunJson>,
current: Option<SecRunJson>, current: Option<ZoneRunJson>,
paused: bool, paused: bool,
} }
impl From<&SecRunnerState> for SecRunnerStateJson { impl From<&ZoneRunnerState> for ZoneRunnerStateJson {
fn from(state: &SecRunnerState) -> Self { fn from(state: &ZoneRunnerState) -> Self {
let mut run_queue = state.run_queue.iter(); let mut run_queue = state.run_queue.iter();
let current = run_queue.next().and_then(|run| SecRunJson::from_run(run)); let current = run_queue.next().and_then(|run| ZoneRunJson::from_run(run));
let queue = run_queue let queue = run_queue
.filter_map(|run| SecRunJson::from_run(run)) .filter_map(|run| ZoneRunJson::from_run(run))
.collect(); .collect();
Self { Self {
queue, queue,

4
sprinklers_rs/sprinklers_rs.default.json

@ -5,8 +5,8 @@
"client_id": "sprinklers_rs-0001", "client_id": "sprinklers_rs-0001",
"device_id": "sprinklers_rs-0001" "device_id": "sprinklers_rs-0001"
}, },
"section_interface": { "zone_interface": {
"provider": "Mock", "provider": "Mock",
"num_sections": 6 "num_zones": 6
} }
} }

36
sprinklers_rs/src/main.rs

@ -2,9 +2,9 @@
#![warn(clippy::print_stdout)] #![warn(clippy::print_stdout)]
// mod option_future; // mod option_future;
mod section_interface;
mod settings; mod settings;
mod state_manager; mod state_manager;
mod zone_interface;
use sprinklers_actors as actors; use sprinklers_actors as actors;
use sprinklers_database as database; use sprinklers_database as database;
@ -31,43 +31,41 @@ async fn main() -> Result<()> {
let db_conn = database::setup_db()?; let db_conn = database::setup_db()?;
let sections = database::query_sections(&db_conn)?; let zones = database::query_zones(&db_conn)?;
for sec in sections.values() { for zone in zones.values() {
debug!(section = debug(&sec), "read section"); debug!(zone = debug(&zone), "read zone");
} }
let section_interface = settings.section_interface.build()?; let zone_interface = settings.zone_interface.build()?;
let mut section_runner = actors::SectionRunner::new(section_interface); let mut zone_runner = actors::ZoneRunner::new(zone_interface);
let mut program_runner = actors::ProgramRunner::new(section_runner.clone()); let mut program_runner = actors::ProgramRunner::new(zone_runner.clone());
let state_manager = crate::state_manager::StateManagerThread::start(db_conn); let state_manager = crate::state_manager::StateManagerThread::start(db_conn);
let mqtt_options = settings.mqtt; let mqtt_options = settings.mqtt;
// TODO: have ability to update sections / other data // TODO: have ability to update zones / other data
let request_context = mqtt::RequestContext { let request_context = mqtt::RequestContext {
sections: sections.clone(), zones: zones.clone(),
section_runner: section_runner.clone(), zone_runner: zone_runner.clone(),
program_runner: program_runner.clone(), program_runner: program_runner.clone(),
state_manager: state_manager.clone(), state_manager: state_manager.clone(),
}; };
let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context); let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context);
let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone()); let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone());
update_listener.listen_section_events(section_runner.subscribe().await?); update_listener.listen_zone_events(zone_runner.subscribe().await?);
update_listener.listen_section_runner(section_runner.get_state_recv()); update_listener.listen_zone_runner(zone_runner.get_state_recv());
update_listener.listen_programs(state_manager.get_programs()); update_listener.listen_programs(state_manager.get_programs());
update_listener.listen_program_events(program_runner.subscribe().await?); update_listener.listen_program_events(program_runner.subscribe().await?);
// Only listen to programs now so above subscriptions get events // Only listen to programs now so above subscriptions get events
program_runner.listen_programs(state_manager.get_programs()); program_runner.listen_programs(state_manager.get_programs());
program_runner.update_sections(sections.clone()).await?; program_runner.update_zones(zones.clone()).await?;
// TODO: update listener should probably do this // TODO: update listener should probably do this
mqtt_interface.publish_sections(&sections).await?; mqtt_interface.publish_zones(&zones).await?;
for section_id in sections.keys() { for zone_id in zones.keys() {
mqtt_interface mqtt_interface.publish_zone_state(*zone_id, false).await?;
.publish_section_state(*section_id, false)
.await?;
} }
info!("sprinklers_rs initialized"); info!("sprinklers_rs initialized");
@ -79,7 +77,7 @@ async fn main() -> Result<()> {
mqtt_interface.quit().await?; mqtt_interface.quit().await?;
drop(state_manager); drop(state_manager);
program_runner.quit().await?; program_runner.quit().await?;
section_runner.quit().await?; zone_runner.quit().await?;
actix::System::current().stop(); actix::System::current().stop();
Ok(()) Ok(())

37
sprinklers_rs/src/section_interface.rs

@ -1,37 +0,0 @@
use sprinklers_core::section_interface::{MockSectionInterface, SecId, SectionInterface};
#[cfg(feature = "sprinklers_linux")]
use sprinklers_linux::LinuxGpioConfig;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "provider")]
pub enum SectionInterfaceConfig {
Mock {
num_sections: SecId,
},
#[cfg(feature = "sprinklers_linux")]
LinuxGpio(LinuxGpioConfig),
}
impl Default for SectionInterfaceConfig {
fn default() -> Self {
SectionInterfaceConfig::Mock { num_sections: 6 }
}
}
impl SectionInterfaceConfig {
pub fn build(self) -> eyre::Result<Arc<dyn SectionInterface>> {
Ok(match self {
SectionInterfaceConfig::Mock { num_sections } => {
Arc::new(MockSectionInterface::new(num_sections))
}
#[cfg(feature = "sprinklers_linux")]
SectionInterfaceConfig::LinuxGpio(config) => {
Arc::new(config.build()?)
}
})
}
}

4
sprinklers_rs/src/settings.rs

@ -1,4 +1,4 @@
use crate::section_interface::SectionInterfaceConfig; use crate::zone_interface::ZoneInterfaceConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::trace; use tracing::trace;
@ -17,7 +17,7 @@ pub struct Settings {
#[serde(with = "MqttOptions")] #[serde(with = "MqttOptions")]
pub mqtt: sprinklers_mqtt::Options, pub mqtt: sprinklers_mqtt::Options,
#[serde(default)] #[serde(default)]
pub section_interface: SectionInterfaceConfig, pub zone_interface: ZoneInterfaceConfig,
} }
impl Settings { impl Settings {

33
sprinklers_rs/src/zone_interface.rs

@ -0,0 +1,33 @@
use sprinklers_core::zone_interface::{MockZoneInterface, ZoneInterface, ZoneNum};
#[cfg(feature = "sprinklers_linux")]
use sprinklers_linux::LinuxGpioConfig;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
#[derive(Debug, Serialize, Deserialize)]
#[serde(tag = "provider")]
pub enum ZoneInterfaceConfig {
Mock {
num_zones: ZoneNum,
},
#[cfg(feature = "sprinklers_linux")]
LinuxGpio(LinuxGpioConfig),
}
impl Default for ZoneInterfaceConfig {
fn default() -> Self {
ZoneInterfaceConfig::Mock { num_zones: 6 }
}
}
impl ZoneInterfaceConfig {
pub fn build(self) -> eyre::Result<Arc<dyn ZoneInterface>> {
Ok(match self {
ZoneInterfaceConfig::Mock { num_zones } => Arc::new(MockZoneInterface::new(num_zones)),
#[cfg(feature = "sprinklers_linux")]
ZoneInterfaceConfig::LinuxGpio(config) => Arc::new(config.build()?),
})
}
}
Loading…
Cancel
Save