diff --git a/.env b/.env index bf705b6..53fcab0 100644 --- a/.env +++ b/.env @@ -1,8 +1,8 @@ DATABASE_URL=postgres://kairo:AJzYhFltZXRiGQ@localhost/kairoXYZ_db -INFLUX_URL=http://localhost:8086 -INFLUX_DB=db0 -INFLUX_USER=mmRTLS -INFLUX_PASSWORD=Lkj9s2iAnd7Gxg +INFLUX_HOST=http://localhost:8086 +INFLUX_BUCKET=db0 +INFLUX_ORG=kario +INFLUX_TOKEN=82GAOBcdQoPnFNp_aew3DPffg44ihr4-lxs2BMGQ7RJ6nZyqSAFerX-WaHgLC47hTI23LgOauEfyTU_FKT0SpQ== MQTT_BROKER=tcp://localhost:1883 diff --git a/.gitignore b/.gitignore index 16411f1..14fd7ba 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ target Cargo.lock data +*.code-workspace diff --git a/docker-compose.yml b/docker-compose.yml index ea3d13e..b8b9087 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -24,29 +24,11 @@ services: - /var/lib/influxdb ports: - 8086:8086 - image: "influxdb:1.8.10" + image: "influxdb:latest" environment: - - INFLUXDB_HTTP_AUTH_ENABLED=true - - INFLUXDB_ADMIN_USER=admin - - INFLUXDB_ADMIN_PASSWORD=xHKvboa0Qlvhtg - - INFLUXDB_USER=mmRTLS - - INFLUXDB_USER_PASSWORD=Lkj9s2iAnd7Gxg - - INFLUXDB_DB=db0 - # image: "influxdb:latest" - # environment: - # - DOCKER_INFLUXDB_INIT_MODE=setup - # - DOCKER_INFLUXDB_INIT_USERNAME=mmRTLS - # - DOCKER_INFLUXDB_INIT_PASSWORD=Lkj9s2iAnd7Gxg - # - DOCKER_INFLUXDB_INIT_ORG=glb - # - DOCKER_INFLUXDB_INIT_BUCKET=db0 - # - DOCKER_INFLUXDB_INIT_CLI_CONFIG_NAME=default - # - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=xHKvboa0Qlvhtg - - chronograf: - image: "chronograf" - container_name: "chronograf" - ports: - - 8888:8888 - volumes: - - ./data/chronograf:/var/lib/chronograf - + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=user + - DOCKER_INFLUXDB_INIT_PASSWORD=Lkj9s2iAnd7Gxg + - DOCKER_INFLUXDB_INIT_ORG=kario + - DOCKER_INFLUXDB_INIT_BUCKET=db0 + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=82GAOBcdQoPnFNp_aew3DPffg44ihr4-lxs2BMGQ7RJ6nZyqSAFerX-WaHgLC47hTI23LgOauEfyTU_FKT0SpQ== diff --git a/kairo-common/Cargo.toml b/kairo-common/Cargo.toml index e9127db..020e03a 100644 --- a/kairo-common/Cargo.toml +++ b/kairo-common/Cargo.toml @@ -9,9 +9,14 @@ edition = "2021" [dependencies] paho-mqtt = { workspace = true } -influxdb = { workspace = true } tokio = { workspace = true } dotenv = { workspace = true } chrono = { workspace = true } serde = { workspace = true } -serde_json = { workspace = true } \ No newline at end of file +serde_json = { workspace = true } + +influxdb2 = "0.4.2" +influxdb2-structmap = "0.2" +influxdb2-derive = "0.1.1" +futures = "0.3.28" +num-traits = "0.2" diff --git a/kairo-common/src/helper.rs b/kairo-common/src/helper.rs index c8718d7..fe6f7ea 100644 --- a/kairo-common/src/helper.rs +++ b/kairo-common/src/helper.rs @@ -1,30 +1,4 @@ pub mod for_async { - - use influxdb::Client; - use std::cell::RefCell; - - thread_local! { - static INFLUX_CLIENT : RefCell = RefCell::new( init_influx_cli() ); - } - - pub fn get_influx_cli() -> influxdb::Client { - INFLUX_CLIENT.with(|rc| rc.borrow().clone()) - } - - fn init_influx_cli() -> influxdb::Client { - let host = dotenv::var("INFLUX_URL").unwrap_or_else(|_| { - println! {"INFLUX_URL not found in .evn file, using default: http://localhost:8086"}; - "http://localhost:8086".to_string() - }); - - let db = dotenv::var("INFLUX_DB").expect("INFLUX_DB not defined in .env file"); - let user = dotenv::var("INFLUX_USER").expect("INFLUX_USER not defined in .env file"); - let pass = - dotenv::var("INFLUX_PASSWORD").expect("INFLUX_PASSWORD not defined in .env file"); - - Client::new(host, db).with_auth(user, pass) - } - use mqtt::{AsyncClient, Message}; use paho_mqtt as mqtt; use std::{process, time::Duration}; diff --git a/kairo-common/src/influx.rs b/kairo-common/src/influx.rs new file mode 100644 index 0000000..5edba74 --- /dev/null +++ b/kairo-common/src/influx.rs @@ -0,0 +1,123 @@ +use std::cell::RefCell; + +#[derive(Debug, Clone)] +pub struct Client { + client: influxdb2::Client, + + // We should get two buckets, one temp and a permanent, but for now we'll use just one + bucket: String, +} + +pub enum Bucket { + Tmp, + Perm, +} + +thread_local! { + static INFLUX_CLIENT : RefCell = Client::new(); +} + +impl Client { + fn new() -> RefCell { + let host = dotenv::var("INFLUX_HOST").unwrap_or_else(|_| { + println! {"INFLUX_HOST not found in .env file, using default: http://localhost:8086"}; + "http://localhost:8086".to_string() + }); + let bucket = dotenv::var("INFLUX_BUCKET").expect("INFLUX_BUCKET not defined in .env file"); + let org = dotenv::var("INFLUX_ORG").expect("INFLUX_ORG not defined in .env file"); + let token = dotenv::var("INFLUX_TOKEN").expect("INFLUX_TOKEN not defined in .env file"); + + RefCell::new(Client { + client: influxdb2::Client::new(host, org, token), + bucket, + }) + } + pub fn get() -> Client { + INFLUX_CLIENT.with(|rc| rc.borrow().clone()) + } + + pub async fn write( + &self, + _bucket: Bucket, + body: impl futures::Stream + + Send + + Sync + + 'static, + ) -> Result<(), influxdb2::RequestError> { + // TODO: use _bucket to choose from internal list + + self.client.write(self.bucket.as_str(), body).await + } + + pub async fn query( + &self, + _bucket: Bucket, + q: String, + ) -> Result, influxdb2::RequestError> + where + T: influxdb2_structmap::FromMap, + { + // TODO: use _bucket to choose from internal list + let from_bucket = format!("from(bucket: \"{}\")", self.bucket); + let query = from_bucket + &q; + let query = influxdb2::models::Query::new(query); + + self.client.query::(Some(query)).await + } +} + +#[cfg(test)] +mod test { + use crate::influx::{Bucket, Client}; + + #[tokio::test] + async fn test_new_get_cli() { + let health = Client::get().client.health().await; + assert!(health.is_ok()) + } + + use influxdb2_derive::{FromDataPoint, WriteDataPoint}; + #[derive(Default, Debug, PartialEq, FromDataPoint, WriteDataPoint)] + #[measurement = "stock_prices"] + struct StockPrice { + #[influxdb(tag)] + ticker: String, + #[influxdb(field)] + value: f64, + #[influxdb(timestamp)] + time: i64, + } + + #[tokio::test] + async fn test_write_then_query() { + let time = chrono::Utc::now().timestamp_nanos(); + let w = StockPrice { + ticker: "ASDF".into(), + value: 150.5, + time: time, + }; + + let res = Client::get() + .write(Bucket::Perm, futures::stream::iter([w])) + .await; + assert!(res.is_ok()); + + let query = format!( + " + |> range(start: -1s) + |> filter(fn: (r) => r[\"_measurement\"] == \"stock_prices\") + |> filter(fn: (r) => r[\"ticker\"] == \"ASDF\") + |> sort(columns: [\"time\"], desc: true) + " + ); + + let r = Client::get() + .query::(Bucket::Perm, query) + .await + .unwrap(); + + assert!(r.len() > 0); + assert_eq!(r[0].ticker, "ASDF"); + assert_eq!(r[0].value, 150.5); + } +} diff --git a/kairo-common/src/influxdb_models/beacon_measure.rs b/kairo-common/src/influxdb_models/beacon_measure.rs deleted file mode 100644 index 5be25f8..0000000 --- a/kairo-common/src/influxdb_models/beacon_measure.rs +++ /dev/null @@ -1,80 +0,0 @@ -use chrono::{DateTime, Utc}; -use influxdb::{InfluxDbWriteable, ReadQuery}; -use serde::{Deserialize, Serialize}; - -use crate::helper::for_async::get_influx_cli; -use crate::influxdb_models::BEACONMEASURE_TIME_WINDOW; -use crate::MAC; - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, InfluxDbWriteable)] -pub struct BeaconMeasure { - #[influxdb(tag)] - pub beacon_id: MAC, - pub rssi: f64, - pub time: DateTime, -} - -#[derive(Serialize, Deserialize)] -struct Tags { - beacon_id: MAC, -} - -impl BeaconMeasure { - #[allow(non_snake_case)] - pub fn new(beacon_id: &MAC, rssi_W: f64) -> BeaconMeasure { - BeaconMeasure { - beacon_id: *beacon_id, - rssi: rssi_W, - time: chrono::Utc::now(), - } - } - pub async fn write_for(self, device_id: &str) -> Result { - let table_name = format!("measure_{}", device_id); - get_influx_cli() - .query(self.into_query(table_name.as_str())) - .await - } - pub async fn get_for(device_id: &str) -> Result, influxdb::Error> { - let query = format!( "SELECT mean(rssi) FROM /measure_{}/ WHERE time > now() - {}s AND time < now() GROUP BY beacon_id;", device_id, BEACONMEASURE_TIME_WINDOW); - - let mut database_result = get_influx_cli().json_query(ReadQuery::new(query)).await?; - - #[derive(Deserialize)] - struct Value { - time: DateTime, - mean: f64, - } - let vect = database_result - .deserialize_next_tagged::()? - .series - .into_iter() - .map(|measure| BeaconMeasure { - beacon_id: measure.tags.beacon_id, - rssi: measure.values[0].mean, - time: measure.values[0].time, - }) - .collect::>(); - Ok(vect) - } -} - -#[tokio::test] -async fn beacon_measure_test() { - print!("Testing BeaconMeasure::* read/write methods"); - let bm1 = BeaconMeasure::new(&MAC::new("AB:CD:EF:12:34:56"), 0.0); - let bm = bm1.clone(); - let _result = bm.write_for("AB:CD:EF:12:34:56").await; - - let bm2 = BeaconMeasure::get_for("AB:CD:EF:12:34:56").await.unwrap(); - assert_eq!(bm2.len(), 1); - assert_eq!(bm1.beacon_id, bm2[0].beacon_id); - assert_eq!(bm1.rssi, bm2[0].rssi); - - //wait for the time window to pass - let delay = BEACONMEASURE_TIME_WINDOW * 1000 + 500; - tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await; - let bm2 = BeaconMeasure::get_for("AB:CD:EF:12:34:56").await.unwrap(); - assert_eq!(bm2.len(), 0); - - println!(" ... ok"); -} diff --git a/kairo-common/src/influxdb_models/device_status.rs b/kairo-common/src/influxdb_models/device_status.rs deleted file mode 100644 index 9b20a4c..0000000 --- a/kairo-common/src/influxdb_models/device_status.rs +++ /dev/null @@ -1,95 +0,0 @@ -use influxdb::{ReadQuery, WriteQuery}; -use serde::{Deserialize, Serialize}; - -use crate::{helper::for_async::get_influx_cli, MAC}; - -const TABLE_NAME: &str = "device_status"; - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct DeviceStatus { - device_id: MAC, - pos_x: f64, - pos_y: f64, - error: f64, - speed: f64, -} - -impl DeviceStatus { - fn new(device_id: MAC) -> DeviceStatus { - DeviceStatus { - device_id, - pos_x: 0.0, - pos_y: 0.0, - error: 0.0, - speed: 0.0, - } - } - - pub async fn get(device_id: MAC) -> Result, influxdb::Error> { - let query = ReadQuery::new(format!( - "SELECT last(*) FROM /{}/ WHERE device_id = '{}';", - TABLE_NAME, device_id - )); - let mut database_result = get_influx_cli().json_query(query).await?; - - #[derive(Debug, Deserialize)] - struct Value { - last_pos_x: f64, - last_pos_y: f64, - last_error: f64, - last_speed: f64, - } - - let vec = database_result.deserialize_next::()?.series; - - if !vec.is_empty() && !vec[0].values.is_empty() { - Ok(Box::new(DeviceStatus { - device_id, - pos_x: vec[0].values[0].last_pos_x, - pos_y: vec[0].values[0].last_pos_y, - error: vec[0].values[0].last_error, - speed: vec[0].values[0].last_speed, - })) - } else { - Ok(Box::new(DeviceStatus::new(device_id))) - } - } - - fn as_query(&self) -> influxdb::WriteQuery { - WriteQuery::new(influxdb::Timestamp::from(chrono::Utc::now()), TABLE_NAME) - .add_tag("device_id", self.device_id) - .add_field("pos_x", self.pos_x) - .add_field("pos_y", self.pos_y) - .add_field("error", self.error) - .add_field("speed", self.speed) - } - - async fn update(query: influxdb::WriteQuery) -> Result { - println!("update"); - get_influx_cli().query(query).await - } -} - -impl Drop for DeviceStatus { - fn drop(&mut self) { - println!("drop"); - let query = self.as_query(); - - tokio::runtime::Handle::current().spawn(async move { Self::update(query).await }); - } -} - -#[tokio::test] -async fn test() { - use std::time::Duration; - // create context to call drop - { - let mut a = DeviceStatus::get(MAC::new("15:23:45:ab:cd:ef")) - .await - .unwrap(); - a.pos_x += 2.0; - a.pos_y += 3.0; - println!("{:?}", a); - } //here and then wait - tokio::time::sleep(Duration::from_millis(150)).await; -} diff --git a/kairo-common/src/influxdb_models/known_position.rs b/kairo-common/src/influxdb_models/known_position.rs deleted file mode 100644 index df43554..0000000 --- a/kairo-common/src/influxdb_models/known_position.rs +++ /dev/null @@ -1,48 +0,0 @@ -use chrono::{DateTime, Utc}; -use influxdb::{InfluxDbWriteable, ReadQuery}; -use serde::{Deserialize, Serialize}; - -use crate::helper::for_async::get_influx_cli; -use crate::Point; - -#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, InfluxDbWriteable)] -pub struct KnownPosition { - pub x: f64, - pub y: f64, - pub time: DateTime, -} - -impl KnownPosition { - pub fn new(pos: Point) -> KnownPosition { - KnownPosition { - x: pos.x, - y: pos.y, - time: chrono::Utc::now(), - } - } - pub async fn write_for(self, device_id: &str) -> Result { - let table_name = format!("position_{}", device_id); - get_influx_cli() - .query(self.into_query(table_name.as_str())) - .await - } - pub async fn get_last_for( - device_id: &str, - time_window: i32, - ) -> Result, influxdb::Error> { - let query = format!( - "SELECT mean(x) as x, mean(y) as y FROM /position_{}/ WHERE time > now() - {}s AND time < now();", - device_id, time_window - ); - - let mut database_result = get_influx_cli().json_query(ReadQuery::new(query)).await?; - - let series = &database_result.deserialize_next::()?.series; - if series.is_empty() { - Ok(None) - } else { - let vec = &series[0].values; - Ok(Some(vec[0].clone())) - } - } -} diff --git a/kairo-common/src/influxdb_models/mod.rs b/kairo-common/src/influxdb_models/mod.rs deleted file mode 100644 index a360f12..0000000 --- a/kairo-common/src/influxdb_models/mod.rs +++ /dev/null @@ -1,10 +0,0 @@ -// pub mod multiple_measures; -mod beacon_measure; -mod device_status; -mod known_position; - -// Renaming types for ease of use outside the scope of this module -pub const BEACONMEASURE_TIME_WINDOW: u64 = 4; -pub type BeaconMeasure = beacon_measure::BeaconMeasure; -pub type KnownPosition = known_position::KnownPosition; -pub type DeviceStatus = device_status::DeviceStatus; diff --git a/kairo-common/src/lib.rs b/kairo-common/src/lib.rs index 4d24d23..44544a7 100644 --- a/kairo-common/src/lib.rs +++ b/kairo-common/src/lib.rs @@ -1,21 +1,23 @@ -#![allow(confusable_idents)] +#![allow(clippy::upper_case_acronyms)] #![allow(mixed_script_confusables)] +#![allow(non_upper_case_globals)] +#![allow(confusable_idents)] -use serde::{Deserialize, Serialize}; +pub mod influx; pub mod helper; -pub mod influxdb_models; -mod mac; pub mod unit_conversion; -pub type Antenna = antenna::Antenna; -pub type Point = point::Point; -pub type MAC = mac::MAC; - -mod antenna; -mod point; - -#[derive(Debug, Serialize, Deserialize)] -pub struct DeviceReport { - pub data: Vec, +mod types { + pub mod mac; + pub mod point; } +pub type Point = types::point::Point; +pub type MAC = types::mac::MAC; + +mod models; +pub type Antenna = models::antenna::Antenna; +pub type DeviceReport = models::DeviceReport; +pub type KnownPosition = models::known_position::KnownPosition; +pub type DynamicDeviceStatus = models::dynamic_device_status::DynamicDeviceStatus; +pub type BeaconMeasure = models::beacon_measure::BeaconMeasure; diff --git a/kairo-common/src/antenna.rs b/kairo-common/src/models/antenna.rs similarity index 91% rename from kairo-common/src/antenna.rs rename to kairo-common/src/models/antenna.rs index 902c7c0..8e3d43c 100644 --- a/kairo-common/src/antenna.rs +++ b/kairo-common/src/models/antenna.rs @@ -1,23 +1,24 @@ -use std::{f64::consts::PI, str::FromStr}; +use std::f64::consts::PI; -use crate::{unit_conversion::UnitsConversion, Point, MAC}; +use crate::{unit_conversion::UnitsConversion, Point}; #[derive(Debug, Clone, Default)] pub struct Antenna { - pub id: MAC, + pub id: String, pub tssi: f64, pub coord: Point, + pub comment: Option, } impl Antenna { const C: f64 = 2.99e8; const F: f64 = 2.4e9; - #[allow(non_upper_case_globals)] const λ: f64 = Self::C / Self::F; pub fn new(id: &str, tssi: f64, coord: Point) -> Antenna { Antenna { - id: MAC::from_str(id).unwrap(), + id: id.into(), + comment: None, coord, tssi, } @@ -29,12 +30,14 @@ impl Antenna { let FSPL = (((distance * 4.0 * PI) / Self::λ).powi(2)).to_dB(); self.tssi - FSPL } + #[allow(non_snake_case)] pub fn get_distance_with_dBm(&self, rssi_dBm: f64) -> f64 { let loss = self.tssi.dBm_to_W() / rssi_dBm.dBm_to_W(); let distance = (loss.sqrt() * Self::λ) / (4.0 * PI); distance.abs() } + #[allow(non_snake_case)] pub fn get_distance_with_W(&self, rssi_W: f64) -> f64 { let loss = self.tssi.dBm_to_W() / rssi_W; diff --git a/kairo-common/src/models/beacon_measure.rs b/kairo-common/src/models/beacon_measure.rs new file mode 100644 index 0000000..b464f7f --- /dev/null +++ b/kairo-common/src/models/beacon_measure.rs @@ -0,0 +1,68 @@ +use serde::{Deserialize, Serialize}; + +use influxdb2_derive::{FromDataPoint, WriteDataPoint}; + +#[derive( + Debug, Default, PartialEq, Clone, Serialize, Deserialize, FromDataPoint, WriteDataPoint, +)] +#[measurement = "beacon_measures"] +pub struct BeaconMeasure { + #[influxdb(tag)] + pub device_id: String, + #[influxdb(tag)] + pub beacon_id: String, + pub rssi: f64, + #[influxdb(timestamp)] + pub time: i64, +} + +impl BeaconMeasure { + #[allow(non_snake_case)] + pub fn new(device_id: &str, beacon_id: &str, rssi_W: f64) -> BeaconMeasure { + BeaconMeasure { + device_id: device_id.into(), + beacon_id: beacon_id.to_owned(), + rssi: rssi_W, + time: chrono::Utc::now().timestamp_nanos(), + } + } +} + +#[cfg(test)] +mod test { + use crate::influx::{Bucket, Client}; + use crate::BeaconMeasure; + + #[tokio::test] + async fn influx_test() { + let device_id = String::from("AB:CD:EF:01:23:45"); + let beacon_id = String::from("01:23:45:AB:CD:EF"); + let rssi_w = 0.001; + + let bm = BeaconMeasure::new(&device_id, &beacon_id, rssi_w); + + let res = Client::get() + .write(Bucket::Tmp, futures::stream::iter([bm])) + .await; + assert!(res.is_ok()); + + let query = format!( + " + |> range(start: -1s) + |> filter(fn: (r) => r[\"_measurement\"] == \"beacon_measures\") + |> filter(fn: (r) => r[\"beacon_id\"] == \"{}\" ) + ", + beacon_id + ); + + let r = Client::get() + .query::(Bucket::Tmp, query) + .await + .unwrap(); + + assert!(r.len() > 0); + assert_eq!(r[0].beacon_id, beacon_id); + assert_eq!(r[0].device_id, device_id); + assert_eq!(r[0].rssi, rssi_w); + } +} diff --git a/kairo-common/src/models/dynamic_device_status.rs b/kairo-common/src/models/dynamic_device_status.rs new file mode 100644 index 0000000..4c2d265 --- /dev/null +++ b/kairo-common/src/models/dynamic_device_status.rs @@ -0,0 +1,10 @@ +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct DynamicDeviceStatus { + id: String, + pos_x: f64, + pos_y: f64, + pos_z: f64, + speed_x: f64, + speed_y: f64, + pub last_seen: chrono::DateTime, +} diff --git a/kairo-common/src/models/known_position.rs b/kairo-common/src/models/known_position.rs new file mode 100644 index 0000000..194b194 --- /dev/null +++ b/kairo-common/src/models/known_position.rs @@ -0,0 +1,31 @@ +use serde::{Deserialize, Serialize}; + +use influxdb2_derive::{FromDataPoint, WriteDataPoint}; + +use crate::Point; + +#[derive( + Debug, Default, PartialEq, Clone, Serialize, Deserialize, FromDataPoint, WriteDataPoint, +)] +#[measurement = "known_positions"] +pub struct KnownPosition { + #[influxdb(tag)] + pub id: String, + pub x: f64, + pub y: f64, + pub z: f64, + #[influxdb(timestamp)] + pub time: i64, +} + +impl KnownPosition { + pub fn new(device_id: &str, pos: Point) -> KnownPosition { + KnownPosition { + id: device_id.into(), + time: chrono::Utc::now().timestamp_nanos(), + x: pos.x, + y: pos.y, + z: 0.0, + } + } +} diff --git a/kairo-common/src/models/mod.rs b/kairo-common/src/models/mod.rs new file mode 100644 index 0000000..86f6a64 --- /dev/null +++ b/kairo-common/src/models/mod.rs @@ -0,0 +1,9 @@ +pub mod antenna; +pub mod beacon_measure; +pub mod dynamic_device_status; +pub mod known_position; + +#[derive(Debug, serde::Serialize, serde::Deserialize)] +pub struct DeviceReport { + pub data: Vec, +} diff --git a/kairo-common/src/mac.rs b/kairo-common/src/types/mac.rs similarity index 52% rename from kairo-common/src/mac.rs rename to kairo-common/src/types/mac.rs index 90016fe..c74bf8c 100644 --- a/kairo-common/src/mac.rs +++ b/kairo-common/src/types/mac.rs @@ -1,11 +1,3 @@ -use std::fmt::{Debug, Display, Formatter}; -use std::str::FromStr; - -use influxdb::Type; -use serde::de::{self, Visitor}; -use serde::{Deserialize, Serialize, Serializer}; - -#[allow(clippy::upper_case_acronyms)] #[derive(Default, Clone, Copy, Hash, PartialEq, Eq)] pub struct MAC { s: [u8; 17], @@ -15,13 +7,18 @@ impl MAC { pub fn new(s: &str) -> MAC { std::str::FromStr::from_str(s).unwrap() } + pub fn as_str(&self) -> &str { let a = std::str::from_utf8(&self.s); a.unwrap() } } -impl FromStr for MAC { +//////////////////////////////////////////////////// +// Standard implementations: +// + +impl std::str::FromStr for MAC { type Err = std::string::ParseError; fn from_str(s: &str) -> Result { let mut m = MAC::default(); @@ -30,25 +27,45 @@ impl FromStr for MAC { } } -impl Display for MAC { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +impl std::fmt::Display for MAC { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", String::from_utf8_lossy(&self.s)) } } -impl Debug for MAC { - fn fmt(&self, f: &mut Formatter) -> std::fmt::Result { +impl std::fmt::Debug for MAC { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { write!(f, "{}", String::from_utf8_lossy(&self.s)) } } -impl From for Type { +//////////////////////////////////////////////////// +// Influx implementations: +// + +#[cfg(influxdb)] +impl From for influxdb::Type { fn from(val: MAC) -> Self { - Type::Text(val.to_string()) + influxdb::Type::Text(val.to_string()) } } -impl<'de> Deserialize<'de> for MAC { +impl influxdb2::writable::KeyWritable for MAC { + fn encode_key(&self) -> String { + format!("{}", self) + } +} +impl influxdb2::writable::ValueWritable for MAC { + fn encode_value(&self) -> String { + format!("{}", self) + } +} + +//////////////////////////////////////////////////// +// Serde implementations: +// + +impl<'de> serde::Deserialize<'de> for MAC { fn deserialize(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -57,7 +74,7 @@ impl<'de> Deserialize<'de> for MAC { len: usize, } - impl<'de> Visitor<'de> for MACVisitor { + impl<'de> serde::de::Visitor<'de> for MACVisitor { type Value = MAC; fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { write!(formatter, "a string containing at least {} bytes", self.len) @@ -65,12 +82,15 @@ impl<'de> Deserialize<'de> for MAC { fn visit_str(self, s: &str) -> Result where - E: de::Error, + E: serde::de::Error, { if s.len() == self.len { Ok(MAC::new(s)) } else { - Err(de::Error::invalid_value(de::Unexpected::Str(s), &self)) + Err(serde::de::Error::invalid_value( + serde::de::Unexpected::Str(s), + &self, + )) } } } @@ -80,10 +100,13 @@ impl<'de> Deserialize<'de> for MAC { } } -impl Serialize for MAC { - fn serialize(&self, serializer: S) -> Result<::Ok, ::Error> +impl serde::Serialize for MAC { + fn serialize( + &self, + serializer: S, + ) -> Result<::Ok, ::Error> where - S: Serializer, + S: serde::Serializer, { serializer.serialize_str(self.as_str()) } diff --git a/kairo-common/src/point.rs b/kairo-common/src/types/point.rs similarity index 99% rename from kairo-common/src/point.rs rename to kairo-common/src/types/point.rs index 56d874e..07fecd3 100644 --- a/kairo-common/src/point.rs +++ b/kairo-common/src/types/point.rs @@ -61,6 +61,7 @@ impl ops::Add for Point { } } } + impl ops::Add<&Point> for &Point { type Output = Point; fn add(self, rhs: &Point) -> Point { @@ -70,6 +71,7 @@ impl ops::Add<&Point> for &Point { } } } + impl ops::AddAssign<&Point> for Point { fn add_assign(&mut self, rhs: &Point) { *self = Self { @@ -78,6 +80,7 @@ impl ops::AddAssign<&Point> for Point { }; } } + impl ops::AddAssign for Point { fn add_assign(&mut self, rhs: Point) { *self = Self { @@ -86,6 +89,7 @@ impl ops::AddAssign for Point { }; } } + impl ops::SubAssign<&Point> for Point { fn sub_assign(&mut self, rhs: &Point) { *self = Self { @@ -94,6 +98,7 @@ impl ops::SubAssign<&Point> for Point { }; } } + impl ops::SubAssign for Point { fn sub_assign(&mut self, rhs: Point) { *self = Self { @@ -102,6 +107,7 @@ impl ops::SubAssign for Point { }; } } + impl ops::Sub for Point { type Output = Point; fn sub(self, rhs: Point) -> Point { @@ -111,6 +117,7 @@ impl ops::Sub for Point { } } } + impl ops::Sub<&Point> for &Point { type Output = Point; fn sub(self, rhs: &Point) -> Point { @@ -130,6 +137,7 @@ impl ops::Mul for Point { } } } + impl ops::MulAssign for Point { fn mul_assign(&mut self, rhs: f64) { *self = Point { @@ -138,6 +146,7 @@ impl ops::MulAssign for Point { } } } + impl ops::Mul for &Point { type Output = Point; fn mul(self, rhs: f64) -> Point { @@ -147,6 +156,7 @@ impl ops::Mul for &Point { } } } + impl ops::Div for Point { type Output = Point; fn div(self, rhs: f64) -> Point { @@ -156,6 +166,7 @@ impl ops::Div for Point { } } } + impl ops::DivAssign for Point { fn div_assign(&mut self, rhs: f64) { *self = Point { @@ -164,6 +175,7 @@ impl ops::DivAssign for Point { } } } + impl ops::Div for &Point { type Output = Point; fn div(self, rhs: f64) -> Point { diff --git a/simulation-tools/src/nav_dev/error_report.rs b/simulation-tools/src/nav_dev/error_report.rs index be90089..2faf8c6 100644 --- a/simulation-tools/src/nav_dev/error_report.rs +++ b/simulation-tools/src/nav_dev/error_report.rs @@ -1,10 +1,9 @@ use chrono::{DateTime, Utc}; use influxdb::InfluxDbWriteable; -use kairo_common::helper::for_async::get_influx_cli; use serde::Serialize; use tokio::time; -use kairo_common::{influxdb_models::KnownPosition, Point}; +use kairo_common::Point; use crate::Config; @@ -14,7 +13,7 @@ pub struct Error { speed: f64, time: DateTime, } - +#[allow(dead_code)] pub async fn thread(config: Config) { let period = time::Duration::from_millis(500); @@ -23,37 +22,37 @@ pub async fn thread(config: Config) { position.rotate_by(f64::to_radians(config.angle_step)); speed -= position; - let speed = speed.module(); + let _speed = speed.module(); loop { let start = time::Instant::now(); - let real = KnownPosition::get_last_for("real", 1).await; - let calc = KnownPosition::get_last_for(config.id.as_str(), 1).await; - if real.is_ok() && calc.is_ok() { - let real = real.unwrap(); - let calc = calc.unwrap(); + // let real = KnownPosition::get_last_for("real", 1).await; + // let calc = KnownPosition::get_last_for(config.id.as_str(), 1).await; + // if real.is_ok() && calc.is_ok() { + // let real = real.unwrap(); + // let calc = calc.unwrap(); - if real.is_some() && calc.is_some() { - let real = real.unwrap(); - let calc = calc.unwrap(); - #[allow(non_snake_case)] - let Δx = real.x - calc.x; - #[allow(non_snake_case)] - let Δy = real.y - calc.y; - let error = Error { - speed, - error: f64::sqrt(Δx.powi(2) + Δy.powi(2)), - time: chrono::Utc::now(), - }; + // if real.is_some() && calc.is_some() { + // let real = real.unwrap(); + // let calc = calc.unwrap(); + // #[allow(non_snake_case)] + // let Δx = real.x - calc.x; + // #[allow(non_snake_case)] + // let Δy = real.y - calc.y; + // let error = Error { + // speed, + // error: f64::sqrt(Δx.powi(2) + Δy.powi(2)), + // time: chrono::Utc::now(), + // }; - let table_name = format!("error_{}", config.id.as_str()); - get_influx_cli() - .query(error.into_query(table_name.as_str())) - .await - .unwrap(); - } - time::sleep(period - start.elapsed()).await; - } + // let table_name = format!("error_{}", config.id.as_str()); + // get_influx_cli() + // .query(error.into_query(table_name.as_str())) + // .await + // .unwrap(); + // } + time::sleep(period - start.elapsed()).await; + // } } } diff --git a/simulation-tools/src/nav_dev/main.rs b/simulation-tools/src/nav_dev/main.rs index 981ade5..fb44024 100644 --- a/simulation-tools/src/nav_dev/main.rs +++ b/simulation-tools/src/nav_dev/main.rs @@ -4,10 +4,7 @@ use std::{thread, time}; mod error_report; use kairo_common::helper::for_sync::{get_mqtt_cli, mqtt_pub}; -use kairo_common::{ - influxdb_models::{BeaconMeasure, KnownPosition}, - Antenna, DeviceReport, Point, -}; +use kairo_common::{Antenna, BeaconMeasure, DeviceReport, Point}; #[derive(Clone)] pub struct Config { @@ -25,12 +22,12 @@ async fn main() { let period = time::Duration::from_millis(config.period_ms); let noise_gen = Normal::new(0.0, config.noise_level).unwrap(); - if config.real { - let config = config.clone(); - tokio::spawn(async move { - error_report::thread(config).await; - }); - } + // if config.real { + // let config = config.clone(); + // tokio::spawn(async move { + // error_report::thread(config).await; + // }); + // } let client = get_mqtt_cli(); @@ -55,14 +52,16 @@ async fn main() { let noise: f64 = noise_gen.sample(&mut rand::thread_rng()); - report.data.push(BeaconMeasure::new(&ant.id, rssi + noise)); + report + .data + .push(BeaconMeasure::new(&config.id, &ant.id, rssi + noise)); } let payload = serde_json::to_string(&report).unwrap_or_else(|_| "".to_string()); mqtt_pub(&client, topic.as_str(), payload.as_str()).expect("Pub error"); - if config.real { - let _r = KnownPosition::new(position).write_for("real").await; - } + // if config.real { + // let _r = KnownPosition::new(position).write_for("real").await; + // } position.rotate_by(f64::to_radians(config.angle_step)); thread::sleep(period - start.elapsed()); diff --git a/xyz-engine/src/handler.rs b/xyz-engine/src/handler.rs index 704dfc1..57c9003 100644 --- a/xyz-engine/src/handler.rs +++ b/xyz-engine/src/handler.rs @@ -1,23 +1,29 @@ pub mod device { - use kairo_common::influxdb_models::BeaconMeasure; - use kairo_common::{unit_conversion::UnitsConversion, DeviceReport, MAC}; + use kairo_common::{ + influx::{self, Bucket}, + unit_conversion::UnitsConversion, + BeaconMeasure, DeviceReport, MAC, + }; use crate::position_solver::solve_for; pub async fn report(device_id: &str, payload: &str) { if let Ok(device_report) = serde_json::from_str::(payload) { - // device_report.data.sort_by(|a, b| b.pwr.cmp(&a.pwr)); + // split the report into individual measures + let measures = device_report + .data + .iter() + .map(|f| BeaconMeasure::new(device_id, &f.beacon_id, f.rssi.dBm_to_W())) + // sort them as a vector of write queries + .collect::>(); - let mut count = 0; - for beacon in device_report.data.iter() { - let measure = BeaconMeasure::new(&beacon.beacon_id, beacon.rssi.dBm_to_W()); - if (measure.write_for(device_id).await).is_ok() { - count += 1; - } - } + let more_than_three = measures.len() >= 3; + let result = influx::Client::get() + .write(Bucket::Tmp, futures::stream::iter(measures)) + .await; // If I added more than 3 valid measures it's worth to process the position - if count >= 3 { + if result.is_ok() && more_than_three { let device_id = MAC::new(device_id); tokio::spawn(async move { let _r = solve_for(device_id).await; diff --git a/xyz-engine/src/position_solver.rs b/xyz-engine/src/position_solver.rs index 11c2c12..8bd6938 100644 --- a/xyz-engine/src/position_solver.rs +++ b/xyz-engine/src/position_solver.rs @@ -2,8 +2,8 @@ use itertools::Itertools; use std::collections::HashMap; use kairo_common::{ - influxdb_models::{BeaconMeasure, KnownPosition}, - Antenna, Point, MAC, + influx::{self, Bucket}, + Antenna, BeaconMeasure, KnownPosition, Point, MAC, }; struct KnownDistance { point: Point, @@ -13,7 +13,21 @@ struct KnownDistance { pub async fn solve_for(device_id: MAC) -> Result { let antennas = anntennas_hashmap(); - let measure = BeaconMeasure::get_for(device_id.as_str()).await.unwrap(); + // let measure = BeaconMeasure::get_for(device_id.as_str()).await.unwrap(); + + let query = format!( + " + |> range(start: -1s) + |> filter(fn: (r) => r[\"_measurement\"] == \"beacon_measures\") + |> filter(fn: (r) => r[\"device_id\"] == \"{}\" ) + ", + device_id + ); + + let measure = influx::Client::get() + .query::(Bucket::Tmp, query) + .await + .unwrap(); let known_distance = measure .iter() @@ -38,13 +52,25 @@ pub async fn solve_for(device_id: MAC) -> Result { print!("Old len(): {} \t", posible_positions.len()); - if let Some(last_position) = KnownPosition::get_last_for(device_id.as_str(), 2) + let query = format!( + "|> range(start: -1s) + |> filter(fn: (r) => r[\"_measurement\"] == \"known_positions\") + |> filter(fn: (r) => r[\"device_id\"] == \"{}\" ) + |> last() + ", + device_id + ); + + if let Ok(last_position) = influx::Client::get() + .query::(Bucket::Perm, query) .await - .unwrap() { - let last_position = Point::new(last_position.x, last_position.y); - posible_positions.retain(|p| last_position.distance_to(&p.point) < 3.0); + if !last_position.is_empty() { + let last_position = Point::new(last_position[0].x, last_position[0].y); + posible_positions.retain(|p| last_position.distance_to(&p.point) < 3.0); + } } + println!("New len(): {}", posible_positions.len()); let mut pos = Point::new(0.0, 0.0); @@ -57,8 +83,11 @@ pub async fn solve_for(device_id: MAC) -> Result { pos /= divisor; - // println!("Pos: {}", pos); - let _r = KnownPosition::new(pos).write_for(device_id.as_str()).await; + println!("Pos: {}", pos); + let known_pos = KnownPosition::new(device_id.as_str(), pos); + let _r = influx::Client::get() + .write(Bucket::Perm, futures::stream::iter([known_pos])) + .await; Ok(pos) } @@ -132,16 +161,16 @@ fn trilat(a: &KnownDistance, b: &KnownDistance, c: &KnownDistance) -> Option HashMap { +fn anntennas_hashmap() -> HashMap { let data = vec![ Antenna::new("e6:ad:0b:2e:d7:11", 30.0, Point::new(15.0, 15.0)), Antenna::new("c2:b5:f5:cc:e6:88", 30.0, Point::new(15.0, -15.0)), Antenna::new("e6:2e:e6:88:f5:cc", 30.0, Point::new(-15.0, 15.0)), Antenna::new("c2:ad:0b:b5:11:d7", 30.0, Point::new(-15.0, -15.0)), ]; - let mut map: HashMap = HashMap::new(); + let mut map: HashMap = HashMap::new(); for a in data.iter() { - map.insert(a.id, a.clone()); + map.insert(a.id.clone(), a.clone()); } map }