Create workspace from previous implementation files

This commit is contained in:
2023-05-21 13:52:18 +02:00
parent b84f46ed57
commit a5976252e8
23 changed files with 1374 additions and 15 deletions

8
.env Normal file
View File

@@ -0,0 +1,8 @@
DATABASE_URL=postgres://kairo:AJzYhFltZXRiGQ@localhost/kairoXYZ_db
INFLUX_URL=http://localhost:8086
INFLUX_DB=db0
INFLUX_USER=mmRTLS
INFLUX_PASSWORD=Lkj9s2iAnd7Gxg
MQTT_BROKER=tcp://localhost:1883

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
target
Cargo.lock
data

22
Cargo.toml Normal file
View File

@@ -0,0 +1,22 @@
[workspace]
members = [
# Well of course, all the common/shared source code among services is going to end up somewhere:
"kairo-common",
# The intended backend application to expose a REST API:
"kairo-core",
# The intended frontend application for GUI navigation:
"kairo-nav",
# Tools and whatnots for testing or simulating other components:
"simulation-tools",
# The service doing the calculations:
"xyz-engine"
]
[workspace.dependencies]
tokio = { version = "1.28.1", features = ["rt-multi-thread", "macros"] }
dotenv = "0.15.0"
chrono = "0.4.24"
paho-mqtt = "0.12.1"
serde = "1.0.162"
serde_json = { version = "1.0.95" }
influxdb = { version = "0.6.0", default-features = false, features = ["derive", "use-serde", "reqwest-client"] }

52
docker-compose.yml Normal file
View File

@@ -0,0 +1,52 @@
version: "3"
services:
postgres:
image: "postgres:latest"
container_name: "postgres"
ports:
- 5432:5432
environment:
- POSTGRES_USER=kairo
- POSTGRES_PASSWORD=AJzYhFltZXRiGQ
- POSTGRES_DB=kairoXYZ_db
# volumes:
# - ./data/postgres:/var/lib/postgresql/data/
mqtt_broker:
image: "eclipse-mosquitto:latest"
container_name: "mosquitto"
network_mode: host
influx:
container_name: "influxdb"
tmpfs:
- /var/lib/influxdb
ports:
- 8086:8086
image: "influxdb:1.8.10"
environment:
- INFLUXDB_HTTP_AUTH_ENABLED=true
- INFLUXDB_ADMIN_USER=admin
- INFLUXDB_ADMIN_PASSWORD=xHKvboa0Qlvhtg
- INFLUXDB_USER=mmRTLS
- INFLUXDB_USER_PASSWORD=Lkj9s2iAnd7Gxg
- INFLUXDB_DB=db0
# image: "influxdb:latest"
# environment:
# - DOCKER_INFLUXDB_INIT_MODE=setup
# - DOCKER_INFLUXDB_INIT_USERNAME=mmRTLS
# - DOCKER_INFLUXDB_INIT_PASSWORD=Lkj9s2iAnd7Gxg
# - DOCKER_INFLUXDB_INIT_ORG=glb
# - DOCKER_INFLUXDB_INIT_BUCKET=db0
# - DOCKER_INFLUXDB_INIT_CLI_CONFIG_NAME=default
# - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=xHKvboa0Qlvhtg
chronograf:
image: "chronograf"
container_name: "chronograf"
ports:
- 8888:8888
volumes:
- ./data/chronograf:/var/lib/chronograf

View File

@@ -3,6 +3,15 @@ name = "kairo-common"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
# [[test]]
# name = "all"
# path = "test/all.rs"
[dependencies]
paho-mqtt = { workspace = true }
influxdb = { workspace = true }
tokio = { workspace = true }
dotenv = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }

View File

@@ -0,0 +1,73 @@
use std::{f64::consts::PI, str::FromStr};
use crate::{unit_conversion::UnitsConversion, Point, MAC};
#[derive(Debug, Clone, Default)]
pub struct Antenna {
pub id: MAC,
pub tssi: f64,
pub coord: Point,
}
impl Antenna {
const C: f64 = 2.99e8;
const F: f64 = 2.4e9;
#[allow(non_upper_case_globals)]
const λ: f64 = Self::C / Self::F;
pub fn new(id: &str, tssi: f64, coord: Point) -> Antenna {
Antenna {
id: MAC::from_str(id).unwrap(),
coord,
tssi,
}
}
pub fn get_rssi(&self, distance: f64) -> f64 {
#[allow(non_snake_case)]
// Free Space Path Loss
let FSPL = (((distance * 4.0 * PI) / Self::λ).powi(2)).to_dB();
self.tssi - FSPL
}
#[allow(non_snake_case)]
pub fn get_distance_with_dBm(&self, rssi_dBm: f64) -> f64 {
let loss = self.tssi.dBm_to_W() / rssi_dBm.dBm_to_W();
let distance = (loss.sqrt() * Self::λ) / (4.0 * PI);
distance.abs()
}
#[allow(non_snake_case)]
pub fn get_distance_with_W(&self, rssi_W: f64) -> f64 {
let loss = self.tssi.dBm_to_W() / rssi_W;
let distance = (loss.sqrt() * Self::λ) / (4.0 * PI);
distance.abs()
}
}
#[test]
fn test() {
let tssi = 0.0; // dBm
let a = Antenna::new("AB:CD:EF:12:34:56", tssi, Point { x: 0.0, y: 0.0 });
// Known Attenuation values for 2.4GHz
// 5 meter = 54.02 dB = 3.96e-9 W
// 10 meter = 60.04 dB = 9.91e-10 W
// 20 meter = 66.06 dB = 2.48e-10 W
print!("Testing Antenna::get_rssi()");
assert!(f64::abs(-54.02 - a.get_rssi(5.0)) < 0.1);
assert!(f64::abs(-60.04 - a.get_rssi(10.0)) < 0.1);
assert!(f64::abs(-66.06 - a.get_rssi(20.0)) < 0.1);
println!(" ... ok");
print!("Testing Antenna::get_distance_with_dBm()");
assert!(f64::abs(5.0 - a.get_distance_with_dBm(-54.02)) < 0.5);
assert!(f64::abs(10.0 - a.get_distance_with_dBm(-60.04)) < 0.5);
assert!(f64::abs(20.0 - a.get_distance_with_dBm(-66.06)) < 0.5);
println!(" ... ok");
print!("Testing Antenna::get_distance_with_W()");
assert!(f64::abs(5.0 - a.get_distance_with_W(3.98e-9)) < 0.5);
assert!(f64::abs(10.0 - a.get_distance_with_W(9.91e-10)) < 0.5);
assert!(f64::abs(20.0 - a.get_distance_with_W(2.48e-10)) < 0.5);
println!(" ... ok");
}

126
kairo-common/src/helper.rs Normal file
View File

@@ -0,0 +1,126 @@
pub mod for_async {
use influxdb::Client;
use std::cell::RefCell;
thread_local! {
static INFLUX_CLIENT : RefCell<influxdb::Client> = RefCell::new( init_influx_cli() );
}
pub fn get_influx_cli() -> influxdb::Client {
INFLUX_CLIENT.with(|rc| rc.borrow().clone())
}
fn init_influx_cli() -> influxdb::Client {
let host = dotenv::var("INFLUX_URL").unwrap_or_else(|_| {
println! {"INFLUX_URL not found in .evn file, using default: http://localhost:8086"};
"http://localhost:8086".to_string()
});
let db = dotenv::var("INFLUX_DB").expect("INFLUX_DB not defined in .env file");
let user = dotenv::var("INFLUX_USER").expect("INFLUX_USER not defined in .env file");
let pass =
dotenv::var("INFLUX_PASSWORD").expect("INFLUX_PASSWORD not defined in .env file");
Client::new(host, db).with_auth(user, pass)
}
use mqtt::{AsyncClient, Message};
use paho_mqtt as mqtt;
use std::{process, time::Duration};
pub async fn get_mqtt_cli_and_stream(
) -> (mqtt::AsyncClient, mqtt::AsyncReceiver<Option<Message>>) {
let host = dotenv::var("MQTT_BROKER").unwrap_or_else(|_| {
println! {"MQTT_BROKER not found in .evn file, using default: tcp://localhost:1883"};
"tcp://localhost:1883".to_string()
});
// Create the client. Use an ID for a persistent session.
// A real system should try harder to use a unique ID.
let mqtt_options = mqtt::CreateOptionsBuilder::new()
.server_uri(host)
.client_id("mmRTLS_async_subscribe")
.finalize();
// Create the client connection
let mut client = AsyncClient::new(mqtt_options).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
// Get message stream before connecting.
let stream = client.get_stream(25);
// Define the set of options for the connection
let conn_opts = mqtt::ConnectOptionsBuilder::new()
.keep_alive_interval(Duration::from_secs(30))
.clean_session(false)
.finalize();
// Make the connection to the broker
println!("Connecting to the MQTT server...");
client.connect(conn_opts).await.unwrap_or_else(|e| {
println!("Error connecting to the broker: {:?}", e);
process::exit(1);
});
(client, stream)
}
pub async fn mqtt_cli_reconnect(client: &mqtt::AsyncClient) {
println!("Lost connection. Attempting reconnect.");
while let Err(err) = client.reconnect().await {
println!("Error reconnecting: {}", err);
tokio::time::sleep(Duration::from_millis(1000)).await;
}
}
pub async fn mqtt_subscribe(client: &mqtt::AsyncClient, topic: &str) {
client
.subscribe(topic, 1)
.await
.expect("Unable to subscribe");
}
}
pub mod for_sync {
use paho_mqtt as mqtt;
use std::{process, time::Duration};
pub fn get_mqtt_cli() -> mqtt::Client {
let host = dotenv::var("MQTT_BROKER").unwrap_or_else(|_| {
println! {"MQTT_BROKER not found in .evn file, using default: tcp://localhost:1883"};
"tcp://localhost:1883".to_string()
});
let mut cli = mqtt::Client::new(host).unwrap_or_else(|e| {
println!("Error creating the client: {:?}", e);
process::exit(1);
});
// Use 5sec timeouts for sync calls.
cli.set_timeout(Duration::from_secs(5));
// Connect and wait for it to complete or fail
if let Err(e) = cli.connect(None) {
println!("Unable to connect: {:?}", e);
process::exit(1);
}
cli
}
pub fn mqtt_pub(
client: &mqtt::Client,
topic: &str,
payload: &str,
) -> Result<(), paho_mqtt::Error> {
let msg = mqtt::MessageBuilder::new()
.topic(topic)
.payload(payload)
.qos(0)
.finalize();
client.publish(msg)
}
}

View File

@@ -0,0 +1,80 @@
use chrono::{DateTime, Utc};
use influxdb::{InfluxDbWriteable, ReadQuery};
use serde::{Deserialize, Serialize};
use crate::helper::for_async::get_influx_cli;
use crate::influxdb_models::BEACONMEASURE_TIME_WINDOW;
use crate::MAC;
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, InfluxDbWriteable)]
pub struct BeaconMeasure {
#[influxdb(tag)]
pub beacon_id: MAC,
pub rssi: f64,
pub time: DateTime<Utc>,
}
#[derive(Serialize, Deserialize)]
struct Tags {
beacon_id: MAC,
}
impl BeaconMeasure {
#[allow(non_snake_case)]
pub fn new(beacon_id: &MAC, rssi_W: f64) -> BeaconMeasure {
BeaconMeasure {
beacon_id: *beacon_id,
rssi: rssi_W,
time: chrono::Utc::now(),
}
}
pub async fn write_for(self, device_id: &str) -> Result<String, influxdb::Error> {
let table_name = format!("measure_{}", device_id);
get_influx_cli()
.query(self.into_query(table_name.as_str()))
.await
}
pub async fn get_for(device_id: &str) -> Result<Vec<BeaconMeasure>, influxdb::Error> {
let query = format!( "SELECT mean(rssi) FROM /measure_{}/ WHERE time > now() - {}s AND time < now() GROUP BY beacon_id;", device_id, BEACONMEASURE_TIME_WINDOW);
let mut database_result = get_influx_cli().json_query(ReadQuery::new(query)).await?;
#[derive(Deserialize)]
struct Value {
time: DateTime<Utc>,
mean: f64,
}
let vect = database_result
.deserialize_next_tagged::<Tags, Value>()?
.series
.into_iter()
.map(|measure| BeaconMeasure {
beacon_id: measure.tags.beacon_id,
rssi: measure.values[0].mean,
time: measure.values[0].time,
})
.collect::<Vec<BeaconMeasure>>();
Ok(vect)
}
}
#[tokio::test]
async fn beacon_measure_test() {
print!("Testing BeaconMeasure::* read/write methods");
let bm1 = BeaconMeasure::new(&MAC::new("AB:CD:EF:12:34:56"), 0.0);
let bm = bm1.clone();
let _result = bm.write_for("AB:CD:EF:12:34:56").await;
let bm2 = BeaconMeasure::get_for("AB:CD:EF:12:34:56").await.unwrap();
assert_eq!(bm2.len(), 1);
assert_eq!(bm1.beacon_id, bm2[0].beacon_id);
assert_eq!(bm1.rssi, bm2[0].rssi);
//wait for the time window to pass
let delay = BEACONMEASURE_TIME_WINDOW * 1000 + 500;
tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
let bm2 = BeaconMeasure::get_for("AB:CD:EF:12:34:56").await.unwrap();
assert_eq!(bm2.len(), 0);
println!(" ... ok");
}

View File

@@ -0,0 +1,95 @@
use influxdb::{ReadQuery, WriteQuery};
use serde::{Deserialize, Serialize};
use crate::{helper::for_async::get_influx_cli, MAC};
const TABLE_NAME: &str = "device_status";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeviceStatus {
device_id: MAC,
pos_x: f64,
pos_y: f64,
error: f64,
speed: f64,
}
impl DeviceStatus {
fn new(device_id: MAC) -> DeviceStatus {
DeviceStatus {
device_id,
pos_x: 0.0,
pos_y: 0.0,
error: 0.0,
speed: 0.0,
}
}
pub async fn get(device_id: MAC) -> Result<Box<DeviceStatus>, influxdb::Error> {
let query = ReadQuery::new(format!(
"SELECT last(*) FROM /{}/ WHERE device_id = '{}';",
TABLE_NAME, device_id
));
let mut database_result = get_influx_cli().json_query(query).await?;
#[derive(Debug, Deserialize)]
struct Value {
last_pos_x: f64,
last_pos_y: f64,
last_error: f64,
last_speed: f64,
}
let vec = database_result.deserialize_next::<Value>()?.series;
if !vec.is_empty() && !vec[0].values.is_empty() {
Ok(Box::new(DeviceStatus {
device_id,
pos_x: vec[0].values[0].last_pos_x,
pos_y: vec[0].values[0].last_pos_y,
error: vec[0].values[0].last_error,
speed: vec[0].values[0].last_speed,
}))
} else {
Ok(Box::new(DeviceStatus::new(device_id)))
}
}
fn as_query(&self) -> influxdb::WriteQuery {
WriteQuery::new(influxdb::Timestamp::from(chrono::Utc::now()), TABLE_NAME)
.add_tag("device_id", self.device_id)
.add_field("pos_x", self.pos_x)
.add_field("pos_y", self.pos_y)
.add_field("error", self.error)
.add_field("speed", self.speed)
}
async fn update(query: influxdb::WriteQuery) -> Result<String, influxdb::Error> {
println!("update");
get_influx_cli().query(query).await
}
}
impl Drop for DeviceStatus {
fn drop(&mut self) {
println!("drop");
let query = self.as_query();
tokio::runtime::Handle::current().spawn(async move { Self::update(query).await });
}
}
#[tokio::test]
async fn test() {
use std::time::Duration;
// create context to call drop
{
let mut a = DeviceStatus::get(MAC::new("15:23:45:ab:cd:ef"))
.await
.unwrap();
a.pos_x += 2.0;
a.pos_y += 3.0;
println!("{:?}", a);
} //here and then wait
tokio::time::sleep(Duration::from_millis(150)).await;
}

View File

@@ -0,0 +1,48 @@
use chrono::{DateTime, Utc};
use influxdb::{InfluxDbWriteable, ReadQuery};
use serde::{Deserialize, Serialize};
use crate::helper::for_async::get_influx_cli;
use crate::Point;
#[derive(Debug, PartialEq, Clone, Serialize, Deserialize, InfluxDbWriteable)]
pub struct KnownPosition {
pub x: f64,
pub y: f64,
pub time: DateTime<Utc>,
}
impl KnownPosition {
pub fn new(pos: Point) -> KnownPosition {
KnownPosition {
x: pos.x,
y: pos.y,
time: chrono::Utc::now(),
}
}
pub async fn write_for(self, device_id: &str) -> Result<String, influxdb::Error> {
let table_name = format!("position_{}", device_id);
get_influx_cli()
.query(self.into_query(table_name.as_str()))
.await
}
pub async fn get_last_for(
device_id: &str,
time_window: i32,
) -> Result<Option<KnownPosition>, influxdb::Error> {
let query = format!(
"SELECT mean(x) as x, mean(y) as y FROM /position_{}/ WHERE time > now() - {}s AND time < now();",
device_id, time_window
);
let mut database_result = get_influx_cli().json_query(ReadQuery::new(query)).await?;
let series = &database_result.deserialize_next::<KnownPosition>()?.series;
if series.is_empty() {
Ok(None)
} else {
let vec = &series[0].values;
Ok(Some(vec[0].clone()))
}
}
}

View File

@@ -0,0 +1,10 @@
// pub mod multiple_measures;
mod beacon_measure;
mod device_status;
mod known_position;
// Renaming types for ease of use outside the scope of this module
pub const BEACONMEASURE_TIME_WINDOW: u64 = 4;
pub type BeaconMeasure = beacon_measure::BeaconMeasure;
pub type KnownPosition = known_position::KnownPosition;
pub type DeviceStatus = device_status::DeviceStatus;

View File

@@ -1,14 +1,21 @@
pub fn add(left: usize, right: usize) -> usize {
left + right
}
#![allow(confusable_idents)]
#![allow(mixed_script_confusables)]
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
#[test]
fn it_works() {
let result = add(2, 2);
assert_eq!(result, 4);
}
pub mod helper;
pub mod influxdb_models;
mod mac;
pub mod unit_conversion;
pub type Antenna = antenna::Antenna;
pub type Point = point::Point;
pub type MAC = mac::MAC;
mod antenna;
mod point;
#[derive(Debug, Serialize, Deserialize)]
pub struct DeviceReport {
pub data: Vec<influxdb_models::BeaconMeasure>,
}

90
kairo-common/src/mac.rs Normal file
View File

@@ -0,0 +1,90 @@
use std::fmt::{Debug, Display, Formatter};
use std::str::FromStr;
use influxdb::Type;
use serde::de::{self, Visitor};
use serde::{Deserialize, Serialize, Serializer};
#[allow(clippy::upper_case_acronyms)]
#[derive(Default, Clone, Copy, Hash, PartialEq, Eq)]
pub struct MAC {
s: [u8; 17],
}
impl MAC {
pub fn new(s: &str) -> MAC {
std::str::FromStr::from_str(s).unwrap()
}
pub fn as_str(&self) -> &str {
let a = std::str::from_utf8(&self.s);
a.unwrap()
}
}
impl FromStr for MAC {
type Err = std::string::ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let mut m = MAC::default();
m.s.copy_from_slice(s.as_bytes());
Ok(m)
}
}
impl Display for MAC {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{}", String::from_utf8_lossy(&self.s))
}
}
impl Debug for MAC {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{}", String::from_utf8_lossy(&self.s))
}
}
impl From<MAC> for Type {
fn from(val: MAC) -> Self {
Type::Text(val.to_string())
}
}
impl<'de> Deserialize<'de> for MAC {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct MACVisitor {
len: usize,
}
impl<'de> Visitor<'de> for MACVisitor {
type Value = MAC;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(formatter, "a string containing at least {} bytes", self.len)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if s.len() == self.len {
Ok(MAC::new(s))
} else {
Err(de::Error::invalid_value(de::Unexpected::Str(s), &self))
}
}
}
let visitor = MACVisitor { len: 17 };
deserializer.deserialize_str(visitor)
}
}
impl Serialize for MAC {
fn serialize<S>(&self, serializer: S) -> Result<<S as Serializer>::Ok, <S as Serializer>::Error>
where
S: Serializer,
{
serializer.serialize_str(self.as_str())
}
}

243
kairo-common/src/point.rs Normal file
View File

@@ -0,0 +1,243 @@
use std::{
fmt::{Display, Formatter},
ops,
};
#[derive(Debug, Clone, Copy, Default, PartialEq)]
pub struct Point {
pub x: f64,
pub y: f64,
}
impl Point {
pub fn new(x: f64, y: f64) -> Point {
Point { x, y }
}
pub fn zero() -> Point {
Point { x: 0.0, y: 0.0 }
}
pub fn is_valid(&self) -> bool {
!self.x.is_nan() && !self.y.is_nan()
}
pub fn module(&self) -> f64 {
f64::sqrt(self.x * self.x + self.y * self.y)
}
pub fn phase(&self) -> f64 {
f64::atan2(self.y, self.x)
}
pub fn distance(a: &Point, b: &Point) -> f64 {
(a - b).module()
}
pub fn distance_to(&self, other: &Point) -> f64 {
(self - other).module()
}
pub fn as_versor(&self) -> Option<Point> {
if self.x == 0.0 && self.y == 0.0 {
None
} else {
Some(self / self.module())
}
}
pub fn rotate_by(&mut self, α: f64) {
let m = self.module();
let (sin, cos) = f64::sin_cos(self.phase() + α);
self.x = m * cos;
self.y = m * sin;
}
}
impl Display for Point {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "({:.2},{:.2})", &self.x, &self.y)
}
}
impl ops::Add<Point> for Point {
type Output = Point;
fn add(self, rhs: Point) -> Point {
Point {
x: self.x + rhs.x,
y: self.y + rhs.y,
}
}
}
impl ops::Add<&Point> for &Point {
type Output = Point;
fn add(self, rhs: &Point) -> Point {
Point {
x: self.x + rhs.x,
y: self.y + rhs.y,
}
}
}
impl ops::AddAssign<&Point> for Point {
fn add_assign(&mut self, rhs: &Point) {
*self = Self {
x: self.x + rhs.x,
y: self.y + rhs.y,
};
}
}
impl ops::AddAssign<Point> for Point {
fn add_assign(&mut self, rhs: Point) {
*self = Self {
x: self.x + rhs.x,
y: self.y + rhs.y,
};
}
}
impl ops::SubAssign<&Point> for Point {
fn sub_assign(&mut self, rhs: &Point) {
*self = Self {
x: self.x - rhs.x,
y: self.y - rhs.y,
};
}
}
impl ops::SubAssign<Point> for Point {
fn sub_assign(&mut self, rhs: Point) {
*self = Self {
x: self.x - rhs.x,
y: self.y - rhs.y,
};
}
}
impl ops::Sub<Point> for Point {
type Output = Point;
fn sub(self, rhs: Point) -> Point {
Point {
x: self.x - rhs.x,
y: self.y - rhs.y,
}
}
}
impl ops::Sub<&Point> for &Point {
type Output = Point;
fn sub(self, rhs: &Point) -> Point {
Point {
x: self.x - rhs.x,
y: self.y - rhs.y,
}
}
}
impl ops::Mul<f64> for Point {
type Output = Point;
fn mul(self, rhs: f64) -> Point {
Point {
x: self.x * rhs,
y: self.y * rhs,
}
}
}
impl ops::MulAssign<f64> for Point {
fn mul_assign(&mut self, rhs: f64) {
*self = Point {
x: self.x * rhs,
y: self.y * rhs,
}
}
}
impl ops::Mul<f64> for &Point {
type Output = Point;
fn mul(self, rhs: f64) -> Point {
Point {
x: self.x * rhs,
y: self.y * rhs,
}
}
}
impl ops::Div<f64> for Point {
type Output = Point;
fn div(self, rhs: f64) -> Point {
Point {
x: self.x / rhs,
y: self.y / rhs,
}
}
}
impl ops::DivAssign<f64> for Point {
fn div_assign(&mut self, rhs: f64) {
*self = Point {
x: self.x / rhs,
y: self.y / rhs,
}
}
}
impl ops::Div<f64> for &Point {
type Output = Point;
fn div(self, rhs: f64) -> Point {
Point {
x: self.x / rhs,
y: self.y / rhs,
}
}
}
#[test]
fn test() {
use std::f64::consts::{FRAC_1_SQRT_2, FRAC_PI_2, FRAC_PI_4, SQRT_2};
// New
let p = Point::new(0.0, 0.0);
print!("Testing Point::new()");
assert_eq!(p, Point { x: 0.0, y: 0.0 });
assert_ne!(p, Point { x: -1.0, y: 1.0 });
println!(" ... ok");
// is_valid
let n = Point::new(std::f64::NAN, std::f64::NAN);
let nn = Point::new(std::f64::NAN, 0.0);
print!("Testing Point::is_valid()");
assert_eq!(p.is_valid(), true);
assert_eq!(n.is_valid(), false);
assert_eq!(nn.is_valid(), false);
println!(" ... ok");
// module
let p = Point::new(1.0, 1.0);
let r = Point::new(2.0, 0.0);
print!("Testing Point::module()");
assert!(f64::abs(p.module() - SQRT_2) < 1e-10);
assert!(f64::abs(r.module() - 2.0) < 1e-10);
println!(" ... ok");
// phase
let p = Point::new(1.0, 1.0);
let r = Point::new(2.0, 0.0);
let q = Point::new(2.0, -2.0);
print!("Testing Point::phase()");
assert!(f64::abs(p.phase() - FRAC_PI_4) < 1e-6);
assert!(f64::abs(r.phase() - 0.0) < 1e-6);
assert!(f64::abs(q.phase() + FRAC_PI_4) < 1e-6);
println!(" ... ok");
//distance
let z = Point::zero();
let p = Point::new(1.0, 0.0);
let q = Point::new(1.0, 1.0);
print!("Testing Point::distance() and distance_to()");
assert_eq!(z.distance_to(&p), 1.0);
assert_eq!(Point::distance(&z, &p), 1.0);
assert!(f64::abs(Point::distance(&z, &q) - SQRT_2) < 1e-10);
println!(" ... ok");
//versor
print!("Testing Point::as_versor()");
assert_eq!(z.as_versor(), None);
assert_eq!(p, p.as_versor().unwrap());
let q_ver = q.as_versor().unwrap();
assert!(f64::abs(q_ver.x - FRAC_1_SQRT_2) < 1e-10);
assert!(f64::abs(q_ver.y - FRAC_1_SQRT_2) < 1e-10);
println!(" ... ok");
//rotate_by
let mut p = Point::new(1.0, 0.0);
print!("Testing Point::rotate_by()");
p.rotate_by(FRAC_PI_2);
assert!(f64::abs(p.x - 0.0) < 1e-10);
assert!(f64::abs(p.y - 1.0) < 1e-10);
p.rotate_by(-FRAC_PI_4);
assert!(f64::abs(p.x - FRAC_1_SQRT_2) < 1e-10);
assert!(f64::abs(p.y - FRAC_1_SQRT_2) < 1e-10);
println!(" ... ok");
}

View File

@@ -0,0 +1,60 @@
pub trait UnitsConversion {
#[allow(non_snake_case)]
fn dBm_to_W(&self) -> f64;
#[allow(non_snake_case)]
fn W_to_dBm(&self) -> f64;
#[allow(non_snake_case, clippy::wrong_self_convention)]
fn from_dB(&self) -> f64;
#[allow(non_snake_case)]
fn to_dB(&self) -> f64;
}
impl UnitsConversion for f64 {
fn dBm_to_W(&self) -> f64 {
10.0_f64.powf((self - 30.0) / 10.0)
}
fn W_to_dBm(&self) -> f64 {
30.0 + 10.0 * f64::log10(*self)
}
fn from_dB(&self) -> f64 {
10.0_f64.powf((*self) / 10.0)
}
fn to_dB(&self) -> f64 {
10.0 * f64::log10(*self)
}
}
#[test]
fn test_unit_conversion() {
print!("Testing conversion from W to dBm");
assert_eq!(1.0_f64.W_to_dBm(), 30.0);
assert_eq!(0.001_f64.W_to_dBm(), 0.0);
assert!(f64::abs(2.0_f64.W_to_dBm() - 33.0) < 0.1);
assert!(f64::abs(0.002_f64.W_to_dBm() - 3.0) < 0.1);
println!(" ... ok");
print!("Testing conversion from dBm to W");
assert_eq!(1.0, 30.0_f64.dBm_to_W());
assert_eq!(0.001, 0.0_f64.dBm_to_W());
assert!(f64::abs(2.0 - 33.0_f64.dBm_to_W()) < 0.1);
assert!(f64::abs(0.002 - 3.0_f64.dBm_to_W()) < 0.1);
println!(" ... ok");
print!("Testing conversion from dB to scalar");
assert_eq!(1.0, 0.0_f64.from_dB());
assert_eq!(10.0, 10.0_f64.from_dB());
assert_eq!(100.0, 20.0_f64.from_dB());
assert!(f64::abs(2.0 - 3.0_f64.from_dB()) < 0.1);
assert!(f64::abs(20.0 - 13_f64.from_dB()) < 0.1);
assert!(f64::abs(200.0 - 23_f64.from_dB()) < 0.5);
println!(" ... ok");
print!("Testing conversion from scalar to dB");
assert_eq!(1.0_f64.to_dB(), 0.0);
assert_eq!(10.0_f64.to_dB(), 10.0);
assert_eq!(100.0_f64.to_dB(), 20.0);
assert!(f64::abs(2.0_f64.to_dB() - 3.0) < 0.1);
assert!(f64::abs(20.0_f64.to_dB() - 13.0) < 0.1);
assert!(f64::abs(200.0_f64.to_dB() - 23.0) < 0.5);
println!(" ... ok");
}

View File

@@ -0,0 +1,23 @@
[package]
name = "simulation-tools"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[bin]]
name = "nav_dev"
path = "src/nav_dev/main.rs"
[dependencies]
paho-mqtt = { workspace = true }
influxdb = { workspace = true }
tokio = { workspace = true }
dotenv = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
kairo-common = {path = "../kairo-common" }
rand = "0.8.5"
rand_distr = "0.4.3"

View File

@@ -0,0 +1,59 @@
use chrono::{DateTime, Utc};
use influxdb::InfluxDbWriteable;
use kairo_common::helper::for_async::get_influx_cli;
use serde::Serialize;
use tokio::time;
use kairo_common::{influxdb_models::KnownPosition, Point};
use crate::Config;
#[derive(Debug, Serialize, InfluxDbWriteable)]
pub struct Error {
error: f64,
speed: f64,
time: DateTime<Utc>,
}
pub async fn thread(config: Config) {
let period = time::Duration::from_millis(500);
let mut position = Point::new(config.radius, 0.0);
let mut speed = position;
position.rotate_by(f64::to_radians(config.angle_step));
speed -= position;
let speed = speed.module();
loop {
let start = time::Instant::now();
let real = KnownPosition::get_last_for("real", 1).await;
let calc = KnownPosition::get_last_for(config.id.as_str(), 1).await;
if real.is_ok() && calc.is_ok() {
let real = real.unwrap();
let calc = calc.unwrap();
if real.is_some() && calc.is_some() {
let real = real.unwrap();
let calc = calc.unwrap();
#[allow(non_snake_case)]
let Δx = real.x - calc.x;
#[allow(non_snake_case)]
let Δy = real.y - calc.y;
let error = Error {
speed,
error: f64::sqrt(Δx.powi(2) + Δy.powi(2)),
time: chrono::Utc::now(),
};
let table_name = format!("error_{}", config.id.as_str());
get_influx_cli()
.query(error.into_query(table_name.as_str()))
.await
.unwrap();
}
time::sleep(period - start.elapsed()).await;
}
}
}

View File

@@ -0,0 +1,106 @@
use rand_distr::{Distribution, Normal};
use std::{thread, time};
mod error_report;
use kairo_common::helper::for_sync::{get_mqtt_cli, mqtt_pub};
use kairo_common::{
influxdb_models::{BeaconMeasure, KnownPosition},
Antenna, DeviceReport, Point,
};
#[derive(Clone)]
pub struct Config {
period_ms: u64,
radius: f64,
noise_level: f64,
angle_step: f64,
id: String,
real: bool,
}
#[tokio::main]
async fn main() {
let config = parse_cli();
let period = time::Duration::from_millis(config.period_ms);
let noise_gen = Normal::new(0.0, config.noise_level).unwrap();
if config.real {
let config = config.clone();
tokio::spawn(async move {
error_report::thread(config).await;
});
}
let client = get_mqtt_cli();
let mut position = Point::new(config.radius, 0.0);
let antenna = vec![
Antenna::new("e6:ad:0b:2e:d7:11", 30.0, Point::new(15.0, 15.0)),
Antenna::new("c2:b5:f5:cc:e6:88", 30.0, Point::new(15.0, -15.0)),
Antenna::new("e6:2e:e6:88:f5:cc", 30.0, Point::new(-15.0, 15.0)),
Antenna::new("c2:ad:0b:b5:11:d7", 30.0, Point::new(-15.0, -15.0)),
];
let topic = format!("device/{}/report", config.id);
loop {
let start = time::Instant::now();
let mut report = DeviceReport { data: vec![] };
for ant in (antenna).iter() {
let d = ant.coord.distance_to(&position);
let rssi = ant.get_rssi(d);
let noise: f64 = noise_gen.sample(&mut rand::thread_rng());
report.data.push(BeaconMeasure::new(&ant.id, rssi + noise));
}
let payload = serde_json::to_string(&report).unwrap_or_else(|_| "".to_string());
mqtt_pub(&client, topic.as_str(), payload.as_str()).expect("Pub error");
if config.real {
let _r = KnownPosition::new(position).write_for("real").await;
}
position.rotate_by(f64::to_radians(config.angle_step));
thread::sleep(period - start.elapsed());
}
}
fn parse_cli() -> Config {
use std::env;
let mut config = Config {
period_ms: 1000,
radius: 12.0,
noise_level: 0.0,
angle_step: 3.6,
id: "60:f2:62:01:a9:28".to_string(),
real: true,
};
let args = env::args().collect::<Vec<String>>();
for (i, arg) in args.iter().enumerate() {
match arg.as_str() {
"--noise" | "--noise-level" | "-n" => {
config.noise_level = args[i + 1].parse::<f64>().unwrap();
}
"--rad" | "--radious" | "-r" => {
config.radius = args[i + 1].parse::<f64>().unwrap();
}
"--period" | "-p" => {
config.period_ms = args[i + 1].parse::<u64>().unwrap();
}
"--angle" | "--step" => {
config.angle_step = args[i + 1].parse::<f64>().unwrap();
}
"--id" => {
config.id = args[i + 1].clone();
config.real = false;
}
_ => {}
}
}
config
}

View File

@@ -3,6 +3,19 @@ name = "xyz-engine"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[[test]]
name = "all"
path = "test/all.rs"
[dependencies]
paho-mqtt = { workspace = true }
influxdb = { workspace = true }
tokio = { workspace = true }
dotenv = { workspace = true }
chrono = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
kairo-common = {path = "../kairo-common" }
itertools = "0.10.3"
futures = "0.3"

30
xyz-engine/src/handler.rs Normal file
View File

@@ -0,0 +1,30 @@
pub mod device {
use kairo_common::influxdb_models::BeaconMeasure;
use kairo_common::{unit_conversion::UnitsConversion, DeviceReport, MAC};
use crate::position_solver::solve_for;
pub async fn report(device_id: &str, payload: &str) {
if let Ok(device_report) = serde_json::from_str::<DeviceReport>(payload) {
// device_report.data.sort_by(|a, b| b.pwr.cmp(&a.pwr));
let mut count = 0;
for beacon in device_report.data.iter() {
let measure = BeaconMeasure::new(&beacon.beacon_id, beacon.rssi.dBm_to_W());
if (measure.write_for(device_id).await).is_ok() {
count += 1;
}
}
// If I added more than 3 valid measures it's worth to process the position
if count >= 3 {
let device_id = MAC::new(device_id);
tokio::spawn(async move {
let _r = solve_for(device_id).await;
});
}
} else {
println!("Unable to parse: {}", payload);
}
}
}

View File

@@ -1,3 +1,37 @@
fn main() {
println!("Hello, world!");
use futures::stream::StreamExt;
mod handler;
mod position_solver;
use kairo_common::helper::for_async::{
get_mqtt_cli_and_stream, mqtt_cli_reconnect, mqtt_subscribe,
};
#[tokio::main]
async fn main() {
let (mqtt_cli, mut stream) = get_mqtt_cli_and_stream().await;
let topic = "device/+/report";
println!("Subscribing to topic: {:?}", topic);
mqtt_subscribe(&mqtt_cli, topic).await;
while let Some(msg) = stream.next().await {
if let Some(msg) = msg {
// split the topic first
let topic: Vec<&str> = msg.topic().splitn(3, '/').collect();
match topic[0] {
"device" => match topic[2] {
"report" => handler::device::report(topic[1], &msg.payload_str()).await,
_ => println!("Unhandled topic for device: {}", topic[2]),
},
_ => println!("Unhandled topic: {}", msg.topic()),
}
} else {
// A "None" means we were disconnected. Try to reconnect...
mqtt_cli_reconnect(&mqtt_cli).await;
mqtt_subscribe(&mqtt_cli, topic).await;
}
}
}

View File

@@ -0,0 +1,169 @@
use itertools::Itertools;
use std::collections::HashMap;
use kairo_common::{
influxdb_models::{BeaconMeasure, KnownPosition},
Antenna, Point, MAC,
};
struct KnownDistance {
point: Point,
dist: f64,
}
pub async fn solve_for(device_id: MAC) -> Result<Point, ()> {
let antennas = anntennas_hashmap();
let measure = BeaconMeasure::get_for(device_id.as_str()).await.unwrap();
let known_distance = measure
.iter()
.filter_map(|m| {
if let Some(a) = antennas.get(&m.beacon_id) {
let kd = KnownDistance {
point: a.coord,
dist: a.get_distance_with_W(m.rssi),
};
Some(kd)
} else {
None
}
})
.collect::<Vec<KnownDistance>>();
let mut posible_positions = known_distance
.iter()
.permutations(3)
.filter_map(|per| trilat(per[0], per[1], per[2]))
.collect::<Vec<KnownDistance>>();
print!("Old len(): {} \t", posible_positions.len());
if let Some(last_position) = KnownPosition::get_last_for(device_id.as_str(), 2)
.await
.unwrap()
{
let last_position = Point::new(last_position.x, last_position.y);
posible_positions.retain(|p| last_position.distance_to(&p.point) < 3.0);
}
println!("New len(): {}", posible_positions.len());
let mut pos = Point::new(0.0, 0.0);
let mut divisor = 0.0;
for p in posible_positions.iter() {
pos.x += p.point.x / p.dist;
pos.y += p.point.y / p.dist;
divisor += 1.0 / p.dist;
}
pos /= divisor;
// println!("Pos: {}", pos);
let _r = KnownPosition::new(pos).write_for(device_id.as_str()).await;
Ok(pos)
}
fn trilat(a: &KnownDistance, b: &KnownDistance, c: &KnownDistance) -> Option<KnownDistance> {
#![allow(non_snake_case)]
let points = vec![a.point, b.point, c.point];
for &p in points.iter() {
if !p.is_valid() {
return None;
}
}
// We have two triangles that share a side,
// Da and Db are both a hypotenuse,
// h is the shared side
// D is the lineal sum of both coaxial sides.
// P
// /|\
// / | \
// Da/ |h \Db
// / | \
// / d1 | d2 \
// *-----------*
// A B => D = BA
let D = (b.point - a.point).module();
let d1 = (D.powi(2) + a.dist.powi(2) - b.dist.powi(2)) / (2.0 * D);
let h = f64::sqrt(a.dist.powi(2) - d1.powi(2));
if h.is_nan() {
return None;
}
// With points A and B, we can find the Position P, but we the fact is that there are
// two posible solutions, we build a rhombus with both posible P:
let D_ver = (b.point - a.point).as_versor().unwrap();
let mut upper = D_ver * a.dist;
let mut downer = D_ver * a.dist;
// we need to rotate that direction by alpha and -alpha
let alpha = f64::tan(h / d1);
upper.rotate_by(alpha);
downer.rotate_by(-alpha);
// Now we have two vectors with |Da| that point from A where the two posible positions are
let P = [a.point + upper, a.point + downer];
//Now we need to see which P[0] or P[1] is at distance Dc from pointC.
//But since all numbers we got (Da,Db and Dc) cointain a lot of error and noise
// we know that they won't be the same number so we need to pick the point that makes the distance to pointC the closest to Dc
let dist_to_C = [P[0].distance_to(&c.point), P[1].distance_to(&c.point)];
let error = [
f64::abs(dist_to_C[0] - c.dist),
f64::abs(dist_to_C[1] - c.dist),
];
if error[0] < error[1] {
Some(KnownDistance {
point: P[0],
dist: error[0],
})
} else {
Some(KnownDistance {
point: P[1],
dist: error[1],
})
}
}
fn anntennas_hashmap() -> HashMap<MAC, Antenna> {
let data = vec![
Antenna::new("e6:ad:0b:2e:d7:11", 30.0, Point::new(15.0, 15.0)),
Antenna::new("c2:b5:f5:cc:e6:88", 30.0, Point::new(15.0, -15.0)),
Antenna::new("e6:2e:e6:88:f5:cc", 30.0, Point::new(-15.0, 15.0)),
Antenna::new("c2:ad:0b:b5:11:d7", 30.0, Point::new(-15.0, -15.0)),
];
let mut map: HashMap<MAC, Antenna> = HashMap::new();
for a in data.iter() {
map.insert(a.id, a.clone());
}
map
}
#[test]
fn test_trilat() {
let a = KnownDistance {
dist: 6.3,
point: Point::new(0.0, 0.0),
};
let b = KnownDistance {
dist: 3.1,
point: Point::new(5.0, 6.5),
};
let c = KnownDistance {
dist: 5.5,
point: Point::new(9.0, 0.0),
};
let pos = trilat(&a, &b, &c).unwrap();
let expected = Point::new(5.0, 3.5);
assert!(f64::abs(pos.point.x - expected.x) < 0.5);
assert!(f64::abs(pos.point.y - expected.y) < 0.5);
}

1
xyz-engine/test/all.rs Normal file
View File

@@ -0,0 +1 @@