Browse Source

Add ability to run program from MQTT

master
Alex Mikhalev 4 years ago
parent
commit
4f030e7bcd
  1. 1
      src/main.rs
  2. 7
      src/mqtt/request/mod.rs
  3. 32
      src/mqtt/request/programs.rs

1
src/main.rs

@ -68,6 +68,7 @@ async fn main() -> Result<()> {
let request_context = mqtt::RequestContext { let request_context = mqtt::RequestContext {
sections: sections.clone(), sections: sections.clone(),
section_runner: section_runner.clone(), section_runner: section_runner.clone(),
program_runner: program_runner.clone(),
}; };
let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context); let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context);

7
src/mqtt/request/mod.rs

@ -1,4 +1,4 @@
use crate::{model::Sections, section_runner::SectionRunner}; use crate::{model::Sections, program_runner::ProgramRunner, section_runner::SectionRunner};
use futures_util::ready; use futures_util::ready;
use futures_util::FutureExt; use futures_util::FutureExt;
@ -6,11 +6,13 @@ use num_derive::FromPrimitive;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{fmt, future::Future, pin::Pin, task::Poll}; use std::{fmt, future::Future, pin::Pin, task::Poll};
mod programs;
mod sections; mod sections;
pub struct RequestContext { pub struct RequestContext {
pub sections: Sections, pub sections: Sections,
pub section_runner: SectionRunner, pub section_runner: SectionRunner,
pub program_runner: ProgramRunner,
} }
type BoxFuture<Output> = Pin<Box<dyn Future<Output = Output>>>; type BoxFuture<Output> = Pin<Box<dyn Future<Output = Output>>>;
@ -30,6 +32,7 @@ pub enum ErrorCode {
// NotUnique = 110, // NotUnique = 110,
NoSuchSection = 120, NoSuchSection = 120,
NoSuchSectionRun = 121, NoSuchSectionRun = 121,
NoSuchProgram = 122,
Internal = 200, Internal = 200,
NotImplemented = 201, NotImplemented = 201,
Timeout = 300, Timeout = 300,
@ -258,6 +261,7 @@ pub enum Request {
CancelSection(sections::CancelSectionRequest), CancelSection(sections::CancelSectionRequest),
CancelSectionRunId(sections::CancelSectionRunIdRequest), CancelSectionRunId(sections::CancelSectionRunIdRequest),
PauseSectionRunner(sections::PauseSectionRunnerRequest), PauseSectionRunner(sections::PauseSectionRunnerRequest),
RunProgram(programs::RunProgramRequest),
} }
impl IRequest for Request { impl IRequest for Request {
@ -269,6 +273,7 @@ impl IRequest for Request {
Request::CancelSection(req) => req.exec_erased(ctx), Request::CancelSection(req) => req.exec_erased(ctx),
Request::CancelSectionRunId(req) => req.exec_erased(ctx), Request::CancelSectionRunId(req) => req.exec_erased(ctx),
Request::PauseSectionRunner(req) => req.exec_erased(ctx), Request::PauseSectionRunner(req) => req.exec_erased(ctx),
Request::RunProgram(req) => req.exec_erased(ctx),
} }
} }
} }

32
src/mqtt/request/programs.rs

@ -0,0 +1,32 @@
use super::*;
use crate::{model::ProgramId, program_runner::ProgramRunnerError};
use eyre::WrapErr;
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RunProgramRequest {
program_id: ProgramId,
}
impl IRequest for RunProgramRequest {
type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut program_runner = ctx.program_runner.clone();
let program_id = self.program_id;
Box::pin(async move {
match program_runner.run_program_id(program_id).await {
Ok(program) => Ok(ResponseMessage::new(format!(
"running program '{}'",
program.name
))),
Err(e @ ProgramRunnerError::InvalidProgramId(_)) => Err(RequestError::with_name(
ErrorCode::NoSuchProgram,
e,
"program",
)),
Err(e) => Err(e).wrap_err("could not run program")?,
}
})
}
}
Loading…
Cancel
Save