From ed4da76b91ef7e795ca8003205e765cadd21d207 Mon Sep 17 00:00:00 2001 From: Felipe Diniello Date: Tue, 1 Aug 2023 20:06:03 +0200 Subject: [PATCH] refactor --- kairo-common/src/mqtt.rs | 66 ++++++++++++++++------------ simulation-tools/src/nav_dev/main.rs | 8 ++-- 2 files changed, 44 insertions(+), 30 deletions(-) diff --git a/kairo-common/src/mqtt.rs b/kairo-common/src/mqtt.rs index fe6f7ea..2fe8d70 100644 --- a/kairo-common/src/mqtt.rs +++ b/kairo-common/src/mqtt.rs @@ -62,39 +62,51 @@ pub mod for_sync { use paho_mqtt as mqtt; use std::{process, time::Duration}; - pub fn get_mqtt_cli() -> mqtt::Client { - let host = dotenv::var("MQTT_BROKER").unwrap_or_else(|_| { + pub struct MqttClient { + cli: mqtt::Client, + } + + impl MqttClient { + pub fn new(client_id: Option<&str>) -> MqttClient { + let host = dotenv::var("MQTT_BROKER").unwrap_or_else(|_| { println! {"MQTT_BROKER not found in .evn file, using default: tcp://localhost:1883"}; "tcp://localhost:1883".to_string() }); - let mut cli = mqtt::Client::new(host).unwrap_or_else(|e| { - println!("Error creating the client: {:?}", e); - process::exit(1); - }); + let mut cli = if let Some(client_id) = client_id { + mqtt::Client::new((host, String::from(client_id))).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }) + } else { + mqtt::Client::new(host).unwrap_or_else(|e| { + println!("Error creating the client: {:?}", e); + process::exit(1); + }) + }; - // Use 5sec timeouts for sync calls. - cli.set_timeout(Duration::from_secs(5)); + // Use 5sec timeouts for sync calls. + cli.set_timeout(Duration::from_secs(5)); - // Connect and wait for it to complete or fail - if let Err(e) = cli.connect(None) { - println!("Unable to connect: {:?}", e); - process::exit(1); + // Connect and wait for it to complete or fail + if let Err(e) = cli.connect(None) { + println!("Unable to connect: {:?}", e); + process::exit(1); + } + + MqttClient { cli } + } + pub fn publish(&self, topic: &str, payload: Option<&str>) -> Result<(), paho_mqtt::Error> { + let msg = if let Some(payload) = payload { + mqtt::MessageBuilder::new() + .topic(topic) + .qos(0) + .payload(payload) + .finalize() + } else { + mqtt::MessageBuilder::new().topic(topic).qos(0).finalize() + }; + self.cli.publish(msg) } - cli - } - - pub fn mqtt_pub( - client: &mqtt::Client, - topic: &str, - payload: &str, - ) -> Result<(), paho_mqtt::Error> { - let msg = mqtt::MessageBuilder::new() - .topic(topic) - .payload(payload) - .qos(0) - .finalize(); - - client.publish(msg) } } diff --git a/simulation-tools/src/nav_dev/main.rs b/simulation-tools/src/nav_dev/main.rs index ae26a79..f3d4977 100644 --- a/simulation-tools/src/nav_dev/main.rs +++ b/simulation-tools/src/nav_dev/main.rs @@ -3,7 +3,7 @@ use std::{thread, time}; mod error_report; -use kairo_common::mqtt::for_sync::{get_mqtt_cli, mqtt_pub}; +use kairo_common::mqtt::for_sync::MqttClient; use kairo_common::{Antenna, BeaconMeasure, DeviceReport, Point}; #[derive(Clone)] @@ -29,7 +29,7 @@ async fn main() { // }); // } - let client = get_mqtt_cli(); + let client = MqttClient::new(None); let mut position = Point::new(config.radius, 0.0); @@ -57,7 +57,9 @@ async fn main() { .push(BeaconMeasure::new(&config.id, &ant.id, rssi + noise)); } let payload = serde_json::to_string(&report).unwrap_or_else(|_| "".to_string()); - mqtt_pub(&client, topic.as_str(), payload.as_str()).expect("Pub error"); + client + .publish(topic.as_str(), Some(payload.as_str())) + .expect("Pub error"); // if config.real { // let _r = KnownPosition::new(position).write_for("real").await;