From abcdf0d3b170901445527942b0a06d74cf4e46fc Mon Sep 17 00:00:00 2001 From: Jonni Liljamo Date: Mon, 11 Nov 2024 23:45:46 +0200 Subject: [PATCH] feat(master): hello i succeeded in async * start of a database with redb * actually check auth tokens * communication tomfoolery between places --- .gitignore | 1 + Cargo.lock | 39 +++++++ README.md | 2 - emerwen-master/Cargo.toml | 4 + emerwen-master/src/db.rs | 121 +++++++++++++++++++++ emerwen-master/src/main.rs | 168 ++++++++++-------------------- emerwen-master/src/server/auth.rs | 67 ++++++++++++ emerwen-master/src/server/mod.rs | 111 ++++++++++++++++++++ emerwen-master/src/worker.rs | 45 ++++++++ emerwen-protocol/Cargo.toml | 3 +- emerwen-protocol/build.rs | 16 ++- 11 files changed, 462 insertions(+), 115 deletions(-) create mode 100644 emerwen-master/src/db.rs create mode 100644 emerwen-master/src/server/auth.rs create mode 100644 emerwen-master/src/server/mod.rs create mode 100644 emerwen-master/src/worker.rs diff --git a/.gitignore b/.gitignore index 8fef9f4..79bd589 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ /.direnv/ /target/ +/tmp/ /.pre-commit-config.yaml diff --git a/Cargo.lock b/Cargo.lock index e64da4c..21d1a1e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -200,6 +200,15 @@ version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + [[package]] name = "bitflags" version = "2.6.0" @@ -385,10 +394,14 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" name = "emerwen-master" version = "0.1.0" dependencies = [ + "bincode", "clap", "emerwen-protocol", + "redb", + "serde", "tokio", "tonic", + "tonic-async-interceptor", "tracing", "tracing-subscriber", ] @@ -398,6 +411,7 @@ name = "emerwen-protocol" version = "0.1.0" dependencies = [ "prost", + "serde", "tonic", "tonic-build", ] @@ -1407,6 +1421,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redb" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84b1de48a7cf7ba193e81e078d17ee2b786236eed1d3f7c60f8a09545efc4925" +dependencies = [ + "libc", +] + [[package]] name = "redox_syscall" version = "0.5.7" @@ -1975,6 +1998,22 @@ dependencies = [ "tracing", ] +[[package]] +name = "tonic-async-interceptor" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86e7b555f9b026568d4713307878c5bf7540cc2f6acd8118b332fbf75d07bb7b" +dependencies = [ + "bytes", + "http", + "http-body", + "http-body-util", + "pin-project", + "tonic", + "tower-layer", + "tower-service", +] + [[package]] name = "tonic-build" version = "0.12.3" diff --git a/README.md b/README.md index c1c10d3..4ccee0c 100644 --- a/README.md +++ b/README.md @@ -14,8 +14,6 @@ tldr: gRPC was easier lmao. ## What's next (short-term TODO) -- Actually checking auth tokens. -- Master database (redb?) - Master web UI (leptos + leptos_oidc) for configuring workers. ## Future (non-critical TODO) diff --git a/emerwen-master/Cargo.toml b/emerwen-master/Cargo.toml index d8168b9..2c58831 100644 --- a/emerwen-master/Cargo.toml +++ b/emerwen-master/Cargo.toml @@ -11,8 +11,12 @@ repository.workspace = true [dependencies] emerwen-protocol = { path = "../emerwen-protocol" } +bincode = "1" clap = { version = "4", features = ["derive"] } +redb = "2" +serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["full"] } tonic = "0.12" +tonic-async-interceptor = "0.12" tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/emerwen-master/src/db.rs b/emerwen-master/src/db.rs new file mode 100644 index 0000000..ad19b7e --- /dev/null +++ b/emerwen-master/src/db.rs @@ -0,0 +1,121 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use std::path::Path; + +use redb::{Database, ReadableTable, TableDefinition}; +use tokio::sync::{mpsc, oneshot}; +use tracing::warn; + +use crate::worker::Worker; + +const WORKER_TABLE: TableDefinition = TableDefinition::new("workers"); + +pub enum DBMessage { + ReadWorkers { + respond_to: oneshot::Sender>, + }, + WriteWorker { + respond_to: oneshot::Sender, + worker: Worker, + }, +} + +pub struct DatabaseInstance { + receiver: mpsc::Receiver, + database: Database, +} + +impl DatabaseInstance { + fn new(receiver: mpsc::Receiver, database: Database) -> DatabaseInstance { + DatabaseInstance { receiver, database } + } + + async fn run(&mut self) -> Result<(), Box> { + while let Some(msg) = self.receiver.recv().await { + match msg { + DBMessage::ReadWorkers { respond_to } => { + // FIXME: handle errors, offload to a generic function + let read_txn = self.database.begin_read().unwrap(); + let table = match read_txn.open_table(WORKER_TABLE) { + Ok(table) => Some(table), + Err(redb::TableError::TableDoesNotExist(table)) => { + warn!(message="Table does not exist, assuming empty", %table); + None + } + Err(_) => { + // FIXME: handle + None + } + }; + + let mut workers = vec![]; + if let Some(table) = table { + for a in table.iter().unwrap() { + workers.push(a.unwrap().1.value()); + } + } + + let _ = respond_to.send(workers); + } + DBMessage::WriteWorker { respond_to, worker } => { + let write_txn = self.database.begin_write().unwrap(); + { + let mut table = write_txn.open_table(WORKER_TABLE).unwrap(); + table.insert(worker.id, worker)?; + } + write_txn.commit()?; + + let _ = respond_to.send(true); + } + } + } + + Ok(()) + } +} + +#[derive(Clone)] +pub struct DatabaseHandle { + sender: mpsc::Sender, +} + +impl DatabaseHandle { + pub fn new(database_file: &Path) -> Result> { + let database = match database_file.exists() { + true => Database::open(database_file)?, + false => Database::create(database_file)?, + }; + + let (sender, receiver) = mpsc::channel(64); + let mut actor = DatabaseInstance::new(receiver, database); + tokio::spawn(async move { + let _ = actor.run().await; + }); + + Ok(DatabaseHandle { sender }) + } + + pub async fn read_workers(&self) -> Vec { + let (tx, rx) = oneshot::channel(); + let msg = DBMessage::ReadWorkers { respond_to: tx }; + + let _ = self.sender.send(msg).await; + rx.await.expect("actor task has been killed") // FIXME: handle + } + + pub async fn write_worker(&self, worker: Worker) -> bool { + let (tx, rx) = oneshot::channel(); + let msg = DBMessage::WriteWorker { + respond_to: tx, + worker, + }; + + let _ = self.sender.send(msg).await; + rx.await.expect("actor task has been killed") // FIXME: handle + } +} diff --git a/emerwen-master/src/main.rs b/emerwen-master/src/main.rs index 3d687de..7faea56 100644 --- a/emerwen-master/src/main.rs +++ b/emerwen-master/src/main.rs @@ -7,20 +7,29 @@ use clap::Parser; use emerwen_protocol::{ - emerwen_protocol_server::{EmerwenProtocol, EmerwenProtocolServer}, target::{MethodGet, MethodPing}, - SetTargetStateRequest, Target, TargetsResponse, + Target, }; -use tonic::{transport::Server, Request, Response, Status}; +use tokio::sync::mpsc; use tracing::{debug, info, level_filters::LevelFilter}; use tracing_subscriber::{prelude::*, EnvFilter}; +mod db; +use db::DatabaseHandle; +mod server; +mod worker; +use worker::Worker; + #[derive(Parser)] #[command(version)] struct Args { /// Enable debug logging #[arg(long)] debug: bool, + + /// Location of the database file + #[arg(long, default_value = "./tmp/db.redb")] + database_file: String, } #[tokio::main] @@ -50,114 +59,51 @@ async fn main() -> Result<(), Box> { info!(message = "Starting emerwen master...", %addr); debug!("Hello, debug!"); - let workers = vec![Worker { - id: 0, - auth_token: "avain_perkele".into(), - targets: vec![ - Target { - id: 0, - addr: "127.0.0.1".into(), - interval: 2000, - method: Some(MethodPing {}.into()), - }, - Target { - id: 1, - addr: "https://liljamo.com/".into(), - interval: 2000, - method: Some( - MethodGet { - ok_codes: vec![200], - } - .into(), - ), - }, - Target { - id: 2, - addr: "10.1.2.30".into(), - interval: 2000, - method: Some(MethodPing {}.into()), - }, - ], - }]; - - let server = MasterServer::new(workers); - - Server::builder() - .add_service(EmerwenProtocolServer::with_interceptor( - server, - move |req: Request<()>| { - match req.metadata().get("authorization") { - Some(_token) => { - // TODO: check - Ok(req) - } - None => Err(Status::unauthenticated("invalid auth")), - } - }, - )) - .serve(addr.parse()?) - .await?; - - Ok(()) -} - -struct MasterServer { - workers: Vec, -} - -impl MasterServer { - fn new(workers: Vec) -> MasterServer { - MasterServer { workers } + let db_handle = DatabaseHandle::new(std::path::Path::new(&args.database_file))?; + let (server_tx, server_rx) = mpsc::channel::(8); + + let db_handle_server = db_handle.clone(); + tokio::spawn(async move { + let _ = server::run(addr.into(), db_handle_server, server_rx).await; + }); + + let _ = db_handle + .write_worker(Worker { + id: 0, + auth_token: "avain_perkele".into(), + targets: vec![ + Target { + id: 0, + addr: "127.0.0.1".into(), + interval: 2000, + method: Some(MethodPing {}.into()), + }, + Target { + id: 1, + addr: "https://liljamo.com/".into(), + interval: 2000, + method: Some( + MethodGet { + ok_codes: vec![200], + } + .into(), + ), + }, + Target { + id: 2, + addr: "10.1.2.30".into(), + interval: 2000, + method: Some(MethodPing {}.into()), + }, + ], + }) + .await; + + server_tx.send(server::ServerMessage::ReloadWorkers).await?; + + loop { + let _ = tokio::time::sleep(std::time::Duration::from_millis(5000)).await; } -} - -#[tonic::async_trait] -impl EmerwenProtocol for MasterServer { - async fn get_targets(&self, request: Request<()>) -> Result, Status> { - let token = match request.metadata().get("authorization") { - Some(value) => match value.to_str() { - Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "), - Err(_) => { - return Err(Status::invalid_argument( - "couldn't read bearer auth to string", - )) - } - }, - None => { - return Err(Status::unauthenticated( - "request had no authorization, when it should", - )) - } - }; - - let targets = match self - .workers - .iter() - .find(|worker| *worker.auth_token == *token) - { - Some(worker) => worker.targets.clone(), - None => { - return Err(Status::invalid_argument( - "no worker exists with the auth token", - )) - } - }; - - Ok(Response::new(TargetsResponse { targets })) - } - - async fn set_target_state( - &self, - request: Request, - ) -> Result, Status> { - info!("{:?}", request.into_inner()); - - Ok(Response::new(())) - } -} -struct Worker { - pub id: u32, - pub auth_token: String, - pub targets: Vec, + //Ok(()) } diff --git a/emerwen-master/src/server/auth.rs b/emerwen-master/src/server/auth.rs new file mode 100644 index 0000000..eb5acd0 --- /dev/null +++ b/emerwen-master/src/server/auth.rs @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use std::{future::Future, pin::Pin}; + +use tonic::{Request, Status}; +use tonic_async_interceptor::AsyncInterceptor; + +use super::WorkerContext; +use crate::db::DatabaseHandle; + +#[derive(Clone)] +pub struct AuthInterceptor { + db_handle: DatabaseHandle, +} + +impl AuthInterceptor { + pub fn new(db_handle: DatabaseHandle) -> Self { + Self { db_handle } + } + + fn authenticate( + &self, + mut request: Request<()>, + ) -> impl Future, Status>> + Send + 'static { + let db_handle = self.db_handle.clone(); + async move { + let token = match request.metadata().get("authorization") { + Some(value) => match value.to_str() { + Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "), + Err(_) => { + return Err(Status::invalid_argument( + "couldn't read bearer auth to string", + )) + } + }, + None => return Err(Status::unauthenticated("request had no authorization")), + }; + + let workers = db_handle.read_workers().await; + let worker_id = match workers + .iter() + .filter(|w| w.auth_token == token) + .collect::>() + .first() + { + Some(worker) => worker.id, + None => return Err(Status::unauthenticated("bad token")), + }; + + request.extensions_mut().insert(WorkerContext { worker_id }); + Ok(request) + } + } +} + +impl AsyncInterceptor for AuthInterceptor { + type Future = Pin, Status>> + Send + 'static>>; + + fn call(&mut self, request: Request<()>) -> Self::Future { + Box::pin(self.authenticate(request)) + } +} diff --git a/emerwen-master/src/server/mod.rs b/emerwen-master/src/server/mod.rs new file mode 100644 index 0000000..41a2122 --- /dev/null +++ b/emerwen-master/src/server/mod.rs @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use std::sync::Arc; + +use emerwen_protocol::{ + emerwen_protocol_server::{EmerwenProtocol, EmerwenProtocolServer}, + SetTargetStateRequest, TargetsResponse, +}; +use tokio::sync::{mpsc, RwLock}; +use tonic::{transport::Server, Request, Response, Status}; +use tonic_async_interceptor::async_interceptor; +use tracing::info; + +use crate::{db::DatabaseHandle, worker::Worker}; + +mod auth; +use auth::AuthInterceptor; + +#[derive(Clone)] +struct WorkerContext { + worker_id: u32, +} + +pub enum ServerMessage { + ReloadWorkers, +} + +pub async fn run( + addr: String, + db_handle: DatabaseHandle, + mut server_rx: mpsc::Receiver, +) -> Result<(), Box> { + let workers = Arc::new(RwLock::new(db_handle.read_workers().await)); + let workers_two = workers.clone(); + + let db_handle_two = db_handle.clone(); + tokio::spawn(async move { + let workers = workers.clone(); + while let Some(msg) = server_rx.recv().await { + match msg { + ServerMessage::ReloadWorkers => { + let new_workers = db_handle_two.read_workers().await; + let mut write_guard = workers.write().await; + *write_guard = new_workers; + } + } + } + }); + + Server::builder() + .layer(async_interceptor(AuthInterceptor::new(db_handle.clone()))) + .add_service(EmerwenProtocolServer::new( + MasterServer::new(db_handle.clone(), workers_two).await, + )) + .serve(addr.parse()?) + .await?; + + Ok(()) +} + +struct MasterServer { + _db_handle: DatabaseHandle, + workers: Arc>>, +} + +impl MasterServer { + async fn new(db_handle: DatabaseHandle, workers: Arc>>) -> Self { + Self { + _db_handle: db_handle, + workers, + } + } +} + +#[tonic::async_trait] +impl EmerwenProtocol for MasterServer { + async fn get_targets(&self, request: Request<()>) -> Result, Status> { + let worker_id = match request.extensions().get::() { + Some(context) => context.worker_id, + None => return Err(Status::internal("no worker id in request")), + }; + + let targets = match self + .workers + .read() + .await + .iter() + .find(|worker| worker.id == worker_id) + { + Some(worker) => worker.targets.clone(), + None => return Err(Status::internal("no worker exists with request worker id")), + }; + + Ok(Response::new(TargetsResponse { targets })) + } + + async fn set_target_state( + &self, + request: Request, + ) -> Result, Status> { + // TODO: Implement. Follow the same pattern as above. + info!("{:?}", request.into_inner()); + + Ok(Response::new(())) + } +} diff --git a/emerwen-master/src/worker.rs b/emerwen-master/src/worker.rs new file mode 100644 index 0000000..937489f --- /dev/null +++ b/emerwen-master/src/worker.rs @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use emerwen_protocol::Target; +use redb::{TypeName, Value}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct Worker { + pub id: u32, + pub auth_token: String, + pub targets: Vec, +} + +impl Value for Worker { + type SelfType<'a> = Worker; + type AsBytes<'a> = Vec; + + fn fixed_width() -> Option { + None + } + + fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a> + where + Self: 'a, + { + bincode::deserialize(data).unwrap() + } + + fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a> + where + Self: 'a, + Self: 'b, + { + bincode::serialize(value).unwrap() + } + + fn type_name() -> TypeName { + TypeName::new("emerwen_master_worker") + } +} diff --git a/emerwen-protocol/Cargo.toml b/emerwen-protocol/Cargo.toml index 8a4fe76..6d5ca11 100644 --- a/emerwen-protocol/Cargo.toml +++ b/emerwen-protocol/Cargo.toml @@ -9,8 +9,9 @@ publish.workspace = true repository.workspace = true [dependencies] -tonic = "0.12" prost = "0.13" +serde = "1" +tonic = "0.12" [build-dependencies] tonic-build = "0.12" diff --git a/emerwen-protocol/build.rs b/emerwen-protocol/build.rs index bcb9483..9dc3ad9 100644 --- a/emerwen-protocol/build.rs +++ b/emerwen-protocol/build.rs @@ -1,4 +1,18 @@ fn main() -> Result<(), Box> { - tonic_build::compile_protos("proto/emerwen.proto")?; + tonic_build::configure() + .type_attribute("Target", "#[derive(serde::Deserialize, serde::Serialize)]") + .type_attribute( + "Target.method", + "#[derive(serde::Deserialize, serde::Serialize)]", + ) + .type_attribute( + "Target.MethodPing", + "#[derive(serde::Deserialize, serde::Serialize)]", + ) + .type_attribute( + "Target.MethodGET", + "#[derive(serde::Deserialize, serde::Serialize)]", + ) + .compile_protos(&["proto/emerwen.proto"], &["proto"])?; Ok(()) } -- 2.44.1