From 9120f0cbd39302be17dbd596e56dd43f7d2ae6f9 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Fri, 18 Sep 2020 17:27:47 -0600 Subject: [PATCH] Add support for scheduled programs --- Cargo.toml | 3 +- src/main.rs | 3 + src/program_runner.rs | 195 ++++++++++++++++++++++++++++++------------ 3 files changed, 143 insertions(+), 58 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index a83c55b..cad8b47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,12 +11,13 @@ rusqlite = "0.23.1" color-eyre = "0.5.1" eyre = "0.6.0" thiserror = "1.0.20" -tokio = { version = "0.2.22", features = ["rt-core", "time", "sync", "macros", "test-util"] } +tokio = { version = "0.2.22", features = ["rt-core", "time", "stream", "sync", "macros", "test-util"] } tracing = { version = "0.1.19", features = ["log"] } tracing-futures = "0.2.4" pin-project = "0.4.23" im = "15.0.0" chrono = { version = "0.4.15" } +assert_matches = "1.3.0" [dependencies.tracing-subscriber] version = "0.2.11" diff --git a/src/main.rs b/src/main.rs index c0a6700..4566e83 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,6 @@ +#![warn(clippy::all)] +#![warn(clippy::print_stdout)] + use color_eyre::eyre::Result; use rusqlite::Connection as DbConnection; use rusqlite::NO_PARAMS; diff --git a/src/program_runner.rs b/src/program_runner.rs index b671cfc..fc376e6 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -5,7 +5,9 @@ use std::collections::VecDeque; use thiserror::Error; use tokio::{ spawn, + stream::StreamExt, sync::{broadcast, mpsc, oneshot}, + time::{delay_queue, DelayQueue}, }; use tracing::{debug, error, trace, trace_span, warn}; @@ -66,6 +68,7 @@ struct RunnerTask { sections: Sections, programs: Programs, event_send: Option, + scheduled_run_queue: DelayQueue, } impl RunnerTask { @@ -77,6 +80,7 @@ impl RunnerTask { sections: Sections::new(), programs: Programs::new(), event_send: None, + scheduled_run_queue: DelayQueue::new(), } } @@ -181,6 +185,24 @@ impl RunnerTask { Ok(()) } + fn update_programs(&mut self, new_programs: Programs) { + self.programs = new_programs; + self.scheduled_run_queue.clear(); + for (_, prog) in &self.programs { + if !prog.enabled { + continue; + } + let ref_time = chrono::Local::now(); + let next_run = match prog.schedule.next_run_after(&ref_time) { + Some(next_run) => next_run, + None => continue, + }; + let delay = (next_run - ref_time).to_std().unwrap(); + trace!("will run program in {:?}", delay); + self.scheduled_run_queue.insert(prog.clone(), delay); + } + } + fn handle_msg(&mut self, msg: Option, run_queue: &mut RunQueue) { let msg = msg.expect("ProgramRunner channel closed"); use RunnerMsg::*; @@ -196,7 +218,7 @@ impl RunnerTask { self.sections = new_sections; } UpdatePrograms(new_programs) => { - self.programs = new_programs; + self.update_programs(new_programs); } RunProgramId(program_id) => { let program = match self.programs.get(&program_id) { @@ -255,6 +277,16 @@ impl RunnerTask { Ok(()) } + async fn handle_scheduled_run( + &mut self, + item: Result, tokio::time::Error>, + run_queue: &mut RunQueue, + ) -> eyre::Result<()> { + let item = item.wrap_err("tokio time error")?; + run_queue.push_back(ProgRun::new(item.into_inner())); + Ok(()) + } + async fn start_impl(&mut self) -> eyre::Result<()> { let mut sec_events = self .section_runner @@ -271,7 +303,9 @@ impl RunnerTask { tokio::select! { msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, - // _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() + Some(scheduled_run) = self.scheduled_run_queue.next() => { + self.handle_scheduled_run(scheduled_run, &mut run_queue).await?; + }, }; } @@ -373,9 +407,10 @@ mod test { use crate::section_interface::{MockSectionInterface, SectionInterface}; use crate::{ model::{Program, ProgramItem, Section}, - schedule::Schedule, + schedule::{every_day, DateTimeBound, Schedule}, trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, }; + use assert_matches::assert_matches; use im::ordmap; use std::{sync::Arc, time::Duration}; use tokio::task::yield_now; @@ -430,12 +465,21 @@ mod test { } fn make_program(num: ProgramId, sequence: Vec) -> ProgramRef { + make_program_with_schedule(num, sequence, false, Schedule::default()) + } + + fn make_program_with_schedule( + num: ProgramId, + sequence: Vec, + enabled: bool, + schedule: Schedule, + ) -> ProgramRef { Program { id: num, name: format!("Program {}", num), sequence, - enabled: false, - schedule: Schedule::default(), + enabled, + schedule, } .into() } @@ -465,36 +509,24 @@ mod test { runner.run_program(program).await.unwrap(); yield_now().await; - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 - )); - assert!(matches!( - sec_events.try_recv().unwrap(), - SectionEvent::RunStart(_) - )); + ); + assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_)); assert_eq!(interface.get_section_state(0), true); tokio::time::pause(); - assert!(matches!( - sec_events.recv().await.unwrap(), - SectionEvent::RunFinish(_) - )); - assert!(matches!( - sec_events.recv().await.unwrap(), - SectionEvent::RunStart(_) - )); + assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_)); + assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunStart(_)); assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(1), true); - assert!(matches!( - sec_events.recv().await.unwrap(), - SectionEvent::RunFinish(_) - )); - assert!(matches!( + assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_)); + assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(_) - )); + ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); @@ -527,31 +559,31 @@ mod test { yield_now().await; // Should immediately start and finish running program // due to nonexistant section - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 - )); - assert!(matches!( + ); + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 - )); + ); runner.run_program(program2).await.unwrap(); yield_now().await; // Should run right away since last program should be done - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 2 - )); + ); tokio::time::pause(); - assert!(matches!( + assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 2 - )); + ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); @@ -610,38 +642,35 @@ mod test { // First try a non-existant program id runner.run_program_id(3).await.unwrap(); yield_now().await; - assert!(matches!( - prog_events.try_recv(), - Err(broadcast::TryRecvError::Empty) - )); + assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); runner.run_program_id(1).await.unwrap(); yield_now().await; - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 - )); + ); tokio::time::pause(); - assert!(matches!( + assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 - )); + ); runner.run_program_id(1).await.unwrap(); yield_now().await; - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 - )); + ); - assert!(matches!( + assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(prog) if prog.id == 1 - )); + ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); @@ -673,27 +702,79 @@ mod test { runner.run_program(program.clone()).await.unwrap(); yield_now().await; - assert!(matches!( + assert_matches!( prog_events.try_recv().unwrap(), ProgramEvent::RunStart(prog) if prog.id == 1 - )); - assert!(matches!( - sec_events.try_recv().unwrap(), - SectionEvent::RunStart(_) - )); + ); + assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_)); runner.cancel_program(program.id).await.unwrap(); yield_now().await; - assert!(matches!( + assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunCancel(prog) if prog.id == 1 - )); - assert!(matches!( - sec_events.recv().await.unwrap(), - SectionEvent::RunCancel(_) - )); + ); + assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunCancel(_)); + + runner.quit().await.unwrap(); + sec_runner.quit().await.unwrap(); + yield_now().await; + } + + #[tokio::test] + async fn test_scheduled_run() { + tracing_subscriber::fmt().init(); + let (sections, mut sec_runner, _) = make_sections_and_runner(); + let mut runner = ProgramRunner::new(sec_runner.clone()); + let mut prog_events = runner.subscribe().await.unwrap(); + + let make_programs = |num: ProgramId, enabled: bool| { + let now = chrono::Local::now(); + let sched_time = now.time() + chrono::Duration::seconds(1); + let schedule = Schedule::new( + vec![sched_time], + every_day(), + DateTimeBound::None, + DateTimeBound::None, + ); + let program1 = make_program_with_schedule( + num, + vec![ProgramItem { + section_id: 1, + duration: Duration::from_secs(10), + }], + enabled, + schedule, + ); + let programs = ordmap![ 1 => program1 ]; + programs + }; + + runner.update_sections(sections.clone()).await.unwrap(); + runner + .update_programs(make_programs(1, false)) + .await + .unwrap(); + + tokio::time::pause(); + tokio::time::delay_for(Duration::from_secs(2)).await; + // Should not run (is disabled) + assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); + + runner + .update_programs(make_programs(2, true)) + .await + .unwrap(); + + // Should run + tokio::time::delay_for(Duration::from_secs(2)).await; + assert_matches!( + prog_events.try_recv(), + Ok(ProgramEvent::RunStart(prog)) + if prog.id == 2 + ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();