use std::time::Duration;
use clap::Parser;
use client::ClientMessage;
use emerwen_protocol::message::payload::MessagePayload;
use emerwen_types::{Target, TargetMethod};
use tokio::sync::mpsc;
use tracing::{debug, info, level_filters::LevelFilter};
use tracing_subscriber::{prelude::*, EnvFilter};
mod client;
#[derive(Parser)]
#[command(version)]
struct Args {
/// Enable debug logging
#[arg(long)]
debug: bool,
}
#[tokio::main]
async fn main() {
let args = Args::parse();
tracing_subscriber::registry()
.with(
tracing_subscriber::fmt::layer()
.with_file(args.debug)
.with_line_number(args.debug)
.with_level(true),
)
.with(
EnvFilter::builder()
.with_default_directive(if args.debug {
LevelFilter::DEBUG.into()
} else {
LevelFilter::INFO.into()
})
.from_env_lossy(),
)
.init();
info!("Starting emerwen worker...");
debug!("Hello, debug!");
// Channel for sending payloads to Master via Client
let (payload_tx, payload_rx) = mpsc::channel(64);
// Channel for sending messages to Client via Master
let (worker_tx, worker_rx) = mpsc::channel(64);
let targets = vec![
Target {
id: 0,
addr: "127.0.0.1".to_owned(),
interval: 2000,
method: TargetMethod::Ping,
},
Target {
id: 1,
addr: "10.1.2.3".to_owned(),
interval: 2000,
method: TargetMethod::Ping,
},
Target {
id: 2,
addr: "https://liljamo.com/".to_owned(),
interval: 5000,
method: TargetMethod::GET {
ok_codes: vec![200],
},
},
];
for target in targets {
let payload_tx = payload_tx.clone();
tokio::task::spawn(async move {
info!("Starting monitoring task for target {}", target.id);
let sleep_duration = Duration::from_millis(target.interval.into());
loop {
let payload;
match target.check().await {
Ok((up, _response)) => {
if !up {
info!("Target {} down!", target.id);
payload = MessagePayload::TargetDown {
target_id: target.id,
};
} else {
payload = MessagePayload::TargetUp {
target_id: target.id,
};
}
}
Err(_) => {
// We failed to check the state, status is unknown (grey).
payload = MessagePayload::TargetUnknown {
target_id: target.id,
};
}
}
let _ = payload_tx.send(payload).await;
tokio::time::sleep(sleep_duration).await;
}
});
}
let mut client = client::Client::new("127.0.0.1:8000", worker_tx, payload_rx);
let _ = client.run().await;
loop {
// read worker_rx, i guess
}
/*
//tokio::task::spawn(async move {
let worker_server = WorkerServer::new("127.0.0.1:8000");
let _ = worker_server.run().await;
//});
*/
}
/*
struct WorkerServer {
addr: String,
}
impl WorkerServer {
pub fn new(addr: impl Into<String>) -> WorkerServer {
WorkerServer { addr: addr.into() }
}
pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let listener = TcpListener::bind(&self.addr).await?;
info!("Worker Server listening on {}", self.addr);
loop {
let (mut socket, addr) = listener.accept().await?;
info!("Client '{}' connected", addr);
tokio::task::spawn(async move {
let (socket_r, mut socket_w) = socket.split();
let mut reader = BufReader::new(socket_r);
loop {
let mut header_bytes = [0; HEADER_LENGTH];
if reader.read_exact(&mut header_bytes).await.is_err() {
break;
}
let header = match MessageHeader::decode(header_bytes) {
Ok(header) => header,
Err(e) => {
error!("Error reading header: {}", e.to_string());
continue;
}
};
let mut payload_bytes = vec![0; header.payload_length as usize];
if reader.read_exact(&mut payload_bytes).await.is_err() {
break;
}
let mut payload_reader = std::io::Cursor::new(payload_bytes);
let payload =
match MessagePayload::decode(header.payload_type, &mut payload_reader) {
Ok(payload) => payload,
Err(e) => {
error!("Error reading payload: {}", e.to_string());
continue;
}
};
info!("Received message: {:?}", payload);
// match payload bla bla
// write something back if appliccable
}
info!("Client '{}' disconnected", addr);
/*
let mut line = String::new();
loop {
let bytes_read = reader.read_line(&mut line).await.unwrap();
if bytes_read == 0 {
info!("Client '{}' disconnected", addr);
break;
}
socket_w.write_all(&line.as_bytes()).await.unwrap();
line.clear();
}
*/
});
}
}
}
*/