Browse Source

Implement updating program from MQTT

master
Alex Mikhalev 4 years ago
parent
commit
1cc4caae60
  1. 20
      sprinklers_actors/src/state_manager.rs
  2. 18
      sprinklers_database/src/program.rs
  3. 2
      sprinklers_mqtt/Cargo.toml
  4. 2
      sprinklers_mqtt/src/actor.rs
  5. 14
      sprinklers_mqtt/src/request/mod.rs
  6. 50
      sprinklers_mqtt/src/request/programs.rs
  7. 11
      sprinklers_mqtt/src/request/sections.rs
  8. 66
      sprinklers_mqtt/src/update_listener.rs
  9. 1
      sprinklers_rs/Cargo.toml
  10. 21
      sprinklers_rs/src/main.rs
  11. 103
      sprinklers_rs/src/state_manager.rs

20
sprinklers_actors/src/state_manager.rs

@ -1,5 +1,5 @@
use eyre::Result;
use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData, Programs}; use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData, Programs};
use thiserror::Error;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
#[derive(Debug)] #[derive(Debug)]
@ -17,6 +17,20 @@ pub struct StateManager {
programs_watch: watch::Receiver<Programs>, programs_watch: watch::Receiver<Programs>,
} }
#[derive(Debug, Error)]
pub enum StateError {
#[error("no such program: {0}")]
NoSuchProgram(ProgramId),
#[error("internal error: {0}")]
Other(
#[from]
#[source]
eyre::Report,
),
}
pub type Result<T, E = StateError> = std::result::Result<T, E>;
impl StateManager { impl StateManager {
pub fn new( pub fn new(
request_tx: mpsc::Sender<Request>, request_tx: mpsc::Sender<Request>,
@ -40,8 +54,8 @@ impl StateManager {
update, update,
resp_tx, resp_tx,
}) })
.await?; .await.map_err(eyre::Report::from)?;
resp_rx.await? resp_rx.await.map_err(eyre::Report::from)?
} }
pub fn get_programs(&self) -> watch::Receiver<Programs> { pub fn get_programs(&self) -> watch::Receiver<Programs> {

18
sprinklers_database/src/program.rs

@ -9,6 +9,7 @@ use sprinklers_core::{
use eyre::Result; use eyre::Result;
use rusqlite::{params, Row, ToSql, Transaction, NO_PARAMS}; use rusqlite::{params, Row, ToSql, Transaction, NO_PARAMS};
use thiserror::Error;
type SqlProgramSequence = SqlJson<ProgramSequence>; type SqlProgramSequence = SqlJson<ProgramSequence>;
type SqlSchedule = SqlJson<Schedule>; type SqlSchedule = SqlJson<Schedule>;
@ -99,6 +100,10 @@ fn sequence_as_sql<'a>(
.map(move |(seq_num, item)| item_as_sql(item, program_id, seq_num)) .map(move |(seq_num, item)| item_as_sql(item, program_id, seq_num))
} }
#[derive(Clone, Debug, Error)]
#[error("no such program id: {0}")]
pub struct NoSuchProgram(pub ProgramId);
pub fn query_programs(conn: &DbConn) -> Result<Programs> { pub fn query_programs(conn: &DbConn) -> Result<Programs> {
let query_sql = "\ let query_sql = "\
SELECT p.id, p.name, p.enabled, p.schedule, ps.sequence SELECT p.id, p.name, p.enabled, p.schedule, ps.sequence
@ -121,7 +126,12 @@ FROM programs AS p
INNER JOIN program_sequences AS ps ON ps.program_id = p.id INNER JOIN program_sequences AS ps ON ps.program_id = p.id
WHERE p.id = ?1;"; WHERE p.id = ?1;";
let mut statement = conn.prepare_cached(query_sql)?; let mut statement = conn.prepare_cached(query_sql)?;
Ok(statement.query_row(params![id], from_sql)?) statement
.query_row(params![id], from_sql)
.map_err(|err| match err {
rusqlite::Error::QueryReturnedNoRows => NoSuchProgram(id).into(),
e => e.into(),
})
} }
pub fn update_program( pub fn update_program(
@ -137,8 +147,12 @@ UPDATE programs
enabled = ifnull(?3, enabled), enabled = ifnull(?3, enabled),
schedule = ifnull(?4, schedule) schedule = ifnull(?4, schedule)
WHERE id = ?1;"; WHERE id = ?1;";
conn.prepare_cached(update_sql)? let updated = conn
.prepare_cached(update_sql)?
.execute(&update_as_sql(id, prog))?; .execute(&update_as_sql(id, prog))?;
if updated == 0 {
return Err(NoSuchProgram(id).into());
}
if let Some(sequence) = &prog.sequence { if let Some(sequence) = &prog.sequence {
let clear_seq_sql = "\ let clear_seq_sql = "\
DELETE DELETE

2
sprinklers_mqtt/Cargo.toml

@ -14,7 +14,7 @@ actix = { version = "0.10.0", default-features = false }
eyre = "0.6.0" eyre = "0.6.0"
rumqttc = "0.1.0" rumqttc = "0.1.0"
tracing = "0.1.19" tracing = "0.1.19"
serde = { version = "1.0.116", features = ["derive"] } serde = { version = "1.0.116", features = ["derive", "rc"] }
serde_json = "1.0.57" serde_json = "1.0.57"
chrono = "0.4.15" chrono = "0.4.15"
num-traits = "0.2.12" num-traits = "0.2.12"

2
sprinklers_mqtt/src/actor.rs

@ -36,7 +36,7 @@ impl MqttActor {
}; };
let rid = request_value.rid; let rid = request_value.rid;
let request_fut = let request_fut =
serde_json::from_value::<request::Request>(request_value.rest).map(|mut request| { serde_json::from_value::<request::Request>(request_value.rest).map(|request| {
debug!(rid, "about to execute request: {:?}", request); debug!(rid, "about to execute request: {:?}", request);
request.execute(&mut self.request_context) request.execute(&mut self.request_context)
}); });

14
sprinklers_mqtt/src/request/mod.rs

@ -1,4 +1,4 @@
use sprinklers_actors::{program_runner::ProgramRunner, section_runner::SectionRunner}; use sprinklers_actors::{ProgramRunner, SectionRunner, StateManager};
use sprinklers_core::model::Sections; use sprinklers_core::model::Sections;
use futures_util::{ready, FutureExt}; use futures_util::{ready, FutureExt};
@ -13,6 +13,7 @@ pub struct RequestContext {
pub sections: Sections, pub sections: Sections,
pub section_runner: SectionRunner, pub section_runner: SectionRunner,
pub program_runner: ProgramRunner, pub program_runner: ProgramRunner,
pub state_manager: StateManager,
} }
type BoxFuture<Output> = Pin<Box<dyn Future<Output = Output>>>; type BoxFuture<Output> = Pin<Box<dyn Future<Output = Output>>>;
@ -190,11 +191,12 @@ type RequestFuture<Ok = ResponseValue> = BoxFuture<RequestResult<Ok>>;
trait IRequest { trait IRequest {
type Response: Serialize; type Response: Serialize;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response>; fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response>;
fn exec_erased(&mut self, ctx: &mut RequestContext) -> RequestFuture fn exec_erased(self, ctx: &mut RequestContext) -> RequestFuture
where where
Self::Response: 'static, Self::Response: 'static,
Self: Sized,
{ {
// TODO: figure out how to get rid of this nested box // TODO: figure out how to get rid of this nested box
Box::pin(ErasedRequestFuture(self.exec(ctx))) Box::pin(ErasedRequestFuture(self.exec(ctx)))
@ -263,12 +265,13 @@ pub enum Request {
PauseSectionRunner(sections::PauseSectionRunnerRequest), PauseSectionRunner(sections::PauseSectionRunnerRequest),
RunProgram(programs::RunProgramRequest), RunProgram(programs::RunProgramRequest),
CancelProgram(programs::CancelProgramRequest), CancelProgram(programs::CancelProgramRequest),
UpdateProgram(programs::UpdateProgramRequest),
} }
impl IRequest for Request { impl IRequest for Request {
type Response = ResponseValue; type Response = ResponseValue;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture { fn exec(self, ctx: &mut RequestContext) -> RequestFuture {
match self { match self {
Request::RunSection(req) => req.exec_erased(ctx), Request::RunSection(req) => req.exec_erased(ctx),
Request::CancelSection(req) => req.exec_erased(ctx), Request::CancelSection(req) => req.exec_erased(ctx),
@ -276,12 +279,13 @@ impl IRequest for Request {
Request::PauseSectionRunner(req) => req.exec_erased(ctx), Request::PauseSectionRunner(req) => req.exec_erased(ctx),
Request::RunProgram(req) => req.exec_erased(ctx), Request::RunProgram(req) => req.exec_erased(ctx),
Request::CancelProgram(req) => req.exec_erased(ctx), Request::CancelProgram(req) => req.exec_erased(ctx),
Request::UpdateProgram(req) => req.exec_erased(ctx),
} }
} }
} }
impl Request { impl Request {
pub fn execute(&mut self, ctx: &mut RequestContext) -> impl Future<Output = Response> { pub fn execute(self, ctx: &mut RequestContext) -> impl Future<Output = Response> {
self.exec(ctx).map(Response::from) self.exec(ctx).map(Response::from)
} }
} }

50
sprinklers_mqtt/src/request/programs.rs

@ -1,6 +1,6 @@
use super::*; use super::*;
use sprinklers_actors::program_runner::ProgramRunnerError; use sprinklers_actors::{program_runner::Error, state_manager::StateError};
use sprinklers_core::model::ProgramId; use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData};
use eyre::WrapErr; use eyre::WrapErr;
@ -13,7 +13,7 @@ pub struct RunProgramRequest {
impl IRequest for RunProgramRequest { impl IRequest for RunProgramRequest {
type Response = ResponseMessage; type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut program_runner = ctx.program_runner.clone(); let mut program_runner = ctx.program_runner.clone();
let program_id = self.program_id; let program_id = self.program_id;
Box::pin(async move { Box::pin(async move {
@ -22,7 +22,7 @@ impl IRequest for RunProgramRequest {
"running program '{}'", "running program '{}'",
program.name program.name
))), ))),
Err(e @ ProgramRunnerError::InvalidProgramId(_)) => Err(RequestError::with_name( Err(e @ Error::InvalidProgramId(_)) => Err(RequestError::with_name(
ErrorCode::NoSuchProgram, ErrorCode::NoSuchProgram,
e, e,
"program", "program",
@ -42,7 +42,7 @@ pub struct CancelProgramRequest {
impl IRequest for CancelProgramRequest { impl IRequest for CancelProgramRequest {
type Response = ResponseMessage; type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut program_runner = ctx.program_runner.clone(); let mut program_runner = ctx.program_runner.clone();
let program_id = self.program_id; let program_id = self.program_id;
Box::pin(async move { Box::pin(async move {
@ -64,3 +64,43 @@ impl IRequest for CancelProgramRequest {
}) })
} }
} }
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateProgramRequest {
program_id: ProgramId,
data: ProgramUpdateData,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateProgramResponse {
message: String,
data: ProgramRef,
}
impl IRequest for UpdateProgramRequest {
type Response = UpdateProgramResponse;
fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut state_manager = ctx.state_manager.clone();
Box::pin(async move {
let new_program = state_manager
.update_program(self.program_id, self.data)
.await
.map_err(|err| match err {
e @ StateError::NoSuchProgram(_) => RequestError::with_name_and_cause(
ErrorCode::NoSuchProgram,
"could not update program",
"program",
e,
),
e => RequestError::from(eyre::Report::from(e)),
})?;
Ok(UpdateProgramResponse {
message: format!("updated program '{}'", new_program.name),
data: new_program,
})
})
}
}

11
sprinklers_mqtt/src/request/sections.rs

@ -36,7 +36,7 @@ pub struct RunSectionResponse {
impl IRequest for RunSectionRequest { impl IRequest for RunSectionRequest {
type Response = RunSectionResponse; type Response = RunSectionResponse;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone(); let mut section_runner = ctx.section_runner.clone();
let section = self.section_id.get_section(&ctx.sections); let section = self.section_id.get_section(&ctx.sections);
let duration = self.duration; let duration = self.duration;
@ -69,7 +69,7 @@ pub struct CancelSectionResponse {
impl IRequest for CancelSectionRequest { impl IRequest for CancelSectionRequest {
type Response = CancelSectionResponse; type Response = CancelSectionResponse;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone(); let mut section_runner = ctx.section_runner.clone();
let section = self.section_id.get_section(&ctx.sections); let section = self.section_id.get_section(&ctx.sections);
Box::pin(async move { Box::pin(async move {
@ -104,12 +104,11 @@ pub struct CancelSectionRunIdResponse {
impl IRequest for CancelSectionRunIdRequest { impl IRequest for CancelSectionRunIdRequest {
type Response = ResponseMessage; type Response = ResponseMessage;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone(); let mut section_runner = ctx.section_runner.clone();
let run_id = self.run_id.clone();
Box::pin(async move { Box::pin(async move {
let cancelled = section_runner let cancelled = section_runner
.cancel_run(run_id) .cancel_run(self.run_id)
.await .await
.wrap_err("could not cancel section run")?; .wrap_err("could not cancel section run")?;
if cancelled { if cancelled {
@ -140,7 +139,7 @@ pub struct PauseSectionRunnerResponse {
impl IRequest for PauseSectionRunnerRequest { impl IRequest for PauseSectionRunnerRequest {
type Response = PauseSectionRunnerResponse; type Response = PauseSectionRunnerResponse;
fn exec(&mut self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> { fn exec(self, ctx: &mut RequestContext) -> RequestFuture<Self::Response> {
let mut section_runner = ctx.section_runner.clone(); let mut section_runner = ctx.section_runner.clone();
let paused = self.paused; let paused = self.paused;
Box::pin(async move { Box::pin(async move {

66
sprinklers_mqtt/src/update_listener.rs

@ -5,11 +5,22 @@ use sprinklers_actors::{
}; };
use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler}; use actix::{fut::wrap_future, Actor, ActorContext, Addr, AsyncContext, Handler, StreamHandler};
use tokio::sync::broadcast; use sprinklers_core::model::Programs;
use tokio::sync::{broadcast, watch};
use tracing::{trace, warn}; use tracing::{trace, warn};
struct UpdateListenerActor { struct UpdateListenerActor {
mqtt_interface: MqttInterface, mqtt_interface: MqttInterface,
has_published_program_states: bool,
}
impl UpdateListenerActor {
fn new(mqtt_interface: MqttInterface) -> Self {
Self {
mqtt_interface,
has_published_program_states: false,
}
}
} }
impl Actor for UpdateListenerActor { impl Actor for UpdateListenerActor {
@ -101,6 +112,35 @@ impl StreamHandler<SecRunnerState> for UpdateListenerActor {
} }
} }
impl StreamHandler<Programs> for UpdateListenerActor {
fn handle(&mut self, programs: Programs, ctx: &mut Self::Context) {
let mut mqtt_interface = self.mqtt_interface.clone();
let has_published_program_states = self.has_published_program_states;
self.has_published_program_states = true;
let fut = async move {
if let Err(err) = mqtt_interface.publish_programs(&programs).await {
warn!("could not publish programs: {:?}", err);
}
// Some what of a hack
// Initialize program running states to false the first time we
// receive programs
if !has_published_program_states {
for program_id in programs.keys() {
if let Err(err) = mqtt_interface
.publish_program_running(*program_id, false)
.await
{
warn!("could not publish program running: {:?}", err);
}
}
}
};
ctx.spawn(wrap_future(fut));
}
}
#[derive(actix::Message)] #[derive(actix::Message)]
#[rtype(result = "()")] #[rtype(result = "()")]
struct Quit; struct Quit;
@ -143,13 +183,19 @@ impl Listenable<UpdateListenerActor> for SectionEventRecv {
} }
} }
impl Listenable<UpdateListenerActor> for ProgramEventRecv { impl Listenable<UpdateListenerActor> for SecRunnerStateRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) { fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self); ctx.add_stream(self);
} }
} }
impl Listenable<UpdateListenerActor> for SecRunnerStateRecv { impl Listenable<UpdateListenerActor> for watch::Receiver<Programs> {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self);
}
}
impl Listenable<UpdateListenerActor> for ProgramEventRecv {
fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) { fn listen(self, ctx: &mut <UpdateListenerActor as Actor>::Context) {
ctx.add_stream(self); ctx.add_stream(self);
} }
@ -161,7 +207,7 @@ pub struct UpdateListener {
impl UpdateListener { impl UpdateListener {
pub fn start(mqtt_interface: MqttInterface) -> Self { pub fn start(mqtt_interface: MqttInterface) -> Self {
let addr = UpdateListenerActor { mqtt_interface }.start(); let addr = UpdateListenerActor::new(mqtt_interface).start();
Self { addr } Self { addr }
} }
@ -176,14 +222,18 @@ impl UpdateListener {
self.listen(section_events); self.listen(section_events);
} }
pub fn listen_program_events(&mut self, program_events: ProgramEventRecv) {
self.listen(program_events);
}
pub fn listen_section_runner(&mut self, sec_runner_state_recv: SecRunnerStateRecv) { pub fn listen_section_runner(&mut self, sec_runner_state_recv: SecRunnerStateRecv) {
self.listen(sec_runner_state_recv); self.listen(sec_runner_state_recv);
} }
pub fn listen_programs(&mut self, programs: watch::Receiver<Programs>) {
self.listen(programs);
}
pub fn listen_program_events(&mut self, program_events: ProgramEventRecv) {
self.listen(program_events);
}
pub async fn quit(self) -> eyre::Result<()> { pub async fn quit(self) -> eyre::Result<()> {
Ok(self.addr.send(Quit).await?) Ok(self.addr.send(Quit).await?)
} }

1
sprinklers_rs/Cargo.toml

@ -18,6 +18,7 @@ tokio = "0.2.22"
tracing = { version = "0.1.19", features = ["log"] } tracing = { version = "0.1.19", features = ["log"] }
actix = { version = "0.10.0", default-features = false } actix = { version = "0.10.0", default-features = false }
actix-rt = "1.1.1" actix-rt = "1.1.1"
chrono = "0.4.19"
[dependencies.tracing-subscriber] [dependencies.tracing-subscriber]
version = "0.2.11" version = "0.2.11"

21
sprinklers_rs/src/main.rs

@ -2,6 +2,7 @@
#![warn(clippy::print_stdout)] #![warn(clippy::print_stdout)]
// mod option_future; // mod option_future;
mod state_manager;
use sprinklers_actors as actors; use sprinklers_actors as actors;
use sprinklers_core::section_interface::MockSectionInterface; use sprinklers_core::section_interface::MockSectionInterface;
@ -24,9 +25,9 @@ async fn main() -> Result<()> {
info!("Starting sprinklers_rs..."); info!("Starting sprinklers_rs...");
color_eyre::install()?; color_eyre::install()?;
let conn = database::setup_db()?; let db_conn = database::setup_db()?;
let sections = database::query_sections(&conn)?; let sections = database::query_sections(&db_conn)?;
for sec in sections.values() { for sec in sections.values() {
debug!(section = debug(&sec), "read section"); debug!(section = debug(&sec), "read section");
} }
@ -36,11 +37,9 @@ async fn main() -> Result<()> {
let mut section_runner = actors::SectionRunner::new(section_interface); let mut section_runner = actors::SectionRunner::new(section_interface);
let mut program_runner = actors::ProgramRunner::new(section_runner.clone()); let mut program_runner = actors::ProgramRunner::new(section_runner.clone());
let programs = database::query_programs(&conn)?; let state_manager = crate::state_manager::StateManagerThread::start(db_conn);
for prog in programs.values() { program_runner.listen_programs(state_manager.get_programs());
debug!(program = debug(&prog), "read program");
}
let mqtt_options = mqtt::Options { let mqtt_options = mqtt::Options {
broker_host: "localhost".into(), broker_host: "localhost".into(),
@ -53,12 +52,14 @@ async fn main() -> Result<()> {
sections: sections.clone(), sections: sections.clone(),
section_runner: section_runner.clone(), section_runner: section_runner.clone(),
program_runner: program_runner.clone(), program_runner: program_runner.clone(),
state_manager: state_manager.clone(),
}; };
let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context); let mut mqtt_interface = mqtt::MqttInterfaceTask::start(mqtt_options, request_context);
let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone()); let mut update_listener = mqtt::UpdateListener::start(mqtt_interface.clone());
update_listener.listen_section_events(section_runner.subscribe().await?); update_listener.listen_section_events(section_runner.subscribe().await?);
update_listener.listen_section_runner(section_runner.get_state_recv()); update_listener.listen_section_runner(section_runner.get_state_recv());
update_listener.listen_programs(state_manager.get_programs());
update_listener.listen_program_events(program_runner.subscribe().await?); update_listener.listen_program_events(program_runner.subscribe().await?);
program_runner.update_sections(sections.clone()).await?; program_runner.update_sections(sections.clone()).await?;
@ -69,13 +70,6 @@ async fn main() -> Result<()> {
.publish_section_state(*section_id, false) .publish_section_state(*section_id, false)
.await?; .await?;
} }
program_runner.update_programs(programs.clone()).await?;
for program_id in programs.keys() {
mqtt_interface
.publish_program_running(*program_id, false)
.await?;
}
mqtt_interface.publish_programs(&programs).await?;
info!("sprinklers_rs initialized"); info!("sprinklers_rs initialized");
@ -84,6 +78,7 @@ async fn main() -> Result<()> {
update_listener.quit().await?; update_listener.quit().await?;
mqtt_interface.quit().await?; mqtt_interface.quit().await?;
drop(state_manager);
program_runner.quit().await?; program_runner.quit().await?;
section_runner.quit().await?; section_runner.quit().await?;
actix::System::current().stop(); actix::System::current().stop();

103
sprinklers_rs/src/state_manager.rs

@ -0,0 +1,103 @@
use sprinklers_actors::{state_manager, StateManager};
use sprinklers_database::{self as database, DbConn};
use eyre::{eyre, WrapErr};
use sprinklers_core::model::{ProgramRef, Programs};
use tokio::{
runtime,
sync::{mpsc, watch},
};
use tracing::warn;
pub struct StateManagerThread {
db_conn: DbConn,
request_rx: mpsc::Receiver<state_manager::Request>,
programs_tx: watch::Sender<Programs>,
}
struct State {
programs: Programs,
}
impl StateManagerThread {
pub fn start(db_conn: DbConn) -> StateManager {
let (request_tx, request_rx) = mpsc::channel(8);
let (programs_tx, programs_rx) = watch::channel(Programs::default());
let task = StateManagerThread {
db_conn,
request_rx,
programs_tx,
};
let runtime_handle = runtime::Handle::current();
std::thread::Builder::new()
.name("sprinklers_rs::state_manager".into())
.spawn(move || task.run(runtime_handle))
.expect("could not start state_manager thread");
StateManager::new(request_tx, programs_rx)
}
fn broadcast_programs(&mut self, programs: Programs) {
if let Err(err) = self.programs_tx.broadcast(programs) {
warn!("could not broadcast programs: {}", err);
}
}
fn handle_request(
&mut self,
request: state_manager::Request,
state: &mut State,
) -> eyre::Result<()> {
use state_manager::Request;
match request {
Request::UpdateProgram {
id,
update,
resp_tx,
} => {
// HACK: would really like stable try notation
let res = (|| -> state_manager::Result<ProgramRef> {
let mut trans = self
.db_conn
.transaction()
.wrap_err("failed to start transaction")?;
database::update_program(&mut trans, id, &update).map_err(|err| {
if let Some(e) = err.downcast_ref::<database::NoSuchProgram>() {
state_manager::StateError::NoSuchProgram(e.0)
} else {
err.into()
}
})?;
let new_program: ProgramRef = database::query_program_by_id(&trans, id)?.into();
state.programs.insert(new_program.id, new_program.clone());
trans.commit().wrap_err("could not commit transaction")?;
self.broadcast_programs(state.programs.clone());
Ok(new_program)
})();
resp_tx
.send(res)
.map_err(|_| eyre!("could not respond to UpdateProgram"))?;
}
}
Ok(())
}
fn load_state(&mut self) -> eyre::Result<State> {
let programs =
database::query_programs(&self.db_conn).wrap_err("could not query programs")?;
Ok(State { programs })
}
fn run(mut self, runtime_handle: runtime::Handle) {
let mut state = self.load_state().expect("could not load initial state");
self.broadcast_programs(state.programs.clone());
while let Some(request) = runtime_handle.block_on(self.request_rx.recv()) {
if let Err(err) = self.handle_request(request, &mut state) {
warn!("error handling request: {:?}", err);
}
}
}
}
Loading…
Cancel
Save