Browse Source

Fix trace spans

master
Alex Mikhalev 4 years ago
parent
commit
9122bd8755
  1. 35
      src/program_runner.rs
  2. 16
      src/section_runner.rs
  3. 27
      src/update_listener.rs

35
src/program_runner.rs

@ -10,6 +10,7 @@ use tokio::{
time::{delay_queue, DelayQueue}, time::{delay_queue, DelayQueue},
}; };
use tracing::{debug, error, trace, trace_span, warn}; use tracing::{debug, error, trace, trace_span, warn};
use tracing_futures::Instrument;
#[derive(Debug)] #[derive(Debug)]
enum RunnerMsg { enum RunnerMsg {
@ -292,7 +293,7 @@ impl RunnerTask {
Ok(()) Ok(())
} }
async fn start_impl(&mut self) -> eyre::Result<()> { async fn run_impl(&mut self) -> eyre::Result<()> {
let mut sec_events = self let mut sec_events = self
.section_runner .section_runner
.subscribe() .subscribe()
@ -321,11 +322,11 @@ impl RunnerTask {
Ok(()) Ok(())
} }
async fn start(mut self) { async fn run(mut self) {
let span = trace_span!("program_runner::runner_task"); let span = trace_span!("program_runner task");
let _enter = span.enter();
self.start_impl() self.run_impl()
.instrument(span)
.await .await
.expect("error in ProgramRunner task"); .expect("error in ProgramRunner task");
} }
@ -358,7 +359,7 @@ pub struct ProgramRunner {
impl ProgramRunner { impl ProgramRunner {
pub fn new(section_runner: SectionRunner) -> Self { pub fn new(section_runner: SectionRunner) -> Self {
let (msg_send, msg_recv) = mpsc::channel(8); let (msg_send, msg_recv) = mpsc::channel(8);
spawn(RunnerTask::new(section_runner, msg_recv).start()); spawn(RunnerTask::new(section_runner, msg_recv).run());
Self { msg_send } Self { msg_send }
} }
@ -438,7 +439,7 @@ mod test {
let task_span = SpanListener::new( let task_span = SpanListener::new(
SpanFilters::new() SpanFilters::new()
.target("sprinklers_rs::program_runner") .target("sprinklers_rs::program_runner")
.name("program_runner::runner_task"), .name("program_runner task"),
); );
let subscriber = tracing_subscriber::registry() let subscriber = tracing_subscriber::registry()
.with(quit_msg.clone()) .with(quit_msg.clone())
@ -529,11 +530,20 @@ mod test {
assert_eq!(interface.get_section_state(0), true); assert_eq!(interface.get_section_state(0), true);
tokio::time::pause(); tokio::time::pause();
assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_, _)); assert_matches!(
assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunStart(_, _)); sec_events.recv().await.unwrap(),
SectionEvent::RunFinish(_, _)
);
assert_matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunStart(_, _)
);
assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(0), false);
assert_eq!(interface.get_section_state(1), true); assert_eq!(interface.get_section_state(1), true);
assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunFinish(_, _)); assert_matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunFinish(_, _)
);
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await.unwrap(),
ProgramEvent::RunFinish(_) ProgramEvent::RunFinish(_)
@ -724,7 +734,10 @@ mod test {
ProgramEvent::RunCancel(prog) ProgramEvent::RunCancel(prog)
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.recv().await.unwrap(), SectionEvent::RunCancel(_, _)); assert_matches!(
sec_events.recv().await.unwrap(),
SectionEvent::RunCancel(_, _)
);
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();

16
src/section_runner.rs

@ -16,6 +16,7 @@ use tokio::{
time::{delay_for, Instant}, time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span, warn}; use tracing::{debug, trace, trace_span, warn};
use tracing_futures::Instrument;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SectionRunHandle(i32); pub struct SectionRunHandle(i32);
@ -368,10 +369,7 @@ impl RunnerTask {
} }
} }
async fn start(mut self) { async fn run_impl(mut self) {
let span = trace_span!("section_runner::runner_task");
let _enter = span.enter();
let mut state = SecRunnerState::default(); let mut state = SecRunnerState::default();
while self.running { while self.running {
@ -389,6 +387,12 @@ impl RunnerTask {
let _ = quit_tx.send(()); let _ = quit_tx.send(());
} }
} }
async fn run(self) {
let span = trace_span!("section_runner task");
self.run_impl().instrument(span).await;
}
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
@ -419,7 +423,7 @@ pub struct SectionRunner {
impl SectionRunner { impl SectionRunner {
pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self {
let (msg_send, msg_recv) = mpsc::channel(8); let (msg_send, msg_recv) = mpsc::channel(8);
spawn(RunnerTask::new(interface, msg_recv).start()); spawn(RunnerTask::new(interface, msg_recv).run());
Self { Self {
inner: Arc::new(SectionRunnerInner::new()), inner: Arc::new(SectionRunnerInner::new()),
msg_send, msg_send,
@ -497,7 +501,7 @@ mod test {
let task_span = SpanListener::new( let task_span = SpanListener::new(
SpanFilters::new() SpanFilters::new()
.target("sprinklers_rs::section_runner") .target("sprinklers_rs::section_runner")
.name("section_runner::runner_task"), .name("section_runner task"),
); );
let subscriber = tracing_subscriber::registry() let subscriber = tracing_subscriber::registry()
.with(quit_msg.clone()) .with(quit_msg.clone())

27
src/update_listener.rs

@ -8,6 +8,7 @@ use tokio::{
task::JoinHandle, task::JoinHandle,
}; };
use tracing::{trace, trace_span}; use tracing::{trace, trace_span};
use tracing_futures::Instrument;
struct UpdateListenerTask { struct UpdateListenerTask {
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
@ -43,10 +44,7 @@ impl UpdateListenerTask {
Ok(()) Ok(())
} }
async fn run_impl( async fn run_impl(&mut self, mut section_events: SectionEventRecv) -> eyre::Result<()> {
&mut self,
mut section_events: SectionEventRecv,
) -> eyre::Result<()> {
while self.running { while self.running {
select! { select! {
section_event = section_events.recv() => { section_event = section_events.recv() => {
@ -57,15 +55,12 @@ impl UpdateListenerTask {
Ok(()) Ok(())
} }
async fn run( async fn run_or_quit(
mut self, mut self,
mut quit_rx: oneshot::Receiver<()>, mut quit_rx: oneshot::Receiver<()>,
section_events: SectionEventRecv, section_events: SectionEventRecv,
) { ) -> eyre::Result<()> {
let span = trace_span!("UpdateListenerTask::run"); select! {
let _enter = span.enter();
let result = select! {
_ = &mut quit_rx => { _ = &mut quit_rx => {
self.running = false; self.running = false;
Ok(()) Ok(())
@ -73,8 +68,16 @@ impl UpdateListenerTask {
res = self.run_impl(section_events) => { res = self.run_impl(section_events) => {
res res
} }
}; }
result.expect("error in UpdateListenerTask"); }
async fn run(self, quit_rx: oneshot::Receiver<()>, section_events: SectionEventRecv) {
let span = trace_span!("update_listener task");
self.run_or_quit(quit_rx, section_events)
.instrument(span)
.await
.expect("error in UpdateListenerTask");
} }
} }

Loading…
Cancel
Save