DEVELOPMENT ENVIRONMENT

~liljamo/emerwen

emerwen/emerwen-master/src/worker_server/mod.rs -rw-r--r-- 3.0 KiB
1b8eda0aJonni Liljamo feat: move emerwen-proto crate out of here 5 days ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
/*
 * Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
 *
 * This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
 * more information.
 */

use std::sync::Arc;

use emerwen_proto::emerwen::{
    shared::Worker,
    workertomaster::{
        worker_to_master_server::{WorkerToMaster, WorkerToMasterServer},
        SetTargetStateRequest, TargetsResponse,
    },
};
use tokio::sync::{mpsc, RwLock};
use tonic::{Request, Response, Status};
use tonic_async_interceptor::async_interceptor;
use tracing::info;

use crate::db::DatabaseHandle;

mod auth;
use auth::AuthInterceptor;

#[derive(Clone)]
struct WorkerContext {
    worker_id: u32,
}

pub enum Message {
    ReloadWorkers,
}

pub async fn run(
    addr: String,
    db_handle: DatabaseHandle,
    mut rx: mpsc::Receiver<Message>,
) -> Result<(), Box<dyn std::error::Error>> {
    let workers = Arc::new(RwLock::new(db_handle.read_workers().await));
    let workers_two = workers.clone();

    let db_handle_two = db_handle.clone();
    tokio::spawn(async move {
        let workers = workers.clone();
        while let Some(msg) = rx.recv().await {
            match msg {
                Message::ReloadWorkers => {
                    let new_workers = db_handle_two.read_workers().await;
                    let mut write_guard = workers.write().await;
                    *write_guard = new_workers;
                }
            }
        }
    });

    tonic::transport::Server::builder()
        .layer(async_interceptor(AuthInterceptor::new(db_handle.clone())))
        .add_service(WorkerToMasterServer::new(
            Server::new(db_handle.clone(), workers_two).await,
        ))
        .serve(addr.parse()?)
        .await?;

    Ok(())
}

struct Server {
    _db_handle: DatabaseHandle,
    workers: Arc<RwLock<Vec<Worker>>>,
}

impl Server {
    async fn new(db_handle: DatabaseHandle, workers: Arc<RwLock<Vec<Worker>>>) -> Self {
        Self {
            _db_handle: db_handle,
            workers,
        }
    }
}

#[tonic::async_trait]
impl WorkerToMaster for Server {
    async fn get_targets(&self, request: Request<()>) -> Result<Response<TargetsResponse>, Status> {
        let worker_id = match request.extensions().get::<WorkerContext>() {
            Some(context) => context.worker_id,
            None => return Err(Status::internal("no worker id in request")),
        };

        let targets = match self
            .workers
            .read()
            .await
            .iter()
            .find(|worker| worker.id == worker_id)
        {
            Some(worker) => worker.targets.clone(),
            None => return Err(Status::internal("no worker exists with request worker id")),
        };

        Ok(Response::new(TargetsResponse { targets }))
    }

    async fn set_target_state(
        &self,
        request: Request<SetTargetStateRequest>,
    ) -> Result<Response<()>, Status> {
        // TODO: Implement. Follow the same pattern as above.
        info!("{:?}", request.into_inner());

        Ok(Response::new(()))
    }
}