DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

77b0f862c8fd398a04ec608db010160eca2f7d22 — Jonni Liljamo 4 days ago 91b9951
feat(master): implement webtomaster as ipc_server

* also split workertomaster to worker_server
7 files changed, 174 insertions(+), 16 deletions(-)

M Cargo.lock
M emerwen-master/Cargo.toml
A emerwen-master/src/ipc_server/auth.rs
A emerwen-master/src/ipc_server/mod.rs
M emerwen-master/src/main.rs
R emerwen-master/src/{server/auth.rs => worker_server/auth.rs}
R emerwen-master/src/{server/mod.rs => worker_server/mod.rs}
M Cargo.lock => Cargo.lock +1 -0
@@ 400,6 400,7 @@ dependencies = [
 "redb",
 "serde",
 "tokio",
 "tokio-stream",
 "tonic",
 "tonic-async-interceptor",
 "tracing",

M emerwen-master/Cargo.toml => emerwen-master/Cargo.toml +1 -0
@@ 16,6 16,7 @@ clap = { version = "4", features = ["derive"] }
redb = "2"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tokio-stream = { version = "0.1", features = ["net"] }
tonic = "0.12"
tonic-async-interceptor = "0.12"
tracing = "0.1"

A emerwen-master/src/ipc_server/auth.rs => emerwen-master/src/ipc_server/auth.rs +56 -0
@@ 0,0 1,56 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::{future::Future, pin::Pin};

use tonic::{Request, Status};
use tonic_async_interceptor::AsyncInterceptor;

#[derive(Clone)]
pub struct AuthInterceptor {
    auth_token: String,
}

impl AuthInterceptor {
    pub fn new(auth_token: String) -> Self {
        Self { auth_token }
    }

    fn authenticate(
        &self,
        request: Request<()>,
    ) -> impl Future<Output = Result<Request<()>, Status>> + Send + 'static {
        let auth_token = self.auth_token.clone();
        async move {
            let token = match request.metadata().get("authorization") {
                Some(value) => match value.to_str() {
                    Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "),
                    Err(_) => {
                        return Err(Status::invalid_argument(
                            "couldn't read bearer auth to string",
                        ))
                    }
                },
                None => return Err(Status::unauthenticated("request had no authorization")),
            };

            if token == auth_token {
                Ok(request)
            } else {
                Err(Status::unauthenticated("bad token"))
            }
        }
    }
}

impl AsyncInterceptor for AuthInterceptor {
    type Future = Pin<Box<dyn Future<Output = Result<Request<()>, Status>> + Send + 'static>>;

    fn call(&mut self, request: Request<()>) -> Self::Future {
        Box::pin(self.authenticate(request))
    }
}

A emerwen-master/src/ipc_server/mod.rs => emerwen-master/src/ipc_server/mod.rs +86 -0
@@ 0,0 1,86 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::path::Path;

use emerwen_protocol::emerwen::webtomaster::{
    web_to_master_server::{WebToMaster, WebToMasterServer},
    WorkersResponse,
};
use tokio::{net::UnixListener, sync::mpsc};
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{Request, Response, Status};
use tonic_async_interceptor::async_interceptor;
use tracing::{error, info, warn};

use crate::db::DatabaseHandle;

mod auth;
use auth::AuthInterceptor;

pub enum Message {}

pub async fn run(
    socket_path: &Path,
    db_handle: DatabaseHandle,
    mut rx: mpsc::Receiver<Message>,
) -> Result<(), Box<dyn std::error::Error>> {
    let _db_handle_two = db_handle.clone();
    tokio::spawn(async move {
        while let Some(msg) = rx.recv().await {
            match msg {}
        }
    });

    if socket_path.exists() {
        warn!(
            msg = "Socket path is not empty, trying to clear",
            ?socket_path
        );
        if let Err(err) = std::fs::remove_file(socket_path) {
            error!(msg = "Failed to clear socket file in the way", %err);
            return Err(err.into());
        }
        info!(msg = "Successfully cleared socket file in the way");
    }

    let uds = UnixListener::bind(socket_path)?;
    let uds_stream = UnixListenerStream::new(uds);

    tonic::transport::Server::builder()
        // TODO: auth token from elsewhere.
        .layer(async_interceptor(AuthInterceptor::new(
            "avain_perkele".into(),
        )))
        .add_service(WebToMasterServer::new(Server::new(db_handle.clone()).await))
        .serve_with_incoming(uds_stream)
        .await?;

    Ok(())
}

struct Server {
    db_handle: DatabaseHandle,
}

impl Server {
    async fn new(db_handle: DatabaseHandle) -> Self {
        Self { db_handle }
    }
}

#[tonic::async_trait]
impl WebToMaster for Server {
    async fn get_workers(
        &self,
        _request: Request<()>,
    ) -> Result<Response<WorkersResponse>, Status> {
        let workers = self.db_handle.read_workers().await;

        Ok(Response::new(WorkersResponse { workers }))
    }
}

M emerwen-master/src/main.rs => emerwen-master/src/main.rs +20 -6
@@ 16,7 16,8 @@ use tracing_subscriber::{prelude::*, EnvFilter};

mod db;
use db::DatabaseHandle;
mod server;
mod ipc_server;
mod worker_server;

#[derive(Parser)]
#[command(version)]


@@ 62,15 63,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

    let addr = "127.0.0.1:8000";

    info!(message = "Starting emerwen master...", %addr);
    info!(msg = "Starting emerwen master...", %addr);
    debug!("Hello, debug!");

    let db_handle = DatabaseHandle::new(std::path::Path::new(&args.database_file))?;
    let (server_tx, server_rx) = mpsc::channel::<server::ServerMessage>(8);

    let db_handle_server = db_handle.clone();
    let db_handle_worker_server = db_handle.clone();
    let (worker_server_tx, worker_server_rx) = mpsc::channel::<worker_server::Message>(8);
    tokio::spawn(async move {
        let _ = server::run(addr.into(), db_handle_server, server_rx).await;
        let _ = worker_server::run(addr.into(), db_handle_worker_server, worker_server_rx).await;
    });

    let db_handle_ipc_server = db_handle.clone();
    let (_ipc_server_tx, ipc_server_rx) = mpsc::channel::<ipc_server::Message>(8);
    tokio::spawn(async move {
        let _ = ipc_server::run(
            std::path::Path::new(&args.ipc_socket_location),
            db_handle_ipc_server,
            ipc_server_rx,
        )
        .await;
    });

    let _ = db_handle


@@ 105,7 117,9 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
        })
        .await;

    server_tx.send(server::ServerMessage::ReloadWorkers).await?;
    worker_server_tx
        .send(worker_server::Message::ReloadWorkers)
        .await?;

    loop {
        let _ = tokio::time::sleep(std::time::Duration::from_millis(5000)).await;

R emerwen-master/src/server/auth.rs => emerwen-master/src/worker_server/auth.rs +0 -0
R emerwen-master/src/server/mod.rs => emerwen-master/src/worker_server/mod.rs +10 -10
@@ 15,7 15,7 @@ use emerwen_protocol::emerwen::{
    },
};
use tokio::sync::{mpsc, RwLock};
use tonic::{transport::Server, Request, Response, Status};
use tonic::{Request, Response, Status};
use tonic_async_interceptor::async_interceptor;
use tracing::info;



@@ 29,14 29,14 @@ struct WorkerContext {
    worker_id: u32,
}

pub enum ServerMessage {
pub enum Message {
    ReloadWorkers,
}

pub async fn run(
    addr: String,
    db_handle: DatabaseHandle,
    mut server_rx: mpsc::Receiver<ServerMessage>,
    mut rx: mpsc::Receiver<Message>,
) -> Result<(), Box<dyn std::error::Error>> {
    let workers = Arc::new(RwLock::new(db_handle.read_workers().await));
    let workers_two = workers.clone();


@@ 44,9 44,9 @@ pub async fn run(
    let db_handle_two = db_handle.clone();
    tokio::spawn(async move {
        let workers = workers.clone();
        while let Some(msg) = server_rx.recv().await {
        while let Some(msg) = rx.recv().await {
            match msg {
                ServerMessage::ReloadWorkers => {
                Message::ReloadWorkers => {
                    let new_workers = db_handle_two.read_workers().await;
                    let mut write_guard = workers.write().await;
                    *write_guard = new_workers;


@@ 55,10 55,10 @@ pub async fn run(
        }
    });

    Server::builder()
    tonic::transport::Server::builder()
        .layer(async_interceptor(AuthInterceptor::new(db_handle.clone())))
        .add_service(WorkerToMasterServer::new(
            MasterServer::new(db_handle.clone(), workers_two).await,
            Server::new(db_handle.clone(), workers_two).await,
        ))
        .serve(addr.parse()?)
        .await?;


@@ 66,12 66,12 @@ pub async fn run(
    Ok(())
}

struct MasterServer {
struct Server {
    _db_handle: DatabaseHandle,
    workers: Arc<RwLock<Vec<Worker>>>,
}

impl MasterServer {
impl Server {
    async fn new(db_handle: DatabaseHandle, workers: Arc<RwLock<Vec<Worker>>>) -> Self {
        Self {
            _db_handle: db_handle,


@@ 81,7 81,7 @@ impl MasterServer {
}

#[tonic::async_trait]
impl WorkerToMaster for MasterServer {
impl WorkerToMaster for Server {
    async fn get_targets(&self, request: Request<()>) -> Result<Response<TargetsResponse>, Status> {
        let worker_id = match request.extensions().get::<WorkerContext>() {
            Some(context) => context.worker_id,