From 67fb02894fd961abdf15d2ed46262fd36ecfb63c Mon Sep 17 00:00:00 2001 From: Philipp Hofer Date: Fri, 1 Aug 2025 19:04:25 +0200 Subject: [PATCH] remove need to call /new --- Cargo.lock | 122 ++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + src/downloader.rs | 85 -------------------------------- src/main.rs | 14 +----- src/state.rs | 105 +++++++++++++++++++++++++++++++++++++-- src/streamer.rs | 2 + 6 files changed, 227 insertions(+), 102 deletions(-) delete mode 100644 src/downloader.rs diff --git a/Cargo.lock b/Cargo.lock index 55e73b7..7609589 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,21 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" +[[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -159,6 +174,26 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "js-sys", + "num-traits", + "wasm-bindgen", + "windows-link", +] + +[[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + [[package]] name = "displaydoc" version = "0.2.5" @@ -434,6 +469,30 @@ dependencies = [ "tracing", ] +[[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + [[package]] name = "icu_collections" version = "2.0.0" @@ -668,6 +727,15 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + [[package]] name = "object" version = "0.36.7" @@ -731,6 +799,7 @@ dependencies = [ "async-stream", "axum", "bytes", + "chrono", "reqwest", "serde_json", "stream-download", @@ -1550,12 +1619,65 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "windows-link" version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/Cargo.toml b/Cargo.toml index 8f39364..f374949 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,3 +13,4 @@ async-stream = "0.3" serde_json = "1" tracing = "0.1" stream-download = "0.22" +chrono = "0.4.41" diff --git a/src/downloader.rs b/src/downloader.rs deleted file mode 100644 index 1b40797..0000000 --- a/src/downloader.rs +++ /dev/null @@ -1,85 +0,0 @@ -use crate::state::AppState; -use std::{io::Read, sync::Arc, time::Duration}; -use stream_download::{ - Settings, StreamDownload, source::DecodeError, storage::temp::TempStorageProvider, -}; -use tokio::sync::mpsc; - -pub async fn spawn_download_task(url: &str, state: Arc) { - state.reset().await; - - let url = url.to_string(); - tokio::spawn(async move { - if let Err(e) = download_stream(&url, state).await { - eprintln!("Download failed: {e:?}"); - } - }); -} - -async fn download_stream( - url: &str, - state: Arc, -) -> Result<(), Box> { - println!("Starting download from: {url}"); - - let reader = match StreamDownload::new_http( - url.parse()?, - TempStorageProvider::new(), - Settings::default(), - ) - .await - { - Ok(reader) => reader, - Err(e) => return Err(e.decode_error().await.into()), - }; - - let (tx, mut rx) = mpsc::unbounded_channel::(); - - // Spawn blocking task to read from the stream - let read_handle = tokio::task::spawn_blocking( - move || -> Result<(), Box> { - let mut reader = reader; - let mut buffer = vec![0u8; 8192]; // 8KB buffer - - loop { - match reader.read(&mut buffer) { - Ok(0) => { - // End of stream - println!("End of stream reached"); - break; - } - Ok(n) => { - let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]); - if tx.send(chunk).is_err() { - // Receiver dropped, exit - break; - } - } - Err(e) => { - println!("Error reading from stream: {e:?}"); - // Short delay before retrying - std::thread::sleep(Duration::from_millis(100)); - } - } - } - - Ok(()) - }, - ); - - // Process received chunks - while let Some(bytes) = rx.recv().await { - state.add_chunk(bytes).await; - } - - // Wait for the reading task to complete - match read_handle.await { - Ok(result) => result.unwrap(), - Err(e) => return Err(e.into()), - } - - state.mark_complete().await; - println!("Download complete!"); - - Ok(()) -} diff --git a/src/main.rs b/src/main.rs index d2037b0..5b31760 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,28 +1,16 @@ -mod downloader; mod state; mod streamer; -use axum::{Router, extract::State, routing::get}; +use axum::{routing::get, Router}; use state::AppState; use std::sync::Arc; -async fn new(State(state): State>) -> &'static str { - let Ok(url) = player::newest_morning_journal_streaming_url().await else { - return "Failed getting latest url"; - }; - - downloader::spawn_download_task(&url, state.clone()).await; - - "Download started. Access / to stream the new content." -} - #[tokio::main] async fn main() -> Result<(), Box> { let state = Arc::new(AppState::new()); let app = Router::new() .route("/", get(streamer::stream_handler)) - .route("/new", get(new)) .with_state(state); println!("Streaming server running on http://localhost:3029"); diff --git a/src/state.rs b/src/state.rs index e7fb40d..dc19cea 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,10 +1,16 @@ use bytes::Bytes; -use tokio::sync::{Notify, RwLock}; +use chrono::{Local, NaiveDate}; +use std::{io::Read, sync::Arc, time::Duration}; +use stream_download::{ + source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload, +}; +use tokio::sync::{mpsc, Notify, RwLock}; pub struct AppState { pub chunks: RwLock>, pub complete: RwLock, pub notify: Notify, + pub downloaded_on_day: RwLock>, } impl AppState { @@ -13,26 +19,117 @@ impl AppState { chunks: RwLock::new(Vec::new()), complete: RwLock::new(false), notify: Notify::new(), + downloaded_on_day: RwLock::new(None), } } - pub async fn reset(&self) { + pub async fn check_update(self: Arc) { + let today = Local::now().date_naive(); + if let Some(downloaded_on_day) = *self.downloaded_on_day.read().await { + if today == downloaded_on_day { + return; + } + } + + self.reset().await; + *self.downloaded_on_day.write().await = Some(today); + + let latest_url = player::newest_morning_journal_streaming_url() + .await + .unwrap(); + + let self_clone = Arc::clone(&self); + tokio::spawn(async move { + if let Err(e) = self_clone.download_stream(&latest_url).await { + eprintln!("Download failed: {e:?}"); + } + }); + } + + async fn download_stream(&self, url: &str) -> Result<(), Box> { + println!("Starting download from: {url}"); + + let reader = match StreamDownload::new_http( + url.parse()?, + TempStorageProvider::new(), + Settings::default(), + ) + .await + { + Ok(reader) => reader, + Err(e) => return Err(e.decode_error().await.into()), + }; + + let (tx, mut rx) = mpsc::unbounded_channel::(); + + // Spawn blocking task to read from the stream + let read_handle = tokio::task::spawn_blocking( + move || -> Result<(), Box> { + let mut reader = reader; + let mut buffer = vec![0u8; 8192]; // 8KB buffer + + loop { + match reader.read(&mut buffer) { + Ok(0) => { + // End of stream + println!("End of stream reached"); + break; + } + Ok(n) => { + let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]); + if tx.send(chunk).is_err() { + // Receiver dropped, exit + break; + } + } + Err(e) => { + println!("Error reading from stream: {e:?}"); + // Short delay before retrying + std::thread::sleep(Duration::from_millis(100)); + } + } + } + + Ok(()) + }, + ); + + // Process received chunks + while let Some(bytes) = rx.recv().await { + self.add_chunk(bytes).await; + } + + // Wait for the reading task to complete + match read_handle.await { + Ok(result) => result.unwrap(), + Err(e) => return Err(e.into()), + } + + self.mark_complete().await; + println!("Download complete!"); + + Ok(()) + } + + async fn reset(&self) { // Clear all chunks self.chunks.write().await.clear(); // Reset completion status *self.complete.write().await = false; + *self.downloaded_on_day.write().await = None; + // Notify any waiting tasks about the reset self.notify.notify_waiters(); } - pub async fn add_chunk(&self, chunk: Bytes) { + async fn add_chunk(&self, chunk: Bytes) { self.chunks.write().await.push(chunk); self.notify.notify_waiters(); } - pub async fn mark_complete(&self) { + async fn mark_complete(&self) { *self.complete.write().await = true; self.notify.notify_waiters(); } diff --git a/src/streamer.rs b/src/streamer.rs index ac54395..8a9a8c6 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use tokio_stream::Stream; pub async fn stream_handler(State(state): State>) -> Response { + state.clone().check_update().await; + let stream = create_chunk_stream(state); let body = Body::from_stream(stream);