|
|
@ -131,8 +131,14 @@ impl RunnerTask { |
|
|
|
run.sec_run_handles.push(handle); |
|
|
|
run.sec_run_handles.push(handle); |
|
|
|
} |
|
|
|
} |
|
|
|
run.state = RunState::Running; |
|
|
|
run.state = RunState::Running; |
|
|
|
debug!(program_id = run.program.id, "started running program"); |
|
|
|
|
|
|
|
self.send_event(ProgramEvent::RunStart(run.program.clone())); |
|
|
|
self.send_event(ProgramEvent::RunStart(run.program.clone())); |
|
|
|
|
|
|
|
if run.sec_run_handles.is_empty() { |
|
|
|
|
|
|
|
warn!(program_id = run.program.id, "program has no valid sections"); |
|
|
|
|
|
|
|
run.state = RunState::Finished; |
|
|
|
|
|
|
|
self.send_event(ProgramEvent::RunFinish(run.program.clone())); |
|
|
|
|
|
|
|
} else { |
|
|
|
|
|
|
|
debug!(program_id = run.program.id, "started running program"); |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async fn process_queue(&mut self, run_queue: &mut RunQueue) { |
|
|
|
async fn process_queue(&mut self, run_queue: &mut RunQueue) { |
|
|
@ -154,7 +160,7 @@ impl RunnerTask { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) { |
|
|
|
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) { |
|
|
|
let msg = msg.expect("SectionRunner channel closed"); |
|
|
|
let msg = msg.expect("ProgramRunner channel closed"); |
|
|
|
use RunnerMsg::*; |
|
|
|
use RunnerMsg::*; |
|
|
|
trace!(msg = debug(&msg), "runner_task recv"); |
|
|
|
trace!(msg = debug(&msg), "runner_task recv"); |
|
|
|
match msg { |
|
|
|
match msg { |
|
|
@ -258,7 +264,7 @@ impl RunnerTask { |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
#[derive(Debug, Clone, Error)] |
|
|
|
#[derive(Debug, Clone, Error)] |
|
|
|
#[error("the SectionRunner channel is closed")] |
|
|
|
#[error("the ProgramRunner channel is closed")] |
|
|
|
pub struct ChannelClosed; |
|
|
|
pub struct ChannelClosed; |
|
|
|
|
|
|
|
|
|
|
|
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>; |
|
|
|
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>; |
|
|
@ -347,6 +353,7 @@ mod test { |
|
|
|
use im::ordmap; |
|
|
|
use im::ordmap; |
|
|
|
use std::{sync::Arc, time::Duration}; |
|
|
|
use std::{sync::Arc, time::Duration}; |
|
|
|
use tracing_subscriber::prelude::*; |
|
|
|
use tracing_subscriber::prelude::*; |
|
|
|
|
|
|
|
use tokio::task::yield_now; |
|
|
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
#[tokio::test] |
|
|
|
async fn test_quit() { |
|
|
|
async fn test_quit() { |
|
|
@ -367,11 +374,12 @@ mod test { |
|
|
|
let _sub = tracing::subscriber::set_default(subscriber); |
|
|
|
let _sub = tracing::subscriber::set_default(subscriber); |
|
|
|
|
|
|
|
|
|
|
|
let interface = MockSectionInterface::new(6); |
|
|
|
let interface = MockSectionInterface::new(6); |
|
|
|
let sec_runner = SectionRunner::new(Arc::new(interface)); |
|
|
|
let mut sec_runner = SectionRunner::new(Arc::new(interface)); |
|
|
|
let mut runner = ProgramRunner::new(sec_runner); |
|
|
|
let mut runner = ProgramRunner::new(sec_runner.clone()); |
|
|
|
tokio::task::yield_now().await; |
|
|
|
yield_now().await; |
|
|
|
runner.quit().await.unwrap(); |
|
|
|
runner.quit().await.unwrap(); |
|
|
|
tokio::task::yield_now().await; |
|
|
|
sec_runner.quit().await.unwrap(); |
|
|
|
|
|
|
|
yield_now().await; |
|
|
|
|
|
|
|
|
|
|
|
assert_eq!(quit_msg.get_count(), 1); |
|
|
|
assert_eq!(quit_msg.get_count(), 1); |
|
|
|
assert_eq!(task_span.get_exit_count(), 1); |
|
|
|
assert_eq!(task_span.get_exit_count(), 1); |
|
|
@ -397,10 +405,9 @@ mod test { |
|
|
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
#[tokio::test] |
|
|
|
async fn test_run_program() { |
|
|
|
async fn test_run_program() { |
|
|
|
tracing_subscriber::fmt().init(); |
|
|
|
|
|
|
|
let (sections, mut sec_runner, interface) = make_sections_and_runner(); |
|
|
|
let (sections, mut sec_runner, interface) = make_sections_and_runner(); |
|
|
|
let mut sec_events = sec_runner.subscribe().await.unwrap(); |
|
|
|
let mut sec_events = sec_runner.subscribe().await.unwrap(); |
|
|
|
let mut runner = ProgramRunner::new(sec_runner); |
|
|
|
let mut runner = ProgramRunner::new(sec_runner.clone()); |
|
|
|
let mut prog_events = runner.subscribe().await.unwrap(); |
|
|
|
let mut prog_events = runner.subscribe().await.unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
let program: ProgramRef = Program { |
|
|
|
let program: ProgramRef = Program { |
|
|
@ -422,7 +429,7 @@ mod test { |
|
|
|
runner.update_sections(sections.clone()).await.unwrap(); |
|
|
|
runner.update_sections(sections.clone()).await.unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
runner.run_program(program).await.unwrap(); |
|
|
|
runner.run_program(program).await.unwrap(); |
|
|
|
tokio::task::yield_now().await; |
|
|
|
yield_now().await; |
|
|
|
assert!(matches!( |
|
|
|
assert!(matches!( |
|
|
|
prog_events.try_recv().unwrap(), |
|
|
|
prog_events.try_recv().unwrap(), |
|
|
|
ProgramEvent::RunStart(prog) |
|
|
|
ProgramEvent::RunStart(prog) |
|
|
@ -455,5 +462,73 @@ mod test { |
|
|
|
)); |
|
|
|
)); |
|
|
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
|
|
|
sec_runner.quit().await.unwrap(); |
|
|
|
|
|
|
|
yield_now().await; |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#[tokio::test] |
|
|
|
|
|
|
|
async fn test_run_nonexistant_section() { |
|
|
|
|
|
|
|
let (sections, mut sec_runner, _) = make_sections_and_runner(); |
|
|
|
|
|
|
|
let mut runner = ProgramRunner::new(sec_runner.clone()); |
|
|
|
|
|
|
|
let mut prog_events = runner.subscribe().await.unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
let program1: ProgramRef = Program { |
|
|
|
|
|
|
|
id: 1, |
|
|
|
|
|
|
|
name: "Program 1".into(), |
|
|
|
|
|
|
|
sequence: vec![ |
|
|
|
|
|
|
|
ProgramItem { |
|
|
|
|
|
|
|
section_id: 3, |
|
|
|
|
|
|
|
duration: Duration::from_secs(10), |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
], |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
.into(); |
|
|
|
|
|
|
|
let program2: ProgramRef = Program { |
|
|
|
|
|
|
|
id: 2, |
|
|
|
|
|
|
|
name: "Program 2".into(), |
|
|
|
|
|
|
|
sequence: vec![ |
|
|
|
|
|
|
|
ProgramItem { |
|
|
|
|
|
|
|
section_id: 1, |
|
|
|
|
|
|
|
duration: Duration::from_secs(10), |
|
|
|
|
|
|
|
}, |
|
|
|
|
|
|
|
], |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
.into(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
runner.update_sections(sections.clone()).await.unwrap(); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
runner.run_program(program1).await.unwrap(); |
|
|
|
|
|
|
|
yield_now().await; |
|
|
|
|
|
|
|
// Should immediately start and finish running program
|
|
|
|
|
|
|
|
// due to nonexistant section
|
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
|
|
|
|
prog_events.try_recv().unwrap(), |
|
|
|
|
|
|
|
ProgramEvent::RunStart(prog) |
|
|
|
|
|
|
|
if prog.id == 1 |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
|
|
|
|
prog_events.try_recv().unwrap(), |
|
|
|
|
|
|
|
ProgramEvent::RunFinish(prog) |
|
|
|
|
|
|
|
if prog.id == 1 |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
runner.run_program(program2).await.unwrap(); |
|
|
|
|
|
|
|
yield_now().await; |
|
|
|
|
|
|
|
// Should run right away since last program should be done
|
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
|
|
|
|
prog_events.try_recv().unwrap(), |
|
|
|
|
|
|
|
ProgramEvent::RunStart(prog) |
|
|
|
|
|
|
|
if prog.id == 2 |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
tokio::time::pause(); |
|
|
|
|
|
|
|
assert!(matches!( |
|
|
|
|
|
|
|
prog_events.recv().await.unwrap(), |
|
|
|
|
|
|
|
ProgramEvent::RunFinish(prog) |
|
|
|
|
|
|
|
if prog.id == 2 |
|
|
|
|
|
|
|
)); |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
runner.quit().await.unwrap(); |
|
|
|
|
|
|
|
sec_runner.quit().await.unwrap(); |
|
|
|
|
|
|
|
yield_now().await; |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|