Browse Source

Implement publishing program next run

master
Alex Mikhalev 4 years ago
parent
commit
9f37cd2805
  1. 4
      sprinklers_actors/src/program_runner.rs
  2. 11
      sprinklers_mqtt/src/lib.rs
  3. 5
      sprinklers_mqtt/src/topics.rs
  4. 29
      sprinklers_mqtt/src/update_listener.rs
  5. 5
      sprinklers_rs/src/main.rs

4
sprinklers_actors/src/program_runner.rs

@ -20,6 +20,7 @@ pub enum ProgramEvent { @@ -20,6 +20,7 @@ pub enum ProgramEvent {
RunStart(ProgramRef),
RunFinish(ProgramRef),
RunCancel(ProgramRef),
NextRun(ProgramRef, chrono::DateTime<chrono::Local>),
}
pub type ProgramEventRecv = broadcast::Receiver<ProgramEvent>;
@ -136,7 +137,7 @@ impl ProgramRunnerInner { @@ -136,7 +137,7 @@ impl ProgramRunnerInner {
fn update_schedules(&mut self, ctx: &mut actix::Context<ProgramRunnerActor>) {
let mut scheduled_run_queue = DelayQueue::with_capacity(self.programs.len());
for (_, prog) in &self.programs {
for (_, prog) in self.programs.clone() {
if !prog.enabled {
continue;
}
@ -148,6 +149,7 @@ impl ProgramRunnerInner { @@ -148,6 +149,7 @@ impl ProgramRunnerInner {
let delay = (next_run - ref_time).to_std().unwrap();
trace!("will run program in {:?}", delay);
scheduled_run_queue.insert(prog.clone(), delay);
self.send_event(ProgramEvent::NextRun(prog, next_run));
}
let fut = actix::fut::wrap_stream(scheduled_run_queue)
.map(|item, act: &mut ProgramRunnerActor, ctx| act.handle_scheduled_run(item, ctx))

11
sprinklers_mqtt/src/lib.rs

@ -154,6 +154,17 @@ impl MqttInterface { @@ -154,6 +154,17 @@ impl MqttInterface {
.wrap_err("failed to publish program running")
}
pub async fn publish_program_next_run(
&mut self,
program_id: ProgramId,
next_run: chrono::DateTime<chrono::Local>,
) -> eyre::Result<()> {
let payload = next_run.to_rfc3339();
self.publish_data(self.topics.program_next_run(program_id), &payload)
.await
.wrap_err("failed to publish program next run")
}
pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> {
let json: SecRunnerStateJson = sr_state.into();
self.publish_data(self.topics.section_runner(), &json)

5
sprinklers_mqtt/src/topics.rs

@ -44,6 +44,11 @@ where @@ -44,6 +44,11 @@ where
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id)
}
pub fn program_next_run(&self, program_id: ProgramId) -> String {
// TODO: reconcile naming convention
format!("{}/programs/{}/nextRun", self.prefix.as_ref(), program_id)
}
pub fn section_runner(&self) -> String {
format!("{}/section_runner", self.prefix.as_ref())
}

29
sprinklers_mqtt/src/update_listener.rs

@ -7,7 +7,6 @@ use sprinklers_actors::{ @@ -7,7 +7,6 @@ use sprinklers_actors::{
use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use futures_util::TryFutureExt;
use sprinklers_core::model::Programs;
use std::sync::Arc;
use tokio::sync::{broadcast, watch};
use tracing::{trace, warn};
@ -85,18 +84,38 @@ impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListene @@ -85,18 +84,38 @@ impl StreamHandler<Result<ProgramEvent, broadcast::RecvError>> for UpdateListene
return;
}
};
let (program_id, running) = match event {
ProgramEvent::RunStart(prog) => (prog.id, true),
ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false),
};
let mut mqtt_interface = self.mqtt_interface.clone();
let fut = async move {
enum Publish {
Running(bool),
NextRun(chrono::DateTime<chrono::Local>),
}
let (program_id, publish) = match event {
ProgramEvent::RunStart(prog) => (prog.id, Publish::Running(true)),
ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => {
(prog.id, Publish::Running(false))
}
ProgramEvent::NextRun(prog, next_run) => (prog.id, Publish::NextRun(next_run)),
};
match publish {
Publish::Running(running) => {
if let Err(err) = mqtt_interface
.publish_program_running(program_id, running)
.await
{
warn!("could not publish program running: {}", err);
}
}
Publish::NextRun(next_run) => {
if let Err(err) = mqtt_interface
.publish_program_next_run(program_id, next_run)
.await
{
warn!("could not publish program next run: {}", err);
}
}
}
};
ctx.spawn(wrap_future(fut));
}

5
sprinklers_rs/src/main.rs

@ -39,8 +39,6 @@ async fn main() -> Result<()> { @@ -39,8 +39,6 @@ async fn main() -> Result<()> {
let state_manager = crate::state_manager::StateManagerThread::start(db_conn);
program_runner.listen_programs(state_manager.get_programs());
let mqtt_options = mqtt::Options {
broker_host: "localhost".into(),
broker_port: 1883,
@ -62,6 +60,9 @@ async fn main() -> Result<()> { @@ -62,6 +60,9 @@ async fn main() -> Result<()> {
update_listener.listen_programs(state_manager.get_programs());
update_listener.listen_program_events(program_runner.subscribe().await?);
// Only listen to programs now so above subscriptions get events
program_runner.listen_programs(state_manager.get_programs());
program_runner.update_sections(sections.clone()).await?;
// TODO: update listener should probably do this
mqtt_interface.publish_sections(&sections).await?;

Loading…
Cancel
Save