Browse Source

Fix running a program on schedule after it ran already

master
Alex Mikhalev 4 years ago
parent
commit
c571eae5ce
  1. 98
      sprinklers_actors/src/program_runner.rs

98
sprinklers_actors/src/program_runner.rs

@ -367,6 +367,17 @@ impl Handler<Process> for ProgramRunnerActor {
} }
} }
#[derive(Message)]
#[rtype(result = "()")]
struct UpdateSchedules;
impl Handler<UpdateSchedules> for ProgramRunnerActor {
type Result = ();
fn handle(&mut self, _msg: UpdateSchedules, ctx: &mut Self::Context) -> Self::Result {
self.inner.update_schedules(ctx);
}
}
impl ProgramRunnerActor { impl ProgramRunnerActor {
fn new(section_runner: SectionRunner) -> Self { fn new(section_runner: SectionRunner) -> Self {
Self { Self {
@ -416,6 +427,7 @@ impl ProgramRunnerActor {
trace!(program_id = program.id, "schedule expired"); trace!(program_id = program.id, "schedule expired");
self.run_queue.push_back(ProgRun::new(program)); self.run_queue.push_back(ProgRun::new(program));
ctx.notify(Process); ctx.notify(Process);
ctx.notify(UpdateSchedules);
} }
} }
@ -866,7 +878,7 @@ mod test {
let make_programs = |num: ProgramId, enabled: bool| { let make_programs = |num: ProgramId, enabled: bool| {
let now = chrono::Local::now(); let now = chrono::Local::now();
let sched_time = now.time() + chrono::Duration::seconds(1); let sched_time = now.time() + chrono::Duration::microseconds(1000);
let schedule = Schedule::new( let schedule = Schedule::new(
vec![sched_time], vec![sched_time],
every_day(), every_day(),
@ -877,7 +889,7 @@ mod test {
num, num,
vec![ProgramItem { vec![ProgramItem {
section_id: 1, section_id: 1,
duration: Duration::from_secs(10), duration: Duration::from_micros(100),
}], }],
enabled, enabled,
schedule, schedule,
@ -892,8 +904,10 @@ mod test {
.await .await
.unwrap(); .unwrap();
tokio::time::pause(); // TODO: would use tokio::time::pause here but that doesn't effect chrono now
tokio::time::delay_for(Duration::from_secs(2)).await; // which is used for schedules
// tokio::time::pause();
tokio::time::delay_for(Duration::from_micros(1100)).await;
// Should not run (is disabled) // Should not run (is disabled)
assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty));
@ -903,10 +917,10 @@ mod test {
.unwrap(); .unwrap();
// Should run // Should run
tokio::time::delay_for(Duration::from_secs(2)).await; tokio::time::delay_for(Duration::from_micros(1100)).await;
assert_matches!(prog_events.try_recv(), Ok(ProgramEvent::NextRun(_, _))); assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _)));
assert_matches!( assert_matches!(
prog_events.try_recv(), prog_events.recv().await,
Ok(ProgramEvent::RunStart(prog)) Ok(ProgramEvent::RunStart(prog))
if prog.id == 2 if prog.id == 2
); );
@ -922,8 +936,8 @@ mod test {
let mut prog_events = runner.subscribe().await.unwrap(); let mut prog_events = runner.subscribe().await.unwrap();
let now = chrono::Local::now(); let now = chrono::Local::now();
let sched_time1 = now.time() + chrono::Duration::seconds(1); let sched_time1 = now.time() + chrono::Duration::microseconds(1000);
let sched_time2 = now.time() + chrono::Duration::seconds(20); let sched_time2 = now.time() + chrono::Duration::microseconds(5000);
let schedule = Schedule::new( let schedule = Schedule::new(
vec![sched_time1, sched_time2], vec![sched_time1, sched_time2],
every_day(), every_day(),
@ -934,7 +948,7 @@ mod test {
1, 1,
vec![ProgramItem { vec![ProgramItem {
section_id: 1, section_id: 1,
duration: Duration::from_secs(10), duration: Duration::from_micros(10),
}], }],
true, true,
schedule, schedule,
@ -944,35 +958,43 @@ mod test {
runner.update_sections(sections.clone()).await.unwrap(); runner.update_sections(sections.clone()).await.unwrap();
runner.update_programs(programs).await.unwrap(); runner.update_programs(programs).await.unwrap();
tokio::time::pause(); let fut = async move {
// Should run // TODO: would use tokio::time::pause here but that doesn't effect chrono now
tokio::time::delay_for(Duration::from_secs(2)).await; // which is used for schedules
assert_matches!(prog_events.try_recv(), Ok(ProgramEvent::NextRun(_, _))); // tokio::time::pause();
assert_matches!( // Should run
prog_events.try_recv(), tokio::time::delay_for(Duration::from_micros(1100)).await;
Ok(ProgramEvent::RunStart(prog)) assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _)));
if prog.id == 1 assert_matches!(
); prog_events.recv().await,
tokio::time::delay_for(Duration::from_secs(10)).await; Ok(ProgramEvent::RunStart(prog))
assert_matches!( if prog.id == 1
prog_events.try_recv(), );
Ok(ProgramEvent::RunFinish(prog)) assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _)));
if prog.id == 1 assert_matches!(
); prog_events.recv().await,
Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1
);
// Should run again // Should run again
tokio::time::delay_for(Duration::from_secs(10)).await; assert_matches!(
assert_matches!( prog_events.recv().await,
prog_events.try_recv(), Ok(ProgramEvent::RunStart(prog))
Ok(ProgramEvent::RunStart(prog)) if prog.id == 1
if prog.id == 1 );
); tokio::task::yield_now().await;
tokio::time::delay_for(Duration::from_secs(10)).await; assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _)));
assert_matches!( assert_matches!(
prog_events.try_recv(), prog_events.recv().await,
Ok(ProgramEvent::RunFinish(prog)) Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1 if prog.id == 1
); );
};
// TODO: this still sometimes fails
tokio::time::timeout(Duration::from_micros(10000), fut)
.await
.unwrap();
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();

Loading…
Cancel
Save