generate rss feed
Some checks failed
CI/CD Pipeline / test (push) Successful in 1m49s
CI/CD Pipeline / deploy (push) Has been cancelled

This commit is contained in:
Philipp Hofer
2025-10-13 08:52:48 +02:00
parent 2411320522
commit 19c9b7739b
5 changed files with 58 additions and 188 deletions

10
Cargo.lock generated
View File

@@ -820,6 +820,7 @@ dependencies = [
"axum", "axum",
"bytes", "bytes",
"chrono", "chrono",
"quick-xml",
"reqwest", "reqwest",
"serde_json", "serde_json",
"stream-download", "stream-download",
@@ -855,6 +856,15 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "quick-xml"
version = "0.38.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42a232e7487fc2ef313d96dde7948e7a3c05101870d8985e4fd8d26aedd27b89"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "quinn" name = "quinn"
version = "0.11.8" version = "0.11.8"

View File

@@ -14,3 +14,4 @@ serde_json = "1"
stream-download = "0.22" stream-download = "0.22"
chrono = "0.4" chrono = "0.4"
tokio-cron-scheduler = "0.14" tokio-cron-scheduler = "0.14"
quick-xml = "0.38"

View File

@@ -2,10 +2,8 @@ mod state;
mod streamer; mod streamer;
use axum::{routing::get, Router}; use axum::{routing::get, Router};
use chrono::Utc;
use state::AppState; use state::AppState;
use std::sync::Arc; use std::sync::Arc;
use tokio_cron_scheduler::{Job, JobScheduler};
#[tokio::main] #[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> { async fn main() -> Result<(), Box<dyn std::error::Error>> {
@@ -15,25 +13,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
.route("/", get(streamer::stream_handler)) .route("/", get(streamer::stream_handler))
.with_state(state.clone()); .with_state(state.clone());
//let scheduler = JobScheduler::new().await.unwrap();
//scheduler
// .add(
// Job::new_async(
// "30 0 7 * * Mon,Tue,Wed,Thu,Fri,Sat",
// move |_uuid, _locked| {
// let state_for_task = state.clone();
// Box::pin(async move {
// state_for_task.check_update().await;
// println!("Task executed at: {}", Utc::now());
// })
// },
// )
// .unwrap(),
// )
// .await
// .unwrap();
//scheduler.start().await.unwrap();
println!("Streaming server running on http://localhost:3029"); println!("Streaming server running on http://localhost:3029");
let listener = tokio::net::TcpListener::bind("0.0.0.0:3029").await?; let listener = tokio::net::TcpListener::bind("0.0.0.0:3029").await?;

View File

@@ -1,143 +1,38 @@
use bytes::Bytes;
use chrono::{Local, NaiveDate}; use chrono::{Local, NaiveDate};
use std::{io::Read, sync::Arc, time::Duration}; use std::sync::Arc;
use stream_download::{ use tokio::sync::RwLock;
source::DecodeError, storage::temp::TempStorageProvider, Settings, StreamDownload,
};
use tokio::sync::{mpsc, Notify, RwLock};
pub struct AppState { pub struct AppState {
pub chunks: RwLock<Vec<Bytes>>, pub urls: RwLock<Vec<String>>,
pub complete: RwLock<bool>, pub last_download_on_day: RwLock<Option<NaiveDate>>,
pub notify: Notify,
pub downloaded_on_day: RwLock<Option<NaiveDate>>,
} }
impl AppState { impl AppState {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
chunks: RwLock::new(Vec::new()), urls: RwLock::new(Vec::new()),
complete: RwLock::new(false), last_download_on_day: RwLock::new(None),
notify: Notify::new(),
downloaded_on_day: RwLock::new(None),
} }
} }
pub async fn check_update(self: Arc<Self>) { pub async fn check_update(self: Arc<Self>) {
let today = Local::now().date_naive(); let today = Local::now().date_naive();
if let Some(downloaded_on_day) = *self.downloaded_on_day.read().await if let Some(downloaded_on_day) = *self.last_download_on_day.read().await
&& today == downloaded_on_day { && today == downloaded_on_day
{
return; return;
} }
self.reset().await; *self.last_download_on_day.write().await = Some(today);
*self.downloaded_on_day.write().await = Some(today);
let latest_url = player::newest_morning_journal_streaming_url() let latest_url = player::newest_morning_journal_streaming_url()
.await .await
.unwrap(); .unwrap();
let self_clone = Arc::clone(&self); let mut old = self.urls.read().await.clone();
tokio::spawn(async move { old.push(latest_url);
if let Err(e) = self_clone.download_stream(&latest_url).await { let new = old.into_iter().rev().take(10).collect(); // only keep last 10
eprintln!("Download failed: {e:?}");
}
});
}
async fn download_stream(&self, url: &str) -> Result<(), Box<dyn std::error::Error>> { *self.urls.write().await = new;
println!("Starting download from: {url}");
let reader = match StreamDownload::new_http(
url.parse()?,
TempStorageProvider::new(),
Settings::default(),
)
.await
{
Ok(reader) => reader,
Err(e) => return Err(e.decode_error().await.into()),
};
let (tx, mut rx) = mpsc::unbounded_channel::<bytes::Bytes>();
// Spawn blocking task to read from the stream
let read_handle = tokio::task::spawn_blocking(
move || -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut reader = reader;
let mut buffer = vec![0u8; 8192]; // 8KB buffer
loop {
match reader.read(&mut buffer) {
Ok(0) => {
// End of stream
println!("End of stream reached");
break;
}
Ok(n) => {
let chunk = bytes::Bytes::copy_from_slice(&buffer[..n]);
if tx.send(chunk).is_err() {
// Receiver dropped, exit
break;
}
}
Err(e) => {
println!("Error reading from stream: {e:?}");
// Short delay before retrying
std::thread::sleep(Duration::from_millis(100));
}
}
}
Ok(())
},
);
// Process received chunks
while let Some(bytes) = rx.recv().await {
self.add_chunk(bytes).await;
}
// Wait for the reading task to complete
match read_handle.await {
Ok(result) => result.unwrap(),
Err(e) => return Err(e.into()),
}
self.mark_complete().await;
println!("Download complete!");
Ok(())
}
async fn reset(&self) {
// Clear all chunks
self.chunks.write().await.clear();
// Reset completion status
*self.complete.write().await = false;
*self.downloaded_on_day.write().await = None;
// Notify any waiting tasks about the reset
self.notify.notify_waiters();
}
async fn add_chunk(&self, chunk: Bytes) {
self.chunks.write().await.push(chunk);
self.notify.notify_waiters();
}
async fn mark_complete(&self) {
*self.complete.write().await = true;
self.notify.notify_waiters();
}
pub async fn is_complete(&self) -> bool {
*self.complete.read().await
}
pub async fn chunk_count(&self) -> usize {
self.chunks.read().await.len()
} }
} }

View File

@@ -1,56 +1,41 @@
use crate::state::AppState; use crate::state::AppState;
use axum::{ use axum::{extract::State, http::HeaderMap, response::IntoResponse};
body::Body, use reqwest::header;
extract::State,
response::{Redirect, Response},
};
use player::newest_morning_journal_streaming_url;
use std::sync::Arc; use std::sync::Arc;
use tokio_stream::Stream;
pub async fn stream_handler(State(state): State<Arc<AppState>>) -> Redirect { pub async fn stream_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let url = newest_morning_journal_streaming_url().await.unwrap(); state.clone().check_update().await;
Redirect::temporary(&url)
//state.clone().check_update().await; let content = feed(&state.urls.read().await.to_vec());
//let stream = create_chunk_stream(state); let mut headers = HeaderMap::new();
//let body = Body::from_stream(stream); headers.insert(header::CONTENT_TYPE, "application/rss+xml".parse().unwrap());
(headers, content)
//Response::builder()
// .header("Content-Type", "audio/mpeg")
// .header("Cache-Control", "no-cache, no-store, must-revalidate")
// .header("Pragma", "no-cache")
// .header("Expires", "0")
// .header("Accept-Ranges", "none")
// .header("Transfer-Encoding", "chunked")
// .header("X-Content-Duration", "infinity")
// .body(body)
// .unwrap()
} }
fn create_chunk_stream( fn feed(urls: &Vec<String>) -> String {
state: Arc<AppState>, let mut ret = String::new();
) -> impl Stream<Item = Result<bytes::Bytes, std::io::Error>> { ret.push_str(r#"<?xml version="1.0" encoding="UTF-8"?>"#);
async_stream::stream! { ret.push_str(r#"<rss version="2.0">"#);
let mut position = 0; ret.push_str("<channel>");
ret.push_str("<title>Ö1 Morgenjournal Feed</title>");
ret.push_str("<link>https://news.hofer.link</link>");
ret.push_str("<description>Feed für Ö1 Morgenjournal. Live.</description>");
loop { for url in urls {
// Send available chunks ret.push_str("<item>");
let chunks = state.chunks.read().await; ret.push_str(&format!("<title>Morgenjournal</title>"));
while position < chunks.len() { ret.push_str(&format!("<link>{}</link>", quick_xml::escape::escape(url)));
yield Ok(chunks[position].clone()); ret.push_str(&format!(
position += 1; "<enclosure url=\"{}\" length=\"0\" type=\"audio/mpeg\"/>\n",
} quick_xml::escape::escape(url)
drop(chunks); ));
ret.push_str(&format!("<description>Morgenjournal</description>"));
// Exit if download complete and all chunks sent ret.push_str("</item>");
if state.is_complete().await && position >= state.chunk_count().await {
break;
} }
// Wait for new chunks ret.push_str(" </channel>");
state.notify.notified().await; ret.push_str("</rss>");
}
} ret
} }