Browse Source

Fix program runner bug

Queued program would never start running
Also add test for taht
master
Alex Mikhalev 4 years ago
parent
commit
da3662c3df
  1. 74
      sprinklers_actors/src/program_runner.rs

74
sprinklers_actors/src/program_runner.rs

@ -194,7 +194,7 @@ impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for ProgramRunner
fn handle( fn handle(
&mut self, &mut self,
item: Result<SectionEvent, broadcast::RecvError>, item: Result<SectionEvent, broadcast::RecvError>,
_ctx: &mut Self::Context, ctx: &mut Self::Context,
) { ) {
let sec_event = match item { let sec_event = match item {
Ok(e) => e, Ok(e) => e,
@ -206,7 +206,7 @@ impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for ProgramRunner
#[allow(clippy::single_match)] #[allow(clippy::single_match)]
match sec_event { match sec_event {
SectionEvent::RunFinish(finished_run, _) => { SectionEvent::RunFinish(finished_run, _) => {
self.handle_finished_run(finished_run); self.handle_finished_run(finished_run, ctx);
} }
_ => {} _ => {}
} }
@ -378,7 +378,11 @@ impl ProgramRunnerActor {
} }
} }
fn handle_finished_run(&mut self, finished_run: SectionRunHandle) -> Option<()> { fn handle_finished_run(
&mut self,
finished_run: SectionRunHandle,
ctx: &mut <ProgramRunnerActor as Actor>::Context,
) -> Option<()> {
let current_run = self.run_queue.front_mut()?; let current_run = self.run_queue.front_mut()?;
let last_run_handle = current_run.sec_run_handles.last()?; let last_run_handle = current_run.sec_run_handles.last()?;
if finished_run == *last_run_handle { if finished_run == *last_run_handle {
@ -389,6 +393,7 @@ impl ProgramRunnerActor {
); );
self.inner self.inner
.send_event(ProgramEvent::RunFinish(current_run.program.clone())); .send_event(ProgramEvent::RunFinish(current_run.program.clone()));
ctx.notify(Process);
} }
Some(()) Some(())
} }
@ -482,12 +487,8 @@ impl ProgramRunner {
Ok(event_recv) Ok(event_recv)
} }
pub fn listen_programs( pub fn listen_programs(&mut self, programs_watch: watch::Receiver<Programs>) {
&mut self, self.addr.do_send(ListenPrograms(programs_watch))
programs_watch: watch::Receiver<Programs>,
) {
self.addr
.do_send(ListenPrograms(programs_watch))
} }
} }
@ -754,6 +755,61 @@ mod test {
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
} }
#[actix_rt::test]
async fn test_queue_program() {
let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone());
let mut prog_events = runner.subscribe().await.unwrap();
let program1 = make_program(
1,
vec![ProgramItem {
section_id: 2,
duration: Duration::from_secs(10),
}],
);
let program2 = make_program(
2,
vec![ProgramItem {
section_id: 2,
duration: Duration::from_secs(10),
}],
);
let programs = ordmap![ 1 => program1, 2 => program2 ];
runner.update_sections(sections.clone()).await.unwrap();
runner.update_programs(programs).await.unwrap();
runner.run_program_id(1).await.unwrap();
runner.run_program_id(2).await.unwrap();
tokio::time::pause();
tokio::time::delay_for(Duration::from_secs(21)).await;
assert_matches!(
prog_events.try_recv(),
Ok(ProgramEvent::RunStart(prog))
if prog.id == 1
);
assert_matches!(
prog_events.try_recv(),
Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1
);
assert_matches!(
prog_events.try_recv(),
Ok(ProgramEvent::RunStart(prog))
if prog.id == 2
);
assert_matches!(
prog_events.try_recv(),
Ok(ProgramEvent::RunFinish(prog))
if prog.id == 2
);
runner.quit().await.unwrap();
sec_runner.quit().await.unwrap();
}
#[actix_rt::test] #[actix_rt::test]
async fn test_cancel_program() { async fn test_cancel_program() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();

Loading…
Cancel
Save