Use actix for program_runner
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			This commit is contained in:
		
							parent
							
								
									9643923428
								
							
						
					
					
						commit
						fc8821171a
					
				| @ -22,6 +22,7 @@ serde = { version = "1.0.116", features = ["derive"] } | ||||
| serde_json = "1.0.57" | ||||
| actix = "0.10.0" | ||||
| actix-rt = "1.1.1" | ||||
| futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await"] } | ||||
| 
 | ||||
| [dependencies.rumqttc] | ||||
| git = "https://github.com/bytebeamio/rumqtt.git" | ||||
|  | ||||
| @ -1,27 +1,18 @@ | ||||
| use crate::model::{ProgramId, ProgramRef, Programs, Sections}; | ||||
| use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; | ||||
| use eyre::WrapErr; | ||||
| use crate::section_runner::{ | ||||
|     Error as SectionRunnerError, SectionEvent, SectionEventRecv, SectionRunHandle, SectionRunner, | ||||
| }; | ||||
| use actix::{ | ||||
|     Actor, ActorContext, ActorFuture, ActorStream, Addr, AsyncContext, Handler, Message, | ||||
|     MessageResult, SpawnHandle, StreamHandler, WrapFuture, | ||||
| }; | ||||
| use std::collections::VecDeque; | ||||
| use thiserror::Error; | ||||
| use tokio::{ | ||||
|     spawn, | ||||
|     stream::StreamExt, | ||||
|     sync::{broadcast, mpsc, oneshot}, | ||||
|     time::{delay_queue, DelayQueue}, | ||||
| }; | ||||
| use tracing::{debug, error, trace, trace_span, warn}; | ||||
| use tracing_futures::Instrument; | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
| enum RunnerMsg { | ||||
|     Quit(oneshot::Sender<()>), | ||||
|     UpdateSections(Sections), | ||||
|     UpdatePrograms(Programs), | ||||
|     RunProgramId(ProgramId), | ||||
|     RunProgram(ProgramRef), | ||||
|     CancelProgram(ProgramId), | ||||
|     Subscribe(oneshot::Sender<ProgramEventRecv>), | ||||
| } | ||||
| use tracing::{debug, error, trace, warn}; | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| pub enum ProgramEvent { | ||||
| @ -62,31 +53,15 @@ impl ProgRun { | ||||
| 
 | ||||
| type RunQueue = VecDeque<ProgRun>; | ||||
| 
 | ||||
| struct RunnerTask { | ||||
| struct ProgramRunnerInner { | ||||
|     section_runner: SectionRunner, | ||||
|     msg_recv: mpsc::Receiver<RunnerMsg>, | ||||
|     running: bool, | ||||
|     sections: Sections, | ||||
|     programs: Programs, | ||||
|     event_send: Option<ProgramEventSend>, | ||||
|     scheduled_run_queue: DelayQueue<ProgramRef>, | ||||
|     quit_tx: Option<oneshot::Sender<()>>, | ||||
|     schedule_run_fut: Option<SpawnHandle>, | ||||
| } | ||||
| 
 | ||||
| impl RunnerTask { | ||||
|     fn new(section_runner: SectionRunner, msg_recv: mpsc::Receiver<RunnerMsg>) -> Self { | ||||
|         Self { | ||||
|             section_runner, | ||||
|             msg_recv, | ||||
|             running: true, | ||||
|             sections: Sections::new(), | ||||
|             programs: Programs::new(), | ||||
|             event_send: None, | ||||
|             scheduled_run_queue: DelayQueue::new(), | ||||
|             quit_tx: None, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| impl ProgramRunnerInner { | ||||
|     fn send_event(&mut self, event: ProgramEvent) { | ||||
|         if let Some(event_send) = &mut self.event_send { | ||||
|             match event_send.send(event) { | ||||
| @ -109,88 +84,57 @@ impl RunnerTask { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     async fn start_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { | ||||
|     fn start_program_run(&mut self, run: &mut ProgRun) { | ||||
|         if run.state != RunState::Waiting { | ||||
|             warn!( | ||||
|                 program_id = run.program.id, | ||||
|                 "cannot run program which is already running" | ||||
|             ); | ||||
|             return Ok(()); | ||||
|             return; | ||||
|         } | ||||
|         run.sec_run_handles.reserve(run.program.sequence.len()); | ||||
|         for item in &run.program.sequence { | ||||
|             let section = match self.sections.get(&item.section_id) { | ||||
|                 Some(sec) => sec.clone(), | ||||
|         let sequence: Vec<_> = run | ||||
|             .program | ||||
|             .sequence | ||||
|             .iter() | ||||
|             .filter_map(|item| match self.sections.get(&item.section_id) { | ||||
|                 Some(sec) => Some((sec.clone(), item.duration)), | ||||
|                 None => { | ||||
|                     warn!( | ||||
|                         program_id = run.program.id, | ||||
|                         section_id = item.section_id, | ||||
|                         "trying to run program with nonexistant section" | ||||
|                     ); | ||||
|                     continue; | ||||
|                     None | ||||
|                 } | ||||
|             }; | ||||
|             let handle = self | ||||
|                 .section_runner | ||||
|                 .queue_run(section, item.duration) | ||||
|                 .await | ||||
|                 .wrap_err("failed to queue section run")?; | ||||
|             }) | ||||
|             .collect(); | ||||
|         if sequence.is_empty() { | ||||
|             warn!(program_id = run.program.id, "program has no valid sections"); | ||||
|             run.state = RunState::Finished; | ||||
|             self.send_event(ProgramEvent::RunStart(run.program.clone())); | ||||
|             self.send_event(ProgramEvent::RunFinish(run.program.clone())); | ||||
|             return; | ||||
|         } | ||||
|         run.sec_run_handles.reserve(sequence.len()); | ||||
|         for (section, duration) in sequence { | ||||
|             let handle = self.section_runner.do_queue_run(section, duration); | ||||
|             run.sec_run_handles.push(handle); | ||||
|         } | ||||
|         run.state = RunState::Running; | ||||
|         self.send_event(ProgramEvent::RunStart(run.program.clone())); | ||||
|         if run.sec_run_handles.is_empty() { | ||||
|             warn!(program_id = run.program.id, "program has no valid sections"); | ||||
|             run.state = RunState::Finished; | ||||
|             self.send_event(ProgramEvent::RunFinish(run.program.clone())); | ||||
|         } else { | ||||
|             debug!(program_id = run.program.id, "started running program"); | ||||
|         } | ||||
|         Ok(()) | ||||
|         debug!(program_id = run.program.id, "started running program"); | ||||
|     } | ||||
| 
 | ||||
|     async fn cancel_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { | ||||
|     fn cancel_program_run(&mut self, run: &mut ProgRun) { | ||||
|         for handle in run.sec_run_handles.drain(..) { | ||||
|             self.section_runner | ||||
|                 .cancel_run(handle) | ||||
|                 .await | ||||
|                 .wrap_err("failed to cancel section run")?; | ||||
|             self.section_runner.do_cancel_run(handle); | ||||
|         } | ||||
|         debug!(program_id = run.program.id, "program run is cancelled"); | ||||
|         self.send_event(ProgramEvent::RunCancel(run.program.clone())); | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn process_queue(&mut self, run_queue: &mut RunQueue) -> eyre::Result<()> { | ||||
|         while let Some(current_run) = run_queue.front_mut() { | ||||
|             let run_finished = match current_run.state { | ||||
|                 RunState::Waiting => { | ||||
|                     self.start_program_run(current_run) | ||||
|                         .await | ||||
|                         .wrap_err("failed to start program run")?; | ||||
|                     false | ||||
|                 } | ||||
|                 RunState::Running => false, | ||||
|                 RunState::Finished => true, | ||||
|                 RunState::Cancelled => { | ||||
|                     self.cancel_program_run(current_run) | ||||
|                         .await | ||||
|                         .wrap_err("failed to cancel program run")?; | ||||
|                     true | ||||
|                 } | ||||
|             }; | ||||
|             if run_finished { | ||||
|                 run_queue.pop_front(); | ||||
|             } else { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn update_programs(&mut self, new_programs: Programs) { | ||||
|         self.programs = new_programs; | ||||
|         self.scheduled_run_queue.clear(); | ||||
|     fn update_schedules(&mut self, ctx: &mut actix::Context<ProgramRunnerActor>) { | ||||
|         let mut scheduled_run_queue = DelayQueue::with_capacity(self.programs.len()); | ||||
|         for (_, prog) in &self.programs { | ||||
|             if !prog.enabled { | ||||
|                 continue; | ||||
| @ -202,59 +146,217 @@ impl RunnerTask { | ||||
|             }; | ||||
|             let delay = (next_run - ref_time).to_std().unwrap(); | ||||
|             trace!("will run program in {:?}", delay); | ||||
|             self.scheduled_run_queue.insert(prog.clone(), delay); | ||||
|             scheduled_run_queue.insert(prog.clone(), delay); | ||||
|         } | ||||
|         let fut = actix::fut::wrap_stream(scheduled_run_queue) | ||||
|             .map(|item, act: &mut ProgramRunnerActor, ctx| act.handle_scheduled_run(item, ctx)) | ||||
|             .finish(); | ||||
|         let handle = ctx.spawn(fut); | ||||
|         if let Some(old_handle) = self.schedule_run_fut.replace(handle) { | ||||
|             ctx.cancel_future(old_handle); | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|     fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut RunQueue) { | ||||
|         let msg = msg.expect("ProgramRunner channel closed"); | ||||
|         use RunnerMsg::*; | ||||
|         trace!(msg = debug(&msg), "runner_task recv"); | ||||
|         match msg { | ||||
|             Quit(quit_tx) => { | ||||
|                 self.running = false; | ||||
|                 self.quit_tx = Some(quit_tx); | ||||
|             } | ||||
|             Subscribe(res_send) => { | ||||
|                 let event_recv = self.subscribe_event(); | ||||
|                 // Ignore error if channel closed
 | ||||
|                 let _ = res_send.send(event_recv); | ||||
|             } | ||||
|             UpdateSections(new_sections) => { | ||||
|                 self.sections = new_sections; | ||||
|             } | ||||
|             UpdatePrograms(new_programs) => { | ||||
|                 self.update_programs(new_programs); | ||||
|             } | ||||
|             RunProgramId(program_id) => { | ||||
|                 let program = match self.programs.get(&program_id) { | ||||
|                     Some(program) => program.clone(), | ||||
|                     None => { | ||||
|                         warn!(program_id, "trying to run non-existant program"); | ||||
|                         return; | ||||
|                     } | ||||
|                 }; | ||||
|                 run_queue.push_back(ProgRun::new(program)); | ||||
|             } | ||||
|             RunProgram(program) => { | ||||
|                 run_queue.push_back(ProgRun::new(program)); | ||||
|             } | ||||
|             RunnerMsg::CancelProgram(program_id) => { | ||||
|                 for run in run_queue { | ||||
|                     if run.program.id == program_id { | ||||
|                         run.state = RunState::Cancelled; | ||||
| struct ProgramRunnerActor { | ||||
|     inner: ProgramRunnerInner, | ||||
|     run_queue: RunQueue, | ||||
| } | ||||
| 
 | ||||
| impl Actor for ProgramRunnerActor { | ||||
|     type Context = actix::Context<Self>; | ||||
| 
 | ||||
|     fn started(&mut self, ctx: &mut Self::Context) { | ||||
|         trace!("subscribing to SectionRunner events"); | ||||
|         let subscribe_fut = self.inner.section_runner.subscribe().into_actor(self).map( | ||||
|             |section_events: Result<SectionEventRecv, SectionRunnerError>, | ||||
|              _act: &mut ProgramRunnerActor, | ||||
|              ctx: &mut Self::Context| { | ||||
|                 match section_events { | ||||
|                     Ok(section_events) => { | ||||
|                         ctx.add_stream(section_events.into_stream()); | ||||
|                     } | ||||
|                     Err(err) => warn!("failed to subscribe to SectionRunner events: {}", err), | ||||
|                 } | ||||
|             }, | ||||
|         ); | ||||
|         ctx.wait(subscribe_fut); | ||||
|         trace!("program_runner starting"); | ||||
|     } | ||||
| 
 | ||||
|     fn stopped(&mut self, _ctx: &mut Self::Context) { | ||||
|         trace!("program_runner stopped"); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl StreamHandler<Result<SectionEvent, broadcast::RecvError>> for ProgramRunnerActor { | ||||
|     fn handle( | ||||
|         &mut self, | ||||
|         item: Result<SectionEvent, broadcast::RecvError>, | ||||
|         _ctx: &mut Self::Context, | ||||
|     ) { | ||||
|         let sec_event = match item { | ||||
|             Ok(e) => e, | ||||
|             Err(err) => { | ||||
|                 warn!("failed to receive section event: {}", err); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
|         #[allow(clippy::single_match)] | ||||
|         match sec_event { | ||||
|             SectionEvent::RunFinish(finished_run, _) => { | ||||
|                 self.handle_finished_run(finished_run); | ||||
|             } | ||||
|             _ => {} | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct Quit; | ||||
| 
 | ||||
| impl Handler<Quit> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result { | ||||
|         ctx.stop(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "ProgramEventRecv")] | ||||
| struct Subscribe; | ||||
| 
 | ||||
| impl Handler<Subscribe> for ProgramRunnerActor { | ||||
|     type Result = MessageResult<Subscribe>; | ||||
|     fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result { | ||||
|         let event_recv = self.inner.subscribe_event(); | ||||
|         MessageResult(event_recv) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct UpdateSections(Sections); | ||||
| 
 | ||||
| impl Handler<UpdateSections> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, msg: UpdateSections, _ctx: &mut Self::Context) -> Self::Result { | ||||
|         trace!("updating sections"); | ||||
|         let UpdateSections(new_sections) = msg; | ||||
|         self.inner.sections = new_sections; | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct UpdatePrograms(Programs); | ||||
| 
 | ||||
| impl Handler<UpdatePrograms> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, msg: UpdatePrograms, ctx: &mut Self::Context) -> Self::Result { | ||||
|         trace!("updating programs"); | ||||
|         let UpdatePrograms(new_programs) = msg; | ||||
|         self.inner.programs = new_programs; | ||||
|         self.inner.update_schedules(ctx); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct RunProgramId(ProgramId); | ||||
| 
 | ||||
| impl Handler<RunProgramId> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, msg: RunProgramId, ctx: &mut Self::Context) -> Self::Result { | ||||
|         let RunProgramId(program_id) = msg; | ||||
|         let program = match self.inner.programs.get(&program_id) { | ||||
|             Some(program) => program.clone(), | ||||
|             None => { | ||||
|                 warn!(program_id, "trying to run non-existant program"); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
|         self.run_queue.push_back(ProgRun::new(program)); | ||||
|         ctx.notify(Process); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct RunProgram(ProgramRef); | ||||
| 
 | ||||
| impl Handler<RunProgram> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, msg: RunProgram, ctx: &mut Self::Context) -> Self::Result { | ||||
|         let RunProgram(program) = msg; | ||||
|         self.run_queue.push_back(ProgRun::new(program)); | ||||
|         ctx.notify(Process); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct CancelProgram(ProgramId); | ||||
| 
 | ||||
| impl Handler<CancelProgram> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, msg: CancelProgram, ctx: &mut Self::Context) -> Self::Result { | ||||
|         let CancelProgram(program_id) = msg; | ||||
|         for run in self.run_queue.iter_mut() { | ||||
|             if run.program.id == program_id { | ||||
|                 run.state = RunState::Cancelled; | ||||
|             } | ||||
|         } | ||||
|         ctx.notify(Process); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Message)] | ||||
| #[rtype(result = "()")] | ||||
| struct Process; | ||||
| 
 | ||||
| impl Handler<Process> for ProgramRunnerActor { | ||||
|     type Result = (); | ||||
|     fn handle(&mut self, _msg: Process, _ctx: &mut Self::Context) -> Self::Result { | ||||
|         while let Some(current_run) = self.run_queue.front_mut() { | ||||
|             let run_finished = match current_run.state { | ||||
|                 RunState::Waiting => { | ||||
|                     self.inner.start_program_run(current_run); | ||||
|                     false | ||||
|                 } | ||||
|                 RunState::Running => false, | ||||
|                 RunState::Finished => true, | ||||
|                 RunState::Cancelled => { | ||||
|                     self.inner.cancel_program_run(current_run); | ||||
|                     true | ||||
|                 } | ||||
|             }; | ||||
|             if run_finished { | ||||
|                 self.run_queue.pop_front(); | ||||
|             } else { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|     fn handle_finished_run( | ||||
|         &mut self, | ||||
|         finished_run: SectionRunHandle, | ||||
|         run_queue: &mut RunQueue, | ||||
|     ) -> Option<()> { | ||||
|         let current_run = run_queue.front_mut()?; | ||||
| impl ProgramRunnerActor { | ||||
|     fn new(section_runner: SectionRunner) -> Self { | ||||
|         Self { | ||||
|             inner: ProgramRunnerInner { | ||||
|                 section_runner, | ||||
|                 sections: Sections::new(), | ||||
|                 programs: Programs::new(), | ||||
|                 event_send: None, | ||||
|                 schedule_run_fut: None, | ||||
|             }, | ||||
|             run_queue: RunQueue::new(), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn handle_finished_run(&mut self, finished_run: SectionRunHandle) -> Option<()> { | ||||
|         let current_run = self.run_queue.front_mut()?; | ||||
|         let last_run_handle = current_run.sec_run_handles.last()?; | ||||
|         if finished_run == *last_run_handle { | ||||
|             current_run.state = RunState::Finished; | ||||
| @ -262,73 +364,27 @@ impl RunnerTask { | ||||
|                 program_id = current_run.program.id, | ||||
|                 "finished running program" | ||||
|             ); | ||||
|             self.send_event(ProgramEvent::RunFinish(current_run.program.clone())); | ||||
|             self.inner | ||||
|                 .send_event(ProgramEvent::RunFinish(current_run.program.clone())); | ||||
|         } | ||||
|         Some(()) | ||||
|     } | ||||
| 
 | ||||
|     fn handle_sec_event( | ||||
|         &mut self, | ||||
|         sec_event: Result<SectionEvent, broadcast::RecvError>, | ||||
|         run_queue: &mut RunQueue, | ||||
|     ) -> eyre::Result<()> { | ||||
|         let sec_event = sec_event.wrap_err("failed to receive section event")?; | ||||
|         #[allow(clippy::single_match)] | ||||
|         match sec_event { | ||||
|             SectionEvent::RunFinish(finished_run, _) => { | ||||
|                 self.handle_finished_run(finished_run, run_queue); | ||||
|             } | ||||
|             _ => {} | ||||
|         } | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn handle_scheduled_run( | ||||
|     fn handle_scheduled_run( | ||||
|         &mut self, | ||||
|         item: Result<delay_queue::Expired<ProgramRef>, tokio::time::Error>, | ||||
|         run_queue: &mut RunQueue, | ||||
|     ) -> eyre::Result<()> { | ||||
|         let item = item.wrap_err("tokio time error")?; | ||||
|         run_queue.push_back(ProgRun::new(item.into_inner())); | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn run_impl(&mut self) -> eyre::Result<()> { | ||||
|         let mut sec_events = self | ||||
|             .section_runner | ||||
|             .subscribe() | ||||
|             .await | ||||
|             .wrap_err("could not subscribe to SectionRunner events")?; | ||||
| 
 | ||||
|         let mut run_queue: RunQueue = VecDeque::new(); | ||||
| 
 | ||||
|         while self.running { | ||||
|             self.process_queue(&mut run_queue) | ||||
|                 .await | ||||
|                 .wrap_err("error during queue processing")?; | ||||
|             tokio::select! { | ||||
|                 msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), | ||||
|                 sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, | ||||
|                 Some(scheduled_run) = self.scheduled_run_queue.next() => { | ||||
|                     self.handle_scheduled_run(scheduled_run, &mut run_queue).await?; | ||||
|                 }, | ||||
|             }; | ||||
|         } | ||||
| 
 | ||||
|         if let Some(quit_tx) = self.quit_tx.take() { | ||||
|             let _ = quit_tx.send(()); | ||||
|         } | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn run(mut self) { | ||||
|         let span = trace_span!("program_runner task"); | ||||
| 
 | ||||
|         self.run_impl() | ||||
|             .instrument(span) | ||||
|             .await | ||||
|             .expect("error in ProgramRunner task"); | ||||
|         ctx: &mut <ProgramRunnerActor as Actor>::Context, | ||||
|     ) { | ||||
|         let program = match item { | ||||
|             Ok(expired) => expired.into_inner(), | ||||
|             Err(err) => { | ||||
|                 error!("tokio time error: {}", err); | ||||
|                 return; | ||||
|             } | ||||
|         }; | ||||
|         trace!(program_id = program.id, "schedule expired"); | ||||
|         self.run_queue.push_back(ProgRun::new(program)); | ||||
|         ctx.notify(Process); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -350,65 +406,67 @@ impl From<oneshot::error::RecvError> for ChannelClosed { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| impl From<actix::MailboxError> for ChannelClosed { | ||||
|     fn from(_: actix::MailboxError) -> Self { | ||||
|         // TODO:
 | ||||
|         Self | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone)] | ||||
| pub struct ProgramRunner { | ||||
|     msg_send: mpsc::Sender<RunnerMsg>, | ||||
|     addr: Addr<ProgramRunnerActor>, | ||||
| } | ||||
| 
 | ||||
| #[allow(dead_code)] | ||||
| impl ProgramRunner { | ||||
|     pub fn new(section_runner: SectionRunner) -> Self { | ||||
|         let (msg_send, msg_recv) = mpsc::channel(8); | ||||
|         spawn(RunnerTask::new(section_runner, msg_recv).run()); | ||||
|         Self { msg_send } | ||||
|         let addr = ProgramRunnerActor::new(section_runner).start(); | ||||
|         Self { addr } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn quit(&mut self) -> Result<()> { | ||||
|         let (quit_tx, quit_rx) = oneshot::channel(); | ||||
|         self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?; | ||||
|         quit_rx.await?; | ||||
|         self.addr.send(Quit).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { | ||||
|         self.msg_send | ||||
|             .send(RunnerMsg::UpdateSections(new_sections)) | ||||
|         self.addr | ||||
|             .send(UpdateSections(new_sections)) | ||||
|             .await | ||||
|             .map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { | ||||
|         self.msg_send | ||||
|             .send(RunnerMsg::UpdatePrograms(new_programs)) | ||||
|         self.addr | ||||
|             .send(UpdatePrograms(new_programs)) | ||||
|             .await | ||||
|             .map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> { | ||||
|         self.msg_send | ||||
|             .send(RunnerMsg::RunProgramId(program_id)) | ||||
|         self.addr | ||||
|             .send(RunProgramId(program_id)) | ||||
|             .await | ||||
|             .map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { | ||||
|         self.msg_send | ||||
|             .send(RunnerMsg::RunProgram(program)) | ||||
|         self.addr | ||||
|             .send(RunProgram(program)) | ||||
|             .await | ||||
|             .map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { | ||||
|         self.msg_send | ||||
|             .send(RunnerMsg::CancelProgram(program_id)) | ||||
|         self.addr | ||||
|             .send(CancelProgram(program_id)) | ||||
|             .await | ||||
|             .map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn subscribe(&mut self) -> Result<ProgramEventRecv> { | ||||
|         let (res_send, res_recv) = oneshot::channel(); | ||||
|         self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; | ||||
|         let event_recv = res_recv.await?; | ||||
|         let event_recv = self.addr.send(Subscribe).await?; | ||||
|         Ok(event_recv) | ||||
|     } | ||||
| } | ||||
| @ -420,7 +478,7 @@ mod test { | ||||
|     use crate::{ | ||||
|         model::{Program, ProgramItem, Section}, | ||||
|         schedule::{every_day, DateTimeBound, Schedule}, | ||||
|         trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, | ||||
|         trace_listeners::{EventListener, Filters}, | ||||
|     }; | ||||
|     use assert_matches::assert_matches; | ||||
|     use im::ordmap; | ||||
| @ -433,17 +491,9 @@ mod test { | ||||
|         let quit_msg = EventListener::new( | ||||
|             Filters::new() | ||||
|                 .target("sprinklers_rs::program_runner") | ||||
|                 .message("runner_task recv") | ||||
|                 .field_value("msg", "Quit"), | ||||
|                 .message("program_runner stopped"), | ||||
|         ); | ||||
|         let task_span = SpanListener::new( | ||||
|             SpanFilters::new() | ||||
|                 .target("sprinklers_rs::program_runner") | ||||
|                 .name("program_runner task"), | ||||
|         ); | ||||
|         let subscriber = tracing_subscriber::registry() | ||||
|             .with(quit_msg.clone()) | ||||
|             .with(task_span.clone()); | ||||
|         let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); | ||||
|         let _sub = tracing::subscriber::set_default(subscriber); | ||||
| 
 | ||||
|         let interface = MockSectionInterface::new(6); | ||||
| @ -455,7 +505,6 @@ mod test { | ||||
|         yield_now().await; | ||||
| 
 | ||||
|         assert_eq!(quit_msg.get_count(), 1); | ||||
|         assert!(task_span.get_exit_count() > 1); | ||||
|     } | ||||
| 
 | ||||
|     fn make_sections_and_runner() -> (Sections, SectionRunner, Arc<MockSectionInterface>) { | ||||
| @ -530,24 +579,12 @@ mod test { | ||||
|         assert_eq!(interface.get_section_state(0), true); | ||||
| 
 | ||||
|         tokio::time::pause(); | ||||
|         assert_matches!( | ||||
|             sec_events.recv().await, | ||||
|             Ok(SectionEvent::RunFinish(_, _)) | ||||
|         ); | ||||
|         assert_matches!( | ||||
|             sec_events.recv().await, | ||||
|             Ok(SectionEvent::RunStart(_, _)) | ||||
|         ); | ||||
|         assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); | ||||
|         assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunStart(_, _))); | ||||
|         assert_eq!(interface.get_section_state(0), false); | ||||
|         assert_eq!(interface.get_section_state(1), true); | ||||
|         assert_matches!( | ||||
|             sec_events.recv().await, | ||||
|             Ok(SectionEvent::RunFinish(_, _)) | ||||
|         ); | ||||
|         assert_matches!( | ||||
|             prog_events.recv().await, | ||||
|             Ok(ProgramEvent::RunFinish(_)) | ||||
|         ); | ||||
|         assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); | ||||
|         assert_matches!(prog_events.recv().await, Ok(ProgramEvent::RunFinish(_))); | ||||
| 
 | ||||
|         runner.quit().await.unwrap(); | ||||
|         sec_runner.quit().await.unwrap(); | ||||
| @ -581,12 +618,12 @@ mod test { | ||||
|         // Should immediately start and finish running program
 | ||||
|         // due to nonexistant section
 | ||||
|         assert_matches!( | ||||
|             prog_events.try_recv(), | ||||
|             prog_events.recv().await, | ||||
|             Ok(ProgramEvent::RunStart(prog)) | ||||
|             if prog.id == 1 | ||||
|         ); | ||||
|         assert_matches!( | ||||
|             prog_events.try_recv(), | ||||
|             prog_events.recv().await, | ||||
|             Ok(ProgramEvent::RunFinish(prog)) | ||||
|             if prog.id == 1 | ||||
|         ); | ||||
| @ -734,10 +771,7 @@ mod test { | ||||
|             Ok(ProgramEvent::RunCancel(prog)) | ||||
|             if prog.id == 1 | ||||
|         ); | ||||
|         assert_matches!( | ||||
|             sec_events.recv().await, | ||||
|             Ok(SectionEvent::RunCancel(_, _)) | ||||
|         ); | ||||
|         assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunCancel(_, _))); | ||||
| 
 | ||||
|         runner.quit().await.unwrap(); | ||||
|         sec_runner.quit().await.unwrap(); | ||||
| @ -745,7 +779,6 @@ mod test { | ||||
| 
 | ||||
|     #[actix_rt::test] | ||||
|     async fn test_scheduled_run() { | ||||
|         tracing_subscriber::fmt().init(); | ||||
|         let (sections, mut sec_runner, _) = make_sections_and_runner(); | ||||
|         let mut runner = ProgramRunner::new(sec_runner.clone()); | ||||
|         let mut prog_events = runner.subscribe().await.unwrap(); | ||||
|  | ||||
| @ -3,7 +3,16 @@ use crate::section_interface::SectionInterface; | ||||
| use actix::{ | ||||
|     Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle, | ||||
| }; | ||||
| use std::{mem::swap, sync::Arc, time::Duration}; | ||||
| use futures_util::TryFutureExt; | ||||
| use std::{ | ||||
|     future::Future, | ||||
|     mem::swap, | ||||
|     sync::{ | ||||
|         atomic::{AtomicI32, Ordering}, | ||||
|         Arc, | ||||
|     }, | ||||
|     time::Duration, | ||||
| }; | ||||
| use thiserror::Error; | ||||
| use tokio::{ | ||||
|     sync::{broadcast, watch}, | ||||
| @ -266,7 +275,6 @@ impl SectionRunnerInner { | ||||
| 
 | ||||
| struct SectionRunnerActor { | ||||
|     state: SecRunnerState, | ||||
|     next_run_id: i32, | ||||
|     inner: SectionRunnerInner, | ||||
| } | ||||
| 
 | ||||
| @ -295,26 +303,22 @@ impl Handler<Quit> for SectionRunnerActor { | ||||
| } | ||||
| 
 | ||||
| #[derive(Message, Debug, Clone)] | ||||
| #[rtype(result = "SectionRunHandle")] | ||||
| struct QueueRun(SectionRef, Duration); | ||||
| #[rtype(result = "()")] | ||||
| struct QueueRun(SectionRunHandle, SectionRef, Duration); | ||||
| 
 | ||||
| impl Handler<QueueRun> for SectionRunnerActor { | ||||
|     type Result = MessageResult<QueueRun>; | ||||
|     type Result = (); | ||||
| 
 | ||||
|     fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result { | ||||
|         let QueueRun(section, duration) = msg; | ||||
|         let QueueRun(handle, section, duration) = msg; | ||||
| 
 | ||||
|         let run_id = self.next_run_id; | ||||
|         self.next_run_id += 1; | ||||
|         let handle = SectionRunHandle(run_id); | ||||
| 
 | ||||
|         let run: Arc<SecRun> = SecRun::new(handle.clone(), section, duration).into(); | ||||
|         let run: Arc<SecRun> = SecRun::new(handle, section, duration).into(); | ||||
|         self.state.run_queue.push_back(run); | ||||
|         self.inner.did_change = true; | ||||
| 
 | ||||
|         ctx.notify(Process); | ||||
| 
 | ||||
|         MessageResult(handle) | ||||
|         () | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -418,7 +422,6 @@ impl SectionRunnerActor { | ||||
|                 delay_future: None, | ||||
|                 did_change: false, | ||||
|             }, | ||||
|             next_run_id: 1, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
| @ -486,6 +489,7 @@ pub type Result<T, E = Error> = std::result::Result<T, E>; | ||||
| pub struct SectionRunner { | ||||
|     state_recv: SecRunnerStateRecv, | ||||
|     addr: Addr<SectionRunnerActor>, | ||||
|     next_run_id: Arc<AtomicI32>, | ||||
| } | ||||
| 
 | ||||
| #[allow(dead_code)] | ||||
| @ -493,46 +497,71 @@ impl SectionRunner { | ||||
|     pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { | ||||
|         let (state_send, state_recv) = watch::channel(SecRunnerState::default()); | ||||
|         let addr = SectionRunnerActor::new(interface, state_send).start(); | ||||
|         Self { state_recv, addr } | ||||
|         Self { | ||||
|             state_recv, | ||||
|             addr, | ||||
|             next_run_id: Arc::new(AtomicI32::new(1)), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn quit(&mut self) -> Result<()> { | ||||
|         self.addr.send(Quit).await?; | ||||
|         Ok(()) | ||||
|     pub fn quit(&mut self) -> impl Future<Output = Result<()>> { | ||||
|         self.addr.send(Quit).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn queue_run( | ||||
|     fn queue_run_inner( | ||||
|         &mut self, | ||||
|         section: SectionRef, | ||||
|         duration: Duration, | ||||
|     ) -> Result<SectionRunHandle> { | ||||
|         let handle = self.addr.send(QueueRun(section, duration)).await?; | ||||
|         Ok(handle) | ||||
|     ) -> (QueueRun, SectionRunHandle) { | ||||
|         let run_id = self.next_run_id.fetch_add(1, Ordering::SeqCst); | ||||
|         let handle = SectionRunHandle(run_id); | ||||
|         (QueueRun(handle.clone(), section, duration), handle) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> { | ||||
|         self.addr.send(CancelRun(handle)).await?; | ||||
|         Ok(()) | ||||
|     pub fn do_queue_run( | ||||
|         &mut self, | ||||
|         section: SectionRef, | ||||
|         duration: Duration, | ||||
|     ) -> SectionRunHandle { | ||||
|         let (queue_run, handle) = self.queue_run_inner(section, duration); | ||||
|         self.addr.do_send(queue_run); | ||||
|         handle | ||||
|     } | ||||
| 
 | ||||
|     pub async fn cancel_all(&mut self) -> Result<()> { | ||||
|         self.addr.send(CancelAll).await?; | ||||
|         Ok(()) | ||||
|     pub fn queue_run( | ||||
|         &mut self, | ||||
|         section: SectionRef, | ||||
|         duration: Duration, | ||||
|     ) -> impl Future<Output = Result<SectionRunHandle>> { | ||||
|         let (queue_run, handle) = self.queue_run_inner(section, duration); | ||||
|         self.addr | ||||
|             .send(queue_run) | ||||
|             .map_err(From::from) | ||||
|             .map_ok(move |_| handle) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn pause(&mut self) -> Result<()> { | ||||
|         self.addr.send(SetPaused(true)).await?; | ||||
|         Ok(()) | ||||
|     pub fn do_cancel_run(&mut self, handle: SectionRunHandle) { | ||||
|         self.addr.do_send(CancelRun(handle)) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn unpause(&mut self) -> Result<()> { | ||||
|         self.addr.send(SetPaused(false)).await?; | ||||
|         Ok(()) | ||||
|     pub fn cancel_run(&mut self, handle: SectionRunHandle) -> impl Future<Output = Result<()>> { | ||||
|         self.addr.send(CancelRun(handle)).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn subscribe(&mut self) -> Result<SectionEventRecv> { | ||||
|         let event_recv = self.addr.send(Subscribe).await?; | ||||
|         Ok(event_recv) | ||||
|     pub fn cancel_all(&mut self) -> impl Future<Output = Result<()>> { | ||||
|         self.addr.send(CancelAll).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub fn pause(&mut self) -> impl Future<Output = Result<()>> { | ||||
|         self.addr.send(SetPaused(true)).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub fn unpause(&mut self) -> impl Future<Output = Result<()>> { | ||||
|         self.addr.send(SetPaused(false)).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub fn subscribe(&mut self) -> impl Future<Output = Result<SectionEventRecv>> { | ||||
|         self.addr.send(Subscribe).map_err(From::from) | ||||
|     } | ||||
| 
 | ||||
|     pub fn state_receiver(&self) -> SecRunnerStateRecv { | ||||
| @ -559,8 +588,7 @@ mod test { | ||||
|                 .target("sprinklers_rs::section_runner") | ||||
|                 .message("section_runner stopped"), | ||||
|         ); | ||||
|         let subscriber = tracing_subscriber::registry() | ||||
|             .with(quit_msg.clone()); | ||||
|         let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); | ||||
|         let _sub = tracing::subscriber::set_default(subscriber); | ||||
| 
 | ||||
|         let interface = MockSectionInterface::new(6); | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user