show waterlevel for the next days
All checks were successful
CI/CD Pipeline / test (push) Successful in 9m49s
CI/CD Pipeline / deploy-staging (push) Has been skipped
CI/CD Pipeline / deploy-main (push) Has been skipped

This commit is contained in:
2024-04-30 11:59:33 +02:00
parent 3323807e46
commit 3a39315a01
11 changed files with 369 additions and 4 deletions

View File

@ -8,6 +8,8 @@ pub mod tera;
#[cfg(feature = "rest")]
pub mod rest;
pub mod scheduled;
#[cfg(test)]
#[macro_export]
macro_rules! testdb {

View File

@ -4,6 +4,7 @@ use std::str::FromStr;
#[cfg(feature = "rest")]
use rot::rest;
use rot::scheduled;
#[cfg(feature = "rowing-tera")]
use rot::tera;
@ -26,6 +27,8 @@ async fn rocket() -> _ {
.await
.unwrap();
scheduled::schedule(&db).await;
let rocket = rocket::build().manage(db);
#[cfg(feature = "rowing-tera")]

View File

@ -5,6 +5,7 @@ use sqlx::SqlitePool;
use self::{
planned_event::{PlannedEvent, PlannedEventWithUserAndTriptype},
trip::{Trip, TripWithUserAndType},
waterlevel::Waterlevel,
};
pub mod boat;
@ -27,6 +28,7 @@ pub mod tripdetails;
pub mod triptype;
pub mod user;
pub mod usertrip;
pub mod waterlevel;
#[derive(Serialize, Debug)]
pub struct Day {
@ -34,6 +36,7 @@ pub struct Day {
planned_events: Vec<PlannedEventWithUserAndTriptype>,
trips: Vec<TripWithUserAndType>,
is_pinned: bool,
max_waterlevel: Option<i64>,
}
impl Day {
@ -44,6 +47,7 @@ impl Day {
planned_events: PlannedEvent::get_pinned_for_day(db, day).await,
trips: Trip::get_pinned_for_day(db, day).await,
is_pinned,
max_waterlevel: Waterlevel::max_waterlevel_for_day(db, day).await,
}
} else {
Self {
@ -51,6 +55,7 @@ impl Day {
planned_events: PlannedEvent::get_for_day(db, day).await,
trips: Trip::get_for_day(db, day).await,
is_pinned,
max_waterlevel: Waterlevel::max_waterlevel_for_day(db, day).await,
}
}
}

72
src/model/waterlevel.rs Normal file
View File

@ -0,0 +1,72 @@
use std::ops::DerefMut;
use chrono::NaiveDate;
use rocket::serde::{Deserialize, Serialize};
use sqlx::{FromRow, Sqlite, SqlitePool, Transaction};
#[derive(FromRow, Debug, Serialize, Deserialize, Eq, Hash, PartialEq, Clone)]
pub struct Waterlevel {
pub id: i64,
pub day: NaiveDate,
pub time: String,
pub max: i64,
pub min: i64,
pub mittel: i64,
pub tumax: i64,
pub tumin: i64,
pub tumittel: i64,
}
impl Waterlevel {
pub async fn find_by_id(db: &SqlitePool, id: i32) -> Option<Self> {
sqlx::query_as!(Self, "SELECT * FROM waterlevel WHERE id like ?", id)
.fetch_one(db)
.await
.ok()
}
pub async fn find_by_id_tx(db: &mut Transaction<'_, Sqlite>, id: i32) -> Option<Self> {
sqlx::query_as!(Self, "SELECT * FROM waterlevel WHERE id like ?", id)
.fetch_one(db.deref_mut())
.await
.ok()
}
pub async fn create(
db: &mut Transaction<'_, Sqlite>,
day: NaiveDate,
time: String,
max: i64,
min: i64,
mittel: i64,
tumax: i64,
tumin: i64,
tumittel: i64,
) -> Result<(), String> {
sqlx::query!(
"INSERT INTO waterlevel(day, time, max, min, mittel, tumax, tumin, tumittel) VALUES (?,?,?,?,?,?,?,?)",
day, time, max, min, mittel, tumax, tumin, tumittel
)
.execute(db.deref_mut())
.await
.map_err(|e| e.to_string())?;
Ok(())
}
pub async fn max_waterlevel_for_day(db: &SqlitePool, day: NaiveDate) -> Option<i64> {
sqlx::query!(
"SELECT MAX(mittel) as max FROM waterlevel WHERE day = ?",
day
)
.fetch_one(db)
.await
.unwrap()
.max
}
pub async fn delete_all(db: &mut Transaction<'_, Sqlite>) {
sqlx::query!("DELETE FROM waterlevel;")
.execute(db.deref_mut())
.await
.unwrap();
}
}

33
src/scheduled/mod.rs Normal file
View File

@ -0,0 +1,33 @@
mod waterlevel;
use std::time::Duration;
use job_scheduler_ng::{Job, JobScheduler};
use rocket::tokio::{self, task};
use sqlx::SqlitePool;
pub async fn schedule(db: &SqlitePool) {
let db = db.clone();
waterlevel::update(&db).await.unwrap();
tokio::task::spawn(async {
let mut sched = JobScheduler::new();
// Every hour
sched.add(Job::new("0 0 * * * * *".parse().unwrap(), move || {
let db_clone = db.clone();
// Use block_in_place to run async code in the synchronous function
task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
waterlevel::update(&db_clone).await.unwrap();
});
});
}));
loop {
sched.tick();
std::thread::sleep(Duration::from_secs(60));
}
});
}

112
src/scheduled/waterlevel.rs Normal file
View File

@ -0,0 +1,112 @@
use chrono::{DateTime, FixedOffset, NaiveDate, NaiveTime};
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;
use crate::model::waterlevel::Waterlevel;
pub async fn update(db: &SqlitePool) -> Result<(), String> {
let mut tx = db.begin().await.unwrap();
// 1. Delete water levels starting from yesterday
Waterlevel::delete_all(&mut tx).await;
// 2. Fetch
let station = fetch()?;
for d in station.data {
let (Some(max), Some(min), Some(mittel), Some(tumax), Some(tumin), Some(tumittel)) =
(d.max, d.min, d.mittel, d.tumax, d.tumin, d.tumittel)
else {
println!("Ignored invalid values: {d:?}");
continue;
};
let Ok(datetime): Result<DateTime<FixedOffset>, _> = d.timestamp.parse() else {
return Err("Failed to parse datetime from hydro json".into());
};
let date: NaiveDate = datetime.naive_utc().date();
// Extract time component and format as string
let time: NaiveTime = datetime.naive_utc().time();
let time_str = time.format("%H:%M").to_string();
Waterlevel::create(
&mut tx, date, time_str, max, min, mittel, tumax, tumin, tumittel,
)
.await?
}
// 3. Save in DB
tx.commit().await.unwrap();
Ok(())
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Station {
station_no: String,
station_latitude: String,
station_longitude: String,
parametertype_name: String,
ts_shortname: String,
ts_name: String,
ts_unitname: String,
ts_unitsymbol: String,
ts_precision: String,
rows: String,
columns: String,
data: Vec<Data>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
struct Data {
timestamp: String,
max: Option<i64>,
min: Option<i64>,
mittel: Option<i64>,
tumax: Option<i64>,
tumin: Option<i64>,
tumittel: Option<i64>,
}
fn fetch() -> Result<Station, String> {
let url = "https://hydro.ooe.gv.at/daten/internet/stations/OG/207068/S/forecast.json";
match ureq::get(url).call() {
Ok(response) => {
let forecast: Result<Vec<Station>, _> = response.into_json();
if let Ok(data) = forecast {
if data.len() == 1 {
return Ok(data[0].clone());
} else {
return Err(format!(
"Expected 1 station (Linz); got {} while fetching from {url}. Maybe the hydro data format changed?",
data.len()
));
}
} else {
return Err(format!(
"Failed to parse the json received by {url}: {}",
forecast.err().unwrap()
));
}
}
Err(_) => {
return Err(format!(
"Could not fetch {url}, do you have internet? Maybe their server is down?"
));
}
}
}
//#[cfg(test)]
//mod test {
// use crate::testdb;
//
// use super::*;
// #[sqlx::test]
// fn test_fetch_succ() {
// let pool = testdb!();
// fetch();
// }
//}