Browse Source

Use actix for section runner

master
Alex Mikhalev 4 years ago
parent
commit
9643923428
  1. 2
      Cargo.toml
  2. 2
      src/main.rs
  3. 80
      src/program_runner.rs
  4. 438
      src/section_runner.rs

2
Cargo.toml

@ -20,6 +20,8 @@ chrono = { version = "0.4.15" }
assert_matches = "1.3.0" assert_matches = "1.3.0"
serde = { version = "1.0.116", features = ["derive"] } serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57" serde_json = "1.0.57"
actix = "0.10.0"
actix-rt = "1.1.1"
[dependencies.rumqttc] [dependencies.rumqttc]
git = "https://github.com/bytebeamio/rumqtt.git" git = "https://github.com/bytebeamio/rumqtt.git"

2
src/main.rs

@ -20,7 +20,7 @@ use tracing::{debug, info};
use tracing_subscriber::EnvFilter; use tracing_subscriber::EnvFilter;
use update_listener::UpdateListener; use update_listener::UpdateListener;
#[tokio::main] #[actix_rt::main]
async fn main() -> Result<()> { async fn main() -> Result<()> {
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_ansi(true) .with_ansi(true)

80
src/program_runner.rs

@ -428,7 +428,7 @@ mod test {
use tokio::task::yield_now; use tokio::task::yield_now;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
#[tokio::test] #[actix_rt::test]
async fn test_quit() { async fn test_quit() {
let quit_msg = EventListener::new( let quit_msg = EventListener::new(
Filters::new() Filters::new()
@ -496,7 +496,7 @@ mod test {
.into() .into()
} }
#[tokio::test] #[actix_rt::test]
async fn test_run_program() { async fn test_run_program() {
let (sections, mut sec_runner, interface) = make_sections_and_runner(); let (sections, mut sec_runner, interface) = make_sections_and_runner();
let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut sec_events = sec_runner.subscribe().await.unwrap();
@ -522,31 +522,31 @@ mod test {
runner.run_program(program).await.unwrap(); runner.run_program(program).await.unwrap();
yield_now().await; yield_now().await;
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.recv().await,
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_, _)); assert_matches!(sec_events.try_recv(), Ok(SectionEvent::RunStart(_, _)));
assert_eq!(interface.get_section_state(0), true); assert_eq!(interface.get_section_state(0), true);
tokio::time::pause(); tokio::time::pause();
assert_matches!( assert_matches!(
sec_events.recv().await.unwrap(), sec_events.recv().await,
SectionEvent::RunFinish(_, _) Ok(SectionEvent::RunFinish(_, _))
); );
assert_matches!( assert_matches!(
sec_events.recv().await.unwrap(), sec_events.recv().await,
SectionEvent::RunStart(_, _) Ok(SectionEvent::RunStart(_, _))
); );
assert_eq!(interface.get_section_state(0), false); assert_eq!(interface.get_section_state(0), false);
assert_eq!(interface.get_section_state(1), true); assert_eq!(interface.get_section_state(1), true);
assert_matches!( assert_matches!(
sec_events.recv().await.unwrap(), sec_events.recv().await,
SectionEvent::RunFinish(_, _) Ok(SectionEvent::RunFinish(_, _))
); );
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await,
ProgramEvent::RunFinish(_) Ok(ProgramEvent::RunFinish(_))
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
@ -554,7 +554,7 @@ mod test {
yield_now().await; yield_now().await;
} }
#[tokio::test] #[actix_rt::test]
async fn test_run_nonexistant_section() { async fn test_run_nonexistant_section() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(sec_runner.clone());
@ -581,13 +581,13 @@ mod test {
// Should immediately start and finish running program // Should immediately start and finish running program
// due to nonexistant section // due to nonexistant section
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv(),
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.try_recv(),
ProgramEvent::RunFinish(prog) Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1 if prog.id == 1
); );
@ -595,14 +595,14 @@ mod test {
yield_now().await; yield_now().await;
// Should run right away since last program should be done // Should run right away since last program should be done
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.recv().await,
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 2 if prog.id == 2
); );
tokio::time::pause(); tokio::time::pause();
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await,
ProgramEvent::RunFinish(prog) Ok(ProgramEvent::RunFinish(prog))
if prog.id == 2 if prog.id == 2
); );
@ -610,7 +610,7 @@ mod test {
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_close_event_chan() { async fn test_close_event_chan() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(sec_runner.clone());
@ -633,7 +633,7 @@ mod test {
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_run_program_id() { async fn test_run_program_id() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut runner = ProgramRunner::new(sec_runner.clone()); let mut runner = ProgramRunner::new(sec_runner.clone());
@ -666,28 +666,28 @@ mod test {
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();
yield_now().await; yield_now().await;
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.recv().await,
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
tokio::time::pause(); tokio::time::pause();
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await,
ProgramEvent::RunFinish(prog) Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1 if prog.id == 1
); );
runner.run_program_id(1).await.unwrap(); runner.run_program_id(1).await.unwrap();
yield_now().await; yield_now().await;
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.recv().await,
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await,
ProgramEvent::RunFinish(prog) Ok(ProgramEvent::RunFinish(prog))
if prog.id == 1 if prog.id == 1
); );
@ -695,7 +695,7 @@ mod test {
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_cancel_program() { async fn test_cancel_program() {
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();
let mut sec_events = sec_runner.subscribe().await.unwrap(); let mut sec_events = sec_runner.subscribe().await.unwrap();
@ -721,8 +721,8 @@ mod test {
runner.run_program(program.clone()).await.unwrap(); runner.run_program(program.clone()).await.unwrap();
yield_now().await; yield_now().await;
assert_matches!( assert_matches!(
prog_events.try_recv().unwrap(), prog_events.recv().await,
ProgramEvent::RunStart(prog) Ok(ProgramEvent::RunStart(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_, _)); assert_matches!(sec_events.try_recv().unwrap(), SectionEvent::RunStart(_, _));
@ -730,20 +730,20 @@ mod test {
runner.cancel_program(program.id).await.unwrap(); runner.cancel_program(program.id).await.unwrap();
yield_now().await; yield_now().await;
assert_matches!( assert_matches!(
prog_events.recv().await.unwrap(), prog_events.recv().await,
ProgramEvent::RunCancel(prog) Ok(ProgramEvent::RunCancel(prog))
if prog.id == 1 if prog.id == 1
); );
assert_matches!( assert_matches!(
sec_events.recv().await.unwrap(), sec_events.recv().await,
SectionEvent::RunCancel(_, _) Ok(SectionEvent::RunCancel(_, _))
); );
runner.quit().await.unwrap(); runner.quit().await.unwrap();
sec_runner.quit().await.unwrap(); sec_runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_scheduled_run() { async fn test_scheduled_run() {
tracing_subscriber::fmt().init(); tracing_subscriber::fmt().init();
let (sections, mut sec_runner, _) = make_sections_and_runner(); let (sections, mut sec_runner, _) = make_sections_and_runner();

438
src/section_runner.rs

@ -1,22 +1,15 @@
use crate::model::SectionRef; use crate::model::SectionRef;
use crate::option_future::OptionFuture;
use crate::section_interface::SectionInterface; use crate::section_interface::SectionInterface;
use std::{ use actix::{
mem::swap, Actor, ActorContext, Addr, AsyncContext, Handler, Message, MessageResult, SpawnHandle,
sync::{
atomic::{AtomicI32, Ordering},
Arc,
},
time::Duration,
}; };
use std::{mem::swap, sync::Arc, time::Duration};
use thiserror::Error; use thiserror::Error;
use tokio::{ use tokio::{
spawn, sync::{broadcast, watch},
sync::{broadcast, mpsc, oneshot, watch}, time::Instant,
time::{delay_for, Instant},
}; };
use tracing::{debug, trace, trace_span, warn}; use tracing::{debug, trace, warn};
use tracing_futures::Instrument;
#[derive(Debug, Clone, PartialEq, Eq, Hash)] #[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct SectionRunHandle(i32); pub struct SectionRunHandle(i32);
@ -27,30 +20,6 @@ impl SectionRunHandle {
} }
} }
#[derive(Debug)]
struct SectionRunnerInner {
next_run_id: AtomicI32,
}
impl SectionRunnerInner {
fn new() -> Self {
Self {
next_run_id: AtomicI32::new(1),
}
}
}
#[derive(Debug)]
enum RunnerMsg {
Quit(oneshot::Sender<()>),
QueueRun(SectionRunHandle, SectionRef, Duration),
CancelRun(SectionRunHandle),
CancelAll,
Pause,
Unpause,
Subscribe(oneshot::Sender<SectionEventRecv>),
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub enum SectionEvent { pub enum SectionEvent {
RunStart(SectionRunHandle, SectionRef), RunStart(SectionRunHandle, SectionRef),
@ -130,35 +99,15 @@ impl Default for SecRunnerState {
pub type SecRunnerStateRecv = watch::Receiver<SecRunnerState>; pub type SecRunnerStateRecv = watch::Receiver<SecRunnerState>;
struct RunnerTask { struct SectionRunnerInner {
interface: Arc<dyn SectionInterface + Sync>, interface: Arc<dyn SectionInterface + Sync>,
msg_recv: mpsc::Receiver<RunnerMsg>,
running: bool,
delay_future: OptionFuture<tokio::time::Delay>,
event_send: Option<SectionEventSend>, event_send: Option<SectionEventSend>,
state_send: watch::Sender<SecRunnerState>, state_send: watch::Sender<SecRunnerState>,
quit_tx: Option<oneshot::Sender<()>>, delay_future: Option<SpawnHandle>,
did_change: bool, did_change: bool,
} }
impl RunnerTask { impl SectionRunnerInner {
fn new(
interface: Arc<dyn SectionInterface + Sync>,
msg_recv: mpsc::Receiver<RunnerMsg>,
state_send: watch::Sender<SecRunnerState>,
) -> Self {
Self {
interface,
msg_recv,
running: true,
delay_future: None.into(),
event_send: None,
state_send,
quit_tx: None,
did_change: false,
}
}
fn send_event(&mut self, event: SectionEvent) { fn send_event(&mut self, event: SectionEvent) {
if let Some(event_send) = &mut self.event_send { if let Some(event_send) = &mut self.event_send {
match event_send.send(event) { match event_send.send(event) {
@ -297,35 +246,212 @@ impl RunnerTask {
self.did_change = true; self.did_change = true;
} }
fn process_queue(&mut self, state: &mut SecRunnerState) { fn process_after_delay(
&mut self,
after: Duration,
ctx: &mut <SectionRunnerActor as Actor>::Context,
) {
let delay_future = ctx.notify_later(Process, after);
if let Some(old_future) = self.delay_future.replace(delay_future) {
ctx.cancel_future(old_future);
}
}
fn cancel_process(&mut self, ctx: &mut <SectionRunnerActor as Actor>::Context) {
if let Some(old_future) = self.delay_future.take() {
ctx.cancel_future(old_future);
}
}
}
struct SectionRunnerActor {
state: SecRunnerState,
next_run_id: i32,
inner: SectionRunnerInner,
}
impl Actor for SectionRunnerActor {
type Context = actix::Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context) {
trace!("section_runner starting");
}
fn stopped(&mut self, _ctx: &mut Self::Context) {
trace!("section_runner stopped");
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
struct Quit;
impl Handler<Quit> for SectionRunnerActor {
type Result = ();
fn handle(&mut self, _msg: Quit, ctx: &mut Self::Context) -> Self::Result {
ctx.stop();
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "SectionRunHandle")]
struct QueueRun(SectionRef, Duration);
impl Handler<QueueRun> for SectionRunnerActor {
type Result = MessageResult<QueueRun>;
fn handle(&mut self, msg: QueueRun, ctx: &mut Self::Context) -> Self::Result {
let QueueRun(section, duration) = msg;
let run_id = self.next_run_id;
self.next_run_id += 1;
let handle = SectionRunHandle(run_id);
let run: Arc<SecRun> = SecRun::new(handle.clone(), section, duration).into();
self.state.run_queue.push_back(run);
self.inner.did_change = true;
ctx.notify(Process);
MessageResult(handle)
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
struct CancelRun(SectionRunHandle);
impl Handler<CancelRun> for SectionRunnerActor {
type Result = ();
fn handle(&mut self, msg: CancelRun, ctx: &mut Self::Context) -> Self::Result {
let CancelRun(handle) = msg;
for run in self.state.run_queue.iter_mut() {
if run.handle != handle {
continue;
}
trace!(handle = handle.0, "cancelling run by handle");
self.inner.cancel_run(run);
}
ctx.notify(Process);
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
struct CancelAll;
impl Handler<CancelAll> for SectionRunnerActor {
type Result = ();
fn handle(&mut self, _msg: CancelAll, ctx: &mut Self::Context) -> Self::Result {
let mut old_runs = SecRunQueue::new();
swap(&mut old_runs, &mut self.state.run_queue);
trace!(count = old_runs.len(), "cancelling all runs");
for mut run in old_runs {
self.inner.cancel_run(&mut run);
}
ctx.notify(Process);
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
struct SetPaused(bool);
impl Handler<SetPaused> for SectionRunnerActor {
type Result = ();
fn handle(&mut self, msg: SetPaused, ctx: &mut Self::Context) -> Self::Result {
let SetPaused(pause) = msg;
if pause != self.state.paused {
if pause {
self.state.paused = true;
self.inner.send_event(SectionEvent::RunnerPause);
} else {
self.state.paused = false;
self.inner.send_event(SectionEvent::RunnerUnpause);
}
self.inner.did_change = true;
ctx.notify(Process);
}
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "SectionEventRecv")]
struct Subscribe;
impl Handler<Subscribe> for SectionRunnerActor {
type Result = MessageResult<Subscribe>;
fn handle(&mut self, _msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
let event_recv = self.inner.subscribe_event();
MessageResult(event_recv)
}
}
#[derive(Message, Debug, Clone)]
#[rtype(result = "()")]
struct Process;
impl Handler<Process> for SectionRunnerActor {
type Result = ();
fn handle(&mut self, _msg: Process, ctx: &mut Self::Context) -> Self::Result {
self.process(ctx)
}
}
impl SectionRunnerActor {
fn new(
interface: Arc<dyn SectionInterface + Sync>,
state_send: watch::Sender<SecRunnerState>,
) -> Self {
Self {
state: SecRunnerState::default(),
inner: SectionRunnerInner {
interface,
event_send: None,
state_send,
delay_future: None,
did_change: false,
},
next_run_id: 1,
}
}
fn process_queue(&mut self, ctx: &mut actix::Context<Self>) {
use SecRunState::*; use SecRunState::*;
let state = &mut self.state;
while let Some(current_run) = state.run_queue.front_mut() { while let Some(current_run) = state.run_queue.front_mut() {
let run_finished = match (&current_run.state, state.paused) { let run_finished = match (&current_run.state, state.paused) {
(Waiting, false) => { (Waiting, false) => {
self.start_run(current_run); self.inner.start_run(current_run);
self.delay_future = Some(delay_for(current_run.duration)).into(); self.inner.process_after_delay(current_run.duration, ctx);
false false
} }
(Running { start_time }, false) => { (Running { start_time }, false) => {
let time_to_finish = start_time.elapsed() >= current_run.duration; let time_to_finish = start_time.elapsed() >= current_run.duration;
if time_to_finish { if time_to_finish {
self.finish_run(current_run); self.inner.finish_run(current_run);
self.delay_future = None.into(); self.inner.cancel_process(ctx);
} }
time_to_finish time_to_finish
} }
(Paused { .. }, false) => { (Paused { .. }, false) => {
self.unpause_run(current_run); self.inner.unpause_run(current_run);
self.delay_future = Some(delay_for(current_run.duration)).into(); self.inner.process_after_delay(current_run.duration, ctx);
false false
} }
(Waiting, true) => { (Waiting, true) => {
self.pause_run(current_run); self.inner.pause_run(current_run);
false false
} }
(Running { .. }, true) => { (Running { .. }, true) => {
self.pause_run(current_run); self.inner.pause_run(current_run);
self.delay_future = None.into(); self.inner.cancel_process(ctx);
false false
} }
(Paused { .. }, true) => false, (Paused { .. }, true) => false,
@ -340,140 +466,38 @@ impl RunnerTask {
} }
} }
fn handle_msg(&mut self, msg: Option<RunnerMsg>, state: &mut SecRunnerState) { fn process(&mut self, ctx: &mut actix::Context<Self>) {
let msg = msg.expect("SectionRunner channel closed"); self.process_queue(ctx);
use RunnerMsg::*;
trace!(msg = debug(&msg), "runner_task recv");
match msg {
Quit(quit_tx) => {
self.quit_tx = Some(quit_tx);
self.running = false;
}
QueueRun(handle, section, duration) => {
state
.run_queue
.push_back(Arc::new(SecRun::new(handle, section, duration)));
self.did_change = true;
}
CancelRun(handle) => {
for run in state.run_queue.iter_mut() {
if run.handle != handle {
continue;
}
trace!(handle = handle.0, "cancelling run by handle");
self.cancel_run(run);
}
}
CancelAll => {
let mut old_runs = SecRunQueue::new();
swap(&mut old_runs, &mut state.run_queue);
trace!(count = old_runs.len(), "cancelling all runs");
for mut run in old_runs {
self.cancel_run(&mut run);
}
}
Pause => {
state.paused = true;
self.send_event(SectionEvent::RunnerPause);
self.did_change = true;
}
Unpause => {
state.paused = false;
self.send_event(SectionEvent::RunnerUnpause);
self.did_change = true;
}
Subscribe(res_send) => {
let event_recv = self.subscribe_event();
// Ignore error if channel closed
let _ = res_send.send(event_recv);
}
}
}
async fn run_impl(mut self) {
let mut state = SecRunnerState::default();
while self.running {
// Process all pending messages
// This is so if there are many pending messages, the state
// is only broadcast once
while let Ok(msg) = self.msg_recv.try_recv() {
self.handle_msg(Some(msg), &mut state);
}
self.process_queue(&mut state);
// If a change was made to state, broadcast it
if self.did_change {
let _ = self.state_send.broadcast(state.clone());
self.did_change = false;
}
let delay_done = || {
trace!("delay done");
};
tokio::select! {
msg = self.msg_recv.recv() => {
self.handle_msg(msg, &mut state)
},
_ = &mut self.delay_future, if self.delay_future.is_some() => delay_done()
};
}
if let Some(quit_tx) = self.quit_tx.take() { if self.inner.did_change {
let _ = quit_tx.send(()); let _ = self.inner.state_send.broadcast(self.state.clone());
self.inner.did_change = false;
} }
} }
async fn run(self) {
let span = trace_span!("section_runner task");
self.run_impl().instrument(span).await;
}
} }
#[derive(Debug, Clone, Error)] #[derive(Debug, Clone, Error)]
#[error("the SectionRunner channel is closed")] #[error("error communicating with SectionRunner: {0}")]
pub struct ChannelClosed; pub struct Error(#[from] actix::MailboxError);
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>;
impl<T> From<mpsc::error::SendError<T>> for ChannelClosed {
fn from(_: mpsc::error::SendError<T>) -> Self {
Self
}
}
impl From<oneshot::error::RecvError> for ChannelClosed { pub type Result<T, E = Error> = std::result::Result<T, E>;
fn from(_: oneshot::error::RecvError) -> Self {
Self
}
}
#[derive(Clone, Debug)] #[derive(Clone)]
pub struct SectionRunner { pub struct SectionRunner {
inner: Arc<SectionRunnerInner>,
msg_send: mpsc::Sender<RunnerMsg>,
state_recv: SecRunnerStateRecv, state_recv: SecRunnerStateRecv,
addr: Addr<SectionRunnerActor>,
} }
#[allow(dead_code)] #[allow(dead_code)]
impl SectionRunner { impl SectionRunner {
pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self { pub fn new(interface: Arc<dyn SectionInterface + Sync>) -> Self {
let (msg_send, msg_recv) = mpsc::channel(32);
let (state_send, state_recv) = watch::channel(SecRunnerState::default()); let (state_send, state_recv) = watch::channel(SecRunnerState::default());
spawn(RunnerTask::new(interface, msg_recv, state_send).run()); let addr = SectionRunnerActor::new(interface, state_send).start();
Self { Self { state_recv, addr }
inner: Arc::new(SectionRunnerInner::new()),
msg_send,
state_recv,
}
} }
pub async fn quit(&mut self) -> Result<()> { pub async fn quit(&mut self) -> Result<()> {
let (quit_tx, quit_rx) = oneshot::channel(); self.addr.send(Quit).await?;
self.msg_send.send(RunnerMsg::Quit(quit_tx)).await?;
quit_rx.await?;
Ok(()) Ok(())
} }
@ -482,38 +506,32 @@ impl SectionRunner {
section: SectionRef, section: SectionRef,
duration: Duration, duration: Duration,
) -> Result<SectionRunHandle> { ) -> Result<SectionRunHandle> {
let run_id = self.inner.next_run_id.fetch_add(1, Ordering::Relaxed); let handle = self.addr.send(QueueRun(section, duration)).await?;
let handle = SectionRunHandle(run_id);
self.msg_send
.send(RunnerMsg::QueueRun(handle.clone(), section, duration))
.await?;
Ok(handle) Ok(handle)
} }
pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> { pub async fn cancel_run(&mut self, handle: SectionRunHandle) -> Result<()> {
self.msg_send.send(RunnerMsg::CancelRun(handle)).await?; self.addr.send(CancelRun(handle)).await?;
Ok(()) Ok(())
} }
pub async fn cancel_all(&mut self) -> Result<()> { pub async fn cancel_all(&mut self) -> Result<()> {
self.msg_send.send(RunnerMsg::CancelAll).await?; self.addr.send(CancelAll).await?;
Ok(()) Ok(())
} }
pub async fn pause(&mut self) -> Result<()> { pub async fn pause(&mut self) -> Result<()> {
self.msg_send.send(RunnerMsg::Pause).await?; self.addr.send(SetPaused(true)).await?;
Ok(()) Ok(())
} }
pub async fn unpause(&mut self) -> Result<()> { pub async fn unpause(&mut self) -> Result<()> {
self.msg_send.send(RunnerMsg::Unpause).await?; self.addr.send(SetPaused(false)).await?;
Ok(()) Ok(())
} }
pub async fn subscribe(&mut self) -> Result<SectionEventRecv> { pub async fn subscribe(&mut self) -> Result<SectionEventRecv> {
let (res_send, res_recv) = oneshot::channel(); let event_recv = self.addr.send(Subscribe).await?;
self.msg_send.send(RunnerMsg::Subscribe(res_send)).await?;
let event_recv = res_recv.await?;
Ok(event_recv) Ok(event_recv)
} }
@ -528,28 +546,21 @@ mod test {
use crate::section_interface::MockSectionInterface; use crate::section_interface::MockSectionInterface;
use crate::{ use crate::{
model::{Section, Sections}, model::{Section, Sections},
trace_listeners::{EventListener, Filters, SpanFilters, SpanListener}, trace_listeners::{EventListener, Filters},
}; };
use assert_matches::assert_matches; use assert_matches::assert_matches;
use im::ordmap; use im::ordmap;
use tracing_subscriber::prelude::*; use tracing_subscriber::prelude::*;
#[tokio::test] #[actix_rt::test]
async fn test_quit() { async fn test_quit() {
let quit_msg = EventListener::new( let quit_msg = EventListener::new(
Filters::new() Filters::new()
.target("sprinklers_rs::section_runner") .target("sprinklers_rs::section_runner")
.message("runner_task recv") .message("section_runner stopped"),
.field_value("msg", "Quit"),
);
let task_span = SpanListener::new(
SpanFilters::new()
.target("sprinklers_rs::section_runner")
.name("section_runner task"),
); );
let subscriber = tracing_subscriber::registry() let subscriber = tracing_subscriber::registry()
.with(quit_msg.clone()) .with(quit_msg.clone());
.with(task_span.clone());
let _sub = tracing::subscriber::set_default(subscriber); let _sub = tracing::subscriber::set_default(subscriber);
let interface = MockSectionInterface::new(6); let interface = MockSectionInterface::new(6);
@ -558,7 +569,6 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
assert_eq!(quit_msg.get_count(), 1); assert_eq!(quit_msg.get_count(), 1);
assert!(task_span.get_exit_count() > 1);
} }
fn make_sections_and_interface() -> (Sections, Arc<MockSectionInterface>) { fn make_sections_and_interface() -> (Sections, Arc<MockSectionInterface>) {
@ -597,7 +607,7 @@ mod test {
tokio::time::resume(); tokio::time::resume();
} }
#[tokio::test] #[actix_rt::test]
async fn test_queue() { async fn test_queue() {
let (sections, interface) = make_sections_and_interface(); let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
@ -644,7 +654,7 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_cancel_run() { async fn test_cancel_run() {
let (sections, interface) = make_sections_and_interface(); let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
@ -681,7 +691,7 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_cancel_all() { async fn test_cancel_all() {
let (sections, interface) = make_sections_and_interface(); let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
@ -715,7 +725,7 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_pause() { async fn test_pause() {
let (sections, interface) = make_sections_and_interface(); let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());
@ -774,7 +784,7 @@ mod test {
runner.quit().await.unwrap(); runner.quit().await.unwrap();
} }
#[tokio::test] #[actix_rt::test]
async fn test_event() { async fn test_event() {
let (sections, interface) = make_sections_and_interface(); let (sections, interface) = make_sections_and_interface();
let mut runner = SectionRunner::new(interface.clone()); let mut runner = SectionRunner::new(interface.clone());

Loading…
Cancel
Save