From 9122bd8755acb97357b979749389fdb18527b5bb Mon Sep 17 00:00:00 2001 From: Alex Mikhalev Date: Sun, 27 Sep 2020 21:11:09 -0600 Subject: [PATCH] Fix trace spans --- src/program_runner.rs | 35 ++++++++++++++++++++++++----------- src/section_runner.rs | 16 ++++++++++------ src/update_listener.rs | 27 +++++++++++++++------------ 3 files changed, 49 insertions(+), 29 deletions(-) diff --git a/src/program_runner.rs b/src/program_runner.rs index 8df84c8..266b8a9 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -10,6 +10,7 @@ use tokio::{ time::{delay_queue, DelayQueue}, }; use tracing::{debug, error, trace, trace_span, warn}; +use tracing_futures::Instrument; #[derive(Debug)] enum RunnerMsg { @@ -292,7 +293,7 @@ impl RunnerTask { Ok(()) } - async fn start_impl(&mut self) -> eyre::Result<()> { + async fn run_impl(&mut self) -> eyre::Result<()> { let mut sec_events = self .section_runner .subscribe() @@ -321,11 +322,11 @@ impl RunnerTask { Ok(()) } - async fn start(mut self) { - let span = trace_span!("program_runner::runner_task"); - let _enter = span.enter(); + async fn run(mut self) { + let span = trace_span!("program_runner task"); - self.start_impl() + self.run_impl() + .instrument(span) .await .expect("error in ProgramRunner task"); } @@ -358,7 +359,7 @@ pub struct ProgramRunner { impl ProgramRunner { pub fn new(section_runner: SectionRunner) -> Self { let (msg_send, msg_recv) = mpsc::channel(8); - spawn(RunnerTask::new(section_runner, msg_recv).start()); + spawn(RunnerTask::new(section_runner, msg_recv).run()); Self { msg_send } } @@ -438,7 +439,7 @@ mod test { let task_span = SpanListener::new( SpanFilters::new() .target("sprinklers_rs::program_runner") - .name("program_runner::runner_task"), + .name("program_runner task"), ); let subscriber = tracing_subscriber::registry() .with(quit_msg.clone()) @@ -529,11 +530,20 @@ mod test { assert_eq!(interface.get_section_state(0), true); tokio::time::pause(); - assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_, _)); - assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunStart(_, _)); + assert_matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunFinish(_, _) + ); + assert_matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunStart(_, _) + ); assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(1), true); - assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_, _)); + assert_matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunFinish(_, _) + ); assert_matches!( prog_events.recv().await.unwrap(), ProgramEvent::RunFinish(_) @@ -724,7 +734,10 @@ mod test { ProgramEvent::RunCancel(prog) if prog.id == 1 ); - assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunCancel(_, _)); + assert_matches!( + sec_events.recv().await.unwrap(), + SectionEvent::RunCancel(_, _) + ); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); diff --git a/src/section_runner.rs b/src/section_runner.rs index c5157fe..bf2d1fb 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -16,6 +16,7 @@ use tokio::{ time::{delay_for, Instant}, }; use tracing::{debug, trace, trace_span, warn}; +use tracing_futures::Instrument; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct SectionRunHandle(i32); @@ -368,10 +369,7 @@ impl RunnerTask { } } - async fn start(mut self) { - let span = trace_span!("section_runner::runner_task"); - let _enter = span.enter(); - + async fn run_impl(mut self) { let mut state = SecRunnerState::default(); while self.running { @@ -389,6 +387,12 @@ impl RunnerTask { let _ = quit_tx.send(()); } } + + async fn run(self) { + let span = trace_span!("section_runner task"); + + self.run_impl().instrument(span).await; + } } #[derive(Debug, Clone, Error)] @@ -419,7 +423,7 @@ pub struct SectionRunner { impl SectionRunner { pub fn new(interface: Arc) -> Self { let (msg_send, msg_recv) = mpsc::channel(8); - spawn(RunnerTask::new(interface, msg_recv).start()); + spawn(RunnerTask::new(interface, msg_recv).run()); Self { inner: Arc::new(SectionRunnerInner::new()), msg_send, @@ -497,7 +501,7 @@ mod test { let task_span = SpanListener::new( SpanFilters::new() .target("sprinklers_rs::section_runner") - .name("section_runner::runner_task"), + .name("section_runner task"), ); let subscriber = tracing_subscriber::registry() .with(quit_msg.clone()) diff --git a/src/update_listener.rs b/src/update_listener.rs index 1f39bf8..baa88e9 100644 --- a/src/update_listener.rs +++ b/src/update_listener.rs @@ -8,6 +8,7 @@ use tokio::{ task::JoinHandle, }; use tracing::{trace, trace_span}; +use tracing_futures::Instrument; struct UpdateListenerTask { mqtt_interface: MqttInterface, @@ -43,10 +44,7 @@ impl UpdateListenerTask { Ok(()) } - async fn run_impl( - &mut self, - mut section_events: SectionEventRecv, - ) -> eyre::Result<()> { + async fn run_impl(&mut self, mut section_events: SectionEventRecv) -> eyre::Result<()> { while self.running { select! { section_event = section_events.recv() => { @@ -57,15 +55,12 @@ impl UpdateListenerTask { Ok(()) } - async fn run( + async fn run_or_quit( mut self, mut quit_rx: oneshot::Receiver<()>, section_events: SectionEventRecv, - ) { - let span = trace_span!("UpdateListenerTask::run"); - let _enter = span.enter(); - - let result = select! { + ) -> eyre::Result<()> { + select! { _ = &mut quit_rx => { self.running = false; Ok(()) @@ -73,8 +68,16 @@ impl UpdateListenerTask { res = self.run_impl(section_events) => { res } - }; - result.expect("error in UpdateListenerTask"); + } + } + + async fn run(self, quit_rx: oneshot::Receiver<()>, section_events: SectionEventRecv) { + let span = trace_span!("update_listener task"); + + self.run_or_quit(quit_rx, section_events) + .instrument(span) + .await + .expect("error in UpdateListenerTask"); } }