Refactor Models for InfluxDB (#1)

Update to InfluxDB 2.0 and update all interfaces to work with it.

MAC has been deprecated, since current `influxdb2` doesn't support non built-in types for read/write into the DB.

Co-authored-by: Felipe Diniello <felipediniello@pm.me>
Reviewed-on: #1
This commit was merged in pull request #1.
This commit is contained in:
2023-06-18 18:43:15 +02:00
parent a5976252e8
commit 962b90e1b8
22 changed files with 440 additions and 397 deletions

123
kairo-common/src/influx.rs Normal file
View File

@@ -0,0 +1,123 @@
use std::cell::RefCell;
#[derive(Debug, Clone)]
pub struct Client {
client: influxdb2::Client,
// We should get two buckets, one temp and a permanent, but for now we'll use just one
bucket: String,
}
pub enum Bucket {
Tmp,
Perm,
}
thread_local! {
static INFLUX_CLIENT : RefCell<Client> = Client::new();
}
impl Client {
fn new() -> RefCell<Client> {
let host = dotenv::var("INFLUX_HOST").unwrap_or_else(|_| {
println! {"INFLUX_HOST not found in .env file, using default: http://localhost:8086"};
"http://localhost:8086".to_string()
});
let bucket = dotenv::var("INFLUX_BUCKET").expect("INFLUX_BUCKET not defined in .env file");
let org = dotenv::var("INFLUX_ORG").expect("INFLUX_ORG not defined in .env file");
let token = dotenv::var("INFLUX_TOKEN").expect("INFLUX_TOKEN not defined in .env file");
RefCell::new(Client {
client: influxdb2::Client::new(host, org, token),
bucket,
})
}
pub fn get() -> Client {
INFLUX_CLIENT.with(|rc| rc.borrow().clone())
}
pub async fn write(
&self,
_bucket: Bucket,
body: impl futures::Stream<Item = impl influxdb2::models::WriteDataPoint>
+ Send
+ Sync
+ 'static,
) -> Result<(), influxdb2::RequestError> {
// TODO: use _bucket to choose from internal list
self.client.write(self.bucket.as_str(), body).await
}
pub async fn query<T>(
&self,
_bucket: Bucket,
q: String,
) -> Result<Vec<T>, influxdb2::RequestError>
where
T: influxdb2_structmap::FromMap,
{
// TODO: use _bucket to choose from internal list
let from_bucket = format!("from(bucket: \"{}\")", self.bucket);
let query = from_bucket + &q;
let query = influxdb2::models::Query::new(query);
self.client.query::<T>(Some(query)).await
}
}
#[cfg(test)]
mod test {
use crate::influx::{Bucket, Client};
#[tokio::test]
async fn test_new_get_cli() {
let health = Client::get().client.health().await;
assert!(health.is_ok())
}
use influxdb2_derive::{FromDataPoint, WriteDataPoint};
#[derive(Default, Debug, PartialEq, FromDataPoint, WriteDataPoint)]
#[measurement = "stock_prices"]
struct StockPrice {
#[influxdb(tag)]
ticker: String,
#[influxdb(field)]
value: f64,
#[influxdb(timestamp)]
time: i64,
}
#[tokio::test]
async fn test_write_then_query() {
let time = chrono::Utc::now().timestamp_nanos();
let w = StockPrice {
ticker: "ASDF".into(),
value: 150.5,
time: time,
};
let res = Client::get()
.write(Bucket::Perm, futures::stream::iter([w]))
.await;
assert!(res.is_ok());
let query = format!(
"
|> range(start: -1s)
|> filter(fn: (r) => r[\"_measurement\"] == \"stock_prices\")
|> filter(fn: (r) => r[\"ticker\"] == \"ASDF\")
|> sort(columns: [\"time\"], desc: true)
"
);
let r = Client::get()
.query::<StockPrice>(Bucket::Perm, query)
.await
.unwrap();
assert!(r.len() > 0);
assert_eq!(r[0].ticker, "ASDF");
assert_eq!(r[0].value, 150.5);
}
}