diff --git a/src/main.rs b/src/main.rs index 608fcf2..5cf125c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ mod db; mod migrations; mod model; mod option_future; +mod program_runner; mod section_interface; mod section_runner; #[cfg(test)] diff --git a/src/program_runner.rs b/src/program_runner.rs new file mode 100644 index 0000000..8747247 --- /dev/null +++ b/src/program_runner.rs @@ -0,0 +1,459 @@ +use crate::model::{ProgramId, ProgramRef, Programs, Sections}; +use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; +use std::collections::VecDeque; +use thiserror::Error; +use tokio::{ + spawn, + sync::{broadcast, mpsc, oneshot}, +}; +use tracing::{debug, error, trace, trace_span, warn}; + +#[derive(Debug)] +enum RunnerMsg { + Quit, + UpdateSections(Sections), + UpdatePrograms(Programs), + RunProgramId(ProgramId), + RunProgram(ProgramRef), + CancelProgram(ProgramId), + Subscribe(oneshot::Sender), +} + +#[derive(Clone, Debug)] +pub enum ProgramEvent { + RunStart(ProgramRef), + RunFinish(ProgramRef), +} + +pub type ProgramEventRecv = broadcast::Receiver; +type ProgramEventSend = broadcast::Sender; + +const EVENT_CAPACITY: usize = 8; + +#[derive(Clone, Debug, PartialEq)] +enum RunState { + Waiting, + Running, + Finished, +} + +#[derive(Debug)] +struct ProgRun { + program: ProgramRef, + state: RunState, + sec_run_handles: Vec, +} + +impl ProgRun { + fn new(program: ProgramRef) -> Self { + Self { + program, + state: RunState::Waiting, + sec_run_handles: Vec::new(), + } + } +} + +type RunQueue = VecDeque; + +struct RunnerTask { + section_runner: SectionRunner, + msg_recv: mpsc::Receiver, + running: bool, + sections: Sections, + programs: Programs, + event_send: Option, +} + +impl RunnerTask { + fn new(section_runner: SectionRunner, msg_recv: mpsc::Receiver) -> Self { + Self { + section_runner, + msg_recv, + running: true, + sections: Sections::new(), + programs: Programs::new(), + event_send: None, + } + } + + fn send_event(&mut self, event: ProgramEvent) { + if let Some(event_send) = &mut self.event_send { + match event_send.send(event) { + Ok(_) => {} + Err(_closed) => { + self.event_send = None; + } + } + } + } + + fn subscribe_event(&mut self) -> ProgramEventRecv { + match &mut self.event_send { + Some(event_send) => event_send.subscribe(), + None => { + let (event_send, event_recv) = broadcast::channel(EVENT_CAPACITY); + self.event_send = Some(event_send); + event_recv + } + } + } + + async fn start_program_run(&mut self, run: &mut ProgRun) { + if run.state != RunState::Waiting { + warn!( + program_id = run.program.id, + "cannot run program which is already running" + ); + return; + } + run.sec_run_handles.reserve(run.program.sequence.len()); + for item in &run.program.sequence { + let section = match self.sections.get(&item.section_id) { + Some(sec) => sec.clone(), + None => { + warn!( + program_id = run.program.id, + section_id = item.section_id, + "trying to run program with nonexistant section" + ); + continue; + } + }; + let handle = match self.section_runner.queue_run(section, item.duration).await { + Ok(handle) => handle, + Err(_closed) => { + error!("section runner channel closed"); + self.running = false; + return; + } + }; + run.sec_run_handles.push(handle); + } + run.state = RunState::Running; + debug!(program_id = run.program.id, "started running program"); + self.send_event(ProgramEvent::RunStart(run.program.clone())); + } + + async fn process_queue(&mut self, run_queue: &mut RunQueue) { + while let Some(current_run) = run_queue.front_mut() { + let run_finished = match current_run.state { + RunState::Waiting => { + self.start_program_run(current_run).await; + false + } + RunState::Running => false, + RunState::Finished => true, + }; + if run_finished { + run_queue.pop_front(); + } else { + break; + } + } + } + + fn handle_msg(&mut self, msg: Option, run_queue: &mut RunQueue) { + let msg = msg.expect("SectionRunner channel closed"); + use RunnerMsg::*; + trace!(msg = debug(&msg), "runner_task recv"); + match msg { + Quit => self.running = false, + Subscribe(res_send) => { + let event_recv = self.subscribe_event(); + // Ignore error if channel closed + let _ = res_send.send(event_recv); + } + UpdateSections(new_sections) => { + self.sections = new_sections; + } + UpdatePrograms(new_programs) => { + self.programs = new_programs; + } + RunProgramId(program_id) => { + let program = match self.programs.get(&program_id) { + Some(program) => program.clone(), + None => { + warn!(program_id, "trying to run non-existant program"); + return; + } + }; + run_queue.push_back(ProgRun::new(program)); + } + RunProgram(program) => { + run_queue.push_back(ProgRun::new(program)); + } + RunnerMsg::CancelProgram(_) => todo!(), + } + } + + fn handle_finished_run( + &mut self, + finished_run: SectionRunHandle, + run_queue: &mut RunQueue, + ) -> Option<()> { + let current_run = run_queue.front_mut()?; + let last_run_handle = current_run.sec_run_handles.last()?; + if finished_run == *last_run_handle { + current_run.state = RunState::Finished; + debug!( + program_id = current_run.program.id, + "finished running program" + ); + self.send_event(ProgramEvent::RunFinish(current_run.program.clone())); + } + Some(()) + } + + fn handle_sec_event( + &mut self, + sec_event: Result, + run_queue: &mut RunQueue, + ) { + let sec_event = match sec_event { + Ok(ev) => ev, + Err(broadcast::RecvError::Lagged(missed)) => { + warn!( + missed, + "missed some section events, increase event channel size" + ); + return; + } + Err(broadcast::RecvError::Closed) => { + error!("section events channel closed"); + self.running = false; + return; + } + }; + #[allow(clippy::single_match)] + match sec_event { + SectionEvent::RunFinish(finished_run) => { + self.handle_finished_run(finished_run, run_queue); + } + _ => {} + } + } + + async fn start(mut self) { + let span = trace_span!("runner_task"); + let _enter = span.enter(); + + let mut sec_events = self + .section_runner + .subscribe() + .await + .expect("could not subscribe to SectionRunner events"); + + let mut run_queue: RunQueue = VecDeque::new(); + + while self.running { + self.process_queue(&mut run_queue).await; + tokio::select! { + msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), + sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue), + // _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() + }; + } + } +} + +#[derive(Debug, Clone, Error)] +#[error("the SectionRunner channel is closed")] +pub struct ChannelClosed; + +pub type Result = std::result::Result; + +impl From> for ChannelClosed { + fn from(_: mpsc::error::SendError) -> Self { + Self + } +} + +impl From for ChannelClosed { + fn from(_: oneshot::error::RecvError) -> Self { + Self + } +} + +#[derive(Clone, Debug)] +pub struct ProgramRunner { + msg_send: mpsc::Sender, +} + +#[allow(dead_code)] +impl ProgramRunner { + pub fn new(section_runner: SectionRunner) -> Self { + let (msg_send, msg_recv) = mpsc::channel(8); + spawn(RunnerTask::new(section_runner, msg_recv).start()); + Self { msg_send } + } + + pub async fn quit(&mut self) -> Result<()> { + self.msg_send.send(RunnerMsg::Quit).await?; + Ok(()) + } + + pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { + self.msg_send + .send(RunnerMsg::UpdateSections(new_sections)) + .await + .map_err(From::from) + } + + pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { + self.msg_send + .send(RunnerMsg::UpdatePrograms(new_programs)) + .await + .map_err(From::from) + } + + pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> { + self.msg_send + .send(RunnerMsg::RunProgramId(program_id)) + .await + .map_err(From::from) + } + + pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { + self.msg_send + .send(RunnerMsg::RunProgram(program)) + .await + .map_err(From::from) + } + + pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { + self.msg_send + .send(RunnerMsg::CancelProgram(program_id)) + .await + .map_err(From::from) + } + + pub async fn subscribe(&mut self) -> Result { + let (res_send, res_recv) = oneshot::channel(); + self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; + let event_recv = res_recv.await?; + Ok(event_recv) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::section_interface::{MockSectionInterface, SectionInterface}; + use crate::{ + model::{Program, ProgramItem, Section}, + trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, + }; + use im::ordmap; + use std::{sync::Arc, time::Duration}; + use tracing_subscriber::prelude::*; + + #[tokio::test] + async fn test_quit() { + let quit_msg = EventListener::new( + Filters::new() + .target("sprinklers_rs::program_runner") + .message("runner_task recv") + .field_value("msg", "Quit"), + ); + let task_span = SpanListener::new( + SpanFilters::new() + .target("sprinklers_rs::program_runner") + .name("runner_task"), + ); + let subscriber = tracing_subscriber::registry() + .with(quit_msg.clone()) + .with(task_span.clone()); + let _sub = tracing::subscriber::set_default(subscriber); + + let interface = MockSectionInterface::new(6); + let sec_runner = SectionRunner::new(Arc::new(interface)); + let mut runner = ProgramRunner::new(sec_runner); + tokio::task::yield_now().await; + runner.quit().await.unwrap(); + tokio::task::yield_now().await; + + assert_eq!(quit_msg.get_count(), 1); + assert_eq!(task_span.get_exit_count(), 1); + } + + fn make_sections_and_runner() -> (Sections, SectionRunner, Arc) { + let interface = Arc::new(MockSectionInterface::new(2)); + let sections: Sections = ordmap![ + 1 => Section { + id: 1, + name: "Section 1".into(), + interface_id: 0, + }.into(), + 2 => Section { + id: 2, + name: "Section 2".into(), + interface_id: 1, + }.into() + ]; + let sec_runner = SectionRunner::new(interface.clone()); + (sections, sec_runner, interface) + } + + #[tokio::test] + async fn test_run_program() { + tracing_subscriber::fmt().init(); + let (sections, mut sec_runner, interface) = make_sections_and_runner(); + let mut sec_events = sec_runner.subscribe().await.unwrap(); + let mut runner = ProgramRunner::new(sec_runner); + let mut prog_events = runner.subscribe().await.unwrap(); + + let program: ProgramRef = Program { + id: 1, + name: "Program 1".into(), + sequence: vec![ + ProgramItem { + section_id: 1, + duration: Duration::from_secs(10), + }, + ProgramItem { + section_id: 2, + duration: Duration::from_secs(10), + }, + ], + } + .into(); + + runner.update_sections(sections.clone()).await.unwrap(); + + runner.run_program(program).await.unwrap(); + tokio::task::yield_now().await; + assert!(matches!( + prog_events.try_recv().unwrap(), + ProgramEvent::RunStart(prog) + if prog.id == 1 + )); + assert!(matches!( + sec_events.try_recv().unwrap(), + SectionEvent::RunStart(_) + )); + assert_eq!(interface.get_section_state(0), true); + + tokio::time::pause(); + assert!(matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunFinish(_) + )); + assert!(matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunStart(_) + )); + assert_eq!(interface.get_section_state(0), false); + assert_eq!(interface.get_section_state(1), true); + assert!(matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunFinish(_) + )); + assert!(matches!( + prog_events.recv().await.unwrap(), + ProgramEvent::RunFinish(_) + )); + + runner.quit().await.unwrap(); + } +}