use std::sync::{Arc, Mutex, RwLock}; use std::time::Duration; use actix_web::{App, error, get, HttpServer, Responder, Result, web}; use actix_web::error::ErrorInternalServerError; use actix_web::web::Data; use futures::{executor::block_on, stream::StreamExt}; use paho_mqtt as mqtt; use tokio::spawn; use status::*; mod status; #[get("/api/spaceapi/v13")] async fn api_spaceapi_v13(info: web::Data>>) -> Result { let info = info.read().unwrap(); let info = SharedInfo { power_usage: info.power_usage, open: info.open }; let status = build_status_v13(info).map_err(|e| ErrorInternalServerError(e))?; Ok(web::Json(status)) } #[get("/api/spaceapi/v14")] async fn api_spaceapi_v14(info: web::Data>>) -> Result { let info = info.read().unwrap(); let info = SharedInfo { power_usage: info.power_usage, open: info.open }; let status = build_status_v13(info).map_err(|e| ErrorInternalServerError(e))?; Ok(web::Json(status)) } struct SharedInfo { power_usage: u64, open: bool, } const TOPICS: &[&str] = &["/status/flukso/powerinW"]; const QOS: &[i32] = &[1]; #[tokio::main] // or async fn main() -> std::io::Result<()> { let shared_info: Arc> = Arc::new(RwLock::new(SharedInfo { power_usage: 0, open: false })); let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri("mqtt.ctdo.de") .client_id("ctdo-status") .finalize(); let mut cli = mqtt::AsyncClient::new(create_opts).expect("Error creating the client"); spawn(async move { // Get message stream before connecting. let mut strm = cli.get_stream(25); let conn_opts = mqtt::ConnectOptionsBuilder::new() .keep_alive_interval(Duration::from_secs(30)) .mqtt_version(mqtt::MQTT_VERSION_3_1_1) .clean_session(false) .finalize(); println!("Connecting to the MQTT server..."); cli.connect(conn_opts).await?; println!("Subscribing to topics: {:?}", TOPICS); cli.subscribe_many(TOPICS, QOS).await?; while let Some(msg_opt) = strm.next().await { if let Some(msg) = msg_opt { match msg.topic() { "/status/flukso/powerinW" => { let power_usage = msg.payload_str().parse::() .map_err(|e| { println!("invalid power value: {}", e); 0 })?; let mut lock = shared_info.write().unwrap(); lock.power_usage = power_usage; } _ => println!("{}", msg) } } else { println!("Lost connection. Attempting reconnect."); while let Err(err) = cli.reconnect().await { println!("Error reconnecting..."); tokio::time::sleep(Duration::from_millis(1000)).await; } } } Ok::<(), mqtt::Error>(()) }); HttpServer::new(|| { App::new() .app_data() .service(api_spaceapi_v13) .service(api_spaceapi_v14) }) .bind(("127.0.0.1", 8080))? .run() .await }