From 68b303e32d1b37c4c7cf1ed4359738a7941e2ad1 Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Sep 2020 15:21:03 -0600 Subject: [PATCH] Synchronous runner quit for determinism --- src/program_runner.rs | 22 ++++++++++++++-------- src/section_runner.rs | 23 ++++++++++++++--------- src/trace_listeners.rs | 2 +- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/src/program_runner.rs b/src/program_runner.rs index fc376e6..ee11fe5 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -13,7 +13,7 @@ use tracing::{debug, error, trace, trace_span, warn}; #[derive(Debug)] enum RunnerMsg { - Quit, + Quit(oneshot::Sender<()>), UpdateSections(Sections), UpdatePrograms(Programs), RunProgramId(ProgramId), @@ -69,6 +69,7 @@ struct RunnerTask { programs: Programs, event_send: Option, scheduled_run_queue: DelayQueue, + quit_tx: Option>, } impl RunnerTask { @@ -81,6 +82,7 @@ impl RunnerTask { programs: Programs::new(), event_send: None, scheduled_run_queue: DelayQueue::new(), + quit_tx: None, } } @@ -208,7 +210,10 @@ impl RunnerTask { use RunnerMsg::*; trace!(msg = debug(&msg), "runner_task recv"); match msg { - Quit => self.running = false, + Quit(quit_tx) => { + self.running = false; + self.quit_tx = Some(quit_tx); + } Subscribe(res_send) => { let event_recv = self.subscribe_event(); // Ignore error if channel closed @@ -309,6 +314,10 @@ impl RunnerTask { }; } + if let Some(quit_tx) = self.quit_tx.take() { + let _ = quit_tx.send(()); + } + Ok(()) } @@ -354,7 +363,9 @@ impl ProgramRunner { } pub async fn quit(&mut self) -> Result<()> { - self.msg_send.send(RunnerMsg::Quit).await?; + let (quit_tx, quit_rx) = oneshot::channel(); + self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?; + quit_rx.await?; Ok(()) } @@ -587,7 +598,6 @@ mod test { runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); - yield_now().await; } #[tokio::test] @@ -611,7 +621,6 @@ mod test { runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); - yield_now().await; } #[tokio::test] @@ -674,7 +683,6 @@ mod test { runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); - yield_now().await; } #[tokio::test] @@ -720,7 +728,6 @@ mod test { runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); - yield_now().await; } #[tokio::test] @@ -778,6 +785,5 @@ mod test { runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); - yield_now().await; } } diff --git a/src/section_runner.rs b/src/section_runner.rs index 5f610f1..c9235b2 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -36,7 +36,7 @@ impl SectionRunnerInner { #[derive(Debug)] enum RunnerMsg { - Quit, + Quit(oneshot::Sender<()>), QueueRun(SectionRunHandle, SectionRef, Duration), CancelRun(SectionRunHandle), CancelAll, @@ -112,6 +112,7 @@ struct RunnerTask { delay_future: OptionFuture, paused: bool, event_send: Option, + quit_tx: Option>, } impl RunnerTask { @@ -126,6 +127,7 @@ impl RunnerTask { delay_future: None.into(), paused: false, event_send: None, + quit_tx: None, } } @@ -290,7 +292,10 @@ impl RunnerTask { use RunnerMsg::*; trace!(msg = debug(&msg), "runner_task recv"); match msg { - Quit => self.running = false, + Quit(quit_tx) => { + self.quit_tx = Some(quit_tx); + self.running = false; + }, QueueRun(handle, section, duration) => { run_queue.push_back(SecRun::new(handle, section, duration)); } @@ -343,6 +348,10 @@ impl RunnerTask { _ = &mut self.delay_future, if self.delay_future.is_some() => delay_done() }; } + + if let Some(quit_tx) = self.quit_tx.take() { + let _ = quit_tx.send(()); + } } } @@ -382,7 +391,9 @@ impl SectionRunner { } pub async fn quit(&mut self) -> Result<()> { - self.msg_send.send(RunnerMsg::Quit).await?; + let (quit_tx, quit_rx) = oneshot::channel(); + self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?; + quit_rx.await?; Ok(()) } @@ -460,7 +471,6 @@ mod test { let mut runner = SectionRunner::new(Arc::new(interface)); tokio::task::yield_now().await; runner.quit().await.unwrap(); - tokio::task::yield_now().await; assert_eq!(quit_msg.get_count(), 1); assert_eq!(task_span.get_exit_count(), 1); @@ -547,7 +557,6 @@ mod test { assert_section_states(&interface, &[false, false]); runner.quit().await.unwrap(); - tokio::task::yield_now().await; } #[tokio::test] @@ -585,7 +594,6 @@ mod test { assert_section_states(&interface, &[false, false]); runner.quit().await.unwrap(); - tokio::task::yield_now().await; } #[tokio::test] @@ -620,7 +628,6 @@ mod test { assert_section_states(&interface, &[false, false]); runner.quit().await.unwrap(); - tokio::task::yield_now().await; } #[tokio::test] @@ -680,7 +687,6 @@ mod test { assert_section_states(&interface, &[false, false]); runner.quit().await.unwrap(); - tokio::task::yield_now().await; } #[tokio::test] @@ -760,6 +766,5 @@ mod test { assert_eq!(event_recv.recv().await, Ok(SectionEvent::RunFinish(run3))); runner.quit().await.unwrap(); - tokio::task::yield_now().await; } } diff --git a/src/trace_listeners.rs b/src/trace_listeners.rs index 7701d66..38e3a08 100644 --- a/src/trace_listeners.rs +++ b/src/trace_listeners.rs @@ -117,7 +117,7 @@ impl<'a> Visit for TraceListenerVisitor<'a> { { if field.name() == filter_field.name { if let Some(filter_field_value) = &filter_field.value { - *right = &value_str == filter_field_value; + *right = value_str.starts_with(filter_field_value); } else { *right = true; }