This commit is contained in:
parent
4e1e64d1b9
commit
b8a9c24444
@ -9,7 +9,7 @@ use actix::{
|
||||
use std::collections::VecDeque;
|
||||
use thiserror::Error;
|
||||
use tokio::{
|
||||
sync::{broadcast, mpsc, oneshot},
|
||||
sync::broadcast,
|
||||
time::{delay_queue, DelayQueue},
|
||||
};
|
||||
use tracing::{debug, error, trace, warn};
|
||||
@ -263,22 +263,23 @@ impl Handler<UpdatePrograms> for ProgramRunnerActor {
|
||||
}
|
||||
|
||||
#[derive(Message)]
|
||||
#[rtype(result = "()")]
|
||||
#[rtype(result = "Result<ProgramRef>")]
|
||||
struct RunProgramId(ProgramId);
|
||||
|
||||
impl Handler<RunProgramId> for ProgramRunnerActor {
|
||||
type Result = ();
|
||||
type Result = Result<ProgramRef>;
|
||||
fn handle(&mut self, msg: RunProgramId, ctx: &mut Self::Context) -> Self::Result {
|
||||
let RunProgramId(program_id) = msg;
|
||||
let program = match self.inner.programs.get(&program_id) {
|
||||
Some(program) => program.clone(),
|
||||
None => {
|
||||
warn!(program_id, "trying to run non-existant program");
|
||||
return;
|
||||
trace!(program_id, "trying to run non-existant program");
|
||||
return Err(ProgramRunnerError::InvalidProgramId(program_id));
|
||||
}
|
||||
};
|
||||
self.run_queue.push_back(ProgRun::new(program));
|
||||
self.run_queue.push_back(ProgRun::new(program.clone()));
|
||||
ctx.notify(Process);
|
||||
Ok(program)
|
||||
}
|
||||
}
|
||||
|
||||
@ -389,29 +390,18 @@ impl ProgramRunnerActor {
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Error)]
|
||||
#[error("the ProgramRunner channel is closed")]
|
||||
pub struct ChannelClosed;
|
||||
|
||||
pub type Result<T, E = ChannelClosed> = std::result::Result<T, E>;
|
||||
|
||||
impl<T> From<mpsc::error::SendError<T>> for ChannelClosed {
|
||||
fn from(_: mpsc::error::SendError<T>) -> Self {
|
||||
Self
|
||||
}
|
||||
pub enum ProgramRunnerError {
|
||||
#[error("mailbox error: {0}")]
|
||||
Mailbox(
|
||||
#[from]
|
||||
#[source]
|
||||
actix::MailboxError,
|
||||
),
|
||||
#[error("no such program id: {0}")]
|
||||
InvalidProgramId(ProgramId),
|
||||
}
|
||||
|
||||
impl From<oneshot::error::RecvError> for ChannelClosed {
|
||||
fn from(_: oneshot::error::RecvError) -> Self {
|
||||
Self
|
||||
}
|
||||
}
|
||||
|
||||
impl From<actix::MailboxError> for ChannelClosed {
|
||||
fn from(_: actix::MailboxError) -> Self {
|
||||
// TODO:
|
||||
Self
|
||||
}
|
||||
}
|
||||
pub type Result<T, E = ProgramRunnerError> = std::result::Result<T, E>;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ProgramRunner {
|
||||
@ -434,35 +424,35 @@ impl ProgramRunner {
|
||||
self.addr
|
||||
.send(UpdateSections(new_sections))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(ProgramRunnerError::from)
|
||||
}
|
||||
|
||||
pub async fn update_programs(&mut self, new_programs: Programs) -> Result<()> {
|
||||
self.addr
|
||||
.send(UpdatePrograms(new_programs))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(ProgramRunnerError::from)
|
||||
}
|
||||
|
||||
pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<()> {
|
||||
pub async fn run_program_id(&mut self, program_id: ProgramId) -> Result<ProgramRef> {
|
||||
self.addr
|
||||
.send(RunProgramId(program_id))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(ProgramRunnerError::from)?
|
||||
}
|
||||
|
||||
pub async fn run_program(&mut self, program: ProgramRef) -> Result<()> {
|
||||
self.addr
|
||||
.send(RunProgram(program))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(ProgramRunnerError::from)
|
||||
}
|
||||
|
||||
pub async fn cancel_program(&mut self, program_id: ProgramId) -> Result<()> {
|
||||
self.addr
|
||||
.send(CancelProgram(program_id))
|
||||
.await
|
||||
.map_err(From::from)
|
||||
.map_err(ProgramRunnerError::from)
|
||||
}
|
||||
|
||||
pub async fn subscribe(&mut self) -> Result<ProgramEventRecv> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user