Browse Source

Add support for scheduled programs

master
Alex Mikhalev 4 years ago
parent
commit
9120f0cbd3
  1. 3
      Cargo.toml
  2. 3
      src/main.rs
  3. 195
      src/program_runner.rs

3
Cargo.toml

@ -11,12 +11,13 @@ rusqlite = "0.23.1"
color-eyre = "0.5.1" color-eyre = "0.5.1"
eyre = "0.6.0" eyre = "0.6.0"
thiserror = "1.0.20" thiserror = "1.0.20"
tokio = { version = "0.2.22", features = ["rt-core", "time", "sync", "macros", "test-util"] } tokio = { version = "0.2.22", features = ["rt-core", "time", "stream", "sync", "macros", "test-util"] }
tracing = { version = "0.1.19", features = ["log"] } tracing = { version = "0.1.19", features = ["log"] }
tracing-futures = "0.2.4" tracing-futures = "0.2.4"
pin-project = "0.4.23" pin-project = "0.4.23"
im = "15.0.0" im = "15.0.0"
chrono = { version = "0.4.15" } chrono = { version = "0.4.15" }
assert_matches = "1.3.0"
[dependencies.tracing-subscriber] [dependencies.tracing-subscriber]
version = "0.2.11" version = "0.2.11"

3
src/main.rs

@ -1,3 +1,6 @@
#![warn(clippy::all)]
#![warn(clippy::print_stdout)]
use color_eyre::eyre::Result; use color_eyre::eyre::Result;
use rusqlite::Connection as DbConnection; use rusqlite::Connection as DbConnection;
use rusqlite::NO_PARAMS; use rusqlite::NO_PARAMS;

195
src/program_runner.rs

@ -5,7 +5,9 @@ use std::collections::VecDeque;
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
spawn, spawn,
stream::StreamExt,
sync::{broadcast, mpsc, oneshot}, sync::{broadcast, mpsc, oneshot},
time::{delay_queue, DelayQueue},
}; };
use tracing::{debug, error, trace, trace_span, warn}; use tracing::{debug, error, trace, trace_span, warn};
@ -66,6 +68,7 @@ struct RunnerTask {
sections: Sections, sections: Sections,
programs: Programs, programs: Programs,
event_send: Option<ProgramEventSend>, event_send: Option<ProgramEventSend>,
scheduled_run_queue: DelayQueue<ProgramRef>,
} }
impl RunnerTask { impl RunnerTask {
@ -77,6 +80,7 @@ impl RunnerTask {
sections: Sections::new(), sections: Sections::new(),
programs: Programs::new(), programs: Programs::new(),
event_send: None, event_send: None,
scheduled_run_queue: DelayQueue::new(),
} }
} }
@ -181,6 +185,24 @@ impl RunnerTask {
Ok(()) Ok(())
} }
fn update_programs(&mut self, new_programs: Programs) {
self.programs = new_programs;
self.scheduled_run_queue.clear();
for (_, prog) in &self.programs {
if !prog.enabled {
continue;
}
let ref_time = chrono::Local::now();
let next_run = match prog.schedule.next_run_after(&ref_time) {
Some(next_run) => next_run,
None => continue,
};
let delay = (next_run - ref_time).to_std().unwrap();
trace!("will run program in {:?}", delay);
self.scheduled_run_queue.insert(prog.clone(), delay);
}
}
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) { fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) {
let msg = msg.expect("ProgramRunner channel closed"); let msg = msg.expect("ProgramRunner channel closed");
use RunnerMsg::*; use RunnerMsg::*;
@ -196,7 +218,7 @@ impl RunnerTask {
self.sections = new_sections; self.sections = new_sections;
} }
UpdatePrograms(new_programs) => { UpdatePrograms(new_programs) => {
self.programs = new_programs; self.update_programs(new_programs);
} }
RunProgramId(program_id) => { RunProgramId(program_id) => {
let program = match self.programs.get(&program_id) { let program = match self.programs.get(&program_id) {
@ -255,6 +277,16 @@ impl RunnerTask {
Ok(()) Ok(())
} }
async fn handle_scheduled_run(
&mut self,
item: Result<delay_queue::Expired<ProgramRef>, tokio::time::Error>,
run_queue: &mut RunQueue,
) -> eyre::Result<()> {
let item = item.wrap_err("tokio time error")?;
run_queue.push_back(ProgRun::new(item.into_inner()));
Ok(())
}
async fn start_impl(&mut self) -> eyre::Result<()> { async fn start_impl(&mut self) -> eyre::Result<()> {
let mut sec_events = self let mut sec_events = self
.section_runner .section_runner
@ -271,7 +303,9 @@ impl RunnerTask {
tokio::select! { tokio::select! {
msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue),
sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?,
// _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() Some(scheduled_run) = self.scheduled_run_queue.next() => {
self.handle_scheduled_run(scheduled_run, &mut run_queue).await?;
},
}; };
} }
@ -373,9 +407,10 @@ mod test {
use crate::section_interface::{MockSectionInterface, SectionInterface}; use crate::section_interface::{MockSectionInterface, SectionInterface};
use crate::{ use crate::{
model::{Program, ProgramItem, Section}, model::{Program, ProgramItem, Section},
schedule::Schedule, schedule::{every_day, DateTimeBound, Schedule},
trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, trace_listeners::{EventListener, Filters, SpanFilters, SpanListener},
}; };
use assert_matches::assert_matches;
use im::ordmap; use im::ordmap;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::task::yield_now; use tokio::task::yield_now;
@ -430,12 +465,21 @@ mod test {
} }
fn make_program(num: ProgramId, sequence: Vec<ProgramItem>) -> ProgramRef { fn make_program(num: ProgramId, sequence: Vec<ProgramItem>) -> ProgramRef {
make_program_with_schedule(num, sequence, false, Schedule::default())
}
fn make_program_with_schedule(
num: ProgramId,
sequence: Vec<ProgramItem>,
enabled: bool,
schedule: Schedule,
) -> ProgramRef {
Program { Program {
id: num, id: num,
name: format!("Program {}", num), name: format!("Program {}", num),
sequence, sequence,
enabled: false, enabled,
schedule: Schedule::default(), schedule,
} }
.into() .into()
} }
@ -465,36 +509,24 @@ mod test {
runner.run_program(program).await.unwrap(); runner.run_program(program).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 1 if prog.id == 1
)); );
assert!(matches!( assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_));
sec_events.try_recv().unwrap(),
SectionEvent::RunStart(_)
));
assert_eq!(interface.get_section_state(0), true); assert_eq!(interface.get_section_state(0), true);
tokio::time::pause(); tokio::time::pause();
assert!(matches!( assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_));
sec_events.recv().await.unwrap(), assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunStart(_));
SectionEvent::RunFinish(_)
));
assert!(matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunStart(_)
));
assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(0), false);
assert_eq!(interface.get_section_state(1), true); assert_eq!(interface.get_section_state(1), true);
assert!(matches!( assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_));
sec_events.recv().await.unwrap(), assert_matches!(
SectionEvent::RunFinish(_)
));
assert!(matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(_) ProgramEvent::RunFinish(_)
)); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
@ -527,31 +559,31 @@ mod test {
yield_now().await; yield_now().await;
// Should immediately start and finish running program // Should immediately start and finish running program
// due to nonexistant section // due to nonexistant section
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 1 if prog.id == 1
)); );
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunFinish(prog) ProgramEvent::RunFinish(prog)
if prog.id == 1 if prog.id == 1
)); );
runner.run_program(program2).await.unwrap(); runner.run_program(program2).await.unwrap();
yield_now().await; yield_now().await;
// Should run right away since last program should be done // Should run right away since last program should be done
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 2 if prog.id == 2
)); );
tokio::time::pause(); tokio::time::pause();
assert!(matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(prog) ProgramEvent::RunFinish(prog)
if prog.id == 2 if prog.id == 2
)); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
@ -610,38 +642,35 @@ mod test {
// First try a non-existant program id // First try a non-existant program id
runner.run_program_id(3).await.unwrap(); runner.run_program_id(3).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty));
prog_events.try_recv(),
Err(broadcast::TryRecvError::Empty)
));
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 1 if prog.id == 1
)); );
tokio::time::pause(); tokio::time::pause();
assert!(matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(prog) ProgramEvent::RunFinish(prog)
if prog.id == 1 if prog.id == 1
)); );
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 1 if prog.id == 1
)); );
assert!(matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(prog) ProgramEvent::RunFinish(prog)
if prog.id == 1 if prog.id == 1
)); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
@ -673,27 +702,79 @@ mod test {
runner.run_program(program.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog) ProgramEvent::RunStart(prog)
if prog.id == 1 if prog.id == 1
)); );
assert!(matches!( assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_));
sec_events.try_recv().unwrap(),
SectionEvent::RunStart(_)
));
runner.cancel_program(program.id).await.unwrap(); runner.cancel_program(program.id).await.unwrap();
yield_now().await; yield_now().await;
assert!(matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunCancel(prog) ProgramEvent::RunCancel(prog)
if prog.id == 1 if prog.id == 1
)); );
assert!(matches!( assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunCancel(_));
sec_events.recv().await.unwrap(),
SectionEvent::RunCancel(_) runner.quit().await.unwrap();
)); sec_runner.quit().await.unwrap();
yield_now().await;
}
#[tokio::test]
async fn test_scheduled_run() {
tracing_subscriber::fmt().init();
let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap();
let make_programs = |num: ProgramId, enabled: bool| {
let now = chrono::Local::now();
let sched_time = now.time() + chrono::Duration::seconds(1);
let schedule = Schedule::new(
vec![sched_time],
every_day(),
DateTimeBound::None,
DateTimeBound::None,
);
let program1 = make_program_with_schedule(
num,
vec![ProgramItem {
section_id: 1,
duration: Duration::from_secs(10),
}],
enabled,
schedule,
);
let programs = ordmap![ 1 => program1 ];
programs
};
runner.update_sections(sections.clone()).await.unwrap();
runner
.update_programs(make_programs(1, false))
.await
.unwrap();
tokio::time::pause();
tokio::time::delay_for(Duration::from_secs(2)).await;
// Should not run (is disabled)
assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty));
runner
.update_programs(make_programs(2, true))
.await
.unwrap();
// Should run
tokio::time::delay_for(Duration::from_secs(2)).await;
assert_matches!(
prog_events.try_recv(),
Ok(ProgramEvent::RunStart(prog))
if prog.id == 2
);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();

Loading…
Cancel
Save