diff --git a/sprinklers_actors/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs index 376543f..b2f48cf 100644 --- a/sprinklers_actors/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -367,6 +367,17 @@ impl Handler for ProgramRunnerActor { } } +#[derive(Message)] +#[rtype(result = "()")] +struct UpdateSchedules; + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, _msg: UpdateSchedules, ctx: &mut Self::Context) -> Self::Result { + self.inner.update_schedules(ctx); + } +} + impl ProgramRunnerActor { fn new(section_runner: SectionRunner) -> Self { Self { @@ -416,6 +427,7 @@ impl ProgramRunnerActor { trace!(program_id = program.id, "schedule expired"); self.run_queue.push_back(ProgRun::new(program)); ctx.notify(Process); + ctx.notify(UpdateSchedules); } } @@ -866,7 +878,7 @@ mod test { let make_programs = |num: ProgramId, enabled: bool| { let now = chrono::Local::now(); - let sched_time = now.time() + chrono::Duration::seconds(1); + let sched_time = now.time() + chrono::Duration::microseconds(1000); let schedule = Schedule::new( vec![sched_time], every_day(), @@ -877,7 +889,7 @@ mod test { num, vec![ProgramItem { section_id: 1, - duration: Duration::from_secs(10), + duration: Duration::from_micros(100), }], enabled, schedule, @@ -892,8 +904,10 @@ mod test { .await .unwrap(); - tokio::time::pause(); - tokio::time::delay_for(Duration::from_secs(2)).await; + // TODO: would use tokio::time::pause here but that doesn't effect chrono now + // which is used for schedules + // tokio::time::pause(); + tokio::time::delay_for(Duration::from_micros(1100)).await; // Should not run (is disabled) assert_matches!(prog_events.try_recv(), Err(broadcast::TryRecvError::Empty)); @@ -903,10 +917,10 @@ mod test { .unwrap(); // Should run - tokio::time::delay_for(Duration::from_secs(2)).await; - assert_matches!(prog_events.try_recv(), Ok(ProgramEvent::NextRun(_, _))); + tokio::time::delay_for(Duration::from_micros(1100)).await; + assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _))); assert_matches!( - prog_events.try_recv(), + prog_events.recv().await, Ok(ProgramEvent::RunStart(prog)) if prog.id == 2 ); @@ -922,8 +936,8 @@ mod test { let mut prog_events = runner.subscribe().await.unwrap(); let now = chrono::Local::now(); - let sched_time1 = now.time() + chrono::Duration::seconds(1); - let sched_time2 = now.time() + chrono::Duration::seconds(20); + let sched_time1 = now.time() + chrono::Duration::microseconds(1000); + let sched_time2 = now.time() + chrono::Duration::microseconds(5000); let schedule = Schedule::new( vec![sched_time1, sched_time2], every_day(), @@ -934,7 +948,7 @@ mod test { 1, vec![ProgramItem { section_id: 1, - duration: Duration::from_secs(10), + duration: Duration::from_micros(10), }], true, schedule, @@ -944,35 +958,43 @@ mod test { runner.update_sections(sections.clone()).await.unwrap(); runner.update_programs(programs).await.unwrap(); - tokio::time::pause(); - // Should run - tokio::time::delay_for(Duration::from_secs(2)).await; - assert_matches!(prog_events.try_recv(), Ok(ProgramEvent::NextRun(_, _))); - assert_matches!( - prog_events.try_recv(), - Ok(ProgramEvent::RunStart(prog)) - if prog.id == 1 - ); - tokio::time::delay_for(Duration::from_secs(10)).await; - assert_matches!( - prog_events.try_recv(), - Ok(ProgramEvent::RunFinish(prog)) - if prog.id == 1 - ); + let fut = async move { + // TODO: would use tokio::time::pause here but that doesn't effect chrono now + // which is used for schedules + // tokio::time::pause(); + // Should run + tokio::time::delay_for(Duration::from_micros(1100)).await; + assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _))); + assert_matches!( + prog_events.recv().await, + Ok(ProgramEvent::RunStart(prog)) + if prog.id == 1 + ); + assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _))); + assert_matches!( + prog_events.recv().await, + Ok(ProgramEvent::RunFinish(prog)) + if prog.id == 1 + ); - // Should run again - tokio::time::delay_for(Duration::from_secs(10)).await; - assert_matches!( - prog_events.try_recv(), - Ok(ProgramEvent::RunStart(prog)) - if prog.id == 1 - ); - tokio::time::delay_for(Duration::from_secs(10)).await; - assert_matches!( - prog_events.try_recv(), - Ok(ProgramEvent::RunFinish(prog)) - if prog.id == 1 - ); + // Should run again + assert_matches!( + prog_events.recv().await, + Ok(ProgramEvent::RunStart(prog)) + if prog.id == 1 + ); + tokio::task::yield_now().await; + assert_matches!(prog_events.recv().await, Ok(ProgramEvent::NextRun(_, _))); + assert_matches!( + prog_events.recv().await, + Ok(ProgramEvent::RunFinish(prog)) + if prog.id == 1 + ); + }; + // TODO: this still sometimes fails + tokio::time::timeout(Duration::from_micros(10000), fut) + .await + .unwrap(); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();