diff --git a/Cargo.toml b/Cargo.toml index 19d47ff..7412dcd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ serde = { version = "1.0.116", features = ["derive"] } serde_json = "1.0.57" actix = "0.10.0" actix-rt = "1.1.1" +futures-util = { version = "0.3.5", default-features = false, features = ["std", "async-await"] } [dependencies.rumqttc] git = "https://github.com/bytebeamio/rumqtt.git" diff --git a/src/program_runner.rs b/src/program_runner.rs index cd8e067..03f0f15 100644 --- a/src/program_runner.rs +++ b/src/program_runner.rs @@ -1,27 +1,18 @@ use crate::model::{ProgramId, ProgramRef, Programs, Sections}; -use crate::section_runner::{SectionEvent, SectionRunHandle, SectionRunner}; -use eyre::WrapErr; +use crate::section_runner::{ + Error as SectionRunnerError, SectionEvent, SectionEventRecv, SectionRunHandle, SectionRunner, +}; +use actix::{ + Actor, ActorContext, ActorFuture, ActorStream, Addr, AsyncContext, Handler, Message, + MessageResult, SpawnHandle, StreamHandler, WrapFuture, +}; use std::collections::VecDeque; use thiserror::Error; use tokio::{ - spawn, - stream::StreamExt, sync::{broadcast, mpsc, oneshot}, time::{delay_queue, DelayQueue}, }; -use tracing::{debug, error, trace, trace_span, warn}; -use tracing_futures::Instrument; - -#[derive(Debug)] -enum RunnerMsg { - Quit(oneshot::Sender<()>), - UpdateSections(Sections), - UpdatePrograms(Programs), - RunProgramId(ProgramId), - RunProgram(ProgramRef), - CancelProgram(ProgramId), - Subscribe(oneshot::Sender), -} +use tracing::{debug, error, trace, warn}; #[derive(Clone, Debug)] pub enum ProgramEvent { @@ -62,31 +53,15 @@ impl ProgRun { type RunQueue = VecDeque; -struct RunnerTask { +struct ProgramRunnerInner { section_runner: SectionRunner, - msg_recv: mpsc::Receiver, - running: bool, sections: Sections, programs: Programs, event_send: Option, - scheduled_run_queue: DelayQueue, - quit_tx: Option>, + schedule_run_fut: Option, } -impl RunnerTask { - fn new(section_runner: SectionRunner, msg_recv: mpsc::Receiver) -> Self { - Self { - section_runner, - msg_recv, - running: true, - sections: Sections::new(), - programs: Programs::new(), - event_send: None, - scheduled_run_queue: DelayQueue::new(), - quit_tx: None, - } - } - +impl ProgramRunnerInner { fn send_event(&mut self, event: ProgramEvent) { if let Some(event_send) = &mut self.event_send { match event_send.send(event) { @@ -109,88 +84,57 @@ impl RunnerTask { } } - async fn start_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { + fn start_program_run(&mut self, run: &mut ProgRun) { if run.state != RunState::Waiting { warn!( program_id = run.program.id, "cannot run program which is already running" ); - return Ok(()); + return; } - run.sec_run_handles.reserve(run.program.sequence.len()); - for item in &run.program.sequence { - let section = match self.sections.get(&item.section_id) { - Some(sec) => sec.clone(), + let sequence: Vec<_> = run + .program + .sequence + .iter() + .filter_map(|item| match self.sections.get(&item.section_id) { + Some(sec) => Some((sec.clone(), item.duration)), None => { warn!( program_id = run.program.id, section_id = item.section_id, "trying to run program with nonexistant section" ); - continue; + None } - }; - let handle = self - .section_runner - .queue_run(section, item.duration) - .await - .wrap_err("failed to queue section run")?; - run.sec_run_handles.push(handle); - } - run.state = RunState::Running; - self.send_event(ProgramEvent::RunStart(run.program.clone())); - if run.sec_run_handles.is_empty() { + }) + .collect(); + if sequence.is_empty() { warn!(program_id = run.program.id, "program has no valid sections"); run.state = RunState::Finished; + self.send_event(ProgramEvent::RunStart(run.program.clone())); self.send_event(ProgramEvent::RunFinish(run.program.clone())); - } else { - debug!(program_id = run.program.id, "started running program"); + return; } - Ok(()) + run.sec_run_handles.reserve(sequence.len()); + for (section, duration) in sequence { + let handle = self.section_runner.do_queue_run(section, duration); + run.sec_run_handles.push(handle); + } + run.state = RunState::Running; + self.send_event(ProgramEvent::RunStart(run.program.clone())); + debug!(program_id = run.program.id, "started running program"); } - async fn cancel_program_run(&mut self, run: &mut ProgRun) -> eyre::Result<()> { + fn cancel_program_run(&mut self, run: &mut ProgRun) { for handle in run.sec_run_handles.drain(..) { - self.section_runner - .cancel_run(handle) - .await - .wrap_err("failed to cancel section run")?; + self.section_runner.do_cancel_run(handle); } debug!(program_id = run.program.id, "program run is cancelled"); self.send_event(ProgramEvent::RunCancel(run.program.clone())); - Ok(()) } - async fn process_queue(&mut self, run_queue: &mut RunQueue) -> eyre::Result<()> { - while let Some(current_run) = run_queue.front_mut() { - let run_finished = match current_run.state { - RunState::Waiting => { - self.start_program_run(current_run) - .await - .wrap_err("failed to start program run")?; - false - } - RunState::Running => false, - RunState::Finished => true, - RunState::Cancelled => { - self.cancel_program_run(current_run) - .await - .wrap_err("failed to cancel program run")?; - true - } - }; - if run_finished { - run_queue.pop_front(); - } else { - break; - } - } - Ok(()) - } - - fn update_programs(&mut self, new_programs: Programs) { - self.programs = new_programs; - self.scheduled_run_queue.clear(); + fn update_schedules(&mut self, ctx: &mut actix::Context) { + let mut scheduled_run_queue = DelayQueue::with_capacity(self.programs.len()); for (_, prog) in &self.programs { if !prog.enabled { continue; @@ -202,133 +146,245 @@ impl RunnerTask { }; let delay = (next_run - ref_time).to_std().unwrap(); trace!("will run program in {:?}", delay); - self.scheduled_run_queue.insert(prog.clone(), delay); + scheduled_run_queue.insert(prog.clone(), delay); + } + let fut = actix::fut::wrap_stream(scheduled_run_queue) + .map(|item, act: &mut ProgramRunnerActor, ctx| act.handle_scheduled_run(item, ctx)) + .finish(); + let handle = ctx.spawn(fut); + if let Some(old_handle) = self.schedule_run_fut.replace(handle) { + ctx.cancel_future(old_handle); } } +} - fn handle_msg(&mut self, msg: Option, run_queue: &mut RunQueue) { - let msg = msg.expect("ProgramRunner channel closed"); - use RunnerMsg::*; - trace!(msg = debug(&msg), "runner_task recv"); - match msg { - Quit(quit_tx) => { - self.running = false; - self.quit_tx = Some(quit_tx); - } - Subscribe(res_send) => { - let event_recv = self.subscribe_event(); - // Ignore error if channel closed - let _ = res_send.send(event_recv); - } - UpdateSections(new_sections) => { - self.sections = new_sections; - } - UpdatePrograms(new_programs) => { - self.update_programs(new_programs); - } - RunProgramId(program_id) => { - let program = match self.programs.get(&program_id) { - Some(program) => program.clone(), - None => { - warn!(program_id, "trying to run non-existant program"); - return; - } - }; - run_queue.push_back(ProgRun::new(program)); - } - RunProgram(program) => { - run_queue.push_back(ProgRun::new(program)); - } - RunnerMsg::CancelProgram(program_id) => { - for run in run_queue { - if run.program.id == program_id { - run.state = RunState::Cancelled; +struct ProgramRunnerActor { + inner: ProgramRunnerInner, + run_queue: RunQueue, +} + +impl Actor for ProgramRunnerActor { + type Context = actix::Context; + + fn started(&mut self, ctx: &mut Self::Context) { + trace!("subscribing to SectionRunner events"); + let subscribe_fut = self.inner.section_runner.subscribe().into_actor(self).map( + |section_events: Result, + _act: &mut ProgramRunnerActor, + ctx: &mut Self::Context| { + match section_events { + Ok(section_events) => { + ctx.add_stream(section_events.into_stream()); } + Err(err) => warn!("failed to subscribe to SectionRunner events: {}", err), } - } - } + }, + ); + ctx.wait(subscribe_fut); + trace!("program_runner starting"); } - fn handle_finished_run( - &mut self, - finished_run: SectionRunHandle, - run_queue: &mut RunQueue, - ) -> Option<()> { - let current_run = run_queue.front_mut()?; - let last_run_handle = current_run.sec_run_handles.last()?; - if finished_run == *last_run_handle { - current_run.state = RunState::Finished; - debug!( - program_id = current_run.program.id, - "finished running program" - ); - self.send_event(ProgramEvent::RunFinish(current_run.program.clone())); - } - Some(()) + fn stopped(&mut self, _ctx: &mut Self::Context) { + trace!("program_runner stopped"); } +} - fn handle_sec_event( +impl StreamHandler> for ProgramRunnerActor { + fn handle( &mut self, - sec_event: Result, - run_queue: &mut RunQueue, - ) -> eyre::Result<()> { - let sec_event = sec_event.wrap_err("failed to receive section event")?; + item: Result, + _ctx: &mut Self::Context, + ) { + let sec_event = match item { + Ok(e) => e, + Err(err) => { + warn!("failed to receive section event: {}", err); + return; + } + }; #[allow(clippy::single_match)] match sec_event { SectionEvent::RunFinish(finished_run, _) => { - self.handle_finished_run(finished_run, run_queue); + self.handle_finished_run(finished_run); } _ => {} } - Ok(()) } +} - async fn handle_scheduled_run( - &mut self, - item: Result, tokio::time::Error>, - run_queue: &mut RunQueue, - ) -> eyre::Result<()> { - let item = item.wrap_err("tokio time error")?; - run_queue.push_back(ProgRun::new(item.into_inner())); - Ok(()) +#[derive(Message)] +#[rtype(result = "()")] +struct Quit; + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result { + ctx.stop(); } +} - async fn run_impl(&mut self) -> eyre::Result<()> { - let mut sec_events = self - .section_runner - .subscribe() - .await - .wrap_err("could not subscribe to SectionRunner events")?; - - let mut run_queue: RunQueue = VecDeque::new(); - - while self.running { - self.process_queue(&mut run_queue) - .await - .wrap_err("error during queue processing")?; - tokio::select! { - msg = self.msg_recv.recv() => self.handle_msg(msg, &mut run_queue), - sec_event = sec_events.recv() => self.handle_sec_event(sec_event, &mut run_queue)?, - Some(scheduled_run) = self.scheduled_run_queue.next() => { - self.handle_scheduled_run(scheduled_run, &mut run_queue).await?; - }, - }; +#[derive(Message)] +#[rtype(result = "ProgramEventRecv")] +struct Subscribe; + +impl Handler for ProgramRunnerActor { + type Result = MessageResult; + fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result { + let event_recv = self.inner.subscribe_event(); + MessageResult(event_recv) + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct UpdateSections(Sections); + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, msg: UpdateSections, _ctx: &mut Self::Context) -> Self::Result { + trace!("updating sections"); + let UpdateSections(new_sections) = msg; + self.inner.sections = new_sections; + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct UpdatePrograms(Programs); + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, msg: UpdatePrograms, ctx: &mut Self::Context) -> Self::Result { + trace!("updating programs"); + let UpdatePrograms(new_programs) = msg; + self.inner.programs = new_programs; + self.inner.update_schedules(ctx); + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct RunProgramId(ProgramId); + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, msg: RunProgramId, ctx: &mut Self::Context) -> Self::Result { + let RunProgramId(program_id) = msg; + let program = match self.inner.programs.get(&program_id) { + Some(program) => program.clone(), + None => { + warn!(program_id, "trying to run non-existant program"); + return; + } + }; + self.run_queue.push_back(ProgRun::new(program)); + ctx.notify(Process); + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct RunProgram(ProgramRef); + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, msg: RunProgram, ctx: &mut Self::Context) -> Self::Result { + let RunProgram(program) = msg; + self.run_queue.push_back(ProgRun::new(program)); + ctx.notify(Process); + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct CancelProgram(ProgramId); + +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, msg: CancelProgram, ctx: &mut Self::Context) -> Self::Result { + let CancelProgram(program_id) = msg; + for run in self.run_queue.iter_mut() { + if run.program.id == program_id { + run.state = RunState::Cancelled; + } } + ctx.notify(Process); + } +} + +#[derive(Message)] +#[rtype(result = "()")] +struct Process; - if let Some(quit_tx) = self.quit_tx.take() { - let _ = quit_tx.send(()); +impl Handler for ProgramRunnerActor { + type Result = (); + fn handle(&mut self, _msg: Process, _ctx: &mut Self::Context) -> Self::Result { + while let Some(current_run) = self.run_queue.front_mut() { + let run_finished = match current_run.state { + RunState::Waiting => { + self.inner.start_program_run(current_run); + false + } + RunState::Running => false, + RunState::Finished => true, + RunState::Cancelled => { + self.inner.cancel_program_run(current_run); + true + } + }; + if run_finished { + self.run_queue.pop_front(); + } else { + break; + } } + } +} - Ok(()) +impl ProgramRunnerActor { + fn new(section_runner: SectionRunner) -> Self { + Self { + inner: ProgramRunnerInner { + section_runner, + sections: Sections::new(), + programs: Programs::new(), + event_send: None, + schedule_run_fut: None, + }, + run_queue: RunQueue::new(), + } } - async fn run(mut self) { - let span = trace_span!("program_runner task"); + fn handle_finished_run(&mut self, finished_run: SectionRunHandle) -> Option<()> { + let current_run = self.run_queue.front_mut()?; + let last_run_handle = current_run.sec_run_handles.last()?; + if finished_run == *last_run_handle { + current_run.state = RunState::Finished; + debug!( + program_id = current_run.program.id, + "finished running program" + ); + self.inner + .send_event(ProgramEvent::RunFinish(current_run.program.clone())); + } + Some(()) + } - self.run_impl() - .instrument(span) - .await - .expect("error in ProgramRunner task"); + fn handle_scheduled_run( + &mut self, + item: Result, tokio::time::Error>, + ctx: &mut ::Context, + ) { + let program = match item { + Ok(expired) => expired.into_inner(), + Err(err) => { + error!("tokio time error: {}", err); + return; + } + }; + trace!(program_id = program.id, "schedule expired"); + self.run_queue.push_back(ProgRun::new(program)); + ctx.notify(Process); } } @@ -350,65 +406,67 @@ impl From for ChannelClosed { } } -#[derive(Clone, Debug)] +impl From for ChannelClosed { + fn from(_: actix::MailboxError) -> Self { + // TODO: + Self + } +} + +#[derive(Clone)] pub struct ProgramRunner { - msg_send: mpsc::Sender, + addr: Addr, } #[allow(dead_code)] impl ProgramRunner { pub fn new(section_runner: SectionRunner) -> Self { - let (msg_send, msg_recv) = mpsc::channel(8); - spawn(RunnerTask::new(section_runner, msg_recv).run()); - Self { msg_send } + let addr = ProgramRunnerActor::new(section_runner).start(); + Self { addr } } pub async fn quit(&mut self) -> Result<()> { - let (quit_tx, quit_rx) = oneshot::channel(); - self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?; - quit_rx.await?; + self.addr.send(Quit).await?; Ok(()) } pub async fn update_sections(&mut self, new_sections: Sections) -> Result<()> { - self.msg_send - .send(RunnerMsg::UpdateSections(new_sections)) + self.addr + .send(UpdateSections(new_sections)) .await .map_err(From::from) } pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> { - self.msg_send - .send(RunnerMsg::UpdatePrograms(new_programs)) + self.addr + .send(UpdatePrograms(new_programs)) .await .map_err(From::from) } pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> { - self.msg_send - .send(RunnerMsg::RunProgramId(program_id)) + self.addr + .send(RunProgramId(program_id)) .await .map_err(From::from) } pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> { - self.msg_send - .send(RunnerMsg::RunProgram(program)) + self.addr + .send(RunProgram(program)) .await .map_err(From::from) } pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> { - self.msg_send - .send(RunnerMsg::CancelProgram(program_id)) + self.addr + .send(CancelProgram(program_id)) .await .map_err(From::from) } pub async fn subscribe(&mut self) -> Result { - let (res_send, res_recv) = oneshot::channel(); - self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?; - let event_recv = res_recv.await?; + let event_recv = self.addr.send(Subscribe).await?; Ok(event_recv) } } @@ -420,7 +478,7 @@ mod test { use crate::{ model::{Program, ProgramItem, Section}, schedule::{every_day, DateTimeBound, Schedule}, - trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, + trace_listeners::{EventListener, Filters}, }; use assert_matches::assert_matches; use im::ordmap; @@ -433,17 +491,9 @@ mod test { let quit_msg = EventListener::new( Filters::new() .target("sprinklers_rs::program_runner") - .message("runner_task recv") - .field_value("msg", "Quit"), + .message("program_runner stopped"), ); - let task_span = SpanListener::new( - SpanFilters::new() - .target("sprinklers_rs::program_runner") - .name("program_runner task"), - ); - let subscriber = tracing_subscriber::registry() - .with(quit_msg.clone()) - .with(task_span.clone()); + let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let _sub = tracing::subscriber::set_default(subscriber); let interface = MockSectionInterface::new(6); @@ -455,7 +505,6 @@ mod test { yield_now().await; assert_eq!(quit_msg.get_count(), 1); - assert!(task_span.get_exit_count() > 1); } fn make_sections_and_runner() -> (Sections, SectionRunner, Arc) { @@ -530,24 +579,12 @@ mod test { assert_eq!(interface.get_section_state(0), true); tokio::time::pause(); - assert_matches!( - sec_events.recv().await, - Ok(SectionEvent::RunFinish(_, _)) - ); - assert_matches!( - sec_events.recv().await, - Ok(SectionEvent::RunStart(_, _)) - ); + assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); + assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunStart(_, _))); assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(1), true); - assert_matches!( - sec_events.recv().await, - Ok(SectionEvent::RunFinish(_, _)) - ); - assert_matches!( - prog_events.recv().await, - Ok(ProgramEvent::RunFinish(_)) - ); + assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunFinish(_, _))); + assert_matches!(prog_events.recv().await, Ok(ProgramEvent::RunFinish(_))); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); @@ -581,12 +618,12 @@ mod test { // Should immediately start and finish running program // due to nonexistant section assert_matches!( - prog_events.try_recv(), + prog_events.recv().await, Ok(ProgramEvent::RunStart(prog)) if prog.id == 1 ); assert_matches!( - prog_events.try_recv(), + prog_events.recv().await, Ok(ProgramEvent::RunFinish(prog)) if prog.id == 1 ); @@ -734,10 +771,7 @@ mod test { Ok(ProgramEvent::RunCancel(prog)) if prog.id == 1 ); - assert_matches!( - sec_events.recv().await, - Ok(SectionEvent::RunCancel(_, _)) - ); + assert_matches!(sec_events.recv().await, Ok(SectionEvent::RunCancel(_, _))); runner.quit().await.unwrap(); sec_runner.quit().await.unwrap(); @@ -745,7 +779,6 @@ mod test { #[actix_rt::test] async fn test_scheduled_run() { - tracing_subscriber::fmt().init(); let (sections, mut sec_runner, _) = make_sections_and_runner(); let mut runner = ProgramRunner::new(sec_runner.clone()); let mut prog_events = runner.subscribe().await.unwrap(); diff --git a/src/section_runner.rs b/src/section_runner.rs index 8e34f55..17b34d0 100644 --- a/src/section_runner.rs +++ b/src/section_runner.rs @@ -3,7 +3,16 @@ use crate::section_interface::SectionInterface; use actix::{ Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle, }; -use std::{mem::swap, sync::Arc, time::Duration}; +use futures_util::TryFutureExt; +use std::{ + future::Future, + mem::swap, + sync::{ + atomic::{AtomicI32, Ordering}, + Arc, + }, + time::Duration, +}; use thiserror::Error; use tokio::{ sync::{broadcast, watch}, @@ -266,7 +275,6 @@ impl SectionRunnerInner { struct SectionRunnerActor { state: SecRunnerState, - next_run_id: i32, inner: SectionRunnerInner, } @@ -295,26 +303,22 @@ impl Handler for SectionRunnerActor { } #[derive(Message, Debug, Clone)] -#[rtype(result = "SectionRunHandle")] -struct QueueRun(SectionRef, Duration); +#[rtype(result = "()")] +struct QueueRun(SectionRunHandle, SectionRef, Duration); impl Handler for SectionRunnerActor { - type Result = MessageResult; + type Result = (); fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result { - let QueueRun(section, duration) = msg; - - let run_id = self.next_run_id; - self.next_run_id += 1; - let handle = SectionRunHandle(run_id); + let QueueRun(handle, section, duration) = msg; - let run: Arc = SecRun::new(handle.clone(), section, duration).into(); + let run: Arc = SecRun::new(handle, section, duration).into(); self.state.run_queue.push_back(run); self.inner.did_change = true; ctx.notify(Process); - MessageResult(handle) + () } } @@ -418,7 +422,6 @@ impl SectionRunnerActor { delay_future: None, did_change: false, }, - next_run_id: 1, } } @@ -486,6 +489,7 @@ pub type Result = std::result::Result; pub struct SectionRunner { state_recv: SecRunnerStateRecv, addr: Addr, + next_run_id: Arc, } #[allow(dead_code)] @@ -493,46 +497,71 @@ impl SectionRunner { pub fn new(interface: Arc) -> Self { let (state_send, state_recv) = watch::channel(SecRunnerState::default()); let addr = SectionRunnerActor::new(interface, state_send).start(); - Self { state_recv, addr } + Self { + state_recv, + addr, + next_run_id: Arc::new(AtomicI32::new(1)), + } } - pub async fn quit(&mut self) -> Result<()> { - self.addr.send(Quit).await?; - Ok(()) + pub fn quit(&mut self) -> impl Future> { + self.addr.send(Quit).map_err(From::from) } - pub async fn queue_run( + fn queue_run_inner( &mut self, section: SectionRef, duration: Duration, - ) -> Result { - let handle = self.addr.send(QueueRun(section, duration)).await?; - Ok(handle) + ) -> (QueueRun, SectionRunHandle) { + let run_id = self.next_run_id.fetch_add(1, Ordering::SeqCst); + let handle = SectionRunHandle(run_id); + (QueueRun(handle.clone(), section, duration), handle) + } + + pub fn do_queue_run( + &mut self, + section: SectionRef, + duration: Duration, + ) -> SectionRunHandle { + let (queue_run, handle) = self.queue_run_inner(section, duration); + self.addr.do_send(queue_run); + handle + } + + pub fn queue_run( + &mut self, + section: SectionRef, + duration: Duration, + ) -> impl Future> { + let (queue_run, handle) = self.queue_run_inner(section, duration); + self.addr + .send(queue_run) + .map_err(From::from) + .map_ok(move |_| handle) + } + + pub fn do_cancel_run(&mut self, handle: SectionRunHandle) { + self.addr.do_send(CancelRun(handle)) } - pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> { - self.addr.send(CancelRun(handle)).await?; - Ok(()) + pub fn cancel_run(&mut self, handle: SectionRunHandle) -> impl Future> { + self.addr.send(CancelRun(handle)).map_err(From::from) } - pub async fn cancel_all(&mut self) -> Result<()> { - self.addr.send(CancelAll).await?; - Ok(()) + pub fn cancel_all(&mut self) -> impl Future> { + self.addr.send(CancelAll).map_err(From::from) } - pub async fn pause(&mut self) -> Result<()> { - self.addr.send(SetPaused(true)).await?; - Ok(()) + pub fn pause(&mut self) -> impl Future> { + self.addr.send(SetPaused(true)).map_err(From::from) } - pub async fn unpause(&mut self) -> Result<()> { - self.addr.send(SetPaused(false)).await?; - Ok(()) + pub fn unpause(&mut self) -> impl Future> { + self.addr.send(SetPaused(false)).map_err(From::from) } - pub async fn subscribe(&mut self) -> Result { - let event_recv = self.addr.send(Subscribe).await?; - Ok(event_recv) + pub fn subscribe(&mut self) -> impl Future> { + self.addr.send(Subscribe).map_err(From::from) } pub fn state_receiver(&self) -> SecRunnerStateRecv { @@ -559,8 +588,7 @@ mod test { .target("sprinklers_rs::section_runner") .message("section_runner stopped"), ); - let subscriber = tracing_subscriber::registry() - .with(quit_msg.clone()); + let subscriber = tracing_subscriber::registry().with(quit_msg.clone()); let _sub = tracing::subscriber::set_default(subscriber); let interface = MockSectionInterface::new(6);