use crate::model::{ProgramId, ProgramRef, Programs, Sections}; use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; use eyre::WrapErr; use std::collections::VecDeque; use thiserror::Error; use tokio::{ spawn, stream::StreamExt, sync::{broadcast, mpsc, oneshot}, time::{delay_queue, DelayQueue}, }; use tracing::{debug, error, trace, trace_span, warn}; #[derive(Debug)] enum RunnerMsg { Quit(oneshot::Sender<()>), UpdateSections(Sections), UpdatePrograms(Programs), RunProgramId(ProgramId), RunProgram(ProgramRef), CancelProgram(ProgramId), Subscribe(oneshot::Sender), } #[derive(Clone, Debug)] pub enum ProgramEvent { RunStart(ProgramRef), RunFinish(ProgramRef), RunCancel(ProgramRef), } pub type ProgramEventRecv = broadcast::Receiver; type ProgramEventSend = broadcast::Sender; const EVENT_CAPACITY: usize = 8; #[derive(Clone, Debug, PartialEq)] enum RunState { Waiting, Running, Finished, Cancelled, } #[derive(Debug)] struct ProgRun { program: ProgramRef, state: RunState, sec_run_handles: Vec, } impl ProgRun { fn new(program: ProgramRef) -> Self { Self { program, state: RunState::Waiting, sec_run_handles: Vec::new(), } } } type RunQueue = VecDeque; struct RunnerTask { section_runner: SectionRunner, msg_recv: mpsc::Receiver, running: bool, sections: Sections, programs: Programs, event_send: Option, scheduled_run_queue: DelayQueue, quit_tx: Option>, } impl RunnerTask { fn new(section_runner: SectionRunner, msg_recv: mpsc::Receiver) -> Self { Self { section_runner, msg_recv, running: true, sections: Sections::new(), programs: Programs::new(), event_send: None, scheduled_run_queue: DelayQueue::new(), quit_tx: None, } } fn send_event(&mut self, event: ProgramEvent) { if let Some(event_send) = &mut self.event_send { match event_send.send(event) { Ok(_) => {} Err(_closed) => { self.event_send = None; } } } } fn subscribe_event(&mut self) -> ProgramEventRecv { match &mut self.event_send { Some(event_send) => event_send.subscribe(), None => { let (event_send, event_recv) = broadcast::channel(EVENT_CAPACITY); self.event_send = Some(event_send); event_recv } } } async fn start_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { if run.state != RunState::Waiting { warn!( program_id = run.program.id, "cannot run program which is already running" ); return Ok(()); } run.sec_run_handles.reserve(run.program.sequence.len()); for item in &run.program.sequence { let section = match self.sections.get(&item.section_id) { Some(sec) => sec.clone(), None => { warn!( program_id = run.program.id, section_id = item.section_id, "trying to run program with nonexistant section" ); continue; } }; let handle = self .section_runner .queue_run(section, item.duration) .await .wrap_err("failed to queue section run")?; run.sec_run_handles.push(handle); } run.state = RunState::Running; self.send_event(ProgramEvent::RunStart(run.program.clone())); if run.sec_run_handles.is_empty() { warn!(program_id = run.program.id, "program has no valid sections"); run.state = RunState::Finished; self.send_event(ProgramEvent::RunFinish(run.program.clone())); } else { debug!(program_id = run.program.id, "started running program"); } Ok(()) } async fn cancel_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { for handle in run.sec_run_handles.drain(..) { self.section_runner .cancel_run(handle) .await .wrap_err("failed to cancel section run")?; } debug!(program_id = run.program.id, "program run is cancelled"); self.send_event(ProgramEvent::RunCancel(run.program.clone())); Ok(()) } async fn process_queue(&mut self, run_queue: &mut RunQueue) -> eyre::Result<()> { while let Some(current_run) = run_queue.front_mut() { let run_finished = match current_run.state { RunState::Waiting => { self.start_program_run(current_run) .await .wrap_err("failed to start program run")?; false } RunState::Running => false, RunState::Finished => true, RunState::Cancelled => { self.cancel_program_run(current_run) .await .wrap_err("failed to cancel program run")?; true } }; if run_finished { run_queue.pop_front(); } else { break; } } Ok(()) } fn update_programs(&mut self, new_programs: Programs) { self.programs = new_programs; self.scheduled_run_queue.clear(); for (_, prog) in &self.programs { if !prog.enabled { continue; } let ref_time = chrono::Local::now(); let next_run = match prog.schedule.next_run_after(&ref_time) { Some(next_run) => next_run, None => continue, }; let delay = (next_run - ref_time).to_std().unwrap(); trace!("will run program in {:?}", delay); self.scheduled_run_queue.insert(prog.clone(), delay); } } fn handle_msg(&mut self, msg: Option, run_queue: &mut RunQueue) { let msg = msg.expect("ProgramRunner channel closed"); use RunnerMsg::*; trace!(msg = debug(&msg), "runner_task recv"); match msg { Quit(quit_tx) => { self.running = false; self.quit_tx = Some(quit_tx); } Subscribe(res_send) => { let event_recv = self.subscribe_event(); // Ignore error if channel closed let _ = res_send.send(event_recv); } UpdateSections(new_sections) => { self.sections = new_sections; } UpdatePrograms(new_programs) => { self.update_programs(new_programs); } RunProgramId(program_id) => { let program = match self.programs.get(&program_id) { Some(program) => program.clone(), None => { warn!(program_id, "trying to run non-existant program"); return; } }; run_queue.push_back(ProgRun::new(program)); } RunProgram(program) => { run_queue.push_back(ProgRun::new(program)); } RunnerMsg::CancelProgram(program_id) => { for run in run_queue { if run.program.id == program_id { run.state = RunState::Cancelled; } } } } } fn handle_finished_run( &mut self, finished_run: SectionRunHandle, run_queue: &mut RunQueue, ) -> Option<()> { let current_run = run_queue.front_mut()?; let last_run_handle = current_run.sec_run_handles.last()?; if finished_run == *last_run_handle { current_run.state = RunState::Finished; debug!( program_id = current_run.program.id, "finished running program" ); self.send_event(ProgramEvent::RunFinish(current_run.program.clone())); } Some(()) } fn handle_sec_event( &mut self, sec_event: Result, run_queue: &mut RunQueue, ) -> eyre::Result<()> { let sec_event = sec_event.wrap_err("failed to receive section event")?; #[allow(clippy::single_match)] match sec_event { SectionEvent::RunFinish(finished_run) => { self.handle_finished_run(finished_run, run_queue); } _ => {} } Ok(()) } async fn handle_scheduled_run( &mut self, item: Result, tokio::time::Error>, run_queue: &mut RunQueue, ) -> eyre::Result<()> { let item = item.wrap_err("tokio time error")?; run_queue.push_back(ProgRun::new(item.into_inner())); Ok(()) } async fn start_impl(&mut self) -> eyre::Result<()> { let mut sec_events = self .section_runner .subscribe() .await .wrap_err("could not subscribe to SectionRunner events")?; let mut run_queue: RunQueue = VecDeque::new(); while self.running { self.process_queue(&mut run_queue) .await .wrap_err("error during queue processing")?; tokio::select! { msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, Some(scheduled_run) = self.scheduled_run_queue.next() => { self.handle_scheduled_run(scheduled_run, &mut run_queue).await?; }, }; } if let Some(quit_tx) = self.quit_tx.take() { let _ = quit_tx.send(()); } Ok(()) } async fn start(mut self) { let span = trace_span!("runner_task"); let _enter = span.enter(); self.start_impl() .await .expect("error in ProgramRunner task"); } } #[derive(Debug, Clone, Error)] #[error("the ProgramRunner channel is closed")] pub struct ChannelClosed; pub type Result = std::result::Result; impl From> for ChannelClosed { fn from(_: mpsc::error::SendError) -> Self { Self } } impl From for ChannelClosed { fn from(_: oneshot::error::RecvError) -> Self { Self } } #[derive(Clone, Debug)] pub struct ProgramRunner { msg_send: mpsc::Sender, } #[allow(dead_code)] impl ProgramRunner { pub fn new(section_runner: SectionRunner) -> Self { let (msg_send, msg_recv) = mpsc::channel(8); spawn(RunnerTask::new(section_runner, msg_recv).start()); Self { msg_send } } pub async fn quit(&mut self) -> Result<()> { let (quit_tx, quit_rx) = oneshot::channel(); self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?; quit_rx.await?; Ok(()) } pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { self.msg_send .send(RunnerMsg::UpdateSections(new_sections)) .await .map_err(From::from) } pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { self.msg_send .send(RunnerMsg::UpdatePrograms(new_programs)) .await .map_err(From::from) } pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> { self.msg_send .send(RunnerMsg::RunProgramId(program_id)) .await .map_err(From::from) } pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { self.msg_send .send(RunnerMsg::RunProgram(program)) .await .map_err(From::from) } pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { self.msg_send .send(RunnerMsg::CancelProgram(program_id)) .await .map_err(From::from) } pub async fn subscribe(&mut self) -> Result { let (res_send, res_recv) = oneshot::channel(); self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; let event_recv = res_recv.await?; Ok(event_recv) } } #[cfg(test)] mod test { use super::*; use crate::section_interface::{MockSectionInterface, SectionInterface}; use crate::{ model::{Program, ProgramItem, Section}, schedule::{every_day, DateTimeBound, Schedule}, trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, }; use assert_matches::assert_matches; use im::ordmap; use std::{sync::Arc, time::Duration}; use tokio::task::yield_now; use tracing_subscriber::prelude::*; #[tokio::test] async fn test_quit() { let quit_msg = EventListener::new( Filters::new() .target("sprinklers_rs::program_runner") .message("runner_task recv") .field_value("msg", "Quit"), ); let task_span = SpanListener::new( SpanFilters::new() .target("sprinklers_rs::program_runner") .name("runner_task"), ); let subscriber = tracing_subscriber::registry() .with(quit_msg.clone()) .with(task_span.clone()); let _sub = tracing::subscriber::set_default(subscriber); let interface = MockSectionInterface::new(6); let mut sec_runner = SectionRunner::new(Arc::new(interface)); let mut runner = ProgramRunner::new(sec_runner.clone()); yield_now().await; runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); yield_now().await; assert_eq!(quit_msg.get_count(), 1); assert_eq!(task_span.get_exit_count(), 1); } fn make_sections_and_runner() -> (Sections, SectionRunner, Arc) { let interface = Arc::new(MockSectionInterface::new(2)); let sections: Sections = ordmap![ 1 => Section { id: 1, name: "Section 1".into(), interface_id: 0, }.into(), 2 => Section { id: 2, name: "Section 2".into(), interface_id: 1, }.into() ]; let sec_runner = SectionRunner::new(interface.clone()); (sections, sec_runner, interface) } fn make_program(num: ProgramId, sequence: Vec) -> ProgramRef { make_program_with_schedule(num, sequence, false, Schedule::default()) } fn make_program_with_schedule( num: ProgramId, sequence: Vec, enabled: bool, schedule: Schedule, ) -> ProgramRef { Program { id: num, name: format!("Program {}", num), sequence, enabled, schedule, } .into() } #[tokio::test] async fn test_run_program() { let (sections, mut sec_runner, interface) = make_sections_and_runner(); let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program( 1, vec![ ProgramItem { section_id: 1, duration: Duration::from_secs(10), }, ProgramItem { section_id: 2, duration: Duration::from_secs(10), }, ], ); runner.update_sections(sections.clone()).await.unwrap(); runner.run_program(program).await.unwrap(); yield_now().await; assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 ); assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_)); assert_eq!(interface.get_section_state(0), true); tokio::time::pause(); assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_)); assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunStart(_)); assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(1), true); assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_)); assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(_) ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); yield_now().await; } #[tokio::test] async fn test_run_nonexistant_section() { let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program1 = make_program( 1, vec![ProgramItem { section_id: 3, duration: Duration::from_secs(10), }], ); let program2 = make_program( 2, vec![ProgramItem { section_id: 1, duration: Duration::from_secs(10), }], ); runner.update_sections(sections.clone()).await.unwrap(); runner.run_program(program1).await.unwrap(); yield_now().await; // Should immediately start and finish running program // due to nonexistant section assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 ); assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 ); runner.run_program(program2).await.unwrap(); yield_now().await; // Should run right away since last program should be done assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 2 ); tokio::time::pause(); assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 2 ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); } #[tokio::test] async fn test_close_event_chan() { let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program(1, vec![]); runner.update_sections(sections.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap(); prog_events.recv().await.unwrap(); prog_events.recv().await.unwrap(); // Make sure it doesn't panic when the events channel is dropped drop(prog_events); yield_now().await; runner.run_program(program).await.unwrap(); yield_now().await; runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); } #[tokio::test] async fn test_run_program_id() { let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program1 = make_program( 1, vec![ProgramItem { section_id: 2, duration: Duration::from_secs(10), }], ); let program2 = make_program( 2, vec![ProgramItem { section_id: 2, duration: Duration::from_secs(10), }], ); let programs = ordmap![ 1 => program1, 2 => program2 ]; runner.update_sections(sections.clone()).await.unwrap(); runner.update_programs(programs).await.unwrap(); // First try a non-existant program id runner.run_program_id(3).await.unwrap(); yield_now().await; assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); runner.run_program_id(1).await.unwrap(); yield_now().await; assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 ); tokio::time::pause(); assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 ); runner.run_program_id(1).await.unwrap(); yield_now().await; assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 ); assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); } #[tokio::test] async fn test_cancel_program() { let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let program = make_program( 1, vec![ ProgramItem { section_id: 1, duration: Duration::from_secs(10), }, ProgramItem { section_id: 2, duration: Duration::from_secs(10), }, ], ); runner.update_sections(sections.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap(); yield_now().await; assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 ); assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_)); runner.cancel_program(program.id).await.unwrap(); yield_now().await; assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunCancel(prog) if prog.id == 1 ); assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunCancel(_)); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); } #[tokio::test] async fn test_scheduled_run() { tracing_subscriber::fmt().init(); let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); let make_programs = |num: ProgramId, enabled: bool| { let now = chrono::Local::now(); let sched_time = now.time() + chrono::Duration::seconds(1); let schedule = Schedule::new( vec![sched_time], every_day(), DateTimeBound::None, DateTimeBound::None, ); let program1 = make_program_with_schedule( num, vec![ProgramItem { section_id: 1, duration: Duration::from_secs(10), }], enabled, schedule, ); let programs = ordmap![ 1 => program1 ]; programs }; runner.update_sections(sections.clone()).await.unwrap(); runner .update_programs(make_programs(1, false)) .await .unwrap(); tokio::time::pause(); tokio::time::delay_for(Duration::from_secs(2)).await; // Should not run (is disabled) assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); runner .update_programs(make_programs(2, true)) .await .unwrap(); // Should run tokio::time::delay_for(Duration::from_secs(2)).await; assert_matches!( prog_events.try_recv(), Ok(ProgramEvent::RunStart(prog)) if prog.id == 2 ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); } }