From 19c9b7739b6101704a91a61a821808c048d46faf Mon Sep 17 00:00:00 2001 From: Philipp Hofer Date: Mon, 13 Oct 2025 08:52:48 +0200 Subject: [PATCH] generate rss feed --- Cargo.lock | 10 ++++ Cargo.toml | 1 + src/main.rs | 21 -------- src/state.rs | 137 ++++++------------------------------------------ src/streamer.rs | 77 +++++++++++---------------- 5 files changed, 58 insertions(+), 188 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 198e7b0..9fb1264 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -820,6 +820,7 @@ dependencies = [ "axum", "bytes", "chrono", + "quick-xml", "reqwest", "serde_json", "stream-download", @@ -855,6 +856,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "quick-xml" +version = "0.38.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89" +dependencies = [ + "memchr", +] + [[package]] name = "quinn" version = "0.11.8" diff --git a/Cargo.toml b/Cargo.toml index 6923d55..e5dd283 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,3 +14,4 @@ serde_json = "1" stream-download = "0.22" chrono = "0.4" tokio-cron-scheduler = "0.14" +quick-xml = "0.38" diff --git a/src/main.rs b/src/main.rs index a715e7c..43494f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,10 +2,8 @@ mod state; mod streamer; use axum::{routing::get, Router}; -use chrono::Utc; use state::AppState; use std::sync::Arc; -use tokio_cron_scheduler::{Job, JobScheduler}; #[tokio::main] async fn main() -> Result<(), Box> { @@ -15,25 +13,6 @@ async fn main() -> Result<(), Box> { .route("/", get(streamer::stream_handler)) .with_state(state.clone()); - //let scheduler = JobScheduler::new().await.unwrap(); - //scheduler - // .add( - // Job::new_async( - // "30 0 7 * * Mon,Tue,Wed,Thu,Fri,Sat", - // move |_uuid, _locked| { - // let state_for_task = state.clone(); - // Box::pin(async move { - // state_for_task.check_update().await; - // println!("Task executed at: {}", Utc::now()); - // }) - // }, - // ) - // .unwrap(), - // ) - // .await - // .unwrap(); - //scheduler.start().await.unwrap(); - println!("Streaming server running on http://localhost:3029"); let listener = tokio::net::TcpListener::bind("0.0.0.0:3029").await?; diff --git a/src/state.rs b/src/state.rs index 0874dfc..5c23f25 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,143 +1,38 @@ -use bytes::Bytes; use chrono::{Local, NaiveDate}; -use std::{io::Read, sync::Arc, time::Duration}; -use stream_download::{ - source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload, -}; -use tokio::sync::{mpsc, Notify, RwLock}; +use std::sync::Arc; +use tokio::sync::RwLock; pub struct AppState { - pub chunks: RwLock>, - pub complete: RwLock, - pub notify: Notify, - pub downloaded_on_day: RwLock>, + pub urls: RwLock>, + pub last_download_on_day: RwLock>, } impl AppState { pub fn new() -> Self { Self { - chunks: RwLock::new(Vec::new()), - complete: RwLock::new(false), - notify: Notify::new(), - downloaded_on_day: RwLock::new(None), + urls: RwLock::new(Vec::new()), + last_download_on_day: RwLock::new(None), } } pub async fn check_update(self: Arc) { let today = Local::now().date_naive(); - if let Some(downloaded_on_day) = *self.downloaded_on_day.read().await - && today == downloaded_on_day { - return; - } + if let Some(downloaded_on_day) = *self.last_download_on_day.read().await + && today == downloaded_on_day + { + return; + } - self.reset().await; - *self.downloaded_on_day.write().await = Some(today); + *self.last_download_on_day.write().await = Some(today); let latest_url = player::newest_morning_journal_streaming_url() .await .unwrap(); - let self_clone = Arc::clone(&self); - tokio::spawn(async move { - if let Err(e) = self_clone.download_stream(&latest_url).await { - eprintln!("Download failed: {e:?}"); - } - }); - } + let mut old = self.urls.read().await.clone(); + old.push(latest_url); + let new = old.into_iter().rev().take(10).collect(); // only keep last 10 - async fn download_stream(&self, url: &str) -> Result<(), Box> { - println!("Starting download from: {url}"); - - let reader = match StreamDownload::new_http( - url.parse()?, - TempStorageProvider::new(), - Settings::default(), - ) - .await - { - Ok(reader) => reader, - Err(e) => return Err(e.decode_error().await.into()), - }; - - let (tx, mut rx) = mpsc::unbounded_channel::(); - - // Spawn blocking task to read from the stream - let read_handle = tokio::task::spawn_blocking( - move || -> Result<(), Box> { - let mut reader = reader; - let mut buffer = vec![0u8; 8192]; // 8KB buffer - - loop { - match reader.read(&mut buffer) { - Ok(0) => { - // End of stream - println!("End of stream reached"); - break; - } - Ok(n) => { - let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]); - if tx.send(chunk).is_err() { - // Receiver dropped, exit - break; - } - } - Err(e) => { - println!("Error reading from stream: {e:?}"); - // Short delay before retrying - std::thread::sleep(Duration::from_millis(100)); - } - } - } - - Ok(()) - }, - ); - - // Process received chunks - while let Some(bytes) = rx.recv().await { - self.add_chunk(bytes).await; - } - - // Wait for the reading task to complete - match read_handle.await { - Ok(result) => result.unwrap(), - Err(e) => return Err(e.into()), - } - - self.mark_complete().await; - println!("Download complete!"); - - Ok(()) - } - - async fn reset(&self) { - // Clear all chunks - self.chunks.write().await.clear(); - - // Reset completion status - *self.complete.write().await = false; - - *self.downloaded_on_day.write().await = None; - - // Notify any waiting tasks about the reset - self.notify.notify_waiters(); - } - - async fn add_chunk(&self, chunk: Bytes) { - self.chunks.write().await.push(chunk); - self.notify.notify_waiters(); - } - - async fn mark_complete(&self) { - *self.complete.write().await = true; - self.notify.notify_waiters(); - } - - pub async fn is_complete(&self) -> bool { - *self.complete.read().await - } - - pub async fn chunk_count(&self) -> usize { - self.chunks.read().await.len() + *self.urls.write().await = new; } } diff --git a/src/streamer.rs b/src/streamer.rs index b3bbf3f..e4f66e0 100644 --- a/src/streamer.rs +++ b/src/streamer.rs @@ -1,56 +1,41 @@ use crate::state::AppState; -use axum::{ - body::Body, - extract::State, - response::{Redirect, Response}, -}; -use player::newest_morning_journal_streaming_url; +use axum::{extract::State, http::HeaderMap, response::IntoResponse}; +use reqwest::header; use std::sync::Arc; -use tokio_stream::Stream; -pub async fn stream_handler(State(state): State>) -> Redirect { - let url = newest_morning_journal_streaming_url().await.unwrap(); - Redirect::temporary(&url) +pub async fn stream_handler(State(state): State>) -> impl IntoResponse { + state.clone().check_update().await; - //state.clone().check_update().await; + let content = feed(&state.urls.read().await.to_vec()); - //let stream = create_chunk_stream(state); - //let body = Body::from_stream(stream); - - //Response::builder() - // .header("Content-Type", "audio/mpeg") - // .header("Cache-Control", "no-cache, no-store, must-revalidate") - // .header("Pragma", "no-cache") - // .header("Expires", "0") - // .header("Accept-Ranges", "none") - // .header("Transfer-Encoding", "chunked") - // .header("X-Content-Duration", "infinity") - // .body(body) - // .unwrap() + let mut headers = HeaderMap::new(); + headers.insert(header::CONTENT_TYPE, "application/rss+xml".parse().unwrap()); + (headers, content) } -fn create_chunk_stream( - state: Arc, -) -> impl Stream> { - async_stream::stream! { - let mut position = 0; +fn feed(urls: &Vec) -> String { + let mut ret = String::new(); + ret.push_str(r#""#); + ret.push_str(r#""#); + ret.push_str(""); + ret.push_str("Ö1 Morgenjournal Feed"); + ret.push_str("https://news.hofer.link"); + ret.push_str("Feed für Ö1 Morgenjournal. Live."); - loop { - // Send available chunks - let chunks = state.chunks.read().await; - while position < chunks.len() { - yield Ok(chunks[position].clone()); - position += 1; - } - drop(chunks); - - // Exit if download complete and all chunks sent - if state.is_complete().await && position >= state.chunk_count().await { - break; - } - - // Wait for new chunks - state.notify.notified().await; - } + for url in urls { + ret.push_str(""); + ret.push_str(&format!("Morgenjournal")); + ret.push_str(&format!("{}", quick_xml::escape::escape(url))); + ret.push_str(&format!( + "\n", + quick_xml::escape::escape(url) + )); + ret.push_str(&format!("Morgenjournal")); + ret.push_str(""); } + + ret.push_str(" "); + ret.push_str(""); + + ret }