diff --git a/sprinklers_actors/Cargo.toml b/sprinklers_actors/Cargo.toml index b8b850d..bf30165 100644 --- a/sprinklers_actors/Cargo.toml +++ b/sprinklers_actors/Cargo.toml @@ -14,6 +14,7 @@ tracing = "0.1.19" chrono = { version = "0.4.15" } serde = { version = "1.0.116", features = ["derive"] } im = "15.0.0" +eyre = "0.6.0" [dependencies.tokio] version = "0.2.22" diff --git a/sprinklers_actors/src/lib.rs b/sprinklers_actors/src/lib.rs index 59edff5..8dcc832 100644 --- a/sprinklers_actors/src/lib.rs +++ b/sprinklers_actors/src/lib.rs @@ -1,8 +1,10 @@ pub mod program_runner; pub mod section_runner; +pub mod state_manager; #[cfg(test)] mod trace_listeners; pub use program_runner::ProgramRunner; pub use section_runner::SectionRunner; +pub use state_manager::StateManager; diff --git a/sprinklers_actors/src/state_manager.rs b/sprinklers_actors/src/state_manager.rs new file mode 100644 index 0000000..805f10e --- /dev/null +++ b/sprinklers_actors/src/state_manager.rs @@ -0,0 +1,50 @@ +use eyre::Result; +use sprinklers_core::model::{ProgramId, ProgramRef, ProgramUpdateData, Programs}; +use tokio::sync::{mpsc, oneshot, watch}; + +#[derive(Debug)] +pub enum Request { + UpdateProgram { + id: ProgramId, + update: ProgramUpdateData, + resp_tx: oneshot::Sender>, + }, +} + +#[derive(Clone)] +pub struct StateManager { + request_tx: mpsc::Sender, + programs_watch: watch::Receiver, +} + +impl StateManager { + pub fn new( + request_tx: mpsc::Sender, + programs_watch: watch::Receiver, + ) -> Self { + Self { + request_tx, + programs_watch, + } + } + + pub async fn update_program( + &mut self, + id: ProgramId, + update: ProgramUpdateData, + ) -> Result { + let (resp_tx, resp_rx) = oneshot::channel(); + self.request_tx + .send(Request::UpdateProgram { + id, + update, + resp_tx, + }) + .await?; + resp_rx.await? + } + + pub fn get_programs(&self) -> watch::Receiver { + self.programs_watch.clone() + } +}