diff --git a/sprinklers_actors/src/lib.rs b/sprinklers_actors/src/lib.rs index 8dcc832..0959fab 100644 --- a/sprinklers_actors/src/lib.rs +++ b/sprinklers_actors/src/lib.rs @@ -1,10 +1,10 @@ pub mod program_runner; -pub mod section_runner; pub mod state_manager; +pub mod zone_runner; #[cfg(test)] mod trace_listeners; pub use program_runner::ProgramRunner; -pub use section_runner::SectionRunner; pub use state_manager::StateManager; +pub use zone_runner::ZoneRunner; diff --git a/sprinklers_actors/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs index b2f48cf..d3ab723 100644 --- a/sprinklers_actors/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -1,7 +1,7 @@ -use crate::section_runner::{ - Error as SectionRunnerError, SectionEvent, SectionEventRecv, SectionRunHandle, SectionRunner, +use crate::zone_runner::{ + Error as ZoneRunnerError, ZoneEvent, ZoneEventRecv, ZoneRunHandle, ZoneRunner, }; -use sprinklers_core::model::{ProgramId, ProgramRef, Programs, Sections}; +use sprinklers_core::model::{ProgramId, ProgramRef, Programs, Zones}; use actix::{ Actor, ActorContext, ActorFuture, ActorStream, Addr, AsyncContext, Handler, Message, @@ -40,7 +40,7 @@ enum RunState { struct ProgRun { program: ProgramRef, state: RunState, - sec_run_handles: Vec, + zone_run_handles: Vec, } impl ProgRun { @@ -48,7 +48,7 @@ impl ProgRun { Self { program, state: RunState::Waiting, - sec_run_handles: Vec::new(), + zone_run_handles: Vec::new(), } } } @@ -56,8 +56,8 @@ impl ProgRun { type RunQueue = VecDeque; struct ProgramRunnerInner { - section_runner: SectionRunner, - sections: Sections, + zone_runner: ZoneRunner, + zones: Zones, programs: Programs, event_send: Option, schedule_run_fut: Option, @@ -98,29 +98,29 @@ impl ProgramRunnerInner { .program .sequence .iter() - .filter_map(|item| match self.sections.get(&item.section_id) { - Some(sec) => Some((sec.clone(), item.duration)), + .filter_map(|item| match self.zones.get(&item.zone_id) { + Some(zone) => Some((zone.clone(), item.duration)), None => { warn!( program_id = run.program.id, - section_id = item.section_id, - "trying to run program with nonexistant section" + zone_id = item.zone_id, + "trying to run program with nonexistant zone" ); None } }) .collect(); if sequence.is_empty() { - warn!(program_id = run.program.id, "program has no valid sections"); + warn!(program_id = run.program.id, "program has no valid zones"); run.state = RunState::Finished; self.send_event(ProgramEvent::RunStart(run.program.clone())); self.send_event(ProgramEvent::RunFinish(run.program.clone())); return; } - run.sec_run_handles.reserve(sequence.len()); - for (section, duration) in sequence { - let handle = self.section_runner.do_queue_run(section, duration); - run.sec_run_handles.push(handle); + run.zone_run_handles.reserve(sequence.len()); + for (zone, duration) in sequence { + let handle = self.zone_runner.do_queue_run(zone, duration); + run.zone_run_handles.push(handle); } run.state = RunState::Running; self.send_event(ProgramEvent::RunStart(run.program.clone())); @@ -128,8 +128,8 @@ impl ProgramRunnerInner { } fn cancel_program_run(&mut self, run: &mut ProgRun) { - for handle in run.sec_run_handles.drain(..) { - self.section_runner.do_cancel_run(handle); + for handle in run.zone_run_handles.drain(..) { + self.zone_runner.do_cancel_run(handle); } debug!(program_id = run.program.id, "program run is cancelled"); self.send_event(ProgramEvent::RunCancel(run.program.clone())); @@ -171,16 +171,16 @@ impl Actor for ProgramRunnerActor { type Context = actix::Context; fn started(&mut self, ctx: &mut Self::Context) { - trace!("subscribing to SectionRunner events"); - let subscribe_fut = self.inner.section_runner.subscribe().into_actor(self).map( - |section_events: Result, + trace!("subscribing to ZoneRunner events"); + let subscribe_fut = self.inner.zone_runner.subscribe().into_actor(self).map( + |zone_events: Result, _act: &mut ProgramRunnerActor, ctx: &mut Self::Context| { - match section_events { - Ok(section_events) => { - ctx.add_stream(section_events.into_stream()); + match zone_events { + Ok(zone_events) => { + ctx.add_stream(zone_events.into_stream()); } - Err(err) => warn!("failed to subscribe to SectionRunner events: {}", err), + Err(err) => warn!("failed to subscribe to ZoneRunner events: {}", err), } }, ); @@ -193,22 +193,18 @@ impl Actor for ProgramRunnerActor { } } -impl StreamHandler> for ProgramRunnerActor { - fn handle( - &mut self, - item: Result, - ctx: &mut Self::Context, - ) { - let sec_event = match item { +impl StreamHandler> for ProgramRunnerActor { + fn handle(&mut self, item: Result, ctx: &mut Self::Context) { + let zone_event = match item { Ok(e) => e, Err(err) => { - warn!("failed to receive section event: {}", err); + warn!("failed to receive zone event: {}", err); return; } }; #[allow(clippy::single_match)] - match sec_event { - SectionEvent::RunFinish(finished_run, _) => { + match zone_event { + ZoneEvent::RunFinish(finished_run, _) => { self.handle_finished_run(finished_run, ctx); } _ => {} @@ -241,14 +237,14 @@ impl Handler for ProgramRunnerActor { #[derive(Message)] #[rtype(result = "()")] -struct UpdateSections(Sections); +struct UpdateZones(Zones); -impl Handler for ProgramRunnerActor { +impl Handler for ProgramRunnerActor { type Result = (); - fn handle(&mut self, msg: UpdateSections, _ctx: &mut Self::Context) -> Self::Result { - trace!("updating sections"); - let UpdateSections(new_sections) = msg; - self.inner.sections = new_sections; + fn handle(&mut self, msg: UpdateZones, _ctx: &mut Self::Context) -> Self::Result { + trace!("updating zones"); + let UpdateZones(new_zones) = msg; + self.inner.zones = new_zones; } } @@ -379,11 +375,11 @@ impl Handler for ProgramRunnerActor { } impl ProgramRunnerActor { - fn new(section_runner: SectionRunner) -> Self { + fn new(zone_runner: ZoneRunner) -> Self { Self { inner: ProgramRunnerInner { - section_runner, - sections: Sections::new(), + zone_runner, + zones: Zones::new(), programs: Programs::new(), event_send: None, schedule_run_fut: None, @@ -394,11 +390,11 @@ impl ProgramRunnerActor { fn handle_finished_run( &mut self, - finished_run: SectionRunHandle, + finished_run: ZoneRunHandle, ctx: &mut ::Context, ) -> Option<()> { let current_run = self.run_queue.front_mut()?; - let last_run_handle = current_run.sec_run_handles.last()?; + let last_run_handle = current_run.zone_run_handles.last()?; if finished_run == *last_run_handle { current_run.state = RunState::Finished; debug!( @@ -452,8 +448,8 @@ pub struct ProgramRunner { #[allow(dead_code)] impl ProgramRunner { - pub fn new(section_runner: SectionRunner) -> Self { - let addr = ProgramRunnerActor::new(section_runner).start(); + pub fn new(zone_runner: ZoneRunner) -> Self { + let addr = ProgramRunnerActor::new(zone_runner).start(); Self { addr } } @@ -462,9 +458,9 @@ impl ProgramRunner { Ok(()) } - pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { + pub async fn update_zones(&mut self, new_zones: Zones) -> Result<()> { self.addr - .send(UpdateSections(new_sections)) + .send(UpdateZones(new_zones)) .await .map_err(Error::from) } @@ -512,9 +508,9 @@ mod test { use super::*; use crate::trace_listeners::{EventListener, Filters}; use sprinklers_core::{ - model::{Program, ProgramItem, Section}, + model::{Program, ProgramItem, Zone}, schedule::{every_day, DateTimeBound, Schedule}, - section_interface::{MockSectionInterface, SectionInterface}, + zone_interface::{MockZoneInterface, ZoneInterface}, }; use assert_matches::assert_matches; @@ -533,33 +529,33 @@ mod test { let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let _sub = tracing::subscriber::set_default(subscriber); - let interface = MockSectionInterface::new(6); - let mut sec_runner = SectionRunner::new(Arc::new(interface)); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let interface = MockZoneInterface::new(6); + let mut zone_runner = ZoneRunner::new(Arc::new(interface)); + let mut runner = ProgramRunner::new(zone_runner.clone()); yield_now().await; runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); yield_now().await; assert_eq!(quit_msg.get_count(), 1); } - fn make_sections_and_runner() -> (Sections, SectionRunner, Arc) { - let interface = Arc::new(MockSectionInterface::new(2)); - let sections: Sections = ordmap![ - 1 => Section { + fn make_zones_and_runner() -> (Zones, ZoneRunner, Arc) { + let interface = Arc::new(MockZoneInterface::new(2)); + let zones: Zones = ordmap![ + 1 => Zone { id: 1, - name: "Section 1".into(), + name: "Zone 1".into(), interface_id: 0, }.into(), - 2 => Section { + 2 => Zone { id: 2, - name: "Section 2".into(), + name: "Zone 2".into(), interface_id: 1, }.into() ]; - let sec_runner = SectionRunner::new(interface.clone()); - (sections, sec_runner, interface) + let zone_runner = ZoneRunner::new(interface.clone()); + (zones, zone_runner, interface) } fn make_program(num: ProgramId, sequence: Vec) -> ProgramRef { @@ -584,26 +580,26 @@ mod test { #[actix_rt::test] async fn test_run_program() { - let (sections, mut sec_runner, interface) = make_sections_and_runner(); - let mut sec_events = sec_runner.subscribe().await.unwrap(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, interface) = make_zones_and_runner(); + let mut zone_events = zone_runner.subscribe().await.unwrap(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program( 1, vec![ ProgramItem { - section_id: 1, + zone_id: 1, duration: Duration::from_secs(10), }, ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }, ], ); - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.run_program(program).await.unwrap(); yield_now().await; @@ -612,48 +608,48 @@ mod test { Ok(ProgramEvent::RunStart(prog)) if prog.id == 1 ); - assert_matches!(sec_events.try_recv(), Ok(SectionEvent::RunStart(_, _))); - assert_eq!(interface.get_section_state(0), true); + assert_matches!(zone_events.try_recv(), Ok(ZoneEvent::RunStart(_, _))); + assert_eq!(interface.get_zone_state(0), true); tokio::time::pause(); - assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); - assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunStart(_, _))); - assert_eq!(interface.get_section_state(0), false); - assert_eq!(interface.get_section_state(1), true); - assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); + assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunFinish(_, _))); + assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunStart(_, _))); + assert_eq!(interface.get_zone_state(0), false); + assert_eq!(interface.get_zone_state(1), true); + assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunFinish(_, _))); assert_matches!(prog_events.recv().await, Ok(ProgramEvent::RunFinish(_))); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); yield_now().await; } #[actix_rt::test] - async fn test_run_nonexistant_section() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + async fn test_run_nonexistant_zone() { + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program1 = make_program( 1, vec![ProgramItem { - section_id: 3, + zone_id: 3, duration: Duration::from_secs(10), }], ); let program2 = make_program( 2, vec![ProgramItem { - section_id: 1, + zone_id: 1, duration: Duration::from_secs(10), }], ); - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.run_program(program1).await.unwrap(); yield_now().await; // Should immediately start and finish running program - // due to nonexistant section + // due to nonexistant zone assert_matches!( prog_events.recv().await, Ok(ProgramEvent::RunStart(prog)) @@ -681,18 +677,18 @@ mod test { ); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_close_event_chan() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program(1, vec![]); - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap(); prog_events.recv().await.unwrap(); @@ -704,32 +700,32 @@ mod test { yield_now().await; runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_run_program_id() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program1 = make_program( 1, vec![ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }], ); let program2 = make_program( 2, vec![ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }], ); let programs = ordmap![ 1 => program1, 2 => program2 ]; - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.update_programs(programs).await.unwrap(); // First try a non-existant program id @@ -767,32 +763,32 @@ mod test { ); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_queue_program() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program1 = make_program( 1, vec![ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }], ); let program2 = make_program( 2, vec![ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }], ); let programs = ordmap![ 1 => program1, 2 => program2 ]; - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.update_programs(programs).await.unwrap(); runner.run_program_id(1).await.unwrap(); @@ -822,31 +818,31 @@ mod test { ); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_cancel_program() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut sec_events = sec_runner.subscribe().await.unwrap(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut zone_events = zone_runner.subscribe().await.unwrap(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program( 1, vec![ ProgramItem { - section_id: 1, + zone_id: 1, duration: Duration::from_secs(10), }, ProgramItem { - section_id: 2, + zone_id: 2, duration: Duration::from_secs(10), }, ], ); - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap(); yield_now().await; @@ -855,7 +851,7 @@ mod test { Ok(ProgramEvent::RunStart(prog)) if prog.id == 1 ); - assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_, _)); + assert_matches!(zone_events.try_recv().unwrap(), ZoneEvent::RunStart(_, _)); runner.cancel_program(program.id).await.unwrap(); yield_now().await; @@ -864,16 +860,16 @@ mod test { Ok(ProgramEvent::RunCancel(prog)) if prog.id == 1 ); - assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunCancel(_, _))); + assert_matches!(zone_events.recv().await, Ok(ZoneEvent::RunCancel(_, _))); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_scheduled_run() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let make_programs = |num: ProgramId, enabled: bool| { @@ -888,7 +884,7 @@ mod test { let program1 = make_program_with_schedule( num, vec![ProgramItem { - section_id: 1, + zone_id: 1, duration: Duration::from_micros(100), }], enabled, @@ -898,7 +894,7 @@ mod test { programs }; - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner .update_programs(make_programs(1, false)) .await @@ -926,13 +922,13 @@ mod test { ); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_scheduled_run_twice() { - let (sections, mut sec_runner, _) = make_sections_and_runner(); - let mut runner = ProgramRunner::new(sec_runner.clone()); + let (zones, mut zone_runner, _) = make_zones_and_runner(); + let mut runner = ProgramRunner::new(zone_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let now = chrono::Local::now(); @@ -947,7 +943,7 @@ mod test { let program1 = make_program_with_schedule( 1, vec![ProgramItem { - section_id: 1, + zone_id: 1, duration: Duration::from_micros(10), }], true, @@ -955,7 +951,7 @@ mod test { ); let programs = ordmap![ 1 => program1 ]; - runner.update_sections(sections.clone()).await.unwrap(); + runner.update_zones(zones.clone()).await.unwrap(); runner.update_programs(programs).await.unwrap(); let fut = async move { @@ -997,6 +993,6 @@ mod test { .unwrap(); runner.quit().await.unwrap(); - sec_runner.quit().await.unwrap(); + zone_runner.quit().await.unwrap(); } } diff --git a/sprinklers_actors/src/state_manager.rs b/sprinklers_actors/src/state_manager.rs index e942716..0a5ecaf 100644 --- a/sprinklers_actors/src/state_manager.rs +++ b/sprinklers_actors/src/state_manager.rs @@ -54,7 +54,8 @@ impl StateManager { update, resp_tx, }) - .await.map_err(eyre::Report::from)?; + .await + .map_err(eyre::Report::from)?; resp_rx.await.map_err(eyre::Report::from)? } diff --git a/sprinklers_actors/src/section_runner.rs b/sprinklers_actors/src/zone_runner.rs similarity index 54% rename from sprinklers_actors/src/section_runner.rs rename to sprinklers_actors/src/zone_runner.rs index db219e7..4511acc 100644 --- a/sprinklers_actors/src/section_runner.rs +++ b/sprinklers_actors/src/zone_runner.rs @@ -1,5 +1,5 @@ -use sprinklers_core::model::{SectionId, SectionRef}; -use sprinklers_core::section_interface::SectionInterface; +use sprinklers_core::model::{ZoneId, ZoneRef}; +use sprinklers_core::zone_interface::ZoneInterface; use actix::{ Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle, @@ -22,32 +22,32 @@ use tokio::{ use tracing::{debug, trace, warn}; #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] -pub struct SectionRunHandle(i32); +pub struct ZoneRunHandle(i32); -impl SectionRunHandle { +impl ZoneRunHandle { pub fn into_inner(self) -> i32 { self.0 } } #[derive(Clone, Debug)] -pub enum SectionEvent { - RunStart(SectionRunHandle, SectionRef), - RunFinish(SectionRunHandle, SectionRef), - RunPause(SectionRunHandle, SectionRef), - RunUnpause(SectionRunHandle, SectionRef), - RunCancel(SectionRunHandle, SectionRef), +pub enum ZoneEvent { + RunStart(ZoneRunHandle, ZoneRef), + RunFinish(ZoneRunHandle, ZoneRef), + RunPause(ZoneRunHandle, ZoneRef), + RunUnpause(ZoneRunHandle, ZoneRef), + RunCancel(ZoneRunHandle, ZoneRef), RunnerPause, RunnerUnpause, } -pub type SectionEventRecv = broadcast::Receiver; -type SectionEventSend = broadcast::Sender; +pub type ZoneEventRecv = broadcast::Receiver; +type ZoneEventSend = broadcast::Sender; const EVENT_CAPACITY: usize = 8; #[derive(Clone, Debug, PartialEq)] -pub enum SecRunState { +pub enum ZoneRunState { Waiting, Running { start_time: Instant, @@ -61,64 +61,64 @@ pub enum SecRunState { } #[derive(Clone, Debug)] -pub struct SecRun { - pub handle: SectionRunHandle, - pub section: SectionRef, +pub struct ZoneRun { + pub handle: ZoneRunHandle, + pub zone: ZoneRef, pub duration: Duration, pub total_duration: Duration, - pub state: SecRunState, + pub state: ZoneRunState, } -impl SecRun { - fn new(handle: SectionRunHandle, section: SectionRef, duration: Duration) -> Self { +impl ZoneRun { + fn new(handle: ZoneRunHandle, zone: ZoneRef, duration: Duration) -> Self { Self { handle, - section, + zone, duration, total_duration: duration, - state: SecRunState::Waiting, + state: ZoneRunState::Waiting, } } pub fn is_running(&self) -> bool { - matches!(self.state, SecRunState::Running{..}) + matches!(self.state, ZoneRunState::Running{..}) } #[allow(dead_code)] pub fn is_paused(&self) -> bool { - matches!(self.state, SecRunState::Paused{..}) + matches!(self.state, ZoneRunState::Paused{..}) } } -pub type SecRunQueue = im::Vector>; +pub type ZoneRunQueue = im::Vector>; #[derive(Clone, Debug)] -pub struct SecRunnerState { - pub run_queue: SecRunQueue, +pub struct ZoneRunnerState { + pub run_queue: ZoneRunQueue, pub paused: bool, } -impl Default for SecRunnerState { +impl Default for ZoneRunnerState { fn default() -> Self { Self { - run_queue: SecRunQueue::default(), + run_queue: ZoneRunQueue::default(), paused: false, } } } -pub type SecRunnerStateRecv = watch::Receiver; +pub type ZoneRunnerStateRecv = watch::Receiver; -struct SectionRunnerInner { - interface: Arc, - event_send: Option, - state_send: watch::Sender, +struct ZoneRunnerInner { + interface: Arc, + event_send: Option, + state_send: watch::Sender, delay_future: Option, did_change: bool, } -impl SectionRunnerInner { - fn send_event(&mut self, event: SectionEvent) { +impl ZoneRunnerInner { + fn send_event(&mut self, event: ZoneEvent) { if let Some(event_send) = &mut self.event_send { match event_send.send(event) { Ok(_) => {} @@ -129,7 +129,7 @@ impl SectionRunnerInner { } } - fn subscribe_event(&mut self) -> SectionEventRecv { + fn subscribe_event(&mut self) -> ZoneEventRecv { match &mut self.event_send { Some(event_send) => event_send.subscribe(), None => { @@ -140,56 +140,44 @@ impl SectionRunnerInner { } } - fn start_run(&mut self, run: &mut Arc) { - use SecRunState::*; + fn start_run(&mut self, run: &mut Arc) { + use ZoneRunState::*; let run = Arc::make_mut(run); - debug!(section_id = run.section.id, "starting running section"); - self.interface - .set_section_state(run.section.interface_id, true); + debug!(zone_id = run.zone.id, "starting running zone"); + self.interface.set_zone_state(run.zone.interface_id, true); run.state = Running { start_time: Instant::now(), }; - self.send_event(SectionEvent::RunStart( - run.handle.clone(), - run.section.clone(), - )); + self.send_event(ZoneEvent::RunStart(run.handle.clone(), run.zone.clone())); self.did_change = true; } - fn finish_run(&mut self, run: &mut Arc) { + fn finish_run(&mut self, run: &mut Arc) { let run = Arc::make_mut(run); if run.is_running() { - debug!(section_id = run.section.id, "finished running section"); - self.interface - .set_section_state(run.section.interface_id, false); - run.state = SecRunState::Finished; - self.send_event(SectionEvent::RunFinish( - run.handle.clone(), - run.section.clone(), - )); + debug!(zone_id = run.zone.id, "finished running zone"); + self.interface.set_zone_state(run.zone.interface_id, false); + run.state = ZoneRunState::Finished; + self.send_event(ZoneEvent::RunFinish(run.handle.clone(), run.zone.clone())); self.did_change = true; } else { warn!( - section_id = run.section.id, + zone_id = run.zone.id, state = debug(&run.state), "cannot finish run which is not running" ); } } - fn cancel_run(&mut self, run: &mut Arc) -> bool { + fn cancel_run(&mut self, run: &mut Arc) -> bool { let run = Arc::make_mut(run); if run.is_running() { - debug!(section_id = run.section.id, "cancelling running section"); - self.interface - .set_section_state(run.section.interface_id, false); + debug!(zone_id = run.zone.id, "cancelling running zone"); + self.interface.set_zone_state(run.zone.interface_id, false); } - if run.state != SecRunState::Cancelled { - run.state = SecRunState::Cancelled; - self.send_event(SectionEvent::RunCancel( - run.handle.clone(), - run.section.clone(), - )); + if run.state != ZoneRunState::Cancelled { + run.state = ZoneRunState::Cancelled; + self.send_event(ZoneEvent::RunCancel(run.handle.clone(), run.zone.clone())); self.did_change = true; true } else { @@ -197,21 +185,20 @@ impl SectionRunnerInner { } } - fn pause_run(&mut self, run: &mut Arc) { - use SecRunState::*; + fn pause_run(&mut self, run: &mut Arc) { + use ZoneRunState::*; let run = Arc::make_mut(run); let new_state = match run.state { Running { start_time } => { - debug!(section_id = run.section.id, "pausing running section"); - self.interface - .set_section_state(run.section.interface_id, false); + debug!(zone_id = run.zone.id, "pausing running zone"); + self.interface.set_zone_state(run.zone.interface_id, false); Paused { start_time, pause_time: Instant::now(), } } Waiting => { - debug!(section_id = run.section.id, "pausing waiting section"); + debug!(zone_id = run.zone.id, "pausing waiting zone"); Paused { start_time: Instant::now(), pause_time: Instant::now(), @@ -222,39 +209,32 @@ impl SectionRunnerInner { } }; run.state = new_state; - self.send_event(SectionEvent::RunPause( - run.handle.clone(), - run.section.clone(), - )); + self.send_event(ZoneEvent::RunPause(run.handle.clone(), run.zone.clone())); self.did_change = true; } - fn unpause_run(&mut self, run: &mut Arc) { - use SecRunState::*; + fn unpause_run(&mut self, run: &mut Arc) { + use ZoneRunState::*; let run = Arc::make_mut(run); match run.state { Paused { start_time, pause_time, } => { - debug!(section_id = run.section.id, "unpausing section"); - self.interface - .set_section_state(run.section.interface_id, true); + debug!(zone_id = run.zone.id, "unpausing zone"); + self.interface.set_zone_state(run.zone.interface_id, true); run.state = Running { start_time: Instant::now(), }; let ran_for = pause_time - start_time; run.duration -= ran_for; - self.send_event(SectionEvent::RunUnpause( - run.handle.clone(), - run.section.clone(), - )); + self.send_event(ZoneEvent::RunUnpause(run.handle.clone(), run.zone.clone())); } Waiting | Finished | Cancelled | Running { .. } => { warn!( - section_id = run.section.id, + zone_id = run.zone.id, state = debug(&run.state), - "can only unpause paused section" + "can only unpause paused zone" ); } } @@ -264,7 +244,7 @@ impl SectionRunnerInner { fn process_after_delay( &mut self, after: Duration, - ctx: &mut ::Context, + ctx: &mut ::Context, ) { let delay_future = ctx.notify_later(Process, after); if let Some(old_future) = self.delay_future.replace(delay_future) { @@ -272,32 +252,32 @@ impl SectionRunnerInner { } } - fn cancel_process(&mut self, ctx: &mut ::Context) { + fn cancel_process(&mut self, ctx: &mut ::Context) { if let Some(old_future) = self.delay_future.take() { ctx.cancel_future(old_future); } } } -struct SectionRunnerActor { - state: SecRunnerState, - inner: SectionRunnerInner, +struct ZoneRunnerActor { + state: ZoneRunnerState, + inner: ZoneRunnerInner, } -impl Actor for SectionRunnerActor { +impl Actor for ZoneRunnerActor { type Context = actix::Context; fn started(&mut self, _ctx: &mut Self::Context) { - trace!("section_runner starting"); - for i in 0..self.inner.interface.num_sections() { - self.inner.interface.set_section_state(i, false); + trace!("zone_runner starting"); + for i in 0..self.inner.interface.num_zones() { + self.inner.interface.set_zone_state(i, false); } } fn stopped(&mut self, _ctx: &mut Self::Context) { - trace!("section_runner stopped"); - for i in 0..self.inner.interface.num_sections() { - self.inner.interface.set_section_state(i, false); + trace!("zone_runner stopped"); + for i in 0..self.inner.interface.num_zones() { + self.inner.interface.set_zone_state(i, false); } } } @@ -306,7 +286,7 @@ impl Actor for SectionRunnerActor { #[rtype(result = "()")] struct Quit; -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = (); fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result { @@ -316,15 +296,15 @@ impl Handler for SectionRunnerActor { #[derive(Message, Debug, Clone)] #[rtype(result = "()")] -struct QueueRun(SectionRunHandle, SectionRef, Duration); +struct QueueRun(ZoneRunHandle, ZoneRef, Duration); -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = (); fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result { - let QueueRun(handle, section, duration) = msg; + let QueueRun(handle, zone, duration) = msg; - let run: Arc = SecRun::new(handle, section, duration).into(); + let run: Arc = ZoneRun::new(handle, zone, duration).into(); self.state.run_queue.push_back(run); self.inner.did_change = true; @@ -334,9 +314,9 @@ impl Handler for SectionRunnerActor { #[derive(Message, Debug, Clone)] #[rtype(result = "bool")] -struct CancelRun(SectionRunHandle); +struct CancelRun(ZoneRunHandle); -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = bool; fn handle(&mut self, msg: CancelRun, ctx: &mut Self::Context) -> Self::Result { @@ -358,25 +338,21 @@ impl Handler for SectionRunnerActor { #[derive(Message, Debug, Clone)] #[rtype(result = "usize")] -struct CancelBySection(SectionId); +struct CancelByZone(ZoneId); -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = usize; - fn handle(&mut self, msg: CancelBySection, ctx: &mut Self::Context) -> Self::Result { - let CancelBySection(section_id) = msg; + fn handle(&mut self, msg: CancelByZone, ctx: &mut Self::Context) -> Self::Result { + let CancelByZone(zone_id) = msg; let mut count = 0_usize; for run in self .state .run_queue .iter_mut() - .filter(|run| run.section.id == section_id) + .filter(|run| run.zone.id == zone_id) { - trace!( - handle = run.handle.0, - section_id, - "cancelling run by section" - ); + trace!(handle = run.handle.0, zone_id, "cancelling run by zone"); if self.inner.cancel_run(run) { count += 1; } @@ -390,11 +366,11 @@ impl Handler for SectionRunnerActor { #[rtype(result = "usize")] struct CancelAll; -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = usize; fn handle(&mut self, _msg: CancelAll, ctx: &mut Self::Context) -> Self::Result { - let mut old_runs = SecRunQueue::new(); + let mut old_runs = ZoneRunQueue::new(); swap(&mut old_runs, &mut self.state.run_queue); trace!(count = old_runs.len(), "cancelling all runs"); let mut count = 0usize; @@ -412,7 +388,7 @@ impl Handler for SectionRunnerActor { #[rtype(result = "()")] struct SetPaused(bool); -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = (); fn handle(&mut self, msg: SetPaused, ctx: &mut Self::Context) -> Self::Result { @@ -420,10 +396,10 @@ impl Handler for SectionRunnerActor { if pause != self.state.paused { if pause { self.state.paused = true; - self.inner.send_event(SectionEvent::RunnerPause); + self.inner.send_event(ZoneEvent::RunnerPause); } else { self.state.paused = false; - self.inner.send_event(SectionEvent::RunnerUnpause); + self.inner.send_event(ZoneEvent::RunnerUnpause); } self.inner.did_change = true; ctx.notify(Process); @@ -432,10 +408,10 @@ impl Handler for SectionRunnerActor { } #[derive(Message, Debug, Clone)] -#[rtype(result = "SectionEventRecv")] +#[rtype(result = "ZoneEventRecv")] struct Subscribe; -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = MessageResult; fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result { @@ -448,7 +424,7 @@ impl Handler for SectionRunnerActor { #[rtype(result = "()")] struct Process; -impl Handler for SectionRunnerActor { +impl Handler for ZoneRunnerActor { type Result = (); fn handle(&mut self, _msg: Process, ctx: &mut Self::Context) -> Self::Result { @@ -456,14 +432,11 @@ impl Handler for SectionRunnerActor { } } -impl SectionRunnerActor { - fn new( - interface: Arc, - state_send: watch::Sender, - ) -> Self { +impl ZoneRunnerActor { + fn new(interface: Arc, state_send: watch::Sender) -> Self { Self { - state: SecRunnerState::default(), - inner: SectionRunnerInner { + state: ZoneRunnerState::default(), + inner: ZoneRunnerInner { interface, event_send: None, state_send, @@ -474,7 +447,7 @@ impl SectionRunnerActor { } fn process_queue(&mut self, ctx: &mut actix::Context) { - use SecRunState::*; + use ZoneRunState::*; let state = &mut self.state; while let Some(current_run) = state.run_queue.front_mut() { let run_finished = match (¤t_run.state, state.paused) { @@ -528,23 +501,23 @@ impl SectionRunnerActor { } #[derive(Debug, Clone, Error)] -#[error("error communicating with SectionRunner: {0}")] +#[error("error communicating with ZoneRunner: {0}")] pub struct Error(#[from] actix::MailboxError); pub type Result = std::result::Result; #[derive(Clone)] -pub struct SectionRunner { - state_recv: SecRunnerStateRecv, - addr: Addr, +pub struct ZoneRunner { + state_recv: ZoneRunnerStateRecv, + addr: Addr, next_run_id: Arc, } #[allow(dead_code)] -impl SectionRunner { - pub fn new(interface: Arc) -> Self { - let (state_send, state_recv) = watch::channel(SecRunnerState::default()); - let addr = SectionRunnerActor::new(interface, state_send).start(); +impl ZoneRunner { + pub fn new(interface: Arc) -> Self { + let (state_send, state_recv) = watch::channel(ZoneRunnerState::default()); + let addr = ZoneRunnerActor::new(interface, state_send).start(); Self { state_recv, addr, @@ -556,49 +529,40 @@ impl SectionRunner { self.addr.send(Quit).map_err(From::from) } - fn queue_run_inner( - &mut self, - section: SectionRef, - duration: Duration, - ) -> (QueueRun, SectionRunHandle) { + fn queue_run_inner(&mut self, zone: ZoneRef, duration: Duration) -> (QueueRun, ZoneRunHandle) { let run_id = self.next_run_id.fetch_add(1, Ordering::SeqCst); - let handle = SectionRunHandle(run_id); - (QueueRun(handle.clone(), section, duration), handle) + let handle = ZoneRunHandle(run_id); + (QueueRun(handle.clone(), zone, duration), handle) } - pub fn do_queue_run(&mut self, section: SectionRef, duration: Duration) -> SectionRunHandle { - let (queue_run, handle) = self.queue_run_inner(section, duration); + pub fn do_queue_run(&mut self, zone: ZoneRef, duration: Duration) -> ZoneRunHandle { + let (queue_run, handle) = self.queue_run_inner(zone, duration); self.addr.do_send(queue_run); handle } pub fn queue_run( &mut self, - section: SectionRef, + zone: ZoneRef, duration: Duration, - ) -> impl Future> { - let (queue_run, handle) = self.queue_run_inner(section, duration); + ) -> impl Future> { + let (queue_run, handle) = self.queue_run_inner(zone, duration); self.addr .send(queue_run) .map_err(From::from) .map_ok(move |_| handle) } - pub fn do_cancel_run(&mut self, handle: SectionRunHandle) { + pub fn do_cancel_run(&mut self, handle: ZoneRunHandle) { self.addr.do_send(CancelRun(handle)) } - pub fn cancel_run(&mut self, handle: SectionRunHandle) -> impl Future> { + pub fn cancel_run(&mut self, handle: ZoneRunHandle) -> impl Future> { self.addr.send(CancelRun(handle)).map_err(From::from) } - pub fn cancel_by_section( - &mut self, - section_id: SectionId, - ) -> impl Future> { - self.addr - .send(CancelBySection(section_id)) - .map_err(From::from) + pub fn cancel_by_zone(&mut self, zone_id: ZoneId) -> impl Future> { + self.addr.send(CancelByZone(zone_id)).map_err(From::from) } pub fn cancel_all(&mut self) -> impl Future> { @@ -613,11 +577,11 @@ impl SectionRunner { self.addr.send(SetPaused(false)).map_err(From::from) } - pub fn subscribe(&mut self) -> impl Future> { + pub fn subscribe(&mut self) -> impl Future> { self.addr.send(Subscribe).map_err(From::from) } - pub fn get_state_recv(&self) -> SecRunnerStateRecv { + pub fn get_state_recv(&self) -> ZoneRunnerStateRecv { self.state_recv.clone() } } @@ -627,8 +591,8 @@ mod test { use super::*; use crate::trace_listeners::{EventListener, Filters}; use sprinklers_core::{ - model::{Section, Sections}, - section_interface::MockSectionInterface, + model::{Zone, Zones}, + zone_interface::MockZoneInterface, }; use assert_matches::assert_matches; @@ -639,43 +603,43 @@ mod test { async fn test_quit() { let quit_msg = EventListener::new( Filters::new() - .target("sprinklers_actors::section_runner") - .message("section_runner stopped"), + .target("sprinklers_actors::zone_runner") + .message("zone_runner stopped"), ); let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let _sub = tracing::subscriber::set_default(subscriber); - let interface = MockSectionInterface::new(6); - let mut runner = SectionRunner::new(Arc::new(interface)); + let interface = MockZoneInterface::new(6); + let mut runner = ZoneRunner::new(Arc::new(interface)); tokio::task::yield_now().await; runner.quit().await.unwrap(); assert_eq!(quit_msg.get_count(), 1); } - fn make_sections_and_interface() -> (Sections, Arc) { - let interface = Arc::new(MockSectionInterface::new(2)); - let sections: Sections = ordmap![ - 1 => Section { + fn make_zones_and_interface() -> (Zones, Arc) { + let interface = Arc::new(MockZoneInterface::new(2)); + let zones: Zones = ordmap![ + 1 => Zone { id: 1, - name: "Section 1".into(), + name: "Zone 1".into(), interface_id: 0, }.into(), - 2 => Section { + 2 => Zone { id: 2, - name: "Section 2".into(), + name: "Zone 2".into(), interface_id: 1, }.into() ]; - (sections, interface) + (zones, interface) } - fn assert_section_states(interface: &MockSectionInterface, states: &[bool]) { + fn assert_zone_states(interface: &MockZoneInterface, states: &[bool]) { for (id, state) in states.iter().enumerate() { assert_eq!( - interface.get_section_state(id as u32), + interface.get_zone_state(id as u32), *state, - "section interface id {} did not match", + "zone interface id {} did not match", id ); } @@ -691,236 +655,236 @@ mod test { #[actix_rt::test] async fn test_queue() { - let (sections, interface) = make_sections_and_interface(); - let mut runner = SectionRunner::new(interface.clone()); + let (zones, interface) = make_zones_and_interface(); + let mut runner = ZoneRunner::new(interface.clone()); - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); - // Queue single section, make sure it runs + // Queue single zone, make sure it runs runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[true, false]); + assert_zone_states(&interface, &[true, false]); advance(Duration::from_secs(11)).await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); - // Queue two sections, make sure they run one at a time + // Queue two zones, make sure they run one at a time runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); advance(Duration::from_secs(11)).await; - assert_section_states(&interface, &[true, false]); + assert_zone_states(&interface, &[true, false]); advance(Duration::from_secs(10)).await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_cancel_run() { - let (sections, interface) = make_sections_and_interface(); - let mut runner = SectionRunner::new(interface.clone()); + let (zones, interface) = make_zones_and_interface(); + let mut runner = ZoneRunner::new(interface.clone()); let run1 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let _run2 = runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let run3 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); runner.cancel_run(run1).await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[true, false]); + assert_zone_states(&interface, &[true, false]); runner.cancel_run(run3).await.unwrap(); advance(Duration::from_secs(11)).await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_cancel_all() { - let (sections, interface) = make_sections_and_interface(); - let mut runner = SectionRunner::new(interface.clone()); + let (zones, interface) = make_zones_and_interface(); + let mut runner = ZoneRunner::new(interface.clone()); runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); runner.cancel_all().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.cancel_all().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_pause() { - let (sections, interface) = make_sections_and_interface(); - let mut runner = SectionRunner::new(interface.clone()); + let (zones, interface) = make_zones_and_interface(); + let mut runner = ZoneRunner::new(interface.clone()); let _run1 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let run2 = runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let _run3 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); runner.pause().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); advance(Duration::from_secs(10)).await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.unpause().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); advance(Duration::from_secs(8)).await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); advance(Duration::from_secs(2)).await; - assert_section_states(&interface, &[true, false]); + assert_zone_states(&interface, &[true, false]); runner.pause().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); // cancel paused run runner.cancel_run(run2).await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.unpause().await.unwrap(); tokio::task::yield_now().await; - assert_section_states(&interface, &[false, true]); + assert_zone_states(&interface, &[false, true]); advance(Duration::from_secs(11)).await; - assert_section_states(&interface, &[false, false]); + assert_zone_states(&interface, &[false, false]); runner.quit().await.unwrap(); } #[actix_rt::test] async fn test_event() { - let (sections, interface) = make_sections_and_interface(); - let mut runner = SectionRunner::new(interface.clone()); + let (zones, interface) = make_zones_and_interface(); + let mut runner = ZoneRunner::new(interface.clone()); let mut event_recv = runner.subscribe().await.unwrap(); let run1 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); let run2 = runner - .queue_run(sections[&1].clone(), Duration::from_secs(10)) + .queue_run(zones[&1].clone(), Duration::from_secs(10)) .await .unwrap(); let run3 = runner - .queue_run(sections[&2].clone(), Duration::from_secs(10)) + .queue_run(zones[&2].clone(), Duration::from_secs(10)) .await .unwrap(); assert_matches!( event_recv.recv().await, - Ok(SectionEvent::RunStart(handle, _)) + Ok(ZoneEvent::RunStart(handle, _)) if handle == run1 ); runner.pause().await.unwrap(); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run1); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerPause)); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run1); runner.unpause().await.unwrap(); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunUnpause(handle, _)) if handle == run1); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerUnpause)); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunUnpause(handle, _)) if handle == run1); advance(Duration::from_secs(11)).await; - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunFinish(handle, _)) if handle == run1); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunStart(handle, _)) if handle == run2); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunFinish(handle, _)) if handle == run1); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunStart(handle, _)) if handle == run2); runner.pause().await.unwrap(); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerPause)); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run2); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerPause)); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run2); // cancel paused run runner.cancel_run(run2.clone()).await.unwrap(); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunCancel(handle, _)) if handle == run2); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunPause(handle, _)) if handle == run3); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunCancel(handle, _)) if handle == run2); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunPause(handle, _)) if handle == run3); runner.unpause().await.unwrap(); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunnerUnpause)); - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunUnpause(handle, _)) if handle == run3); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunnerUnpause)); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunUnpause(handle, _)) if handle == run3); advance(Duration::from_secs(11)).await; - assert_matches!(event_recv.recv().await, Ok(SectionEvent::RunFinish(handle, _)) if handle == run3); + assert_matches!(event_recv.recv().await, Ok(ZoneEvent::RunFinish(handle, _)) if handle == run3); runner.quit().await.unwrap(); } diff --git a/sprinklers_core/src/lib.rs b/sprinklers_core/src/lib.rs index 9ecb760..659fb47 100644 --- a/sprinklers_core/src/lib.rs +++ b/sprinklers_core/src/lib.rs @@ -1,4 +1,4 @@ pub mod model; pub mod schedule; pub mod serde; -pub mod section_interface; +pub mod zone_interface; diff --git a/sprinklers_core/src/model/mod.rs b/sprinklers_core/src/model/mod.rs index 1b36f18..d0dab42 100644 --- a/sprinklers_core/src/model/mod.rs +++ b/sprinklers_core/src/model/mod.rs @@ -1,7 +1,7 @@ //! Domain specific data models mod program; -mod section; +mod zone; pub use program::*; -pub use section::*; +pub use zone::*; diff --git a/sprinklers_core/src/model/program.rs b/sprinklers_core/src/model/program.rs index 44dbd3f..f75051e 100644 --- a/sprinklers_core/src/model/program.rs +++ b/sprinklers_core/src/model/program.rs @@ -1,4 +1,4 @@ -use super::section::SectionId; +use super::zone::ZoneId; use crate::schedule::Schedule; use serde::{Deserialize, Serialize}; use std::{sync::Arc, time::Duration}; @@ -6,7 +6,9 @@ use std::{sync::Arc, time::Duration}; #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ProgramItem { - pub section_id: SectionId, + // TODO: update nomenclature + #[serde(rename = "sectionId")] + pub zone_id: ZoneId, #[serde(with = "crate::serde::duration_secs")] pub duration: Duration, } diff --git a/sprinklers_core/src/model/section.rs b/sprinklers_core/src/model/section.rs deleted file mode 100644 index 7499365..0000000 --- a/sprinklers_core/src/model/section.rs +++ /dev/null @@ -1,27 +0,0 @@ -//! Data models for sprinklers sections -//! -//! A section represents a group of sprinkler heads actuated by a single -//! valve. Physically controllable (or virtual) valves are handled by implementations of -//! [SectionInterface](../../section_interface/trait.SectionInterface.html), but the model -//! describes a logical section and how it maps to a physical one. - -use crate::section_interface::SecId; -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -/// Identifying integer type for a Section -pub type SectionId = u32; - -/// A single logical section -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct Section { - pub id: SectionId, - pub name: String, - /// ID number of the corresponding physical section - pub interface_id: SecId, -} - -pub type SectionRef = Arc
; - -pub type Sections = im::OrdMap; diff --git a/sprinklers_core/src/model/zone.rs b/sprinklers_core/src/model/zone.rs new file mode 100644 index 0000000..daf94d4 --- /dev/null +++ b/sprinklers_core/src/model/zone.rs @@ -0,0 +1,27 @@ +//! Data models for sprinklers zones +//! +//! A zone represents a group of sprinkler heads actuated by a single +//! valve. Physically controllable (or virtual) valves are handled by implementations of +//! [ZoneInterface](../../zone_interface/trait.ZoneInterface.html), but the model +//! describes a logical zone and how it maps to a physical one. + +use crate::zone_interface::ZoneNum; +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +/// Identifying integer type for a Zone +pub type ZoneId = u32; + +/// A single logical zone +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Zone { + pub id: ZoneId, + pub name: String, + /// ID number of the corresponding physical zone + pub interface_id: ZoneNum, +} + +pub type ZoneRef = Arc; + +pub type Zones = im::OrdMap; diff --git a/sprinklers_core/src/schedule.rs b/sprinklers_core/src/schedule.rs index bcf3532..8ac1697 100644 --- a/sprinklers_core/src/schedule.rs +++ b/sprinklers_core/src/schedule.rs @@ -1,9 +1,6 @@ //! Scheduling for events to run at certain intervals in the future -use chrono::{ - Date, DateTime, Datelike, Local, NaiveDateTime, NaiveTime, TimeZone, - Weekday, -}; +use chrono::{Date, DateTime, Datelike, Local, NaiveDateTime, NaiveTime, TimeZone, Weekday}; use serde::{Deserialize, Serialize}; use std::cmp; use std::iter::FromIterator; diff --git a/sprinklers_core/src/section_interface.rs b/sprinklers_core/src/section_interface.rs deleted file mode 100644 index 59e84f1..0000000 --- a/sprinklers_core/src/section_interface.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::iter::repeat_with; -use std::sync::atomic::{AtomicBool, Ordering}; -use tracing::debug; - -pub type SecId = u32; - -pub trait SectionInterface: Send + Sync { - fn num_sections(&self) -> SecId; - fn set_section_state(&self, id: SecId, running: bool); - fn get_section_state(&self, id: SecId) -> bool; -} - -pub struct MockSectionInterface { - states: Vec, -} - -impl MockSectionInterface { - #[allow(dead_code)] - pub fn new(num_sections: SecId) -> Self { - Self { - states: repeat_with(|| AtomicBool::new(false)) - .take(num_sections as usize) - .collect(), - } - } -} - -impl SectionInterface for MockSectionInterface { - fn num_sections(&self) -> SecId { - self.states.len() as SecId - } - fn set_section_state(&self, id: SecId, running: bool) { - debug!(id, running, "setting section"); - self.states[id as usize].store(running, Ordering::SeqCst); - } - fn get_section_state(&self, id: SecId) -> bool { - self.states[id as usize].load(Ordering::SeqCst) - } -} - -#[cfg(test)] -mod test { - use super::*; - - #[test] - fn test_mock_section_interface() { - let iface = MockSectionInterface::new(6); - assert_eq!(iface.num_sections(), 6); - for i in 0..6u32 { - assert_eq!(iface.get_section_state(i), false); - } - for i in 0..6u32 { - iface.set_section_state(i, true); - } - for i in 0..6u32 { - assert_eq!(iface.get_section_state(i), true); - } - for i in 0..6u32 { - iface.set_section_state(i, false); - } - for i in 0..6u32 { - assert_eq!(iface.get_section_state(i), false); - } - } -} diff --git a/sprinklers_core/src/zone_interface.rs b/sprinklers_core/src/zone_interface.rs new file mode 100644 index 0000000..7f74c18 --- /dev/null +++ b/sprinklers_core/src/zone_interface.rs @@ -0,0 +1,65 @@ +use std::iter::repeat_with; +use std::sync::atomic::{AtomicBool, Ordering}; +use tracing::debug; + +pub type ZoneNum = u32; + +pub trait ZoneInterface: Send + Sync { + fn num_zones(&self) -> ZoneNum; + fn set_zone_state(&self, id: ZoneNum, running: bool); + fn get_zone_state(&self, id: ZoneNum) -> bool; +} + +pub struct MockZoneInterface { + states: Vec, +} + +impl MockZoneInterface { + #[allow(dead_code)] + pub fn new(num_zones: ZoneNum) -> Self { + Self { + states: repeat_with(|| AtomicBool::new(false)) + .take(num_zones as usize) + .collect(), + } + } +} + +impl ZoneInterface for MockZoneInterface { + fn num_zones(&self) -> ZoneNum { + self.states.len() as ZoneNum + } + fn set_zone_state(&self, id: ZoneNum, running: bool) { + debug!(id, running, "setting zone"); + self.states[id as usize].store(running, Ordering::SeqCst); + } + fn get_zone_state(&self, id: ZoneNum) -> bool { + self.states[id as usize].load(Ordering::SeqCst) + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_mock_zone_interface() { + let iface = MockZoneInterface::new(6); + assert_eq!(iface.num_zones(), 6); + for i in 0..6u32 { + assert_eq!(iface.get_zone_state(i), false); + } + for i in 0..6u32 { + iface.set_zone_state(i, true); + } + for i in 0..6u32 { + assert_eq!(iface.get_zone_state(i), true); + } + for i in 0..6u32 { + iface.set_zone_state(i, false); + } + for i in 0..6u32 { + assert_eq!(iface.get_zone_state(i), false); + } + } +} diff --git a/sprinklers_database/src/lib.rs b/sprinklers_database/src/lib.rs index 84dd6fb..c59cd61 100644 --- a/sprinklers_database/src/lib.rs +++ b/sprinklers_database/src/lib.rs @@ -1,8 +1,8 @@ mod migration; mod migrations; mod program; -mod section; mod sql_json; +mod zone; pub use migration::*; pub use migrations::create_migrations; @@ -10,7 +10,7 @@ pub use program::*; pub use rusqlite::Connection as DbConn; -use sprinklers_core::model::Sections; +use sprinklers_core::model::Zones; use eyre::Result; use rusqlite::NO_PARAMS; @@ -28,16 +28,16 @@ pub fn setup_db() -> Result { Ok(conn) } -pub fn query_sections(conn: &DbConn) -> Result { +pub fn query_zones(conn: &DbConn) -> Result { let mut statement = conn.prepare_cached( "SELECT s.id, s.name, s.interface_id \ FROM sections AS s;", )?; - let rows = statement.query_map(NO_PARAMS, section::from_sql)?; - let mut sections = Sections::new(); + let rows = statement.query_map(NO_PARAMS, zone::from_sql)?; + let mut zones = Zones::new(); for row in rows { - let section = row?; - sections.insert(section.id, section.into()); + let zone = row?; + zones.insert(zone.id, zone.into()); } - Ok(sections) + Ok(zones) } diff --git a/sprinklers_database/src/migration.rs b/sprinklers_database/src/migration.rs index dc3346c..e334909 100644 --- a/sprinklers_database/src/migration.rs +++ b/sprinklers_database/src/migration.rs @@ -3,7 +3,7 @@ use rusqlite::{params, Connection}; use std::collections::BTreeMap; use std::ops::Bound::{Excluded, Unbounded}; use thiserror::Error; -use tracing::{debug, trace, info}; +use tracing::{debug, info, trace}; #[derive(Debug, Error)] pub enum MigrationError { @@ -152,7 +152,11 @@ impl Migrations { last_ver = *ver; } if last_ver != NO_MIGRATIONS { - info!(old_version = db_version, new_version = last_ver, "applied database migrations"); + info!( + old_version = db_version, + new_version = last_ver, + "applied database migrations" + ); set_db_version(&trans, last_ver)?; } trans.commit()?; diff --git a/sprinklers_database/src/migrations/mod.rs b/sprinklers_database/src/migrations/mod.rs index 900c7d1..df8f209 100644 --- a/sprinklers_database/src/migrations/mod.rs +++ b/sprinklers_database/src/migrations/mod.rs @@ -20,4 +20,4 @@ pub fn create_migrations() -> Migrations { migs.add(include_file_migration!(6, "0006-fix_view_program_seq")); // INSERT MIGRATION ABOVE -- DO NOT EDIT THIS COMMENT migs -} \ No newline at end of file +} diff --git a/sprinklers_database/src/program.rs b/sprinklers_database/src/program.rs index 5d5d826..a8e1502 100644 --- a/sprinklers_database/src/program.rs +++ b/sprinklers_database/src/program.rs @@ -2,7 +2,7 @@ use super::sql_json::SqlJson; use super::DbConn; use sprinklers_core::{ model::{ - Program, ProgramId, ProgramItem, ProgramSequence, ProgramUpdateData, Programs, SectionId, + Program, ProgramId, ProgramItem, ProgramSequence, ProgramUpdateData, Programs, ZoneId, }, schedule::Schedule, }; @@ -58,7 +58,7 @@ fn update_as_sql(id: ProgramId, program: &ProgramUpdateData) -> SqlProgramUpdate struct SqlProgramItem { program_id: ProgramId, seq_num: isize, - section_id: SectionId, + zone_id: ZoneId, duration: f64, } @@ -70,7 +70,7 @@ impl<'a> IntoIterator for &'a SqlProgramItem { vec![ &self.program_id as &dyn ToSql, &self.seq_num, - &self.section_id, + &self.zone_id, &self.duration, ] .into_iter() @@ -85,7 +85,7 @@ fn item_as_sql( SqlProgramItem { program_id, seq_num: (seq_num + 1) as isize, - section_id: program_item.section_id, + zone_id: program_item.zone_id, duration: program_item.duration.as_secs_f64(), } } diff --git a/sprinklers_database/src/section.rs b/sprinklers_database/src/section.rs deleted file mode 100644 index f072fab..0000000 --- a/sprinklers_database/src/section.rs +++ /dev/null @@ -1,16 +0,0 @@ -use sprinklers_core::model::Section; - -use rusqlite::{Error as SqlError, Row as SqlRow, ToSql}; - -pub fn from_sql<'a>(row: &SqlRow<'a>) -> Result { - Ok(Section { - id: row.get(0)?, - name: row.get(1)?, - interface_id: row.get(2)?, - }) -} - -#[allow(dead_code)] -pub fn as_sql(section: &Section) -> Vec<&dyn ToSql> { - vec![§ion.id, §ion.name, §ion.interface_id] -} diff --git a/sprinklers_database/src/zone.rs b/sprinklers_database/src/zone.rs new file mode 100644 index 0000000..4c11643 --- /dev/null +++ b/sprinklers_database/src/zone.rs @@ -0,0 +1,16 @@ +use sprinklers_core::model::Zone; + +use rusqlite::{Error as SqlError, Row as SqlRow, ToSql}; + +pub fn from_sql<'a>(row: &SqlRow<'a>) -> Result { + Ok(Zone { + id: row.get(0)?, + name: row.get(1)?, + interface_id: row.get(2)?, + }) +} + +#[allow(dead_code)] +pub fn as_sql(zone: &Zone) -> Vec<&dyn ToSql> { + vec![&zone.id, &zone.name, &zone.interface_id] +} diff --git a/sprinklers_linux/src/lib.rs b/sprinklers_linux/src/lib.rs index 0ac0d1f..0adc29e 100644 --- a/sprinklers_linux/src/lib.rs +++ b/sprinklers_linux/src/lib.rs @@ -1,4 +1,4 @@ -use sprinklers_core::section_interface::{SecId, SectionInterface}; +use sprinklers_core::zone_interface::{ZoneInterface, ZoneNum}; use eyre::WrapErr; use gpio_cdev::{LineHandle, LineRequestFlags}; @@ -9,12 +9,12 @@ pub struct LinuxGpio { lines: Vec, } -impl SectionInterface for LinuxGpio { - fn num_sections(&self) -> SecId { - self.lines.len() as SecId +impl ZoneInterface for LinuxGpio { + fn num_zones(&self) -> ZoneNum { + self.lines.len() as ZoneNum } - fn set_section_state(&self, id: SecId, running: bool) { + fn set_zone_state(&self, id: ZoneNum, running: bool) { if let Some(line) = &self.lines.get(id as usize) { trace!( line = line.line().offset(), @@ -26,11 +26,11 @@ impl SectionInterface for LinuxGpio { error!("error setting GPIO line value: {}", err); } } else { - warn!("set_section_state: invalid section id: {}", id); + warn!("set_zone_state: invalid zone id: {}", id); } } - fn get_section_state(&self, id: SecId) -> bool { + fn get_zone_state(&self, id: ZoneNum) -> bool { if let Some(line) = &self.lines.get(id as usize) { match line.get_value() { Ok(active) => active != 0, @@ -40,7 +40,7 @@ impl SectionInterface for LinuxGpio { } } } else { - warn!("get_section_state: invalid section id: {}", id); + warn!("get_zone_state: invalid zone id: {}", id); false } } diff --git a/sprinklers_mqtt/src/actor.rs b/sprinklers_mqtt/src/actor.rs index 8df6871..649769b 100644 --- a/sprinklers_mqtt/src/actor.rs +++ b/sprinklers_mqtt/src/actor.rs @@ -1,6 +1,6 @@ use super::{event_loop::EventLoopTask, request, MqttInterface}; use actix::{Actor, ActorContext, ActorFuture, AsyncContext, Handler, WrapFuture}; -use request::{ErrorCode, RequestContext, RequestError, WithRequestId, Response}; +use request::{ErrorCode, RequestContext, RequestError, Response, WithRequestId}; use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; @@ -55,7 +55,7 @@ impl MqttActor { match &response { Response::Success(res) => { debug!(rid, response = display(res), "success response:"); - }, + } Response::Error(err) => { debug!(rid, "request error: {}", err); } diff --git a/sprinklers_mqtt/src/lib.rs b/sprinklers_mqtt/src/lib.rs index da03268..582f9b4 100644 --- a/sprinklers_mqtt/src/lib.rs +++ b/sprinklers_mqtt/src/lib.rs @@ -1,17 +1,17 @@ mod actor; mod event_loop; mod request; -mod section_runner_json; mod topics; mod update_listener; +mod zone_runner_json; pub use request::RequestContext; pub use update_listener::UpdateListener; use self::topics::Topics; -use section_runner_json::SecRunnerStateJson; -use sprinklers_actors::section_runner::SecRunnerState; -use sprinklers_core::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; +use sprinklers_actors::zone_runner::ZoneRunnerState; +use sprinklers_core::model::{Program, ProgramId, Programs, Zone, ZoneId, Zones}; +use zone_runner_json::ZoneRunnerStateJson; use actix::{Actor, Addr}; use eyre::WrapErr; @@ -73,32 +73,28 @@ impl MqttInterface { self.client.cancel().await } - pub async fn publish_sections(&mut self, sections: &Sections) -> eyre::Result<()> { - let section_ids: Vec<_> = sections.keys().cloned().collect(); - self.publish_data(self.topics.sections(), §ion_ids) + pub async fn publish_zones(&mut self, zones: &Zones) -> eyre::Result<()> { + let zone_ids: Vec<_> = zones.keys().cloned().collect(); + self.publish_data(self.topics.zones(), &zone_ids) .await - .wrap_err("failed to publish section ids")?; - for section in sections.values() { - self.publish_section(section).await?; + .wrap_err("failed to publish zone ids")?; + for zone in zones.values() { + self.publish_zone(zone).await?; } Ok(()) } - pub async fn publish_section(&mut self, section: &Section) -> eyre::Result<()> { - self.publish_data(self.topics.section_data(section.id), section) + pub async fn publish_zone(&mut self, zone: &Zone) -> eyre::Result<()> { + self.publish_data(self.topics.zone_data(zone.id), zone) .await - .wrap_err("failed to publish section") + .wrap_err("failed to publish zone") } - // Section state can be derived from section runner state... - pub async fn publish_section_state( - &mut self, - section_id: SectionId, - state: bool, - ) -> eyre::Result<()> { - self.publish_data(self.topics.section_state(section_id), &state) + // Zone state can be derived from zone runner state... + pub async fn publish_zone_state(&mut self, zone_id: ZoneId, state: bool) -> eyre::Result<()> { + self.publish_data(self.topics.zone_state(zone_id), &state) .await - .wrap_err("failed to publish section state") + .wrap_err("failed to publish zone state") } pub async fn publish_programs(&mut self, programs: &Programs) -> eyre::Result<()> { @@ -165,11 +161,11 @@ impl MqttInterface { .wrap_err("failed to publish program next run") } - pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> { - let json: SecRunnerStateJson = sr_state.into(); - self.publish_data(self.topics.section_runner(), &json) + pub async fn publish_zone_runner(&mut self, sr_state: &ZoneRunnerState) -> eyre::Result<()> { + let json: ZoneRunnerStateJson = sr_state.into(); + self.publish_data(self.topics.zone_runner(), &json) .await - .wrap_err("failed to publish section runner") + .wrap_err("failed to publish zone runner") } async fn publish_response(&mut self, resp: request::ResponseWithId) -> eyre::Result<()> { diff --git a/sprinklers_mqtt/src/request/mod.rs b/sprinklers_mqtt/src/request/mod.rs index 4532519..430acae 100644 --- a/sprinklers_mqtt/src/request/mod.rs +++ b/sprinklers_mqtt/src/request/mod.rs @@ -1,5 +1,5 @@ -use sprinklers_actors::{ProgramRunner, SectionRunner, StateManager}; -use sprinklers_core::model::Sections; +use sprinklers_actors::{ProgramRunner, StateManager, ZoneRunner}; +use sprinklers_core::model::Zones; use futures_util::{ready, FutureExt}; use num_derive::FromPrimitive; @@ -7,11 +7,11 @@ use serde::{Deserialize, Serialize}; use std::{fmt, future::Future, pin::Pin, task::Poll}; mod programs; -mod sections; +mod zones; pub struct RequestContext { - pub sections: Sections, - pub section_runner: SectionRunner, + pub zones: Zones, + pub zone_runner: ZoneRunner, pub program_runner: ProgramRunner, pub state_manager: StateManager, } @@ -31,8 +31,8 @@ pub enum ErrorCode { NoPermission = 107, NotFound = 109, // NotUnique = 110, - NoSuchSection = 120, - NoSuchSectionRun = 121, + NoSuchZone = 120, + NoSuchZoneRun = 121, NoSuchProgram = 122, Internal = 200, NotImplemented = 201, @@ -259,10 +259,15 @@ pub type ResponseWithId = WithRequestId; #[derive(Debug, Deserialize, Serialize)] #[serde(rename_all = "camelCase", tag = "type")] pub enum Request { - RunSection(sections::RunSectionRequest), - CancelSection(sections::CancelSectionRequest), - CancelSectionRunId(sections::CancelSectionRunIdRequest), - PauseSectionRunner(sections::PauseSectionRunnerRequest), + // TODO: update nomenclature + #[serde(rename = "runSection")] + RunZone(zones::RunZoneRequest), + #[serde(rename = "cancelSection")] + CancelZone(zones::CancelZoneRequest), + #[serde(rename = "cancelSectionRunId")] + CancelZoneRunId(zones::CancelZoneRunIdRequest), + #[serde(rename = "pauseSectionRunner")] + PauseZoneRunner(zones::PauseZoneRunnerRequest), RunProgram(programs::RunProgramRequest), CancelProgram(programs::CancelProgramRequest), UpdateProgram(programs::UpdateProgramRequest), @@ -273,10 +278,10 @@ impl IRequest for Request { fn exec(self, ctx: &mut RequestContext) -> RequestFuture { match self { - Request::RunSection(req) => req.exec_erased(ctx), - Request::CancelSection(req) => req.exec_erased(ctx), - Request::CancelSectionRunId(req) => req.exec_erased(ctx), - Request::PauseSectionRunner(req) => req.exec_erased(ctx), + Request::RunZone(req) => req.exec_erased(ctx), + Request::CancelZone(req) => req.exec_erased(ctx), + Request::CancelZoneRunId(req) => req.exec_erased(ctx), + Request::PauseZoneRunner(req) => req.exec_erased(ctx), Request::RunProgram(req) => req.exec_erased(ctx), Request::CancelProgram(req) => req.exec_erased(ctx), Request::UpdateProgram(req) => req.exec_erased(ctx), diff --git a/sprinklers_mqtt/src/request/sections.rs b/sprinklers_mqtt/src/request/sections.rs deleted file mode 100644 index f299321..0000000 --- a/sprinklers_mqtt/src/request/sections.rs +++ /dev/null @@ -1,161 +0,0 @@ -use super::*; -use sprinklers_actors::section_runner::SectionRunHandle; -use sprinklers_core::model::{self, SectionRef}; -use sprinklers_core::serde::duration_secs; - -use eyre::WrapErr; -use serde::{Deserialize, Serialize}; -use std::time::Duration; - -#[derive(Copy, Clone, Debug, Serialize, Deserialize)] -#[serde(transparent)] -pub struct SectionId(pub model::SectionId); - -impl SectionId { - fn get_section(self, sections: &Sections) -> Result { - sections.get(&self.0).cloned().ok_or_else(|| { - RequestError::with_name(ErrorCode::NoSuchSection, "no such section", "section") - }) - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct RunSectionRequest { - pub section_id: SectionId, - #[serde(with = "duration_secs")] - pub duration: Duration, -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct RunSectionResponse { - pub message: String, - pub run_id: SectionRunHandle, -} - -impl IRequest for RunSectionRequest { - type Response = RunSectionResponse; - fn exec(self, ctx: &mut RequestContext) -> RequestFuture { - let mut section_runner = ctx.section_runner.clone(); - let section = self.section_id.get_section(&ctx.sections); - let duration = self.duration; - Box::pin(async move { - let section = section?; - let handle = section_runner - .queue_run(section.clone(), duration) - .await - .wrap_err("could not queue run")?; - Ok(RunSectionResponse { - message: format!("running section '{}' for {:?}", §ion.name, duration), - run_id: handle, - }) - }) - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CancelSectionRequest { - pub section_id: SectionId, -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CancelSectionResponse { - pub message: String, - pub cancelled: usize, -} - -impl IRequest for CancelSectionRequest { - type Response = CancelSectionResponse; - fn exec(self, ctx: &mut RequestContext) -> RequestFuture { - let mut section_runner = ctx.section_runner.clone(); - let section = self.section_id.get_section(&ctx.sections); - Box::pin(async move { - let section = section?; - let cancelled = section_runner - .cancel_by_section(section.id) - .await - .wrap_err("could not cancel section")?; - Ok(CancelSectionResponse { - message: format!( - "cancelled {} runs for section '{}'", - cancelled, section.name - ), - cancelled, - }) - }) - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CancelSectionRunIdRequest { - pub run_id: SectionRunHandle, -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct CancelSectionRunIdResponse { - pub message: String, - pub cancelled: bool, -} - -impl IRequest for CancelSectionRunIdRequest { - type Response = ResponseMessage; - fn exec(self, ctx: &mut RequestContext) -> RequestFuture { - let mut section_runner = ctx.section_runner.clone(); - Box::pin(async move { - let cancelled = section_runner - .cancel_run(self.run_id) - .await - .wrap_err("could not cancel section run")?; - if cancelled { - Ok(ResponseMessage::new("cancelled section run")) - } else { - Err(RequestError::with_name( - ErrorCode::NoSuchSectionRun, - "no such section run", - "section run", - )) - } - }) - } -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct PauseSectionRunnerRequest { - pub paused: bool, -} - -#[derive(Debug, Deserialize, Serialize)] -#[serde(rename_all = "camelCase")] -pub struct PauseSectionRunnerResponse { - pub message: String, - pub paused: bool, -} - -impl IRequest for PauseSectionRunnerRequest { - type Response = PauseSectionRunnerResponse; - fn exec(self, ctx: &mut RequestContext) -> RequestFuture { - let mut section_runner = ctx.section_runner.clone(); - let paused = self.paused; - Box::pin(async move { - if paused { - section_runner.pause().await - } else { - section_runner.unpause().await - } - .wrap_err("could not pause/unpause section runner")?; - Ok(PauseSectionRunnerResponse { - message: format!( - "{} section runner", - if paused { "paused" } else { "unpaused" } - ), - paused, - }) - }) - } -} diff --git a/sprinklers_mqtt/src/request/zones.rs b/sprinklers_mqtt/src/request/zones.rs new file mode 100644 index 0000000..958a6ae --- /dev/null +++ b/sprinklers_mqtt/src/request/zones.rs @@ -0,0 +1,160 @@ +use super::*; +use sprinklers_actors::zone_runner::ZoneRunHandle; +use sprinklers_core::model::{self, ZoneRef}; +use sprinklers_core::serde::duration_secs; + +use eyre::WrapErr; +use serde::{Deserialize, Serialize}; +use std::time::Duration; + +#[derive(Copy, Clone, Debug, Serialize, Deserialize)] +#[serde(transparent)] +pub struct ZoneId(pub model::ZoneId); + +impl ZoneId { + fn get_zone(self, zones: &Zones) -> Result { + zones + .get(&self.0) + .cloned() + .ok_or_else(|| RequestError::with_name(ErrorCode::NoSuchZone, "no such zone", "zone")) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RunZoneRequest { + // TODO: update nomenclature + #[serde(rename = "sectionId")] + pub zone_id: ZoneId, + #[serde(with = "duration_secs")] + pub duration: Duration, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct RunZoneResponse { + pub message: String, + pub run_id: ZoneRunHandle, +} + +impl IRequest for RunZoneRequest { + type Response = RunZoneResponse; + fn exec(self, ctx: &mut RequestContext) -> RequestFuture { + let mut zone_runner = ctx.zone_runner.clone(); + let zone = self.zone_id.get_zone(&ctx.zones); + let duration = self.duration; + Box::pin(async move { + let zone = zone?; + let handle = zone_runner + .queue_run(zone.clone(), duration) + .await + .wrap_err("could not queue run")?; + Ok(RunZoneResponse { + message: format!("running zone '{}' for {:?}", &zone.name, duration), + run_id: handle, + }) + }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelZoneRequest { + // TODO: update nomenclature + #[serde(rename = "sectionId")] + pub zone_id: ZoneId, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelZoneResponse { + pub message: String, + pub cancelled: usize, +} + +impl IRequest for CancelZoneRequest { + type Response = CancelZoneResponse; + fn exec(self, ctx: &mut RequestContext) -> RequestFuture { + let mut zone_runner = ctx.zone_runner.clone(); + let zone = self.zone_id.get_zone(&ctx.zones); + Box::pin(async move { + let zone = zone?; + let cancelled = zone_runner + .cancel_by_zone(zone.id) + .await + .wrap_err("could not cancel zone")?; + Ok(CancelZoneResponse { + message: format!("cancelled {} runs for zone '{}'", cancelled, zone.name), + cancelled, + }) + }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelZoneRunIdRequest { + pub run_id: ZoneRunHandle, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct CancelZoneRunIdResponse { + pub message: String, + pub cancelled: bool, +} + +impl IRequest for CancelZoneRunIdRequest { + type Response = ResponseMessage; + fn exec(self, ctx: &mut RequestContext) -> RequestFuture { + let mut zone_runner = ctx.zone_runner.clone(); + Box::pin(async move { + let cancelled = zone_runner + .cancel_run(self.run_id) + .await + .wrap_err("could not cancel zone run")?; + if cancelled { + Ok(ResponseMessage::new("cancelled zone run")) + } else { + Err(RequestError::with_name( + ErrorCode::NoSuchZoneRun, + "no such zone run", + "zone run", + )) + } + }) + } +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PauseZoneRunnerRequest { + pub paused: bool, +} + +#[derive(Debug, Deserialize, Serialize)] +#[serde(rename_all = "camelCase")] +pub struct PauseZoneRunnerResponse { + pub message: String, + pub paused: bool, +} + +impl IRequest for PauseZoneRunnerRequest { + type Response = PauseZoneRunnerResponse; + fn exec(self, ctx: &mut RequestContext) -> RequestFuture { + let mut zone_runner = ctx.zone_runner.clone(); + let paused = self.paused; + Box::pin(async move { + if paused { + zone_runner.pause().await + } else { + zone_runner.unpause().await + } + .wrap_err("could not pause/unpause zone runner")?; + Ok(PauseZoneRunnerResponse { + message: format!("{} zone runner", if paused { "paused" } else { "unpaused" }), + paused, + }) + }) + } +} diff --git a/sprinklers_mqtt/src/topics.rs b/sprinklers_mqtt/src/topics.rs index 3fad15a..0312105 100644 --- a/sprinklers_mqtt/src/topics.rs +++ b/sprinklers_mqtt/src/topics.rs @@ -1,4 +1,4 @@ -use sprinklers_core::model::{ProgramId, SectionId}; +use sprinklers_core::model::{ProgramId, ZoneId}; #[derive(Clone, Debug)] pub struct Topics @@ -20,16 +20,19 @@ where format!("{}/connected", self.prefix.as_ref()) } - pub fn sections(&self) -> String { + pub fn zones(&self) -> String { + // TODO: change nomenclature format!("{}/sections", self.prefix.as_ref()) } - pub fn section_data(&self, section_id: SectionId) -> String { - format!("{}/sections/{}", self.prefix.as_ref(), section_id) + pub fn zone_data(&self, zone_id: ZoneId) -> String { + // TODO: change nomenclature + format!("{}/sections/{}", self.prefix.as_ref(), zone_id) } - pub fn section_state(&self, section_id: SectionId) -> String { - format!("{}/sections/{}/state", self.prefix.as_ref(), section_id) + pub fn zone_state(&self, zone_id: ZoneId) -> String { + // TODO: change nomenclature + format!("{}/sections/{}/state", self.prefix.as_ref(), zone_id) } pub fn programs(&self) -> String { @@ -49,7 +52,8 @@ where format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id) } - pub fn section_runner(&self) -> String { + pub fn zone_runner(&self) -> String { + // TODO: change nomenclature format!("{}/section_runner", self.prefix.as_ref()) } diff --git a/sprinklers_mqtt/src/update_listener.rs b/sprinklers_mqtt/src/update_listener.rs index bb3da63..67d3c0f 100644 --- a/sprinklers_mqtt/src/update_listener.rs +++ b/sprinklers_mqtt/src/update_listener.rs @@ -1,7 +1,7 @@ use super::MqttInterface; use sprinklers_actors::{ program_runner::{ProgramEvent, ProgramEventRecv}, - section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv}, + zone_runner::{ZoneEvent, ZoneEventRecv, ZoneRunnerState, ZoneRunnerStateRecv}, }; use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; @@ -36,33 +36,27 @@ impl Actor for UpdateListenerActor { } } -impl StreamHandler> for UpdateListenerActor { - fn handle( - &mut self, - event: Result, - ctx: &mut Self::Context, - ) { +impl StreamHandler> for UpdateListenerActor { + fn handle(&mut self, event: Result, ctx: &mut Self::Context) { let event = match event { Ok(ev) => ev, Err(broadcast::RecvError::Closed) => unreachable!(), Err(broadcast::RecvError::Lagged(n)) => { - warn!("section events lagged by {}", n); + warn!("zone events lagged by {}", n); return; } }; - if let Some((sec_id, state)) = match event { - SectionEvent::RunStart(_, sec) | SectionEvent::RunUnpause(_, sec) => { - Some((sec.id, true)) - } - SectionEvent::RunFinish(_, sec) - | SectionEvent::RunPause(_, sec) - | SectionEvent::RunCancel(_, sec) => Some((sec.id, false)), - SectionEvent::RunnerPause | SectionEvent::RunnerUnpause => None, + if let Some((zone_id, state)) = match event { + ZoneEvent::RunStart(_, zone) | ZoneEvent::RunUnpause(_, zone) => Some((zone.id, true)), + ZoneEvent::RunFinish(_, zone) + | ZoneEvent::RunPause(_, zone) + | ZoneEvent::RunCancel(_, zone) => Some((zone.id, false)), + ZoneEvent::RunnerPause | ZoneEvent::RunnerUnpause => None, } { let mut mqtt_interface = self.mqtt_interface.clone(); let fut = async move { - if let Err(err) = mqtt_interface.publish_section_state(sec_id, state).await { - warn!("could not publish section state: {}", err); + if let Err(err) = mqtt_interface.publish_zone_state(zone_id, state).await { + warn!("could not publish zone state: {}", err); } }; ctx.spawn(wrap_future(fut)); @@ -121,12 +115,12 @@ impl StreamHandler> for UpdateListene } } -impl StreamHandler for UpdateListenerActor { - fn handle(&mut self, state: SecRunnerState, ctx: &mut Self::Context) { +impl StreamHandler for UpdateListenerActor { + fn handle(&mut self, state: ZoneRunnerState, ctx: &mut Self::Context) { let mut mqtt_interface = self.mqtt_interface.clone(); let fut = async move { - if let Err(err) = mqtt_interface.publish_section_runner(&state).await { - warn!("could not publish section runner: {}", err); + if let Err(err) = mqtt_interface.publish_zone_runner(&state).await { + warn!("could not publish zone runner: {}", err); } }; ctx.spawn(wrap_future(fut)); @@ -202,13 +196,13 @@ where } } -impl Listenable for SectionEventRecv { +impl Listenable for ZoneEventRecv { fn listen(self, ctx: &mut ::Context) { ctx.add_stream(self); } } -impl Listenable for SecRunnerStateRecv { +impl Listenable for ZoneRunnerStateRecv { fn listen(self, ctx: &mut ::Context) { ctx.add_stream(self); } @@ -243,12 +237,12 @@ impl UpdateListener { self.addr.do_send(Listen(listener)); } - pub fn listen_section_events(&mut self, section_events: SectionEventRecv) { - self.listen(section_events); + pub fn listen_zone_events(&mut self, zone_events: ZoneEventRecv) { + self.listen(zone_events); } - pub fn listen_section_runner(&mut self, sec_runner_state_recv: SecRunnerStateRecv) { - self.listen(sec_runner_state_recv); + pub fn listen_zone_runner(&mut self, zone_runner_state_recv: ZoneRunnerStateRecv) { + self.listen(zone_runner_state_recv); } pub fn listen_programs(&mut self, programs: watch::Receiver) { diff --git a/sprinklers_mqtt/src/section_runner_json.rs b/sprinklers_mqtt/src/zone_runner_json.rs similarity index 63% rename from sprinklers_mqtt/src/section_runner_json.rs rename to sprinklers_mqtt/src/zone_runner_json.rs index ada486c..3be28fd 100644 --- a/sprinklers_mqtt/src/section_runner_json.rs +++ b/sprinklers_mqtt/src/zone_runner_json.rs @@ -1,5 +1,5 @@ -use sprinklers_actors::section_runner::{SecRun, SecRunState, SecRunnerState}; -use sprinklers_core::model::SectionId; +use sprinklers_actors::zone_runner::{ZoneRun, ZoneRunState, ZoneRunnerState}; +use sprinklers_core::model::ZoneId; use chrono::{DateTime, Utc}; use serde::Serialize; @@ -8,9 +8,11 @@ use tokio::time::Instant; #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] -pub struct SecRunJson { +pub struct ZoneRunJson { id: i32, - section: SectionId, + // TODO: change nomenclature + #[serde(rename = "section")] + zone: ZoneId, total_duration: f64, duration: f64, start_time: Option, @@ -18,19 +20,19 @@ pub struct SecRunJson { unpause_time: Option, } -impl SecRunJson { - fn from_run(run: &SecRun) -> Option { +impl ZoneRunJson { + fn from_run(run: &ZoneRun) -> Option { let (now, system_now) = (Instant::now(), SystemTime::now()); let instant_to_string = |instant: Instant| -> String { DateTime::::from(system_now - now.duration_since(instant)).to_rfc3339() }; let (start_time, pause_time) = match run.state { - SecRunState::Finished | SecRunState::Cancelled => { + ZoneRunState::Finished | ZoneRunState::Cancelled => { return None; } - SecRunState::Waiting => (None, None), - SecRunState::Running { start_time } => (Some(instant_to_string(start_time)), None), - SecRunState::Paused { + ZoneRunState::Waiting => (None, None), + ZoneRunState::Running { start_time } => (Some(instant_to_string(start_time)), None), + ZoneRunState::Paused { start_time, pause_time, } => ( @@ -40,7 +42,7 @@ impl SecRunJson { }; Some(Self { id: run.handle.clone().into_inner(), - section: run.section.id, + zone: run.zone.id, total_duration: run.total_duration.as_secs_f64(), duration: run.duration.as_secs_f64(), start_time, @@ -52,18 +54,18 @@ impl SecRunJson { #[derive(Clone, Debug, Serialize)] #[serde(rename_all = "camelCase")] -pub struct SecRunnerStateJson { - queue: Vec, - current: Option, +pub struct ZoneRunnerStateJson { + queue: Vec, + current: Option, paused: bool, } -impl From<&SecRunnerState> for SecRunnerStateJson { - fn from(state: &SecRunnerState) -> Self { +impl From<&ZoneRunnerState> for ZoneRunnerStateJson { + fn from(state: &ZoneRunnerState) -> Self { let mut run_queue = state.run_queue.iter(); - let current = run_queue.next().and_then(|run| SecRunJson::from_run(run)); + let current = run_queue.next().and_then(|run| ZoneRunJson::from_run(run)); let queue = run_queue - .filter_map(|run| SecRunJson::from_run(run)) + .filter_map(|run| ZoneRunJson::from_run(run)) .collect(); Self { queue, diff --git a/sprinklers_rs/sprinklers_rs.default.json b/sprinklers_rs/sprinklers_rs.default.json index 046fc9a..9083569 100644 --- a/sprinklers_rs/sprinklers_rs.default.json +++ b/sprinklers_rs/sprinklers_rs.default.json @@ -5,8 +5,8 @@ "client_id": "sprinklers_rs-0001", "device_id": "sprinklers_rs-0001" }, - "section_interface": { + "zone_interface": { "provider": "Mock", - "num_sections": 6 + "num_zones": 6 } } \ No newline at end of file diff --git a/sprinklers_rs/src/main.rs b/sprinklers_rs/src/main.rs index bc58d66..fea0316 100644 --- a/sprinklers_rs/src/main.rs +++ b/sprinklers_rs/src/main.rs @@ -2,9 +2,9 @@ #![warn(clippy::print_stdout)] // mod option_future; -mod section_interface; mod settings; mod state_manager; +mod zone_interface; use sprinklers_actors as actors; use sprinklers_database as database; @@ -31,43 +31,41 @@ async fn main() -> Result<()> { let db_conn = database::setup_db()?; - let sections = database::query_sections(&db_conn)?; - for sec in sections.values() { - debug!(section = debug(&sec), "read section"); + let zones = database::query_zones(&db_conn)?; + for zone in zones.values() { + debug!(zone = debug(&zone), "read zone"); } - let section_interface = settings.section_interface.build()?; - let mut section_runner = actors::SectionRunner::new(section_interface); - let mut program_runner = actors::ProgramRunner::new(section_runner.clone()); + let zone_interface = settings.zone_interface.build()?; + let mut zone_runner = actors::ZoneRunner::new(zone_interface); + let mut program_runner = actors::ProgramRunner::new(zone_runner.clone()); let state_manager = crate::state_manager::StateManagerThread::start(db_conn); let mqtt_options = settings.mqtt; - // TODO: have ability to update sections / other data + // TODO: have ability to update zones / other data let request_context = mqtt::RequestContext { - sections: sections.clone(), - section_runner: section_runner.clone(), + zones: zones.clone(), + zone_runner: zone_runner.clone(), program_runner: program_runner.clone(), state_manager: state_manager.clone(), }; let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context); let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone()); - update_listener.listen_section_events(section_runner.subscribe().await?); - update_listener.listen_section_runner(section_runner.get_state_recv()); + update_listener.listen_zone_events(zone_runner.subscribe().await?); + update_listener.listen_zone_runner(zone_runner.get_state_recv()); update_listener.listen_programs(state_manager.get_programs()); update_listener.listen_program_events(program_runner.subscribe().await?); // Only listen to programs now so above subscriptions get events program_runner.listen_programs(state_manager.get_programs()); - program_runner.update_sections(sections.clone()).await?; + program_runner.update_zones(zones.clone()).await?; // TODO: update listener should probably do this - mqtt_interface.publish_sections(§ions).await?; - for section_id in sections.keys() { - mqtt_interface - .publish_section_state(*section_id, false) - .await?; + mqtt_interface.publish_zones(&zones).await?; + for zone_id in zones.keys() { + mqtt_interface.publish_zone_state(*zone_id, false).await?; } info!("sprinklers_rs initialized"); @@ -79,7 +77,7 @@ async fn main() -> Result<()> { mqtt_interface.quit().await?; drop(state_manager); program_runner.quit().await?; - section_runner.quit().await?; + zone_runner.quit().await?; actix::System::current().stop(); Ok(()) diff --git a/sprinklers_rs/src/section_interface.rs b/sprinklers_rs/src/section_interface.rs deleted file mode 100644 index b965b6f..0000000 --- a/sprinklers_rs/src/section_interface.rs +++ /dev/null @@ -1,37 +0,0 @@ -use sprinklers_core::section_interface::{MockSectionInterface, SecId, SectionInterface}; - -#[cfg(feature = "sprinklers_linux")] -use sprinklers_linux::LinuxGpioConfig; - -use serde::{Deserialize, Serialize}; -use std::sync::Arc; - -#[derive(Debug, Serialize, Deserialize)] -#[serde(tag = "provider")] -pub enum SectionInterfaceConfig { - Mock { - num_sections: SecId, - }, - #[cfg(feature = "sprinklers_linux")] - LinuxGpio(LinuxGpioConfig), -} - -impl Default for SectionInterfaceConfig { - fn default() -> Self { - SectionInterfaceConfig::Mock { num_sections: 6 } - } -} - -impl SectionInterfaceConfig { - pub fn build(self) -> eyre::Result> { - Ok(match self { - SectionInterfaceConfig::Mock { num_sections } => { - Arc::new(MockSectionInterface::new(num_sections)) - } - #[cfg(feature = "sprinklers_linux")] - SectionInterfaceConfig::LinuxGpio(config) => { - Arc::new(config.build()?) - } - }) - } -} diff --git a/sprinklers_rs/src/settings.rs b/sprinklers_rs/src/settings.rs index 2b3c00c..d8da279 100644 --- a/sprinklers_rs/src/settings.rs +++ b/sprinklers_rs/src/settings.rs @@ -1,4 +1,4 @@ -use crate::section_interface::SectionInterfaceConfig; +use crate::zone_interface::ZoneInterfaceConfig; use serde::{Deserialize, Serialize}; use tracing::trace; @@ -17,7 +17,7 @@ pub struct Settings { #[serde(with = "MqttOptions")] pub mqtt: sprinklers_mqtt::Options, #[serde(default)] - pub section_interface: SectionInterfaceConfig, + pub zone_interface: ZoneInterfaceConfig, } impl Settings { diff --git a/sprinklers_rs/src/zone_interface.rs b/sprinklers_rs/src/zone_interface.rs new file mode 100644 index 0000000..e4040d4 --- /dev/null +++ b/sprinklers_rs/src/zone_interface.rs @@ -0,0 +1,33 @@ +use sprinklers_core::zone_interface::{MockZoneInterface, ZoneInterface, ZoneNum}; + +#[cfg(feature = "sprinklers_linux")] +use sprinklers_linux::LinuxGpioConfig; + +use serde::{Deserialize, Serialize}; +use std::sync::Arc; + +#[derive(Debug, Serialize, Deserialize)] +#[serde(tag = "provider")] +pub enum ZoneInterfaceConfig { + Mock { + num_zones: ZoneNum, + }, + #[cfg(feature = "sprinklers_linux")] + LinuxGpio(LinuxGpioConfig), +} + +impl Default for ZoneInterfaceConfig { + fn default() -> Self { + ZoneInterfaceConfig::Mock { num_zones: 6 } + } +} + +impl ZoneInterfaceConfig { + pub fn build(self) -> eyre::Result> { + Ok(match self { + ZoneInterfaceConfig::Mock { num_zones } => Arc::new(MockZoneInterface::new(num_zones)), + #[cfg(feature = "sprinklers_linux")] + ZoneInterfaceConfig::LinuxGpio(config) => Arc::new(config.build()?), + }) + } +}