/*
* Copyright (C) 2024 Jonni Liljamo <jonni@liljamo.com>
*
* This file is licensed under GPL-3.0-or-later, see NOTICE and LICENSE for
* more information.
*/
use std::path::Path;
use emerwen_protocol::emerwen::shared::Worker;
use redb::{Database, ReadableTable, TableDefinition, TypeName, Value};
use tokio::sync::{mpsc, oneshot};
use tracing::warn;
const WORKER_TABLE: TableDefinition<u32, WorkerType> = TableDefinition::new("workers");
pub enum DBMessage {
ReadWorkers {
respond_to: oneshot::Sender<Vec<Worker>>,
},
WriteWorker {
respond_to: oneshot::Sender<bool>,
worker: Worker,
},
}
pub struct DatabaseInstance {
receiver: mpsc::Receiver<DBMessage>,
database: Database,
}
impl DatabaseInstance {
fn new(receiver: mpsc::Receiver<DBMessage>, database: Database) -> DatabaseInstance {
DatabaseInstance { receiver, database }
}
async fn run(&mut self) -> Result<(), Box<dyn std::error::Error>> {
while let Some(msg) = self.receiver.recv().await {
match msg {
DBMessage::ReadWorkers { respond_to } => {
// FIXME: handle errors, offload to a generic function
let read_txn = self.database.begin_read().unwrap();
let table = match read_txn.open_table(WORKER_TABLE) {
Ok(table) => Some(table),
Err(redb::TableError::TableDoesNotExist(table)) => {
warn!(message="Table does not exist, assuming empty", %table);
None
}
Err(_) => {
// FIXME: handle
None
}
};
let mut workers = vec![];
if let Some(table) = table {
for a in table.iter().unwrap() {
workers.push(a.unwrap().1.value());
}
}
let _ = respond_to.send(workers);
}
DBMessage::WriteWorker { respond_to, worker } => {
let write_txn = self.database.begin_write().unwrap();
{
let mut table = write_txn.open_table(WORKER_TABLE).unwrap();
table.insert(worker.id, worker)?;
}
write_txn.commit()?;
let _ = respond_to.send(true);
}
}
}
Ok(())
}
}
#[derive(Clone)]
pub struct DatabaseHandle {
sender: mpsc::Sender<DBMessage>,
}
impl DatabaseHandle {
pub fn new(database_file: &Path) -> Result<DatabaseHandle, Box<dyn std::error::Error>> {
let database = match database_file.exists() {
true => Database::open(database_file)?,
false => Database::create(database_file)?,
};
let (sender, receiver) = mpsc::channel(64);
let mut actor = DatabaseInstance::new(receiver, database);
tokio::spawn(async move {
let _ = actor.run().await;
});
Ok(DatabaseHandle { sender })
}
pub async fn read_workers(&self) -> Vec<Worker> {
let (tx, rx) = oneshot::channel();
let msg = DBMessage::ReadWorkers { respond_to: tx };
let _ = self.sender.send(msg).await;
rx.await.expect("actor task has been killed") // FIXME: handle
}
pub async fn write_worker(&self, worker: Worker) -> bool {
let (tx, rx) = oneshot::channel();
let msg = DBMessage::WriteWorker {
respond_to: tx,
worker,
};
let _ = self.sender.send(msg).await;
rx.await.expect("actor task has been killed") // FIXME: handle
}
}
#[derive(Debug)]
struct WorkerType;
impl Value for WorkerType {
type SelfType<'a> = Worker;
type AsBytes<'a> = Vec<u8>;
fn fixed_width() -> Option<usize> {
None
}
fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
where
Self: 'a,
{
bincode::deserialize(data).unwrap()
}
fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
where
Self: 'a,
Self: 'b,
{
bincode::serialize(value).unwrap()
}
fn type_name() -> TypeName {
TypeName::new("emerwen_worker")
}
}