Browse Source

Publish program running state

master
Alex Mikhalev 4 years ago
parent
commit
cc88083b19
  1. 20
      src/main.rs
  2. 27
      src/mqtt_interface.rs
  3. 58
      src/update_listener.rs

20
src/main.rs

@ -15,7 +15,7 @@ mod update_listener;
use eyre::Result; use eyre::Result;
use std::sync::Arc; use std::sync::Arc;
use tracing::info; use tracing::{debug, info};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use update_listener::UpdateListener; use update_listener::UpdateListener;
@ -34,7 +34,7 @@ async fn main() -> Result<()> {
let sections = database::query_sections(&conn)?; let sections = database::query_sections(&conn)?;
for sec in sections.values() { for sec in sections.values() {
info!(section = debug(&sec), "read section"); debug!(section = debug(&sec), "read section");
} }
// TODO: Section interface which actual does something. Preferrably selectable somehow // TODO: Section interface which actual does something. Preferrably selectable somehow
@ -53,7 +53,7 @@ async fn main() -> Result<()> {
} }
for prog in programs.values() { for prog in programs.values() {
info!(program = debug(&prog), "read program"); debug!(program = debug(&prog), "read program");
} }
let mqtt_options = mqtt_interface::Options { let mqtt_options = mqtt_interface::Options {
@ -64,10 +64,11 @@ async fn main() -> Result<()> {
}; };
let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?; let mut mqtt_interface = mqtt_interface::MqttInterfaceTask::start(mqtt_options).await?;
let update_listener = UpdateListener::start( let update_listener = {
section_runner.subscribe().await?, let section_events = section_runner.subscribe().await?;
mqtt_interface.clone(), let program_events = program_runner.subscribe().await?;
); UpdateListener::start(section_events, program_events, mqtt_interface.clone())
};
program_runner.update_sections(sections.clone()).await?; program_runner.update_sections(sections.clone()).await?;
mqtt_interface.publish_sections(&sections).await?; mqtt_interface.publish_sections(&sections).await?;
@ -77,6 +78,11 @@ async fn main() -> Result<()> {
.await?; .await?;
} }
program_runner.update_programs(programs.clone()).await?; program_runner.update_programs(programs.clone()).await?;
for program_id in programs.keys() {
mqtt_interface
.publish_program_running(*program_id, false)
.await?;
}
mqtt_interface.publish_programs(&programs).await?; mqtt_interface.publish_programs(&programs).await?;
info!("sprinklers_rs initialized"); info!("sprinklers_rs initialized");

27
src/mqtt_interface.rs

@ -48,6 +48,10 @@ where
fn program_data(&self, program_id: ProgramId) -> String { fn program_data(&self, program_id: ProgramId) -> String {
format!("{}/programs/{}", self.prefix.as_ref(), program_id) format!("{}/programs/{}", self.prefix.as_ref(), program_id)
} }
fn program_running(&self, program_id: ProgramId) -> String {
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id)
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -59,7 +63,7 @@ pub struct Options {
} }
async fn event_loop_task( async fn event_loop_task(
mut interface: MqttInterface, interface: MqttInterface,
mut event_loop: rumqttc::EventLoop, mut event_loop: rumqttc::EventLoop,
) -> eyre::Result<()> { ) -> eyre::Result<()> {
use rumqttc::{ConnectionError, Event}; use rumqttc::{ConnectionError, Event};
@ -77,9 +81,7 @@ async fn event_loop_task(
// HACK: this really should just be await // HACK: this really should just be await
// but that can sometimes deadlock if the publish channel is full // but that can sometimes deadlock if the publish channel is full
let mut interface = interface.clone(); let mut interface = interface.clone();
let fut = async move { let fut = async move { interface.publish_connected(true).await };
interface.publish_connected(true).await
};
tokio::spawn(fut); tokio::spawn(fut);
} }
//.await?; //.await?;
@ -216,6 +218,23 @@ impl MqttInterface {
.await?; .await?;
Ok(()) Ok(())
} }
pub async fn publish_program_running(
&mut self,
program_id: ProgramId,
running: bool,
) -> eyre::Result<()> {
let payload = running.to_string();
self.client
.publish(
self.topics.program_running(program_id),
QoS::AtLeastOnce,
true,
payload,
)
.await?;
Ok(())
}
} }
pub struct MqttInterfaceTask { pub struct MqttInterfaceTask {

58
src/update_listener.rs

@ -1,5 +1,6 @@
use crate::{ use crate::{
mqtt_interface::MqttInterface, mqtt_interface::MqttInterface,
program_runner::{ProgramEvent, ProgramEventRecv},
section_runner::{SectionEvent, SectionEventRecv}, section_runner::{SectionEvent, SectionEventRecv},
}; };
use tokio::{ use tokio::{
@ -11,6 +12,8 @@ use tracing::{trace, trace_span, warn};
use tracing_futures::Instrument; use tracing_futures::Instrument;
struct UpdateListenerTask { struct UpdateListenerTask {
section_events: SectionEventRecv,
program_events: ProgramEventRecv,
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
running: bool, running: bool,
} }
@ -48,37 +51,62 @@ impl UpdateListenerTask {
Ok(()) Ok(())
} }
async fn run_impl(&mut self, mut section_events: SectionEventRecv) -> eyre::Result<()> { async fn handle_program_event(
&mut self,
event: Result<ProgramEvent, broadcast::RecvError>,
) -> eyre::Result<()> {
let event = match event {
Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => {
trace!("section events channel closed");
self.running = false;
return Ok(());
}
Err(broadcast::RecvError::Lagged(n)) => {
warn!("section events lagged by {}", n);
return Ok(());
}
};
let (program_id, running) = match event {
ProgramEvent::RunStart(prog) => (prog.id, true),
ProgramEvent::RunFinish(prog) | ProgramEvent::RunCancel(prog) => (prog.id, false),
};
self.mqtt_interface
.publish_program_running(program_id, running)
.await?;
Ok(())
}
async fn run_impl(&mut self) -> eyre::Result<()> {
while self.running { while self.running {
select! { select! {
section_event = section_events.recv() => { section_event = self.section_events.recv() => {
self.handle_section_event(section_event).await? self.handle_section_event(section_event).await?
} }
program_event = self.program_events.recv() => {
self.handle_program_event(program_event).await?
}
}; };
} }
Ok(()) Ok(())
} }
async fn run_or_quit( async fn run_or_quit(mut self, mut quit_rx: oneshot::Receiver<()>) -> eyre::Result<()> {
mut self,
mut quit_rx: oneshot::Receiver<()>,
section_events: SectionEventRecv,
) -> eyre::Result<()> {
select! { select! {
_ = &mut quit_rx => { _ = &mut quit_rx => {
self.running = false; self.running = false;
Ok(()) Ok(())
} }
res = self.run_impl(section_events) => { res = self.run_impl() => {
res res
} }
} }
} }
async fn run(self, quit_rx: oneshot::Receiver<()>, section_events: SectionEventRecv) { async fn run(self, quit_rx: oneshot::Receiver<()>) {
let span = trace_span!("update_listener task"); let span = trace_span!("update_listener task");
self.run_or_quit(quit_rx, section_events) self.run_or_quit(quit_rx)
.instrument(span) .instrument(span)
.await .await
.expect("error in UpdateListenerTask"); .expect("error in UpdateListenerTask");
@ -91,13 +119,19 @@ pub struct UpdateListener {
} }
impl UpdateListener { impl UpdateListener {
pub fn start(section_events: SectionEventRecv, mqtt_interface: MqttInterface) -> Self { pub fn start(
section_events: SectionEventRecv,
program_events: ProgramEventRecv,
mqtt_interface: MqttInterface,
) -> Self {
let task = UpdateListenerTask { let task = UpdateListenerTask {
section_events,
program_events,
mqtt_interface, mqtt_interface,
running: true, running: true,
}; };
let (quit_tx, quit_rx) = oneshot::channel(); let (quit_tx, quit_rx) = oneshot::channel();
let join_handle = tokio::spawn(task.run(quit_rx, section_events)); let join_handle = tokio::spawn(task.run(quit_rx));
Self { Self {
quit_tx, quit_tx,
join_handle, join_handle,

Loading…
Cancel
Save