Browse Source

Add ProgramRunner

still needs some more tests
master
Alex Mikhalev 4 years ago
parent
commit
6343ed1353
  1. 1
      src/main.rs
  2. 459
      src/program_runner.rs

1
src/main.rs

@ -8,6 +8,7 @@ mod db; @@ -8,6 +8,7 @@ mod db;
mod migrations;
mod model;
mod option_future;
mod program_runner;
mod section_interface;
mod section_runner;
#[cfg(test)]

459
src/program_runner.rs

@ -0,0 +1,459 @@ @@ -0,0 +1,459 @@
use crate::model::{ProgramId, ProgramRef, Programs, Sections};
use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner};
use std::collections::VecDeque;
use thiserror::Error;
use tokio::{
spawn,
sync::{broadcast, mpsc, oneshot},
};
use tracing::{debug, error, trace, trace_span, warn};
#[derive(Debug)]
enum RunnerMsg {
Quit,
UpdateSections(Sections),
UpdatePrograms(Programs),
RunProgramId(ProgramId),
RunProgram(ProgramRef),
CancelProgram(ProgramId),
Subscribe(oneshot::Sender<ProgramEventRecv>),
}
#[derive(Clone, Debug)]
pub enum ProgramEvent {
RunStart(ProgramRef),
RunFinish(ProgramRef),
}
pub type ProgramEventRecv = broadcast::Receiver<ProgramEvent>;
type ProgramEventSend = broadcast::Sender<ProgramEvent>;
const EVENT_CAPACITY: usize = 8;
#[derive(Clone, Debug, PartialEq)]
enum RunState {
Waiting,
Running,
Finished,
}
#[derive(Debug)]
struct ProgRun {
program: ProgramRef,
state: RunState,
sec_run_handles: Vec<SectionRunHandle>,
}
impl ProgRun {
fn new(program: ProgramRef) -> Self {
Self {
program,
state: RunState::Waiting,
sec_run_handles: Vec::new(),
}
}
}
type RunQueue = VecDeque<ProgRun>;
struct RunnerTask {
section_runner: SectionRunner,
msg_recv: mpsc::Receiver<RunnerMsg>,
running: bool,
sections: Sections,
programs: Programs,
event_send: Option<ProgramEventSend>,
}
impl RunnerTask {
fn new(section_runner: SectionRunner, msg_recv: mpsc::Receiver<RunnerMsg>) -> Self {
Self {
section_runner,
msg_recv,
running: true,
sections: Sections::new(),
programs: Programs::new(),
event_send: None,
}
}
fn send_event(&mut self, event: ProgramEvent) {
if let Some(event_send) = &mut self.event_send {
match event_send.send(event) {
Ok(_) => {}
Err(_closed) => {
self.event_send = None;
}
}
}
}
fn subscribe_event(&mut self) -> ProgramEventRecv {
match &mut self.event_send {
Some(event_send) => event_send.subscribe(),
None => {
let (event_send, event_recv) = broadcast::channel(EVENT_CAPACITY);
self.event_send = Some(event_send);
event_recv
}
}
}
async fn start_program_run(&mut self, run: &mut ProgRun) {
if run.state != RunState::Waiting {
warn!(
program_id = run.program.id,
"cannot run program which is already running"
);
return;
}
run.sec_run_handles.reserve(run.program.sequence.len());
for item in &run.program.sequence {
let section = match self.sections.get(&item.section_id) {
Some(sec) => sec.clone(),
None => {
warn!(
program_id = run.program.id,
section_id = item.section_id,
"trying to run program with nonexistant section"
);
continue;
}
};
let handle = match self.section_runner.queue_run(section, item.duration).await {
Ok(handle) => handle,
Err(_closed) => {
error!("section runner channel closed");
self.running = false;
return;
}
};
run.sec_run_handles.push(handle);
}
run.state = RunState::Running;
debug!(program_id = run.program.id, "started running program");
self.send_event(ProgramEvent::RunStart(run.program.clone()));
}
async fn process_queue(&mut self, run_queue: &mut RunQueue) {
while let Some(current_run) = run_queue.front_mut() {
let run_finished = match current_run.state {
RunState::Waiting => {
self.start_program_run(current_run).await;
false
}
RunState::Running => false,
RunState::Finished => true,
};
if run_finished {
run_queue.pop_front();
} else {
break;
}
}
}
fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) {
let msg = msg.expect("SectionRunner channel closed");
use RunnerMsg::*;
trace!(msg = debug(&msg), "runner_task recv");
match msg {
Quit => self.running = false,
Subscribe(res_send) => {
let event_recv = self.subscribe_event();
// Ignore error if channel closed
let _ = res_send.send(event_recv);
}
UpdateSections(new_sections) => {
self.sections = new_sections;
}
UpdatePrograms(new_programs) => {
self.programs = new_programs;
}
RunProgramId(program_id) => {
let program = match self.programs.get(&program_id) {
Some(program) => program.clone(),
None => {
warn!(program_id, "trying to run non-existant program");
return;
}
};
run_queue.push_back(ProgRun::new(program));
}
RunProgram(program) => {
run_queue.push_back(ProgRun::new(program));
}
RunnerMsg::CancelProgram(_) => todo!(),
}
}
fn handle_finished_run(
&mut self,
finished_run: SectionRunHandle,
run_queue: &mut RunQueue,
) -> Option<()> {
let current_run = run_queue.front_mut()?;
let last_run_handle = current_run.sec_run_handles.last()?;
if finished_run == *last_run_handle {
current_run.state = RunState::Finished;
debug!(
program_id = current_run.program.id,
"finished running program"
);
self.send_event(ProgramEvent::RunFinish(current_run.program.clone()));
}
Some(())
}
fn handle_sec_event(
&mut self,
sec_event: Result<SectionEvent, broadcast::RecvError>,
run_queue: &mut RunQueue,
) {
let sec_event = match sec_event {
Ok(ev) => ev,
Err(broadcast::RecvError::Lagged(missed)) => {
warn!(
missed,
"missed some section events, increase event channel size"
);
return;
}
Err(broadcast::RecvError::Closed) => {
error!("section events channel closed");
self.running = false;
return;
}
};
#[allow(clippy::single_match)]
match sec_event {
SectionEvent::RunFinish(finished_run) => {
self.handle_finished_run(finished_run, run_queue);
}
_ => {}
}
}
async fn start(mut self) {
let span = trace_span!("runner_task");
let _enter = span.enter();
let mut sec_events = self
.section_runner
.subscribe()
.await
.expect("could not subscribe to SectionRunner events");
let mut run_queue: RunQueue = VecDeque::new();
while self.running {
self.process_queue(&mut run_queue).await;
tokio::select! {
msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue),
sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue),
// _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
};
}
}
}
#[derive(Debug, Clone, Error)]
#[error("the SectionRunner channel is closed")]
pub struct ChannelClosed;
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>;
impl<T> From<mpsc::error::SendError<T>> for ChannelClosed {
fn from(_: mpsc::error::SendError<T>) -> Self {
Self
}
}
impl From<oneshot::error::RecvError> for ChannelClosed {
fn from(_: oneshot::error::RecvError) -> Self {
Self
}
}
#[derive(Clone, Debug)]
pub struct ProgramRunner {
msg_send: mpsc::Sender<RunnerMsg>,
}
#[allow(dead_code)]
impl ProgramRunner {
pub fn new(section_runner: SectionRunner) -> Self {
let (msg_send, msg_recv) = mpsc::channel(8);
spawn(RunnerTask::new(section_runner, msg_recv).start());
Self { msg_send }
}
pub async fn quit(&mut self) -> Result<()> {
self.msg_send.send(RunnerMsg::Quit).await?;
Ok(())
}
pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> {
self.msg_send
.send(RunnerMsg::UpdateSections(new_sections))
.await
.map_err(From::from)
}
pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> {
self.msg_send
.send(RunnerMsg::UpdatePrograms(new_programs))
.await
.map_err(From::from)
}
pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> {
self.msg_send
.send(RunnerMsg::RunProgramId(program_id))
.await
.map_err(From::from)
}
pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> {
self.msg_send
.send(RunnerMsg::RunProgram(program))
.await
.map_err(From::from)
}
pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> {
self.msg_send
.send(RunnerMsg::CancelProgram(program_id))
.await
.map_err(From::from)
}
pub async fn subscribe(&mut self) -> Result<ProgramEventRecv> {
let (res_send, res_recv) = oneshot::channel();
self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?;
let event_recv = res_recv.await?;
Ok(event_recv)
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::section_interface::{MockSectionInterface, SectionInterface};
use crate::{
model::{Program, ProgramItem, Section},
trace_listeners::{EventListener, Filters, SpanFilters, SpanListener},
};
use im::ordmap;
use std::{sync::Arc, time::Duration};
use tracing_subscriber::prelude::*;
#[tokio::test]
async fn test_quit() {
let quit_msg = EventListener::new(
Filters::new()
.target("sprinklers_rs::program_runner")
.message("runner_task recv")
.field_value("msg", "Quit"),
);
let task_span = SpanListener::new(
SpanFilters::new()
.target("sprinklers_rs::program_runner")
.name("runner_task"),
);
let subscriber = tracing_subscriber::registry()
.with(quit_msg.clone())
.with(task_span.clone());
let _sub = tracing::subscriber::set_default(subscriber);
let interface = MockSectionInterface::new(6);
let sec_runner = SectionRunner::new(Arc::new(interface));
let mut runner = ProgramRunner::new(sec_runner);
tokio::task::yield_now().await;
runner.quit().await.unwrap();
tokio::task::yield_now().await;
assert_eq!(quit_msg.get_count(), 1);
assert_eq!(task_span.get_exit_count(), 1);
}
fn make_sections_and_runner() -> (Sections, SectionRunner, Arc<MockSectionInterface>) {
let interface = Arc::new(MockSectionInterface::new(2));
let sections: Sections = ordmap![
1 => Section {
id: 1,
name: "Section 1".into(),
interface_id: 0,
}.into(),
2 => Section {
id: 2,
name: "Section 2".into(),
interface_id: 1,
}.into()
];
let sec_runner = SectionRunner::new(interface.clone());
(sections, sec_runner, interface)
}
#[tokio::test]
async fn test_run_program() {
tracing_subscriber::fmt().init();
let (sections, mut sec_runner, interface) = make_sections_and_runner();
let mut sec_events = sec_runner.subscribe().await.unwrap();
let mut runner = ProgramRunner::new(sec_runner);
let mut prog_events = runner.subscribe().await.unwrap();
let program: ProgramRef = Program {
id: 1,
name: "Program 1".into(),
sequence: vec![
ProgramItem {
section_id: 1,
duration: Duration::from_secs(10),
},
ProgramItem {
section_id: 2,
duration: Duration::from_secs(10),
},
],
}
.into();
runner.update_sections(sections.clone()).await.unwrap();
runner.run_program(program).await.unwrap();
tokio::task::yield_now().await;
assert!(matches!(
prog_events.try_recv().unwrap(),
ProgramEvent::RunStart(prog)
if prog.id == 1
));
assert!(matches!(
sec_events.try_recv().unwrap(),
SectionEvent::RunStart(_)
));
assert_eq!(interface.get_section_state(0), true);
tokio::time::pause();
assert!(matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunFinish(_)
));
assert!(matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunStart(_)
));
assert_eq!(interface.get_section_state(0), false);
assert_eq!(interface.get_section_state(1), true);
assert!(matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunFinish(_)
));
assert!(matches!(
prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(_)
));
runner.quit().await.unwrap();
}
}
Loading…
Cancel
Save