From da3662c3df9752ea047644375c8e70baa78a30f7 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 4 Oct 2020 16:18:10 -0600 Subject: [PATCH] Fix program runner bug Queued program would never start running Also add test for taht --- sprinklers_actors/src/program_runner.rs | 74 ++++++++++++++++++++++--- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/sprinklers_actors/src/program_runner.rs b/sprinklers_actors/src/program_runner.rs index 448c7fa..66c78fe 100644 --- a/sprinklers_actors/src/program_runner.rs +++ b/sprinklers_actors/src/program_runner.rs @@ -194,7 +194,7 @@ impl StreamHandler> for ProgramRunner fn handle( &mut self, item: Result, - _ctx: &mut Self::Context, + ctx: &mut Self::Context, ) { let sec_event = match item { Ok(e) => e, @@ -206,7 +206,7 @@ impl StreamHandler> for ProgramRunner #[allow(clippy::single_match)] match sec_event { SectionEvent::RunFinish(finished_run, _) => { - self.handle_finished_run(finished_run); + self.handle_finished_run(finished_run, ctx); } _ => {} } @@ -378,7 +378,11 @@ impl ProgramRunnerActor { } } - fn handle_finished_run(&mut self, finished_run: SectionRunHandle) -> Option<()> { + fn handle_finished_run( + &mut self, + finished_run: SectionRunHandle, + ctx: &mut ::Context, + ) -> Option<()> { let current_run = self.run_queue.front_mut()?; let last_run_handle = current_run.sec_run_handles.last()?; if finished_run == *last_run_handle { @@ -389,6 +393,7 @@ impl ProgramRunnerActor { ); self.inner .send_event(ProgramEvent::RunFinish(current_run.program.clone())); + ctx.notify(Process); } Some(()) } @@ -482,12 +487,8 @@ impl ProgramRunner { Ok(event_recv) } - pub fn listen_programs( - &mut self, - programs_watch: watch::Receiver, - ) { - self.addr - .do_send(ListenPrograms(programs_watch)) + pub fn listen_programs(&mut self, programs_watch: watch::Receiver) { + self.addr.do_send(ListenPrograms(programs_watch)) } } @@ -754,6 +755,61 @@ mod test { sec_runner.quit().await.unwrap(); } + #[actix_rt::test] + async fn test_queue_program() { + let (sections, mut sec_runner, _) = make_sections_and_runner(); + let mut runner = ProgramRunner::new(sec_runner.clone()); + let mut prog_events = runner.subscribe().await.unwrap(); + + let program1 = make_program( + 1, + vec![ProgramItem { + section_id: 2, + duration: Duration::from_secs(10), + }], + ); + let program2 = make_program( + 2, + vec![ProgramItem { + section_id: 2, + duration: Duration::from_secs(10), + }], + ); + let programs = ordmap![ 1 => program1, 2 => program2 ]; + + runner.update_sections(sections.clone()).await.unwrap(); + runner.update_programs(programs).await.unwrap(); + + runner.run_program_id(1).await.unwrap(); + runner.run_program_id(2).await.unwrap(); + tokio::time::pause(); + tokio::time::delay_for(Duration::from_secs(21)).await; + + assert_matches!( + prog_events.try_recv(), + Ok(ProgramEvent::RunStart(prog)) + if prog.id == 1 + ); + assert_matches!( + prog_events.try_recv(), + Ok(ProgramEvent::RunFinish(prog)) + if prog.id == 1 + ); + assert_matches!( + prog_events.try_recv(), + Ok(ProgramEvent::RunStart(prog)) + if prog.id == 2 + ); + assert_matches!( + prog_events.try_recv(), + Ok(ProgramEvent::RunFinish(prog)) + if prog.id == 2 + ); + + runner.quit().await.unwrap(); + sec_runner.quit().await.unwrap(); + } + #[actix_rt::test] async fn test_cancel_program() { let (sections, mut sec_runner, _) = make_sections_and_runner();