DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

abcdf0d3b170901445527942b0a06d74cf4e46fc — Jonni Liljamo 11 days ago f7aea62
feat(master): hello i succeeded in async

* start of a database with redb
* actually check auth tokens
* communication tomfoolery between places
M .gitignore => .gitignore +1 -0
@@ 1,3 1,4 @@
/.direnv/
/target/
/tmp/
/.pre-commit-config.yaml

M Cargo.lock => Cargo.lock +39 -0
@@ 201,6 201,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"

[[package]]
name = "bincode"
version = "1.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad"
dependencies = [
 "serde",
]

[[package]]
name = "bitflags"
version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 385,10 394,14 @@ checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0"
name = "emerwen-master"
version = "0.1.0"
dependencies = [
 "bincode",
 "clap",
 "emerwen-protocol",
 "redb",
 "serde",
 "tokio",
 "tonic",
 "tonic-async-interceptor",
 "tracing",
 "tracing-subscriber",
]


@@ 398,6 411,7 @@ name = "emerwen-protocol"
version = "0.1.0"
dependencies = [
 "prost",
 "serde",
 "tonic",
 "tonic-build",
]


@@ 1408,6 1422,15 @@ dependencies = [
]

[[package]]
name = "redb"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84b1de48a7cf7ba193e81e078d17ee2b786236eed1d3f7c60f8a09545efc4925"
dependencies = [
 "libc",
]

[[package]]
name = "redox_syscall"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 1976,6 1999,22 @@ dependencies = [
]

[[package]]
name = "tonic-async-interceptor"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "86e7b555f9b026568d4713307878c5bf7540cc2f6acd8118b332fbf75d07bb7b"
dependencies = [
 "bytes",
 "http",
 "http-body",
 "http-body-util",
 "pin-project",
 "tonic",
 "tower-layer",
 "tower-service",
]

[[package]]
name = "tonic-build"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"

M README.md => README.md +0 -2
@@ 14,8 14,6 @@ tldr: gRPC was easier lmao.

## What's next (short-term TODO)

- Actually checking auth tokens.
- Master database (redb?)
- Master web UI (leptos + leptos_oidc) for configuring workers.

## Future (non-critical TODO)

M emerwen-master/Cargo.toml => emerwen-master/Cargo.toml +4 -0
@@ 11,8 11,12 @@ repository.workspace = true
[dependencies]
emerwen-protocol = { path = "../emerwen-protocol" }

bincode = "1"
clap = { version = "4", features = ["derive"] }
redb = "2"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1", features = ["full"] }
tonic = "0.12"
tonic-async-interceptor = "0.12"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }

A emerwen-master/src/db.rs => emerwen-master/src/db.rs +121 -0
@@ 0,0 1,121 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::path::Path;

use redb::{Database, ReadableTable, TableDefinition};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;

use crate::worker::Worker;

const WORKER_TABLE: TableDefinition<u32, Worker> = TableDefinition::new("workers");

pub enum DBMessage {
    ReadWorkers {
        respond_to: oneshot::Sender<Vec<Worker>>,
    },
    WriteWorker {
        respond_to: oneshot::Sender<bool>,
        worker: Worker,
    },
}

pub struct DatabaseInstance {
    receiver: mpsc::Receiver<DBMessage>,
    database: Database,
}

impl DatabaseInstance {
    fn new(receiver: mpsc::Receiver<DBMessage>, database: Database) -> DatabaseInstance {
        DatabaseInstance { receiver, database }
    }

    async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
        while let Some(msg) = self.receiver.recv().await {
            match msg {
                DBMessage::ReadWorkers { respond_to } => {
                    // FIXME: handle errors, offload to a generic function
                    let read_txn = self.database.begin_read().unwrap();
                    let table = match read_txn.open_table(WORKER_TABLE) {
                        Ok(table) => Some(table),
                        Err(redb::TableError::TableDoesNotExist(table)) => {
                            warn!(message="Table does not exist, assuming empty", %table);
                            None
                        }
                        Err(_) => {
                            // FIXME: handle
                            None
                        }
                    };

                    let mut workers = vec![];
                    if let Some(table) = table {
                        for a in table.iter().unwrap() {
                            workers.push(a.unwrap().1.value());
                        }
                    }

                    let _ = respond_to.send(workers);
                }
                DBMessage::WriteWorker { respond_to, worker } => {
                    let write_txn = self.database.begin_write().unwrap();
                    {
                        let mut table = write_txn.open_table(WORKER_TABLE).unwrap();
                        table.insert(worker.id, worker)?;
                    }
                    write_txn.commit()?;

                    let _ = respond_to.send(true);
                }
            }
        }

        Ok(())
    }
}

#[derive(Clone)]
pub struct DatabaseHandle {
    sender: mpsc::Sender<DBMessage>,
}

impl DatabaseHandle {
    pub fn new(database_file: &Path) -> Result<DatabaseHandle, Box<dyn std::error::Error>> {
        let database = match database_file.exists() {
            true => Database::open(database_file)?,
            false => Database::create(database_file)?,
        };

        let (sender, receiver) = mpsc::channel(64);
        let mut actor = DatabaseInstance::new(receiver, database);
        tokio::spawn(async move {
            let _ = actor.run().await;
        });

        Ok(DatabaseHandle { sender })
    }

    pub async fn read_workers(&self) -> Vec<Worker> {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::ReadWorkers { respond_to: tx };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }

    pub async fn write_worker(&self, worker: Worker) -> bool {
        let (tx, rx) = oneshot::channel();
        let msg = DBMessage::WriteWorker {
            respond_to: tx,
            worker,
        };

        let _ = self.sender.send(msg).await;
        rx.await.expect("actor task has been killed") // FIXME: handle
    }
}

M emerwen-master/src/main.rs => emerwen-master/src/main.rs +57 -111
@@ 7,20 7,29 @@

use clap::Parser;
use emerwen_protocol::{
    emerwen_protocol_server::{EmerwenProtocol, EmerwenProtocolServer},
    target::{MethodGet, MethodPing},
    SetTargetStateRequest, Target, TargetsResponse,
    Target,
};
use tonic::{transport::Server, Request, Response, Status};
use tokio::sync::mpsc;
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};

mod db;
use db::DatabaseHandle;
mod server;
mod worker;
use worker::Worker;

#[derive(Parser)]
#[command(version)]
struct Args {
    /// Enable debug logging
    #[arg(long)]
    debug: bool,

    /// Location of the database file
    #[arg(long, default_value = "./tmp/db.redb")]
    database_file: String,
}

#[tokio::main]


@@ 50,114 59,51 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
    info!(message = "Starting emerwen master...", %addr);
    debug!("Hello, debug!");

    let workers = vec![Worker {
        id: 0,
        auth_token: "avain_perkele".into(),
        targets: vec![
            Target {
                id: 0,
                addr: "127.0.0.1".into(),
                interval: 2000,
                method: Some(MethodPing {}.into()),
            },
            Target {
                id: 1,
                addr: "https://liljamo.com/".into(),
                interval: 2000,
                method: Some(
                    MethodGet {
                        ok_codes: vec![200],
                    }
                    .into(),
                ),
            },
            Target {
                id: 2,
                addr: "10.1.2.30".into(),
                interval: 2000,
                method: Some(MethodPing {}.into()),
            },
        ],
    }];

    let server = MasterServer::new(workers);

    Server::builder()
        .add_service(EmerwenProtocolServer::with_interceptor(
            server,
            move |req: Request<()>| {
                match req.metadata().get("authorization") {
                    Some(_token) => {
                        // TODO: check
                        Ok(req)
                    }
                    None => Err(Status::unauthenticated("invalid auth")),
                }
            },
        ))
        .serve(addr.parse()?)
        .await?;

    Ok(())
}

struct MasterServer {
    workers: Vec<Worker>,
}

impl MasterServer {
    fn new(workers: Vec<Worker>) -> MasterServer {
        MasterServer { workers }
    let db_handle = DatabaseHandle::new(std::path::Path::new(&args.database_file))?;
    let (server_tx, server_rx) = mpsc::channel::<server::ServerMessage>(8);

    let db_handle_server = db_handle.clone();
    tokio::spawn(async move {
        let _ = server::run(addr.into(), db_handle_server, server_rx).await;
    });

    let _ = db_handle
        .write_worker(Worker {
            id: 0,
            auth_token: "avain_perkele".into(),
            targets: vec![
                Target {
                    id: 0,
                    addr: "127.0.0.1".into(),
                    interval: 2000,
                    method: Some(MethodPing {}.into()),
                },
                Target {
                    id: 1,
                    addr: "https://liljamo.com/".into(),
                    interval: 2000,
                    method: Some(
                        MethodGet {
                            ok_codes: vec![200],
                        }
                        .into(),
                    ),
                },
                Target {
                    id: 2,
                    addr: "10.1.2.30".into(),
                    interval: 2000,
                    method: Some(MethodPing {}.into()),
                },
            ],
        })
        .await;

    server_tx.send(server::ServerMessage::ReloadWorkers).await?;

    loop {
        let _ = tokio::time::sleep(std::time::Duration::from_millis(5000)).await;
    }
}

#[tonic::async_trait]
impl EmerwenProtocol for MasterServer {
    async fn get_targets(&self, request: Request<()>) -> Result<Response<TargetsResponse>, Status> {
        let token = match request.metadata().get("authorization") {
            Some(value) => match value.to_str() {
                Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "),
                Err(_) => {
                    return Err(Status::invalid_argument(
                        "couldn't read bearer auth to string",
                    ))
                }
            },
            None => {
                return Err(Status::unauthenticated(
                    "request had no authorization, when it should",
                ))
            }
        };

        let targets = match self
            .workers
            .iter()
            .find(|worker| *worker.auth_token == *token)
        {
            Some(worker) => worker.targets.clone(),
            None => {
                return Err(Status::invalid_argument(
                    "no worker exists with the auth token",
                ))
            }
        };

        Ok(Response::new(TargetsResponse { targets }))
    }

    async fn set_target_state(
        &self,
        request: Request<SetTargetStateRequest>,
    ) -> Result<Response<()>, Status> {
        info!("{:?}", request.into_inner());

        Ok(Response::new(()))
    }
}

struct Worker {
    pub id: u32,
    pub auth_token: String,
    pub targets: Vec<Target>,
    //Ok(())
}

A emerwen-master/src/server/auth.rs => emerwen-master/src/server/auth.rs +67 -0
@@ 0,0 1,67 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::{future::Future, pin::Pin};

use tonic::{Request, Status};
use tonic_async_interceptor::AsyncInterceptor;

use super::WorkerContext;
use crate::db::DatabaseHandle;

#[derive(Clone)]
pub struct AuthInterceptor {
    db_handle: DatabaseHandle,
}

impl AuthInterceptor {
    pub fn new(db_handle: DatabaseHandle) -> Self {
        Self { db_handle }
    }

    fn authenticate(
        &self,
        mut request: Request<()>,
    ) -> impl Future<Output = Result<Request<()>, Status>> + Send + 'static {
        let db_handle = self.db_handle.clone();
        async move {
            let token = match request.metadata().get("authorization") {
                Some(value) => match value.to_str() {
                    Ok(bearer_token) => bearer_token.trim_start_matches("Bearer "),
                    Err(_) => {
                        return Err(Status::invalid_argument(
                            "couldn't read bearer auth to string",
                        ))
                    }
                },
                None => return Err(Status::unauthenticated("request had no authorization")),
            };

            let workers = db_handle.read_workers().await;
            let worker_id = match workers
                .iter()
                .filter(|w| w.auth_token == token)
                .collect::<Vec<_>>()
                .first()
            {
                Some(worker) => worker.id,
                None => return Err(Status::unauthenticated("bad token")),
            };

            request.extensions_mut().insert(WorkerContext { worker_id });
            Ok(request)
        }
    }
}

impl AsyncInterceptor for AuthInterceptor {
    type Future = Pin<Box<dyn Future<Output = Result<Request<()>, Status>> + Send + 'static>>;

    fn call(&mut self, request: Request<()>) -> Self::Future {
        Box::pin(self.authenticate(request))
    }
}

A emerwen-master/src/server/mod.rs => emerwen-master/src/server/mod.rs +111 -0
@@ 0,0 1,111 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::sync::Arc;

use emerwen_protocol::{
    emerwen_protocol_server::{EmerwenProtocol, EmerwenProtocolServer},
    SetTargetStateRequest, TargetsResponse,
};
use tokio::sync::{mpsc, RwLock};
use tonic::{transport::Server, Request, Response, Status};
use tonic_async_interceptor::async_interceptor;
use tracing::info;

use crate::{db::DatabaseHandle, worker::Worker};

mod auth;
use auth::AuthInterceptor;

#[derive(Clone)]
struct WorkerContext {
    worker_id: u32,
}

pub enum ServerMessage {
    ReloadWorkers,
}

pub async fn run(
    addr: String,
    db_handle: DatabaseHandle,
    mut server_rx: mpsc::Receiver<ServerMessage>,
) -> Result<(), Box<dyn std::error::Error>> {
    let workers = Arc::new(RwLock::new(db_handle.read_workers().await));
    let workers_two = workers.clone();

    let db_handle_two = db_handle.clone();
    tokio::spawn(async move {
        let workers = workers.clone();
        while let Some(msg) = server_rx.recv().await {
            match msg {
                ServerMessage::ReloadWorkers => {
                    let new_workers = db_handle_two.read_workers().await;
                    let mut write_guard = workers.write().await;
                    *write_guard = new_workers;
                }
            }
        }
    });

    Server::builder()
        .layer(async_interceptor(AuthInterceptor::new(db_handle.clone())))
        .add_service(EmerwenProtocolServer::new(
            MasterServer::new(db_handle.clone(), workers_two).await,
        ))
        .serve(addr.parse()?)
        .await?;

    Ok(())
}

struct MasterServer {
    _db_handle: DatabaseHandle,
    workers: Arc<RwLock<Vec<Worker>>>,
}

impl MasterServer {
    async fn new(db_handle: DatabaseHandle, workers: Arc<RwLock<Vec<Worker>>>) -> Self {
        Self {
            _db_handle: db_handle,
            workers,
        }
    }
}

#[tonic::async_trait]
impl EmerwenProtocol for MasterServer {
    async fn get_targets(&self, request: Request<()>) -> Result<Response<TargetsResponse>, Status> {
        let worker_id = match request.extensions().get::<WorkerContext>() {
            Some(context) => context.worker_id,
            None => return Err(Status::internal("no worker id in request")),
        };

        let targets = match self
            .workers
            .read()
            .await
            .iter()
            .find(|worker| worker.id == worker_id)
        {
            Some(worker) => worker.targets.clone(),
            None => return Err(Status::internal("no worker exists with request worker id")),
        };

        Ok(Response::new(TargetsResponse { targets }))
    }

    async fn set_target_state(
        &self,
        request: Request<SetTargetStateRequest>,
    ) -> Result<Response<()>, Status> {
        // TODO: Implement. Follow the same pattern as above.
        info!("{:?}", request.into_inner());

        Ok(Response::new(()))
    }
}

A emerwen-master/src/worker.rs => emerwen-master/src/worker.rs +45 -0
@@ 0,0 1,45 @@
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use emerwen_protocol::Target;
use redb::{TypeName, Value};
use serde::{Deserialize, Serialize};

#[derive(Debug, Deserialize, Serialize)]
pub struct Worker {
    pub id: u32,
    pub auth_token: String,
    pub targets: Vec<Target>,
}

impl Value for Worker {
    type SelfType<'a> = Worker;
    type AsBytes<'a> = Vec<u8>;

    fn fixed_width() -> Option<usize> {
        None
    }

    fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
    where
        Self: 'a,
    {
        bincode::deserialize(data).unwrap()
    }

    fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
    where
        Self: 'a,
        Self: 'b,
    {
        bincode::serialize(value).unwrap()
    }

    fn type_name() -> TypeName {
        TypeName::new("emerwen_master_worker")
    }
}

M emerwen-protocol/Cargo.toml => emerwen-protocol/Cargo.toml +2 -1
@@ 9,8 9,9 @@ publish.workspace = true
repository.workspace = true

[dependencies]
tonic = "0.12"
prost = "0.13"
serde = "1"
tonic = "0.12"

[build-dependencies]
tonic-build = "0.12"

M emerwen-protocol/build.rs => emerwen-protocol/build.rs +15 -1
@@ 1,4 1,18 @@
fn main() -> Result<(), Box<dyn std::error::Error>> {
    tonic_build::compile_protos("proto/emerwen.proto")?;
    tonic_build::configure()
        .type_attribute("Target", "#[derive(serde::Deserialize, serde::Serialize)]")
        .type_attribute(
            "Target.method",
            "#[derive(serde::Deserialize, serde::Serialize)]",
        )
        .type_attribute(
            "Target.MethodPing",
            "#[derive(serde::Deserialize, serde::Serialize)]",
        )
        .type_attribute(
            "Target.MethodGET",
            "#[derive(serde::Deserialize, serde::Serialize)]",
        )
        .compile_protos(&["proto/emerwen.proto"], &["proto"])?;
    Ok(())
}