Merge pull request 'Refactor SectionRunner runner_task' (#6) from refactor-section-runner into master
	
		
			
	
		
	
	
		
	
		
			All checks were successful
		
		
	
	
		
			
				
	
				continuous-integration/drone/push Build is passing
				
			
		
		
	
	
				
					
				
			
		
			All checks were successful
		
		
	
	continuous-integration/drone/push Build is passing
				
			Reviewed-on: #6
This commit is contained in:
		
						commit
						8d719032e1
					
				| @ -79,15 +79,6 @@ impl SecRun { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn start(&mut self, interface: &dyn SectionInterface) { | ||||
|         use RunState::*; | ||||
|         debug!(section_id = self.section.id, "starting running section"); | ||||
|         interface.set_section_state(self.section.interface_id, true); | ||||
|         self.state = Running { | ||||
|             start_time: Instant::now(), | ||||
|         }; | ||||
|     } | ||||
| 
 | ||||
|     fn is_running(&self) -> bool { | ||||
|         matches!(self.state, RunState::Running{..}) | ||||
|     } | ||||
| @ -96,43 +87,79 @@ impl SecRun { | ||||
|     fn is_paused(&self) -> bool { | ||||
|         matches!(self.state, RunState::Paused{..}) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|     fn finish(&mut self, interface: &dyn SectionInterface) { | ||||
|         if self.is_running() { | ||||
|             debug!(section_id = self.section.id, "finished running section"); | ||||
|             interface.set_section_state(self.section.interface_id, false); | ||||
|             self.state = RunState::Finished; | ||||
| struct RunnerTask { | ||||
|     interface: Arc<dyn SectionInterface + Sync>, | ||||
|     msg_recv: mpsc::Receiver<RunnerMsg>, | ||||
|     running: bool, | ||||
|     delay_future: OptionFuture<tokio::time::Delay>, | ||||
|     paused: bool, | ||||
| } | ||||
| 
 | ||||
| impl RunnerTask { | ||||
|     fn new( | ||||
|         interface: Arc<dyn SectionInterface + Sync>, | ||||
|         msg_recv: mpsc::Receiver<RunnerMsg>, | ||||
|     ) -> Self { | ||||
|         Self { | ||||
|             interface, | ||||
|             msg_recv, | ||||
|             running: true, | ||||
|             delay_future: None.into(), | ||||
|             paused: false, | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn start_run(&mut self, run: &mut SecRun) { | ||||
|         use RunState::*; | ||||
|         debug!(section_id = run.section.id, "starting running section"); | ||||
|         self.interface | ||||
|             .set_section_state(run.section.interface_id, true); | ||||
|         run.state = Running { | ||||
|             start_time: Instant::now(), | ||||
|         }; | ||||
|     } | ||||
| 
 | ||||
|     fn finish_run(&mut self, run: &mut SecRun) { | ||||
|         if run.is_running() { | ||||
|             debug!(section_id = run.section.id, "finished running section"); | ||||
|             self.interface | ||||
|                 .set_section_state(run.section.interface_id, false); | ||||
|             run.state = RunState::Finished; | ||||
|         } else { | ||||
|             warn!( | ||||
|                 section_id = self.section.id, | ||||
|                 state = debug(&self.state), | ||||
|                 section_id = run.section.id, | ||||
|                 state = debug(&run.state), | ||||
|                 "cannot finish run which is not running" | ||||
|             ); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn cancel(&mut self, interface: &dyn SectionInterface) { | ||||
|         if self.is_running() { | ||||
|             debug!(section_id = self.section.id, "cancelling running section"); | ||||
|             interface.set_section_state(self.section.interface_id, false); | ||||
|     fn cancel_run(&mut self, run: &mut SecRun) { | ||||
|         if run.is_running() { | ||||
|             debug!(section_id = run.section.id, "cancelling running section"); | ||||
|             self.interface | ||||
|                 .set_section_state(run.section.interface_id, false); | ||||
|         } | ||||
|         self.state = RunState::Cancelled; | ||||
|         run.state = RunState::Cancelled; | ||||
|     } | ||||
| 
 | ||||
|     fn pause(&mut self, interface: &dyn SectionInterface) { | ||||
|     fn pause_run(&mut self, run: &mut SecRun) { | ||||
|         use RunState::*; | ||||
|         match self.state { | ||||
|         match run.state { | ||||
|             Running { start_time } => { | ||||
|                 debug!(section_id = self.section.id, "pausing running section"); | ||||
|                 interface.set_section_state(self.section.interface_id, false); | ||||
|                 self.state = Paused { | ||||
|                 debug!(section_id = run.section.id, "pausing running section"); | ||||
|                 self.interface | ||||
|                     .set_section_state(run.section.interface_id, false); | ||||
|                 run.state = Paused { | ||||
|                     start_time, | ||||
|                     pause_time: Instant::now(), | ||||
|                 }; | ||||
|             } | ||||
|             Waiting => { | ||||
|                 debug!(section_id = self.section.id, "pausing waiting section"); | ||||
|                 self.state = Paused { | ||||
|                 debug!(section_id = run.section.id, "pausing waiting section"); | ||||
|                 run.state = Paused { | ||||
|                     start_time: Instant::now(), | ||||
|                     pause_time: Instant::now(), | ||||
|                 }; | ||||
| @ -141,127 +168,130 @@ impl SecRun { | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn unpause(&mut self, interface: &dyn SectionInterface) { | ||||
|     fn unpause_run(&mut self, run: &mut SecRun) { | ||||
|         use RunState::*; | ||||
|         match self.state { | ||||
|         match run.state { | ||||
|             Paused { | ||||
|                 start_time, | ||||
|                 pause_time, | ||||
|             } => { | ||||
|                 debug!(section_id = self.section.id, "unpausing section"); | ||||
|                 interface.set_section_state(self.section.interface_id, true); | ||||
|                 self.state = Running { | ||||
|                 debug!(section_id = run.section.id, "unpausing section"); | ||||
|                 self.interface | ||||
|                     .set_section_state(run.section.interface_id, true); | ||||
|                 run.state = Running { | ||||
|                     start_time: Instant::now(), | ||||
|                 }; | ||||
|                 let ran_for = pause_time - start_time; | ||||
|                 self.duration -= ran_for; | ||||
|                 run.duration -= ran_for; | ||||
|             } | ||||
|             Waiting | Finished | Cancelled | Running { .. } => { | ||||
|                 warn!( | ||||
|                     section_id = self.section.id, | ||||
|                     state = debug(&self.state), | ||||
|                     section_id = run.section.id, | ||||
|                     state = debug(&run.state), | ||||
|                     "can only unpause paused section" | ||||
|                 ); | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| async fn runner_task( | ||||
|     interface: Arc<dyn SectionInterface + Sync>, | ||||
|     mut msg_recv: mpsc::Receiver<RunnerMsg>, | ||||
| ) { | ||||
|     use RunState::*; | ||||
| 
 | ||||
|     let span = trace_span!("runner_task"); | ||||
|     let _enter = span.enter(); | ||||
| 
 | ||||
|     let mut running = true; | ||||
|     let mut run_queue: VecDeque<SecRun> = VecDeque::new(); | ||||
|     let mut delay_future: OptionFuture<_> = None.into(); | ||||
|     let mut paused = false; | ||||
|     while running { | ||||
|         if let Some(current_run) = run_queue.front_mut() { | ||||
|             let done = match current_run.state { | ||||
|                 Waiting => { | ||||
|                     if paused { | ||||
|                         current_run.pause(&*interface); | ||||
|                     } else { | ||||
|                         current_run.start(&*interface); | ||||
|                         delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     } | ||||
|     fn process_queue(&mut self, run_queue: &mut VecDeque<SecRun>) { | ||||
|         use RunState::*; | ||||
|         loop { | ||||
|             let current_run = match run_queue.front_mut() { | ||||
|                 Some(current_run) => current_run, | ||||
|                 None => break, | ||||
|             }; | ||||
|             let run_finished = match (¤t_run.state, self.paused) { | ||||
|                 (Waiting, false) => { | ||||
|                     self.start_run(current_run); | ||||
|                     self.delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     false | ||||
|                 } | ||||
|                 Running { start_time } => { | ||||
|                     if paused { | ||||
|                         current_run.pause(&*interface); | ||||
|                         delay_future = None.into(); | ||||
|                         false | ||||
|                     } else if start_time.elapsed() >= current_run.duration { | ||||
|                         current_run.finish(&*interface); | ||||
|                         delay_future = None.into(); | ||||
|                         true | ||||
|                     } else { | ||||
|                         false | ||||
|                 (Running { start_time }, false) => { | ||||
|                     let time_to_finish = start_time.elapsed() >= current_run.duration; | ||||
|                     if time_to_finish { | ||||
|                         self.finish_run(current_run); | ||||
|                         self.delay_future = None.into(); | ||||
|                     } | ||||
|                     time_to_finish | ||||
|                 } | ||||
|                 Paused { .. } => { | ||||
|                     if !paused { | ||||
|                         current_run.unpause(&*interface); | ||||
|                         delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     } | ||||
|                 (Paused { .. }, false) => { | ||||
|                     self.unpause_run(current_run); | ||||
|                     self.delay_future = Some(delay_for(current_run.duration)).into(); | ||||
|                     false | ||||
|                 } | ||||
|                 Cancelled | Finished => true, | ||||
|                 (Waiting, true) => { | ||||
|                     self.pause_run(current_run); | ||||
|                     false | ||||
|                 } | ||||
|                 (Running { .. }, true) => { | ||||
|                     self.pause_run(current_run); | ||||
|                     self.delay_future = None.into(); | ||||
|                     false | ||||
|                 } | ||||
|                 (Paused { .. }, true) => false, | ||||
|                 (Cancelled, _) | (Finished, _) => true, | ||||
|             }; | ||||
| 
 | ||||
|             if done { | ||||
|             if run_finished { | ||||
|                 run_queue.pop_front(); | ||||
|                 continue; | ||||
|             } else { | ||||
|                 break; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|         let mut handle_msg = |msg: Option<RunnerMsg>| { | ||||
|             let msg = msg.expect("SectionRunner channel closed"); | ||||
|             use RunnerMsg::*; | ||||
|             trace!(msg = debug(&msg), "runner_task recv"); | ||||
|             match msg { | ||||
|                 Quit => running = false, | ||||
|                 QueueRun(handle, section, duration) => { | ||||
|                     run_queue.push_back(SecRun::new(handle, section, duration)); | ||||
|                 } | ||||
|                 CancelRun(handle) => { | ||||
|                     for run in &mut run_queue { | ||||
|                         if run.handle != handle { | ||||
|                             continue; | ||||
|                         } | ||||
|                         trace!(handle = handle.0, "cancelling run by handle"); | ||||
|                         run.cancel(&*interface); | ||||
|     fn handle_msg(&mut self, msg: Option<RunnerMsg>, run_queue: &mut VecDeque<SecRun>) { | ||||
|         let msg = msg.expect("SectionRunner channel closed"); | ||||
|         use RunnerMsg::*; | ||||
|         trace!(msg = debug(&msg), "runner_task recv"); | ||||
|         match msg { | ||||
|             Quit => self.running = false, | ||||
|             QueueRun(handle, section, duration) => { | ||||
|                 run_queue.push_back(SecRun::new(handle, section, duration)); | ||||
|             } | ||||
|             CancelRun(handle) => { | ||||
|                 for run in run_queue { | ||||
|                     if run.handle != handle { | ||||
|                         continue; | ||||
|                     } | ||||
|                 } | ||||
|                 CancelAll => { | ||||
|                     let mut old_runs = VecDeque::new(); | ||||
|                     swap(&mut old_runs, &mut run_queue); | ||||
|                     trace!(count = old_runs.len(), "cancelling all runs"); | ||||
|                     for mut run in old_runs { | ||||
|                         run.cancel(&*interface); | ||||
|                     } | ||||
|                 } | ||||
|                 Pause => { | ||||
|                     paused = true; | ||||
|                 } | ||||
|                 Unpause => { | ||||
|                     paused = false; | ||||
|                     trace!(handle = handle.0, "cancelling run by handle"); | ||||
|                     self.cancel_run(run); | ||||
|                 } | ||||
|             } | ||||
|         }; | ||||
|         let delay_done = || { | ||||
|             trace!("delay done"); | ||||
|         }; | ||||
|         tokio::select! { | ||||
|             msg = msg_recv.recv() => handle_msg(msg), | ||||
|             _ = &mut delay_future, if delay_future.is_some() => delay_done() | ||||
|         }; | ||||
|             CancelAll => { | ||||
|                 let mut old_runs = VecDeque::new(); | ||||
|                 swap(&mut old_runs, run_queue); | ||||
|                 trace!(count = old_runs.len(), "cancelling all runs"); | ||||
|                 for mut run in old_runs { | ||||
|                     self.cancel_run(&mut run); | ||||
|                 } | ||||
|             } | ||||
|             Pause => { | ||||
|                 self.paused = true; | ||||
|             } | ||||
|             Unpause => { | ||||
|                 self.paused = false; | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     async fn start(mut self) { | ||||
|         let span = trace_span!("runner_task"); | ||||
|         let _enter = span.enter(); | ||||
| 
 | ||||
|         let mut run_queue: VecDeque<SecRun> = VecDeque::new(); | ||||
| 
 | ||||
|         while self.running { | ||||
|             self.process_queue(&mut run_queue); | ||||
|             let delay_done = || { | ||||
|                 trace!("delay done"); | ||||
|             }; | ||||
|             tokio::select! { | ||||
|                 msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), | ||||
|                 _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() | ||||
|             }; | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -287,7 +317,7 @@ pub struct SectionRunner { | ||||
| impl SectionRunner { | ||||
|     pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { | ||||
|         let (msg_send, msg_recv) = mpsc::channel(8); | ||||
|         spawn(runner_task(interface, msg_recv)); | ||||
|         spawn(RunnerTask::new(interface, msg_recv).start()); | ||||
|         Self { | ||||
|             inner: Arc::new(SectionRunnerInner::new()), | ||||
|             msg_send, | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user