Update to InfluxDB 2.0 and update all interfaces to work with it. MAC has been deprecated, since current `influxdb2` doesn't support non built-in types for read/write into the DB. Co-authored-by: Felipe Diniello <felipediniello@pm.me> Reviewed-on: #1
124 lines
3.4 KiB
Rust
124 lines
3.4 KiB
Rust
use std::cell::RefCell;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct Client {
|
|
client: influxdb2::Client,
|
|
|
|
// We should get two buckets, one temp and a permanent, but for now we'll use just one
|
|
bucket: String,
|
|
}
|
|
|
|
pub enum Bucket {
|
|
Tmp,
|
|
Perm,
|
|
}
|
|
|
|
thread_local! {
|
|
static INFLUX_CLIENT : RefCell<Client> = Client::new();
|
|
}
|
|
|
|
impl Client {
|
|
fn new() -> RefCell<Client> {
|
|
let host = dotenv::var("INFLUX_HOST").unwrap_or_else(|_| {
|
|
println! {"INFLUX_HOST not found in .env file, using default: http://localhost:8086"};
|
|
"http://localhost:8086".to_string()
|
|
});
|
|
let bucket = dotenv::var("INFLUX_BUCKET").expect("INFLUX_BUCKET not defined in .env file");
|
|
let org = dotenv::var("INFLUX_ORG").expect("INFLUX_ORG not defined in .env file");
|
|
let token = dotenv::var("INFLUX_TOKEN").expect("INFLUX_TOKEN not defined in .env file");
|
|
|
|
RefCell::new(Client {
|
|
client: influxdb2::Client::new(host, org, token),
|
|
bucket,
|
|
})
|
|
}
|
|
pub fn get() -> Client {
|
|
INFLUX_CLIENT.with(|rc| rc.borrow().clone())
|
|
}
|
|
|
|
pub async fn write(
|
|
&self,
|
|
_bucket: Bucket,
|
|
body: impl futures::Stream<Item = impl influxdb2::models::WriteDataPoint>
|
|
+ Send
|
|
+ Sync
|
|
+ 'static,
|
|
) -> Result<(), influxdb2::RequestError> {
|
|
// TODO: use _bucket to choose from internal list
|
|
|
|
self.client.write(self.bucket.as_str(), body).await
|
|
}
|
|
|
|
pub async fn query<T>(
|
|
&self,
|
|
_bucket: Bucket,
|
|
q: String,
|
|
) -> Result<Vec<T>, influxdb2::RequestError>
|
|
where
|
|
T: influxdb2_structmap::FromMap,
|
|
{
|
|
// TODO: use _bucket to choose from internal list
|
|
let from_bucket = format!("from(bucket: \"{}\")", self.bucket);
|
|
let query = from_bucket + &q;
|
|
let query = influxdb2::models::Query::new(query);
|
|
|
|
self.client.query::<T>(Some(query)).await
|
|
}
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod test {
|
|
use crate::influx::{Bucket, Client};
|
|
|
|
#[tokio::test]
|
|
async fn test_new_get_cli() {
|
|
let health = Client::get().client.health().await;
|
|
assert!(health.is_ok())
|
|
}
|
|
|
|
use influxdb2_derive::{FromDataPoint, WriteDataPoint};
|
|
#[derive(Default, Debug, PartialEq, FromDataPoint, WriteDataPoint)]
|
|
#[measurement = "stock_prices"]
|
|
struct StockPrice {
|
|
#[influxdb(tag)]
|
|
ticker: String,
|
|
#[influxdb(field)]
|
|
value: f64,
|
|
#[influxdb(timestamp)]
|
|
time: i64,
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_write_then_query() {
|
|
let time = chrono::Utc::now().timestamp_nanos();
|
|
let w = StockPrice {
|
|
ticker: "ASDF".into(),
|
|
value: 150.5,
|
|
time: time,
|
|
};
|
|
|
|
let res = Client::get()
|
|
.write(Bucket::Perm, futures::stream::iter([w]))
|
|
.await;
|
|
assert!(res.is_ok());
|
|
|
|
let query = format!(
|
|
"
|
|
|> range(start: -1s)
|
|
|> filter(fn: (r) => r[\"_measurement\"] == \"stock_prices\")
|
|
|> filter(fn: (r) => r[\"ticker\"] == \"ASDF\")
|
|
|> sort(columns: [\"time\"], desc: true)
|
|
"
|
|
);
|
|
|
|
let r = Client::get()
|
|
.query::<StockPrice>(Bucket::Perm, query)
|
|
.await
|
|
.unwrap();
|
|
|
|
assert!(r.len() > 0);
|
|
assert_eq!(r[0].ticker, "ASDF");
|
|
assert_eq!(r[0].value, 150.5);
|
|
}
|
|
}
|