Browse Source

Implement section runner state publishing

master
Alex Mikhalev 4 years ago
parent
commit
7541618ec8
  1. 9
      src/main.rs
  2. 18
      src/mqtt_interface.rs
  3. 66
      src/section_runner.rs
  4. 75
      src/section_runner_json.rs
  5. 29
      src/update_listener.rs

9
src/main.rs

@ -9,6 +9,7 @@ mod program_runner;
mod schedule; mod schedule;
mod section_interface; mod section_interface;
mod section_runner; mod section_runner;
mod section_runner_json;
#[cfg(test)] #[cfg(test)]
mod trace_listeners; mod trace_listeners;
mod update_listener; mod update_listener;
@ -67,7 +68,13 @@ async fn main() -> Result<()> {
let update_listener = { let update_listener = {
let section_events = section_runner.subscribe().await?; let section_events = section_runner.subscribe().await?;
let program_events = program_runner.subscribe().await?; let program_events = program_runner.subscribe().await?;
UpdateListener::start(section_events, program_events, mqtt_interface.clone()) let sec_runner_state = section_runner.state_receiver();
UpdateListener::start(
section_events,
program_events,
sec_runner_state,
mqtt_interface.clone(),
)
}; };
program_runner.update_sections(sections.clone()).await?; program_runner.update_sections(sections.clone()).await?;

18
src/mqtt_interface.rs

@ -1,4 +1,8 @@
use crate::model::{Program, ProgramId, Programs, Section, SectionId, Sections}; use crate::{
model::{Program, ProgramId, Programs, Section, SectionId, Sections},
section_runner::SecRunnerState,
section_runner_json::SecRunnerStateJson,
};
use eyre::WrapErr; use eyre::WrapErr;
use rumqttc::{LastWill, MqttOptions, QoS}; use rumqttc::{LastWill, MqttOptions, QoS};
use std::{ use std::{
@ -52,6 +56,10 @@ where
fn program_running(&self, program_id: ProgramId) -> String { fn program_running(&self, program_id: ProgramId) -> String {
format!("{}/programs/{}/running", self.prefix.as_ref(), program_id) format!("{}/programs/{}/running", self.prefix.as_ref(), program_id)
} }
fn section_runner(&self) -> String {
format!("{}/section_runner", self.prefix.as_ref())
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -165,6 +173,7 @@ impl MqttInterface {
.wrap_err("failed to publish section") .wrap_err("failed to publish section")
} }
// Section state can be derived from section runner state...
pub async fn publish_section_state( pub async fn publish_section_state(
&mut self, &mut self,
section_id: SectionId, section_id: SectionId,
@ -201,6 +210,13 @@ impl MqttInterface {
.await .await
.wrap_err("failed to publish program running") .wrap_err("failed to publish program running")
} }
pub async fn publish_section_runner(&mut self, sr_state: &SecRunnerState) -> eyre::Result<()> {
let json: SecRunnerStateJson = sr_state.into();
self.publish_data(self.topics.section_runner(), &json)
.await
.wrap_err("failed to publish section runner")
}
} }
pub struct MqttInterfaceTask { pub struct MqttInterfaceTask {

66
src/section_runner.rs

@ -12,7 +12,7 @@ use std::{
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
spawn, spawn,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot, watch},
time::{delay_for, Instant}, time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span, warn}; use tracing::{debug, trace, trace_span, warn};
@ -21,6 +21,12 @@ use tracing_futures::Instrument;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SectionRunHandle(i32); pub struct SectionRunHandle(i32);
impl SectionRunHandle {
pub fn into_inner(self) -> i32 {
self.0
}
}
#[derive(Debug)] #[derive(Debug)]
struct SectionRunnerInner { struct SectionRunnerInner {
next_run_id: AtomicI32, next_run_id: AtomicI32,
@ -77,11 +83,11 @@ pub enum SecRunState {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SecRun { pub struct SecRun {
handle: SectionRunHandle, pub(crate) handle: SectionRunHandle,
section: SectionRef, pub(crate) section: SectionRef,
duration: Duration, pub(crate) duration: Duration,
total_duration: Duration, pub(crate) total_duration: Duration,
state: SecRunState, pub(crate) state: SecRunState,
} }
impl SecRun { impl SecRun {
@ -109,8 +115,8 @@ pub type SecRunQueue = im::Vector<Arc<SecRun>>;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct SecRunnerState { pub struct SecRunnerState {
run_queue: SecRunQueue, pub(crate) run_queue: SecRunQueue,
paused: bool, pub(crate) paused: bool,
} }
impl Default for SecRunnerState { impl Default for SecRunnerState {
@ -122,19 +128,24 @@ impl Default for SecRunnerState {
} }
} }
pub type SecRunnerStateRecv = watch::Receiver<SecRunnerState>;
struct RunnerTask { struct RunnerTask {
interface: Arc<dyn SectionInterface + Sync>, interface: Arc<dyn SectionInterface + Sync>,
msg_recv: mpsc::Receiver<RunnerMsg>, msg_recv: mpsc::Receiver<RunnerMsg>,
running: bool, running: bool,
delay_future: OptionFuture<tokio::time::Delay>, delay_future: OptionFuture<tokio::time::Delay>,
event_send: Option<SectionEventSend>, event_send: Option<SectionEventSend>,
state_send: watch::Sender<SecRunnerState>,
quit_tx: Option<oneshot::Sender<()>>, quit_tx: Option<oneshot::Sender<()>>,
did_change: bool,
} }
impl RunnerTask { impl RunnerTask {
fn new( fn new(
interface: Arc<dyn SectionInterface + Sync>, interface: Arc<dyn SectionInterface + Sync>,
msg_recv: mpsc::Receiver<RunnerMsg>, msg_recv: mpsc::Receiver<RunnerMsg>,
state_send: watch::Sender<SecRunnerState>,
) -> Self { ) -> Self {
Self { Self {
interface, interface,
@ -142,7 +153,9 @@ impl RunnerTask {
running: true, running: true,
delay_future: None.into(), delay_future: None.into(),
event_send: None, event_send: None,
state_send,
quit_tx: None, quit_tx: None,
did_change: false,
} }
} }
@ -181,6 +194,7 @@ impl RunnerTask {
run.handle.clone(), run.handle.clone(),
run.section.clone(), run.section.clone(),
)); ));
self.did_change = true;
} }
fn finish_run(&mut self, run: &mut Arc<SecRun>) { fn finish_run(&mut self, run: &mut Arc<SecRun>) {
@ -194,6 +208,7 @@ impl RunnerTask {
run.handle.clone(), run.handle.clone(),
run.section.clone(), run.section.clone(),
)); ));
self.did_change = true;
} else { } else {
warn!( warn!(
section_id = run.section.id, section_id = run.section.id,
@ -215,6 +230,7 @@ impl RunnerTask {
run.handle.clone(), run.handle.clone(),
run.section.clone(), run.section.clone(),
)); ));
self.did_change = true;
} }
fn pause_run(&mut self, run: &mut Arc<SecRun>) { fn pause_run(&mut self, run: &mut Arc<SecRun>) {
@ -246,6 +262,7 @@ impl RunnerTask {
run.handle.clone(), run.handle.clone(),
run.section.clone(), run.section.clone(),
)); ));
self.did_change = true;
} }
fn unpause_run(&mut self, run: &mut Arc<SecRun>) { fn unpause_run(&mut self, run: &mut Arc<SecRun>) {
@ -277,6 +294,7 @@ impl RunnerTask {
); );
} }
} }
self.did_change = true;
} }
fn process_queue(&mut self, state: &mut SecRunnerState) { fn process_queue(&mut self, state: &mut SecRunnerState) {
@ -335,6 +353,7 @@ impl RunnerTask {
state state
.run_queue .run_queue
.push_back(Arc::new(SecRun::new(handle, section, duration))); .push_back(Arc::new(SecRun::new(handle, section, duration)));
self.did_change = true;
} }
CancelRun(handle) => { CancelRun(handle) => {
for run in state.run_queue.iter_mut() { for run in state.run_queue.iter_mut() {
@ -356,10 +375,12 @@ impl RunnerTask {
Pause => { Pause => {
state.paused = true; state.paused = true;
self.send_event(SectionEvent::RunnerPause); self.send_event(SectionEvent::RunnerPause);
self.did_change = true;
} }
Unpause => { Unpause => {
state.paused = false; state.paused = false;
self.send_event(SectionEvent::RunnerUnpause); self.send_event(SectionEvent::RunnerUnpause);
self.did_change = true;
} }
Subscribe(res_send) => { Subscribe(res_send) => {
let event_recv = self.subscribe_event(); let event_recv = self.subscribe_event();
@ -373,12 +394,28 @@ impl RunnerTask {
let mut state = SecRunnerState::default(); let mut state = SecRunnerState::default();
while self.running { while self.running {
// Process all pending messages
// This is so if there are many pending messages, the state
// is only broadcast once
while let Ok(msg) = self.msg_recv.try_recv() {
self.handle_msg(Some(msg), &mut state);
}
self.process_queue(&mut state); self.process_queue(&mut state);
// If a change was made to state, broadcast it
if self.did_change {
let _ = self.state_send.broadcast(state.clone());
self.did_change = false;
}
let delay_done = || { let delay_done = || {
trace!("delay done"); trace!("delay done");
}; };
tokio::select! { tokio::select! {
msg = self.msg_recv.recv() => self.handle_msg(msg, &mut state), msg = self.msg_recv.recv() => {
self.handle_msg(msg, &mut state)
},
_ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
}; };
} }
@ -417,16 +454,19 @@ impl From<oneshot::error::RecvError> for ChannelClosed {
pub struct SectionRunner { pub struct SectionRunner {
inner: Arc<SectionRunnerInner>, inner: Arc<SectionRunnerInner>,
msg_send: mpsc::Sender<RunnerMsg>, msg_send: mpsc::Sender<RunnerMsg>,
state_recv: SecRunnerStateRecv,
} }
#[allow(dead_code)] #[allow(dead_code)]
impl SectionRunner { impl SectionRunner {
pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self {
let (msg_send, msg_recv) = mpsc::channel(8); let (msg_send, msg_recv) = mpsc::channel(32);
spawn(RunnerTask::new(interface, msg_recv).run()); let (state_send, state_recv) = watch::channel(SecRunnerState::default());
spawn(RunnerTask::new(interface, msg_recv, state_send).run());
Self { Self {
inner: Arc::new(SectionRunnerInner::new()), inner: Arc::new(SectionRunnerInner::new()),
msg_send, msg_send,
state_recv,
} }
} }
@ -476,6 +516,10 @@ impl SectionRunner {
let event_recv = res_recv.await?; let event_recv = res_recv.await?;
Ok(event_recv) Ok(event_recv)
} }
pub fn state_receiver(&self) -> SecRunnerStateRecv {
self.state_recv.clone()
}
} }
#[cfg(test)] #[cfg(test)]

75
src/section_runner_json.rs

@ -0,0 +1,75 @@
use crate::{
model::SectionId,
section_runner::{SecRun, SecRunState, SecRunnerState},
};
use chrono::{DateTime, Utc};
use serde::Serialize;
use std::time::SystemTime;
use tokio::time::Instant;
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SecRunJson {
id: i32,
section: SectionId,
total_duration: f64,
duration: f64,
start_time: Option<String>,
pause_time: Option<String>,
unpause_time: Option<String>,
}
impl SecRunJson {
fn from_run(run: &SecRun) -> Option<Self> {
let (now, system_now) = (Instant::now(), SystemTime::now());
let instant_to_string = |instant: Instant| -> String {
DateTime::<Utc>::from(system_now - now.duration_since(instant)).to_rfc3339()
};
let (start_time, pause_time) = match run.state {
SecRunState::Finished | SecRunState::Cancelled => {
return None;
}
SecRunState::Waiting => (None, None),
SecRunState::Running { start_time } => (Some(instant_to_string(start_time)), None),
SecRunState::Paused {
start_time,
pause_time,
} => (
Some(instant_to_string(start_time)),
Some(instant_to_string(pause_time)),
),
};
Some(Self {
id: run.handle.clone().into_inner(),
section: run.section.id,
total_duration: run.total_duration.as_secs_f64(),
duration: run.duration.as_secs_f64(),
start_time,
pause_time,
unpause_time: None, // TODO: this is kinda useless, should probably be removed
})
}
}
#[derive(Clone, Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct SecRunnerStateJson {
queue: Vec<SecRunJson>,
current: Option<SecRunJson>,
paused: bool,
}
impl From<&SecRunnerState> for SecRunnerStateJson {
fn from(state: &SecRunnerState) -> Self {
let mut run_queue = state.run_queue.iter();
let current = run_queue.next().and_then(|run| SecRunJson::from_run(run));
let queue = run_queue
.filter_map(|run| SecRunJson::from_run(run))
.collect();
Self {
queue,
current,
paused: state.paused,
}
}
}

29
src/update_listener.rs

@ -1,19 +1,20 @@
use crate::{ use crate::{
mqtt_interface::MqttInterface, mqtt_interface::MqttInterface,
program_runner::{ProgramEvent, ProgramEventRecv}, program_runner::{ProgramEvent, ProgramEventRecv},
section_runner::{SectionEvent, SectionEventRecv}, section_runner::{SecRunnerState, SecRunnerStateRecv, SectionEvent, SectionEventRecv},
}; };
use tokio::{ use tokio::{
select, select,
sync::{broadcast, oneshot}, sync::{broadcast, oneshot},
task::JoinHandle, task::JoinHandle,
}; };
use tracing::{trace, trace_span, warn}; use tracing::{trace_span, warn};
use tracing_futures::Instrument; use tracing_futures::Instrument;
struct UpdateListenerTask { struct UpdateListenerTask {
section_events: SectionEventRecv, section_events: SectionEventRecv,
program_events: ProgramEventRecv, program_events: ProgramEventRecv,
sec_runner_state: SecRunnerStateRecv,
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
running: bool, running: bool,
} }
@ -26,7 +27,7 @@ impl UpdateListenerTask {
let event = match event { let event = match event {
Ok(ev) => ev, Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => { Err(broadcast::RecvError::Closed) => {
trace!("section events channel closed"); warn!("section events channel closed");
self.running = false; self.running = false;
return Ok(()); return Ok(());
} }
@ -58,12 +59,12 @@ impl UpdateListenerTask {
let event = match event { let event = match event {
Ok(ev) => ev, Ok(ev) => ev,
Err(broadcast::RecvError::Closed) => { Err(broadcast::RecvError::Closed) => {
trace!("section events channel closed"); warn!("program events channel closed");
self.running = false; self.running = false;
return Ok(()); return Ok(());
} }
Err(broadcast::RecvError::Lagged(n)) => { Err(broadcast::RecvError::Lagged(n)) => {
warn!("section events lagged by {}", n); warn!("program events lagged by {}", n);
return Ok(()); return Ok(());
} }
}; };
@ -77,6 +78,19 @@ impl UpdateListenerTask {
Ok(()) Ok(())
} }
async fn handle_sec_runner_state(&mut self, state: Option<SecRunnerState>) -> eyre::Result<()> {
let state = match state {
Some(state) => state,
None => {
warn!("section runner events channel closed");
self.running = false;
return Ok(());
}
};
self.mqtt_interface.publish_section_runner(&state).await?;
Ok(())
}
async fn run_impl(&mut self) -> eyre::Result<()> { async fn run_impl(&mut self) -> eyre::Result<()> {
while self.running { while self.running {
select! { select! {
@ -86,6 +100,9 @@ impl UpdateListenerTask {
program_event = self.program_events.recv() => { program_event = self.program_events.recv() => {
self.handle_program_event(program_event).await? self.handle_program_event(program_event).await?
} }
sec_runner_state = self.sec_runner_state.recv() => {
self.handle_sec_runner_state(sec_runner_state).await?
}
}; };
} }
Ok(()) Ok(())
@ -122,11 +139,13 @@ impl UpdateListener {
pub fn start( pub fn start(
section_events: SectionEventRecv, section_events: SectionEventRecv,
program_events: ProgramEventRecv, program_events: ProgramEventRecv,
sec_runner_state: SecRunnerStateRecv,
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
) -> Self { ) -> Self {
let task = UpdateListenerTask { let task = UpdateListenerTask {
section_events, section_events,
program_events, program_events,
sec_runner_state,
mqtt_interface, mqtt_interface,
running: true, running: true,
}; };

Loading…
Cancel
Save