From 77b0f862c8fd398a04ec608db010160eca2f7d22 Mon Sep 17 00:00:00 2001 From: Jonni Liljamo Date: Mon, 18 Nov 2024 15:45:00 +0200 Subject: [PATCH] feat(master): implement webtomaster as ipc_server * also split workertomaster to worker_server --- Cargo.lock | 1 + emerwen-master/Cargo.toml | 1 + emerwen-master/src/ipc_server/auth.rs | 56 ++++++++++++ emerwen-master/src/ipc_server/mod.rs | 86 +++++++++++++++++++ emerwen-master/src/main.rs | 26 ++++-- .../src/{server => worker_server}/auth.rs | 0 .../src/{server => worker_server}/mod.rs | 20 ++--- 7 files changed, 174 insertions(+), 16 deletions(-) create mode 100644 emerwen-master/src/ipc_server/auth.rs create mode 100644 emerwen-master/src/ipc_server/mod.rs rename emerwen-master/src/{server => worker_server}/auth.rs (100%) rename emerwen-master/src/{server => worker_server}/mod.rs (86%) diff --git a/Cargo.lock b/Cargo.lock index 21d1a1e..f5da97d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -400,6 +400,7 @@ dependencies = [ "redb", "serde", "tokio", + "tokio-stream", "tonic", "tonic-async-interceptor", "tracing", diff --git a/emerwen-master/Cargo.toml b/emerwen-master/Cargo.toml index 2c58831..f5b94b9 100644 --- a/emerwen-master/Cargo.toml +++ b/emerwen-master/Cargo.toml @@ -16,6 +16,7 @@ clap = { version = "4", features = ["derive"] } redb = "2" serde = { version = "1", features = ["derive"] } tokio = { version = "1", features = ["full"] } +tokio-stream = { version = "0.1", features = ["net"] } tonic = "0.12" tonic-async-interceptor = "0.12" tracing = "0.1" diff --git a/emerwen-master/src/ipc_server/auth.rs b/emerwen-master/src/ipc_server/auth.rs new file mode 100644 index 0000000..f7ac637 --- /dev/null +++ b/emerwen-master/src/ipc_server/auth.rs @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use std::{future::Future, pin::Pin}; + +use tonic::{Request, Status}; +use tonic_async_interceptor::AsyncInterceptor; + +#[derive(Clone)] +pub struct AuthInterceptor { + auth_token: String, +} + +impl AuthInterceptor { + pub fn new(auth_token: String) -> Self { + Self { auth_token } + } + + fn authenticate( + &self, + request: Request<()>, + ) -> impl Future, Status>> + Send + 'static { + let auth_token = self.auth_token.clone(); + async move { + let token = match request.metadata().get("authorization") { + Some(value) => match value.to_str() { + Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "), + Err(_) => { + return Err(Status::invalid_argument( + "couldn't read bearer auth to string", + )) + } + }, + None => return Err(Status::unauthenticated("request had no authorization")), + }; + + if token == auth_token { + Ok(request) + } else { + Err(Status::unauthenticated("bad token")) + } + } + } +} + +impl AsyncInterceptor for AuthInterceptor { + type Future = Pin, Status>> + Send + 'static>>; + + fn call(&mut self, request: Request<()>) -> Self::Future { + Box::pin(self.authenticate(request)) + } +} diff --git a/emerwen-master/src/ipc_server/mod.rs b/emerwen-master/src/ipc_server/mod.rs new file mode 100644 index 0000000..6c90d1c --- /dev/null +++ b/emerwen-master/src/ipc_server/mod.rs @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2024 Jonni Liljamo + * + * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for + * more information. + */ + +use std::path::Path; + +use emerwen_protocol::emerwen::webtomaster::{ + web_to_master_server::{WebToMaster, WebToMasterServer}, + WorkersResponse, +}; +use tokio::{net::UnixListener, sync::mpsc}; +use tokio_stream::wrappers::UnixListenerStream; +use tonic::{Request, Response, Status}; +use tonic_async_interceptor::async_interceptor; +use tracing::{error, info, warn}; + +use crate::db::DatabaseHandle; + +mod auth; +use auth::AuthInterceptor; + +pub enum Message {} + +pub async fn run( + socket_path: &Path, + db_handle: DatabaseHandle, + mut rx: mpsc::Receiver, +) -> Result<(), Box> { + let _db_handle_two = db_handle.clone(); + tokio::spawn(async move { + while let Some(msg) = rx.recv().await { + match msg {} + } + }); + + if socket_path.exists() { + warn!( + msg = "Socket path is not empty, trying to clear", + ?socket_path + ); + if let Err(err) = std::fs::remove_file(socket_path) { + error!(msg = "Failed to clear socket file in the way", %err); + return Err(err.into()); + } + info!(msg = "Successfully cleared socket file in the way"); + } + + let uds = UnixListener::bind(socket_path)?; + let uds_stream = UnixListenerStream::new(uds); + + tonic::transport::Server::builder() + // TODO: auth token from elsewhere. + .layer(async_interceptor(AuthInterceptor::new( + "avain_perkele".into(), + ))) + .add_service(WebToMasterServer::new(Server::new(db_handle.clone()).await)) + .serve_with_incoming(uds_stream) + .await?; + + Ok(()) +} + +struct Server { + db_handle: DatabaseHandle, +} + +impl Server { + async fn new(db_handle: DatabaseHandle) -> Self { + Self { db_handle } + } +} + +#[tonic::async_trait] +impl WebToMaster for Server { + async fn get_workers( + &self, + _request: Request<()>, + ) -> Result, Status> { + let workers = self.db_handle.read_workers().await; + + Ok(Response::new(WorkersResponse { workers })) + } +} diff --git a/emerwen-master/src/main.rs b/emerwen-master/src/main.rs index c912a5d..883a61e 100644 --- a/emerwen-master/src/main.rs +++ b/emerwen-master/src/main.rs @@ -16,7 +16,8 @@ use tracing_subscriber::{prelude::*, EnvFilter}; mod db; use db::DatabaseHandle; -mod server; +mod ipc_server; +mod worker_server; #[derive(Parser)] #[command(version)] @@ -62,15 +63,26 @@ async fn main() -> Result<(), Box> { let addr = "127.0.0.1:8000"; - info!(message = "Starting emerwen master...", %addr); + info!(msg = "Starting emerwen master...", %addr); debug!("Hello, debug!"); let db_handle = DatabaseHandle::new(std::path::Path::new(&args.database_file))?; - let (server_tx, server_rx) = mpsc::channel::(8); - let db_handle_server = db_handle.clone(); + let db_handle_worker_server = db_handle.clone(); + let (worker_server_tx, worker_server_rx) = mpsc::channel::(8); tokio::spawn(async move { - let _ = server::run(addr.into(), db_handle_server, server_rx).await; + let _ = worker_server::run(addr.into(), db_handle_worker_server, worker_server_rx).await; + }); + + let db_handle_ipc_server = db_handle.clone(); + let (_ipc_server_tx, ipc_server_rx) = mpsc::channel::(8); + tokio::spawn(async move { + let _ = ipc_server::run( + std::path::Path::new(&args.ipc_socket_location), + db_handle_ipc_server, + ipc_server_rx, + ) + .await; }); let _ = db_handle @@ -105,7 +117,9 @@ async fn main() -> Result<(), Box> { }) .await; - server_tx.send(server::ServerMessage::ReloadWorkers).await?; + worker_server_tx + .send(worker_server::Message::ReloadWorkers) + .await?; loop { let _ = tokio::time::sleep(std::time::Duration::from_millis(5000)).await; diff --git a/emerwen-master/src/server/auth.rs b/emerwen-master/src/worker_server/auth.rs similarity index 100% rename from emerwen-master/src/server/auth.rs rename to emerwen-master/src/worker_server/auth.rs diff --git a/emerwen-master/src/server/mod.rs b/emerwen-master/src/worker_server/mod.rs similarity index 86% rename from emerwen-master/src/server/mod.rs rename to emerwen-master/src/worker_server/mod.rs index 7da502a..3817fea 100644 --- a/emerwen-master/src/server/mod.rs +++ b/emerwen-master/src/worker_server/mod.rs @@ -15,7 +15,7 @@ use emerwen_protocol::emerwen::{ }, }; use tokio::sync::{mpsc, RwLock}; -use tonic::{transport::Server, Request, Response, Status}; +use tonic::{Request, Response, Status}; use tonic_async_interceptor::async_interceptor; use tracing::info; @@ -29,14 +29,14 @@ struct WorkerContext { worker_id: u32, } -pub enum ServerMessage { +pub enum Message { ReloadWorkers, } pub async fn run( addr: String, db_handle: DatabaseHandle, - mut server_rx: mpsc::Receiver, + mut rx: mpsc::Receiver, ) -> Result<(), Box> { let workers = Arc::new(RwLock::new(db_handle.read_workers().await)); let workers_two = workers.clone(); @@ -44,9 +44,9 @@ pub async fn run( let db_handle_two = db_handle.clone(); tokio::spawn(async move { let workers = workers.clone(); - while let Some(msg) = server_rx.recv().await { + while let Some(msg) = rx.recv().await { match msg { - ServerMessage::ReloadWorkers => { + Message::ReloadWorkers => { let new_workers = db_handle_two.read_workers().await; let mut write_guard = workers.write().await; *write_guard = new_workers; @@ -55,10 +55,10 @@ pub async fn run( } }); - Server::builder() + tonic::transport::Server::builder() .layer(async_interceptor(AuthInterceptor::new(db_handle.clone()))) .add_service(WorkerToMasterServer::new( - MasterServer::new(db_handle.clone(), workers_two).await, + Server::new(db_handle.clone(), workers_two).await, )) .serve(addr.parse()?) .await?; @@ -66,12 +66,12 @@ pub async fn run( Ok(()) } -struct MasterServer { +struct Server { _db_handle: DatabaseHandle, workers: Arc>>, } -impl MasterServer { +impl Server { async fn new(db_handle: DatabaseHandle, workers: Arc>>) -> Self { Self { _db_handle: db_handle, @@ -81,7 +81,7 @@ impl MasterServer { } #[tonic::async_trait] -impl WorkerToMaster for MasterServer { +impl WorkerToMaster for Server { async fn get_targets(&self, request: Request<()>) -> Result, Status> { let worker_id = match request.extensions().get::() { Some(context) => context.worker_id, -- 2.44.1